Skip to main content

meerkat_mob/runtime/
handle.rs

1use super::*;
2use crate::MobRuntimeMode;
3use crate::roster::MobMemberKickoffSnapshot;
4use crate::runtime::mob_member_lifecycle_authority::{
5    CanonicalMemberSnapshotMaterial, CanonicalMemberStatus, CanonicalSessionObservation,
6    MobMemberLifecycleAuthority, MobMemberLifecycleInput, MobMemberTerminalClass,
7};
8#[cfg(target_arch = "wasm32")]
9use crate::tokio;
10use futures::stream::{FuturesUnordered, StreamExt};
11use meerkat_core::comms::{
12    PeerDirectoryEntry, PeerReachability, PeerReachabilityReason, TrustedPeerSpec,
13};
14use meerkat_core::ops::OperationId;
15use meerkat_core::ops_lifecycle::OpsLifecycleRegistry;
16use meerkat_core::service::{MobToolAuthorityContext, SessionError};
17use meerkat_core::time_compat::Instant;
18use meerkat_core::types::{HandlingMode, RenderMetadata, SessionId};
19use serde::{Deserialize, Serialize};
20use std::collections::BTreeMap;
21use std::collections::BTreeSet;
22use std::collections::HashMap;
23use std::time::Duration;
24
25const DEFAULT_KICKOFF_WAIT_TIMEOUT: Duration = Duration::from_secs(600);
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28enum PeerConnectivityProjection {
29    Omit,
30    Include,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34enum SessionObservationProjection {
35    LiveOnly,
36    Full,
37}
38
39/// Point-in-time snapshot of a mob member's execution state.
40#[derive(Debug, Clone, Serialize)]
41#[non_exhaustive]
42pub struct MobMemberSnapshot {
43    /// Current lifecycle status.
44    pub status: MobMemberStatus,
45    /// Preview of the last assistant output (if any).
46    pub output_preview: Option<String>,
47    /// Error description (if the member errored).
48    pub error: Option<String>,
49    /// Cumulative token usage.
50    pub tokens_used: u64,
51    /// Whether the member has reached a terminal state.
52    pub is_final: bool,
53    /// Current session ID (if a session bridge exists).
54    pub current_session_id: Option<SessionId>,
55    /// Live comms connectivity for currently wired peers, when available.
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub peer_connectivity: Option<MobPeerConnectivitySnapshot>,
58    /// Initial autonomous-turn kickoff state, when this member has one.
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub kickoff: Option<MobMemberKickoffSnapshot>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[non_exhaustive]
65pub struct MobMemberListEntry {
66    pub meerkat_id: MeerkatId,
67    pub profile: ProfileName,
68    pub member_ref: MemberRef,
69    pub runtime_mode: MobRuntimeMode,
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub peer_id: Option<String>,
72    #[serde(default)]
73    pub state: crate::roster::MemberState,
74    pub wired_to: BTreeSet<MeerkatId>,
75    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
76    pub external_peer_specs: BTreeMap<MeerkatId, TrustedPeerSpec>,
77    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
78    pub labels: BTreeMap<String, String>,
79    pub status: MobMemberStatus,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub error: Option<String>,
82    pub is_final: bool,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub current_session_id: Option<SessionId>,
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub kickoff: Option<MobMemberKickoffSnapshot>,
87}
88
89impl MobMemberListEntry {
90    pub fn session_id(&self) -> Option<&SessionId> {
91        self.member_ref.session_id()
92    }
93}
94
95/// Live connectivity summary for a member's currently wired peers.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97#[non_exhaustive]
98pub struct MobPeerConnectivitySnapshot {
99    pub reachable_peer_count: usize,
100    pub unknown_peer_count: usize,
101    pub unreachable_peers: Vec<MobUnreachablePeer>,
102}
103
104/// One currently wired peer that is known to be unreachable.
105#[derive(Debug, Clone, Serialize, Deserialize)]
106#[non_exhaustive]
107pub struct MobUnreachablePeer {
108    pub peer: String,
109    pub reason: Option<PeerReachabilityReason>,
110}
111
112/// Execution status for a mob member.
113#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
114#[serde(rename_all = "snake_case")]
115#[non_exhaustive]
116pub enum MobMemberStatus {
117    /// Member is active and potentially running.
118    Active,
119    /// Member is in the process of retiring.
120    Retiring,
121    /// Member failed to restore durable session state and needs repair.
122    Broken,
123    /// Member has completed (session archived or not found).
124    Completed,
125    /// Member is not in the roster.
126    Unknown,
127}
128
129/// Receipt returned by a successful member respawn.
130#[derive(Debug, Clone, Serialize)]
131#[non_exhaustive]
132pub struct MemberRespawnReceipt {
133    /// The member identity that was respawned.
134    pub member_id: MeerkatId,
135    /// Session ID of the retired (old) session.
136    pub old_session_id: Option<SessionId>,
137    /// Session ID of the newly spawned session.
138    pub new_session_id: Option<SessionId>,
139}
140
141impl MemberRespawnReceipt {
142    pub fn new(
143        member_id: MeerkatId,
144        old_session_id: Option<SessionId>,
145        new_session_id: Option<SessionId>,
146    ) -> Self {
147        Self {
148            member_id,
149            old_session_id,
150            new_session_id,
151        }
152    }
153}
154
155/// Receipt returned by a successful member spawn.
156#[derive(Debug, Clone, Serialize)]
157#[non_exhaustive]
158pub(crate) struct MemberSpawnReceipt {
159    /// The member identity that was provisioned and committed into the roster.
160    pub(crate) member_ref: MemberRef,
161    /// Canonical mob child operation for the spawned member lifecycle.
162    pub(crate) operation_id: OperationId,
163}
164
165#[derive(Clone)]
166pub(crate) struct CanonicalOpsOwnerContext {
167    pub(crate) owner_session_id: SessionId,
168    pub(crate) ops_registry: Arc<dyn OpsLifecycleRegistry>,
169}
170
171/// Structured error for direct-Rust respawn failures.
172#[derive(Debug, thiserror::Error)]
173#[non_exhaustive]
174pub enum MobRespawnError {
175    /// Member has no current session bridge to retire.
176    #[error("no current session bridge for member {member_id}")]
177    NoSessionBridge { member_id: MeerkatId },
178
179    /// Spawn failed after the old member was retired.
180    #[error("spawn failed after retire for member {member_id}: {reason}")]
181    SpawnAfterRetire {
182        member_id: MeerkatId,
183        reason: String,
184    },
185
186    /// Topology restore failed after replacement spawn.
187    /// The replacement receipt is carried so callers can still use the new session.
188    #[error("topology restore failed for member {}: {} peer(s) failed", receipt.member_id, failed_peer_ids.len())]
189    TopologyRestoreFailed {
190        receipt: MemberRespawnReceipt,
191        failed_peer_ids: Vec<MeerkatId>,
192    },
193
194    /// An underlying mob error occurred before mutation.
195    #[error(transparent)]
196    Mob(#[from] MobError),
197}
198
199/// Receipt returned by member message delivery.
200#[derive(Debug, Clone, Serialize)]
201#[non_exhaustive]
202pub struct MemberDeliveryReceipt {
203    /// The member that received the message.
204    pub member_id: MeerkatId,
205    /// The session ID the message was delivered to.
206    pub session_id: SessionId,
207    /// How the message was handled.
208    pub handling_mode: HandlingMode,
209}
210
211/// Reference to a member's current session bridge.
212#[derive(Debug, Clone, Serialize)]
213#[non_exhaustive]
214pub struct MemberSessionRef {
215    /// The member identity.
216    pub member_id: MeerkatId,
217    /// The current session ID.
218    pub session_id: SessionId,
219}
220
221/// Options for helper convenience spawns.
222#[derive(Debug, Clone, Default)]
223#[non_exhaustive]
224pub struct HelperOptions {
225    /// Role name (profile key) to use. If None, requires a default profile in the definition.
226    pub role_name: Option<ProfileName>,
227    /// Runtime mode override.
228    pub runtime_mode: Option<crate::MobRuntimeMode>,
229    /// Backend override.
230    pub backend: Option<MobBackendKind>,
231    /// Tool access policy for the helper.
232    pub tool_access_policy: Option<meerkat_core::ops::ToolAccessPolicy>,
233}
234
235/// Result from a helper spawn-and-wait operation.
236#[derive(Debug, Clone)]
237#[non_exhaustive]
238pub struct HelperResult {
239    /// The member's final output text.
240    pub output: Option<String>,
241    /// Total tokens used by the helper.
242    pub tokens_used: u64,
243    /// The session ID that was used.
244    pub session_id: Option<meerkat_core::types::SessionId>,
245}
246
247/// Target for a wire operation from a local mob member.
248#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
249#[serde(rename_all = "snake_case")]
250pub enum PeerTarget {
251    /// Another member in the same mob roster.
252    Local(MeerkatId),
253    /// A trusted peer that lives outside the local mob roster.
254    External(TrustedPeerSpec),
255}
256
257impl From<MeerkatId> for PeerTarget {
258    fn from(value: MeerkatId) -> Self {
259        Self::Local(value)
260    }
261}
262
263struct MobMemberTerminalClassifier;
264
265impl MobMemberTerminalClassifier {
266    fn classify(material: &CanonicalMemberSnapshotMaterial) -> MobMemberTerminalClass {
267        if !material.member_present {
268            return MobMemberTerminalClass::TerminalUnknown;
269        }
270        match material.status {
271            // Retiring remains non-terminal while canonical roster membership
272            // still exists, even if the session read is already inactive/missing.
273            CanonicalMemberStatus::Retiring => MobMemberTerminalClass::Running,
274            CanonicalMemberStatus::Broken => MobMemberTerminalClass::TerminalFailure,
275            CanonicalMemberStatus::Active => match material.session_observation {
276                CanonicalSessionObservation::Active
277                | CanonicalSessionObservation::Inactive
278                | CanonicalSessionObservation::Unknown => MobMemberTerminalClass::Running,
279                CanonicalSessionObservation::Missing => MobMemberTerminalClass::TerminalCompleted,
280            },
281            CanonicalMemberStatus::Completed => MobMemberTerminalClass::TerminalCompleted,
282            CanonicalMemberStatus::Unknown => MobMemberTerminalClass::TerminalUnknown,
283        }
284    }
285
286    fn is_terminal(material: &CanonicalMemberSnapshotMaterial) -> bool {
287        matches!(
288            Self::classify(material),
289            MobMemberTerminalClass::TerminalFailure
290                | MobMemberTerminalClass::TerminalUnknown
291                | MobMemberTerminalClass::TerminalCompleted
292        )
293    }
294
295    fn has_canonical_member(material: &CanonicalMemberSnapshotMaterial) -> bool {
296        material.member_present
297    }
298}
299
300// ---------------------------------------------------------------------------
301// MobHandle
302// ---------------------------------------------------------------------------
303
304/// Clone-cheap, thread-safe handle for interacting with a running mob.
305///
306/// All mutation commands are sent through an mpsc channel to the actor.
307/// Read-only operations (roster, state) bypass the actor and read from
308/// shared `Arc` state directly.
309#[derive(Clone)]
310pub struct MobHandle {
311    pub(super) command_tx: mpsc::Sender<MobCommand>,
312    pub(super) roster: Arc<RwLock<RosterAuthority>>,
313    pub(super) task_board: Arc<RwLock<TaskBoard>>,
314    pub(super) definition: Arc<MobDefinition>,
315    pub(super) state: Arc<AtomicU8>,
316    pub(super) events: Arc<dyn MobEventStore>,
317    pub(super) mcp_servers: Arc<tokio::sync::Mutex<BTreeMap<String, actor::McpServerEntry>>>,
318    pub(super) flow_streams:
319        Arc<tokio::sync::Mutex<BTreeMap<RunId, mpsc::Sender<meerkat_core::ScopedAgentEvent>>>>,
320    pub(super) session_service: Arc<dyn MobSessionService>,
321    pub(super) restore_diagnostics: Arc<RwLock<HashMap<MeerkatId, RestoreFailureDiagnostic>>>,
322}
323
324#[derive(Debug, Clone)]
325pub(crate) struct RestoreFailureDiagnostic {
326    pub(crate) session_id: SessionId,
327    pub(crate) reason: String,
328}
329
330/// Clone-cheap, capability-bearing handle for interacting with one mob member.
331///
332/// This is the target 0.5 API surface for message/turn submission. The mob
333/// handle remains orchestration/control-plane oriented, while member-directed
334/// delivery goes through this narrower capability.
335#[derive(Clone)]
336pub struct MemberHandle {
337    mob: MobHandle,
338    meerkat_id: MeerkatId,
339}
340
341#[derive(Clone)]
342pub struct MobEventsView {
343    inner: Arc<dyn MobEventStore>,
344}
345
346/// Spawn request for first-class batch member provisioning.
347#[derive(Clone, Debug)]
348#[non_exhaustive]
349pub struct SpawnMemberSpec {
350    /// The role name (profile key) for this member in the mob roster.
351    ///
352    /// When `tooling` is present it controls model/tool resolution;
353    /// `role_name` remains a roster/topology label.
354    pub role_name: ProfileName,
355    pub meerkat_id: MeerkatId,
356    pub initial_message: Option<ContentInput>,
357    pub runtime_mode: Option<crate::MobRuntimeMode>,
358    pub backend: Option<MobBackendKind>,
359    /// Runtime binding for this member. When set, takes precedence over
360    /// `backend` and carries concrete binding details (e.g., external process
361    /// comms identity). First step toward identity-first mobs.
362    pub binding: Option<crate::RuntimeBinding>,
363    /// Opaque application context passed through to the agent build pipeline.
364    pub context: Option<serde_json::Value>,
365    /// Application-defined labels for this member.
366    pub labels: Option<std::collections::BTreeMap<String, String>>,
367    /// How this member should be launched (fresh, resume, or fork).
368    pub launch_mode: crate::launch::MemberLaunchMode,
369    /// Tool access policy for this member.
370    pub tool_access_policy: Option<meerkat_core::ops::ToolAccessPolicy>,
371    /// How to split budget from the orchestrator to this member.
372    pub budget_split_policy: Option<crate::launch::BudgetSplitPolicy>,
373    /// When true, automatically wire this member to its spawner.
374    pub auto_wire_parent: bool,
375    /// Additional instruction sections appended to the system prompt for this member.
376    pub additional_instructions: Option<Vec<String>>,
377    /// Per-agent environment variables injected into shell tool subprocesses.
378    pub shell_env: Option<std::collections::HashMap<String, String>>,
379    /// Pre-resolved inherited tool filter from spawn tooling resolution.
380    ///
381    /// When set, stored as `INHERITED_TOOL_FILTER_METADATA_KEY` on the child
382    /// session metadata so `AgentBuilder::build()` recovers it as a base filter.
383    pub inherited_tool_filter: Option<meerkat_core::tool_scope::ToolFilter>,
384    /// Override profile resolved from `SpawnTooling::Profile` source.
385    ///
386    /// When set, the spawn path uses this profile instead of looking up by
387    /// `role_name` from the mob definition. This allows agent-owned spawn
388    /// tooling to specify a different model/skills/tools via inline or
389    /// realm-scoped profiles.
390    pub override_profile: Option<crate::profile::Profile>,
391}
392
393impl SpawnMemberSpec {
394    pub fn new(profile: impl Into<ProfileName>, meerkat_id: impl Into<MeerkatId>) -> Self {
395        Self {
396            role_name: profile.into(),
397            meerkat_id: meerkat_id.into(),
398            initial_message: None,
399            runtime_mode: None,
400            backend: None,
401            binding: None,
402            context: None,
403            labels: None,
404            launch_mode: crate::launch::MemberLaunchMode::Fresh,
405            tool_access_policy: None,
406            budget_split_policy: None,
407            auto_wire_parent: false,
408            additional_instructions: None,
409            shell_env: None,
410            inherited_tool_filter: None,
411            override_profile: None,
412        }
413    }
414
415    pub fn with_shell_env(mut self, env: std::collections::HashMap<String, String>) -> Self {
416        self.shell_env = Some(env);
417        self
418    }
419
420    pub fn with_initial_message(mut self, message: impl Into<ContentInput>) -> Self {
421        self.initial_message = Some(message.into());
422        self
423    }
424
425    pub fn with_runtime_mode(mut self, mode: crate::MobRuntimeMode) -> Self {
426        self.runtime_mode = Some(mode);
427        self
428    }
429
430    pub fn with_backend(mut self, backend: MobBackendKind) -> Self {
431        self.backend = Some(backend);
432        self
433    }
434
435    pub fn with_context(mut self, context: serde_json::Value) -> Self {
436        self.context = Some(context);
437        self
438    }
439
440    pub fn with_labels(mut self, labels: std::collections::BTreeMap<String, String>) -> Self {
441        self.labels = Some(labels);
442        self
443    }
444
445    /// Set launch mode to resume an existing session.
446    ///
447    /// This is a convenience method equivalent to setting
448    /// `launch_mode = MemberLaunchMode::Resume { session_id }`.
449    pub fn with_resume_session_id(mut self, id: meerkat_core::types::SessionId) -> Self {
450        self.launch_mode = crate::launch::MemberLaunchMode::Resume { session_id: id };
451        self
452    }
453
454    pub fn with_launch_mode(mut self, mode: crate::launch::MemberLaunchMode) -> Self {
455        self.launch_mode = mode;
456        self
457    }
458
459    pub fn with_tool_access_policy(mut self, policy: meerkat_core::ops::ToolAccessPolicy) -> Self {
460        self.tool_access_policy = Some(policy);
461        self
462    }
463
464    pub fn with_budget_split_policy(mut self, policy: crate::launch::BudgetSplitPolicy) -> Self {
465        self.budget_split_policy = Some(policy);
466        self
467    }
468
469    pub fn with_auto_wire_parent(mut self, auto_wire: bool) -> Self {
470        self.auto_wire_parent = auto_wire;
471        self
472    }
473
474    pub fn with_additional_instructions(mut self, instructions: Vec<String>) -> Self {
475        self.additional_instructions = Some(instructions);
476        self
477    }
478
479    pub fn from_wire(
480        profile: String,
481        meerkat_id: String,
482        initial_message: Option<ContentInput>,
483        runtime_mode: Option<crate::MobRuntimeMode>,
484        backend: Option<MobBackendKind>,
485    ) -> Self {
486        let mut spec = Self::new(profile, meerkat_id);
487        spec.initial_message = initial_message;
488        spec.runtime_mode = runtime_mode;
489        spec.backend = backend;
490        spec
491    }
492
493    /// Extract the resume session ID if the launch mode is `Resume`.
494    pub fn resume_session_id(&self) -> Option<&meerkat_core::types::SessionId> {
495        match &self.launch_mode {
496            crate::launch::MemberLaunchMode::Resume { session_id } => Some(session_id),
497            _ => None,
498        }
499    }
500}
501
502impl MobEventsView {
503    pub async fn poll(
504        &self,
505        after_cursor: u64,
506        limit: usize,
507    ) -> Result<Vec<crate::event::MobEvent>, MobError> {
508        self.inner
509            .poll(after_cursor, limit)
510            .await
511            .map_err(MobError::from)
512    }
513
514    pub async fn replay_all(&self) -> Result<Vec<crate::event::MobEvent>, MobError> {
515        self.inner.replay_all().await.map_err(MobError::from)
516    }
517}
518
519impl MobHandle {
520    async fn restore_failure_for(
521        &self,
522        meerkat_id: &MeerkatId,
523    ) -> Option<RestoreFailureDiagnostic> {
524        self.restore_diagnostics
525            .read()
526            .await
527            .get(meerkat_id)
528            .cloned()
529    }
530
531    fn restore_failure_error(meerkat_id: &MeerkatId, diag: RestoreFailureDiagnostic) -> MobError {
532        MobError::MemberRestoreFailed {
533            member_id: meerkat_id.clone(),
534            session_id: diag.session_id,
535            reason: diag.reason,
536        }
537    }
538
539    /// Poll mob events from the underlying store.
540    pub async fn poll_events(
541        &self,
542        after_cursor: u64,
543        limit: usize,
544    ) -> Result<Vec<crate::event::MobEvent>, MobError> {
545        self.events
546            .poll(after_cursor, limit)
547            .await
548            .map_err(MobError::from)
549    }
550
551    /// Current mob lifecycle state (lock-free read).
552    pub fn status(&self) -> MobState {
553        MobState::from_u8(self.state.load(Ordering::Acquire))
554    }
555
556    /// Access the mob definition.
557    pub fn definition(&self) -> &MobDefinition {
558        &self.definition
559    }
560
561    /// Mob ID.
562    pub fn mob_id(&self) -> &MobId {
563        &self.definition.id
564    }
565
566    /// Snapshot of the current roster.
567    pub async fn roster(&self) -> Roster {
568        self.roster.read().await.snapshot()
569    }
570
571    fn derived_comms_name(&self, entry: &RosterEntry) -> String {
572        format!(
573            "{}/{}/{}",
574            self.definition.id, entry.profile, entry.meerkat_id
575        )
576    }
577
578    async fn resolve_peer_connectivity(
579        &self,
580        entry: &RosterEntry,
581        session_id: &SessionId,
582        roster_snapshot: &Roster,
583    ) -> Option<MobPeerConnectivitySnapshot> {
584        let comms = self.session_service.comms_runtime(session_id).await?;
585        let peers = comms.peers().await;
586        let peers_by_id: HashMap<&str, &PeerDirectoryEntry> = peers
587            .iter()
588            .map(|peer| (peer.peer_id.as_str(), peer))
589            .collect();
590        let peers_by_name: HashMap<&str, &PeerDirectoryEntry> = peers
591            .iter()
592            .map(|peer| (peer.name.as_str(), peer))
593            .collect();
594
595        let mut reachable_peer_count = 0usize;
596        let mut unknown_peer_count = 0usize;
597        let mut unreachable_peers = Vec::new();
598
599        for wired_peer in &entry.wired_to {
600            let matched = if let Some(spec) = entry.external_peer_specs.get(wired_peer) {
601                peers_by_id
602                    .get(spec.peer_id.as_str())
603                    .copied()
604                    .or_else(|| peers_by_name.get(spec.name.as_str()).copied())
605            } else {
606                let local_entry = roster_snapshot.get(wired_peer);
607                let live_peer_id =
608                    match local_entry.and_then(|peer_entry| peer_entry.member_ref.session_id()) {
609                        Some(target_session_id) => self
610                            .session_service
611                            .comms_runtime(target_session_id)
612                            .await
613                            .and_then(|runtime| runtime.public_key()),
614                        None => None,
615                    };
616                live_peer_id
617                    .as_deref()
618                    .and_then(|peer_id| peers_by_id.get(peer_id).copied())
619                    .or_else(|| {
620                        local_entry
621                            .and_then(|peer_entry| peer_entry.peer_id.as_deref())
622                            .and_then(|peer_id| peers_by_id.get(peer_id).copied())
623                    })
624                    .or_else(|| {
625                        local_entry
626                            .map(|peer_entry| self.derived_comms_name(peer_entry))
627                            .and_then(|name| peers_by_name.get(name.as_str()).copied())
628                    })
629            };
630
631            match matched {
632                Some(peer) => match peer.reachability {
633                    PeerReachability::Reachable => reachable_peer_count += 1,
634                    PeerReachability::Unknown => unknown_peer_count += 1,
635                    PeerReachability::Unreachable => unreachable_peers.push(MobUnreachablePeer {
636                        peer: peer.name.as_string(),
637                        reason: peer.last_unreachable_reason,
638                    }),
639                },
640                None => unknown_peer_count += 1,
641            }
642        }
643
644        Some(MobPeerConnectivitySnapshot {
645            reachable_peer_count,
646            unknown_peer_count,
647            unreachable_peers,
648        })
649    }
650
651    /// List members as an operational projection surface.
652    ///
653    /// This includes structural roster fields plus current runtime status,
654    /// error/finality state, and the current session binding when known.
655    /// It intentionally skips live peer-connectivity fanout so ordinary
656    /// membership polling cannot stall on comms reachability lookups.
657    /// For low-level structural roster visibility without runtime projection,
658    /// use [`list_all_members`](Self::list_all_members).
659    pub async fn list_members(&self) -> Vec<MobMemberListEntry> {
660        self.project_member_list(self.roster.read().await.list())
661            .await
662    }
663
664    /// List all members including those in `Retiring` state, with canonical
665    /// lifecycle/session projection.
666    ///
667    /// Like [`list_members`](Self::list_members), this intentionally avoids
668    /// live peer-connectivity fanout. Use [`member_status`](Self::member_status)
669    /// for deep per-member inspection including live comms reachability.
670    pub async fn list_members_including_retiring(&self) -> Vec<MobMemberListEntry> {
671        self.project_member_list(self.roster.read().await.list_all())
672            .await
673    }
674
675    async fn project_member_list<'a>(
676        &self,
677        entries: impl Iterator<Item = &'a crate::roster::RosterEntry>,
678    ) -> Vec<MobMemberListEntry> {
679        let entries: Vec<_> = entries.cloned().collect();
680        let mut projected = Vec::with_capacity(entries.len());
681        for entry in entries {
682            let snapshot = self
683                .canonical_member_list_material(&entry.meerkat_id)
684                .await
685                .to_snapshot();
686            let snapshot = Some(snapshot);
687            let (status, error, is_final, current_session_id, kickoff) = match snapshot {
688                Some(snapshot) => (
689                    snapshot.status,
690                    snapshot.error,
691                    snapshot.is_final,
692                    snapshot.current_session_id,
693                    snapshot.kickoff,
694                ),
695                None => (
696                    MobMemberStatus::Unknown,
697                    None,
698                    true,
699                    entry.session_id().cloned(),
700                    None,
701                ),
702            };
703            projected.push(MobMemberListEntry {
704                meerkat_id: entry.meerkat_id,
705                profile: entry.profile,
706                member_ref: entry.member_ref,
707                runtime_mode: entry.runtime_mode,
708                peer_id: entry.peer_id,
709                state: entry.state,
710                wired_to: entry.wired_to,
711                external_peer_specs: entry.external_peer_specs,
712                labels: entry.labels,
713                status,
714                error,
715                is_final,
716                current_session_id,
717                kickoff,
718            });
719        }
720        projected
721    }
722
723    /// List members currently eligible for runtime work dispatch.
724    ///
725    /// Excludes retiring, completed, broken, or unknown members even if they
726    /// still appear in the public operational projection.
727    pub(crate) async fn list_runnable_members(&self) -> Vec<MobMemberListEntry> {
728        self.list_members()
729            .await
730            .into_iter()
731            .filter(|entry| {
732                entry.state == crate::roster::MemberState::Active
733                    && entry.status == MobMemberStatus::Active
734            })
735            .collect()
736    }
737
738    /// List all members including those in `Retiring` state.
739    ///
740    /// The `state` field on each [`RosterEntry`] distinguishes `Active` from
741    /// `Retiring`. Use this for observability and membership inspection where
742    /// in-flight retires should be visible.
743    pub async fn list_all_members(&self) -> Vec<RosterEntry> {
744        self.roster.read().await.list_all().cloned().collect()
745    }
746
747    /// Get a specific member entry.
748    pub async fn get_member(&self, meerkat_id: &MeerkatId) -> Option<RosterEntry> {
749        self.roster.read().await.get(meerkat_id).cloned()
750    }
751
752    /// Acquire a capability-bearing handle for a specific active member.
753    pub async fn member(&self, meerkat_id: &MeerkatId) -> Result<MemberHandle, MobError> {
754        if let Some(diag) = self.restore_failure_for(meerkat_id).await {
755            return Err(Self::restore_failure_error(meerkat_id, diag));
756        }
757        let entry = self
758            .get_member(meerkat_id)
759            .await
760            .ok_or_else(|| MobError::MeerkatNotFound(meerkat_id.clone()))?;
761        if entry.state != crate::roster::MemberState::Active {
762            return Err(MobError::MeerkatNotFound(meerkat_id.clone()));
763        }
764        Ok(MemberHandle {
765            mob: self.clone(),
766            meerkat_id: meerkat_id.clone(),
767        })
768    }
769
770    /// Access a read-only events view for polling/replay.
771    pub fn events(&self) -> MobEventsView {
772        MobEventsView {
773            inner: self.events.clone(),
774        }
775    }
776
777    /// Append a dispatcher-owned operator provenance projection.
778    ///
779    /// This is audit/projection data only. It must never become
780    /// authorization truth.
781    pub async fn record_operator_action_provenance(
782        &self,
783        tool_name: &str,
784        authority_context: &MobToolAuthorityContext,
785    ) -> Result<(), MobError> {
786        self.events
787            .append(NewMobEvent {
788                mob_id: self.definition.id.clone(),
789                timestamp: None,
790                kind: MobEventKind::OperatorActionRecorded {
791                    tool_name: tool_name.to_string(),
792                    principal_token: authority_context.principal_token().clone(),
793                    caller_provenance: authority_context.caller_provenance().cloned(),
794                    audit_invocation_id: authority_context
795                        .audit_invocation_id()
796                        .map(ToOwned::to_owned),
797                },
798            })
799            .await
800            .map(|_| ())
801            .map_err(MobError::from)
802    }
803
804    /// Subscribe to agent-level events for a specific meerkat.
805    ///
806    /// Looks up the meerkat's session ID from the roster, then subscribes
807    /// to the session-level event stream via [`MobSessionService`].
808    ///
809    /// Returns `MobError::MeerkatNotFound` if the meerkat is not in the
810    /// roster or has no session ID.
811    pub async fn subscribe_agent_events(
812        &self,
813        meerkat_id: &MeerkatId,
814    ) -> Result<EventStream, MobError> {
815        let session_id = {
816            let roster = self.roster.read().await;
817            roster
818                .session_id(meerkat_id)
819                .cloned()
820                .ok_or_else(|| MobError::MeerkatNotFound(meerkat_id.clone()))?
821        };
822        SessionService::subscribe_session_events(self.session_service.as_ref(), &session_id)
823            .await
824            .map_err(|e| {
825                MobError::Internal(format!(
826                    "failed to subscribe to agent events for '{meerkat_id}': {e}"
827                ))
828            })
829    }
830
831    /// Subscribe to agent events for all active members (point-in-time snapshot).
832    ///
833    /// Returns one stream per active member that has a session ID. Members
834    /// spawned after this call are not included — use [`subscribe_mob_events`]
835    /// for a continuously updated view.
836    pub async fn subscribe_all_agent_events(&self) -> Vec<(MeerkatId, EventStream)> {
837        let entries: Vec<_> = {
838            let roster = self.roster.read().await;
839            roster
840                .list()
841                .filter_map(|e| {
842                    e.member_ref
843                        .session_id()
844                        .map(|sid| (e.meerkat_id.clone(), sid.clone()))
845                })
846                .collect()
847        };
848        let mut streams = Vec::with_capacity(entries.len());
849        for (meerkat_id, session_id) in entries {
850            if let Ok(stream) =
851                SessionService::subscribe_session_events(self.session_service.as_ref(), &session_id)
852                    .await
853            {
854                streams.push((meerkat_id, stream));
855            }
856        }
857        streams
858    }
859
860    /// Subscribe to a continuously-updated, mob-level event bus.
861    ///
862    /// Spawns an independent task that merges per-member session streams,
863    /// tags each event with [`AttributedEvent`], and tracks roster changes
864    /// (spawns/retires) automatically. Drop the returned handle to stop
865    /// the router.
866    pub fn subscribe_mob_events(&self) -> super::event_router::MobEventRouterHandle {
867        self.subscribe_mob_events_with_config(super::event_router::MobEventRouterConfig::default())
868    }
869
870    /// Like [`subscribe_mob_events`](Self::subscribe_mob_events) with explicit config.
871    pub fn subscribe_mob_events_with_config(
872        &self,
873        config: super::event_router::MobEventRouterConfig,
874    ) -> super::event_router::MobEventRouterHandle {
875        super::event_router::spawn_event_router(
876            self.session_service.clone(),
877            self.events.clone(),
878            self.roster.clone(),
879            config,
880        )
881    }
882
883    /// Snapshot of MCP server lifecycle state tracked by this runtime.
884    pub async fn mcp_server_states(&self) -> BTreeMap<String, bool> {
885        self.mcp_servers
886            .lock()
887            .await
888            .iter()
889            .map(|(name, entry)| (name.clone(), entry.running))
890            .collect()
891    }
892
893    /// Start a flow run and return its run ID.
894    pub async fn run_flow(
895        &self,
896        flow_id: FlowId,
897        params: serde_json::Value,
898    ) -> Result<RunId, MobError> {
899        self.run_flow_with_stream(flow_id, params, None).await
900    }
901
902    /// Start a flow run with an optional scoped stream sink.
903    pub async fn run_flow_with_stream(
904        &self,
905        flow_id: FlowId,
906        params: serde_json::Value,
907        scoped_event_tx: Option<mpsc::Sender<meerkat_core::ScopedAgentEvent>>,
908    ) -> Result<RunId, MobError> {
909        let (reply_tx, reply_rx) = oneshot::channel();
910        self.command_tx
911            .send(MobCommand::RunFlow {
912                flow_id,
913                activation_params: params,
914                scoped_event_tx,
915                reply_tx,
916            })
917            .await
918            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
919        reply_rx
920            .await
921            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
922    }
923
924    /// Request cancellation of an in-flight flow run.
925    pub async fn cancel_flow(&self, run_id: RunId) -> Result<(), MobError> {
926        let (reply_tx, reply_rx) = oneshot::channel();
927        self.command_tx
928            .send(MobCommand::CancelFlow { run_id, reply_tx })
929            .await
930            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
931        reply_rx
932            .await
933            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
934    }
935
936    /// Fetch a flow run snapshot from the run store.
937    pub async fn flow_status(&self, run_id: RunId) -> Result<Option<MobRun>, MobError> {
938        let (reply_tx, reply_rx) = oneshot::channel();
939        self.command_tx
940            .send(MobCommand::FlowStatus { run_id, reply_tx })
941            .await
942            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
943        reply_rx
944            .await
945            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
946    }
947
948    /// List all configured flow IDs in this mob definition.
949    pub fn list_flows(&self) -> Vec<FlowId> {
950        self.definition.flows.keys().cloned().collect()
951    }
952
953    /// Spawn a new member from a profile and return its member reference.
954    pub async fn spawn(
955        &self,
956        profile_name: ProfileName,
957        meerkat_id: MeerkatId,
958        initial_message: Option<ContentInput>,
959    ) -> Result<MemberRef, MobError> {
960        self.spawn_with_options(profile_name, meerkat_id, initial_message, None, None)
961            .await
962    }
963
964    /// Spawn a new member with an explicit runtime binding.
965    pub async fn spawn_with_binding(
966        &self,
967        profile_name: ProfileName,
968        meerkat_id: MeerkatId,
969        initial_message: Option<ContentInput>,
970        binding: crate::RuntimeBinding,
971    ) -> Result<MemberRef, MobError> {
972        let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id);
973        spec.initial_message = initial_message;
974        spec.binding = Some(binding);
975        self.spawn_spec(spec).await
976    }
977
978    /// Spawn a new member from a profile with explicit backend override.
979    pub async fn spawn_with_backend(
980        &self,
981        profile_name: ProfileName,
982        meerkat_id: MeerkatId,
983        initial_message: Option<ContentInput>,
984        backend: Option<MobBackendKind>,
985    ) -> Result<MemberRef, MobError> {
986        self.spawn_with_options(profile_name, meerkat_id, initial_message, None, backend)
987            .await
988    }
989
990    /// Spawn a new member from a profile with explicit runtime mode/backend overrides.
991    pub async fn spawn_with_options(
992        &self,
993        profile_name: ProfileName,
994        meerkat_id: MeerkatId,
995        initial_message: Option<ContentInput>,
996        runtime_mode: Option<crate::MobRuntimeMode>,
997        backend: Option<MobBackendKind>,
998    ) -> Result<MemberRef, MobError> {
999        let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id);
1000        spec.initial_message = initial_message;
1001        spec.runtime_mode = runtime_mode;
1002        spec.backend = backend;
1003        self.spawn_spec(spec).await
1004    }
1005
1006    /// Attach an existing session by reusing the mob spawn control-plane path.
1007    pub async fn attach_existing_session(
1008        &self,
1009        profile_name: ProfileName,
1010        meerkat_id: MeerkatId,
1011        session_id: meerkat_core::types::SessionId,
1012        runtime_mode: Option<crate::MobRuntimeMode>,
1013        backend: Option<MobBackendKind>,
1014    ) -> Result<MemberRef, MobError> {
1015        let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id);
1016        spec.launch_mode = crate::launch::MemberLaunchMode::Resume { session_id };
1017        spec.runtime_mode = runtime_mode;
1018        spec.backend = backend;
1019        self.spawn_spec(spec).await
1020    }
1021
1022    /// Attach an existing session as the mob orchestrator.
1023    pub async fn attach_existing_session_as_orchestrator(
1024        &self,
1025        profile_name: ProfileName,
1026        meerkat_id: MeerkatId,
1027        session_id: meerkat_core::types::SessionId,
1028    ) -> Result<MemberRef, MobError> {
1029        self.attach_existing_session(profile_name, meerkat_id, session_id, None, None)
1030            .await
1031    }
1032
1033    /// Attach an existing session as a regular mob member.
1034    pub async fn attach_existing_session_as_member(
1035        &self,
1036        profile_name: ProfileName,
1037        meerkat_id: MeerkatId,
1038        session_id: meerkat_core::types::SessionId,
1039    ) -> Result<MemberRef, MobError> {
1040        self.attach_existing_session(profile_name, meerkat_id, session_id, None, None)
1041            .await
1042    }
1043
1044    /// Spawn a member from a fully-specified [`SpawnMemberSpec`].
1045    pub async fn spawn_spec(&self, spec: SpawnMemberSpec) -> Result<MemberRef, MobError> {
1046        let (reply_tx, reply_rx) = oneshot::channel();
1047        self.command_tx
1048            .send(MobCommand::Spawn {
1049                spec: Box::new(spec),
1050                owner_session_id: None,
1051                ops_registry: None,
1052                reply_tx,
1053            })
1054            .await
1055            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1056        reply_rx
1057            .await
1058            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1059            .map(|receipt| receipt.member_ref)
1060    }
1061
1062    pub(super) async fn spawn_spec_receipt_with_owner_context(
1063        &self,
1064        spec: SpawnMemberSpec,
1065        owner_context: CanonicalOpsOwnerContext,
1066    ) -> Result<MemberSpawnReceipt, MobError> {
1067        let (reply_tx, reply_rx) = oneshot::channel();
1068        self.command_tx
1069            .send(MobCommand::Spawn {
1070                spec: Box::new(spec),
1071                owner_session_id: Some(owner_context.owner_session_id),
1072                ops_registry: Some(owner_context.ops_registry),
1073                reply_tx,
1074            })
1075            .await
1076            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1077        reply_rx
1078            .await
1079            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1080    }
1081
1082    /// Spawn multiple members in parallel.
1083    ///
1084    /// Results preserve input order.
1085    pub async fn spawn_many(
1086        &self,
1087        specs: Vec<SpawnMemberSpec>,
1088    ) -> Vec<Result<MemberRef, MobError>> {
1089        futures::future::join_all(specs.into_iter().map(|spec| self.spawn_spec(spec))).await
1090    }
1091
1092    pub(super) async fn spawn_many_receipts_with_owner_context(
1093        &self,
1094        specs: Vec<SpawnMemberSpec>,
1095        owner_context: CanonicalOpsOwnerContext,
1096    ) -> Vec<Result<MemberSpawnReceipt, MobError>> {
1097        futures::future::join_all(
1098            specs.into_iter().map(|spec| {
1099                self.spawn_spec_receipt_with_owner_context(spec, owner_context.clone())
1100            }),
1101        )
1102        .await
1103    }
1104
1105    /// Retire a member, archiving its session and removing trust.
1106    pub async fn retire(&self, meerkat_id: MeerkatId) -> Result<(), MobError> {
1107        let (reply_tx, reply_rx) = oneshot::channel();
1108        self.command_tx
1109            .send(MobCommand::Retire {
1110                meerkat_id,
1111                reply_tx,
1112            })
1113            .await
1114            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1115        reply_rx
1116            .await
1117            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1118    }
1119
1120    /// Retire a member and respawn with the same profile, labels, wiring, and mode.
1121    ///
1122    /// This is a helper convenience over primitive mob behavior, not a
1123    /// machine-owned primitive. Returns a receipt on full success, or a
1124    /// structured error on failure. No rollback is attempted after retire.
1125    pub async fn respawn(
1126        &self,
1127        meerkat_id: MeerkatId,
1128        initial_message: Option<ContentInput>,
1129    ) -> Result<MemberRespawnReceipt, MobRespawnError> {
1130        let old_session_id_before = self
1131            .canonical_member_list_material(&meerkat_id)
1132            .await
1133            .current_session_id;
1134        let (reply_tx, reply_rx) = oneshot::channel();
1135        self.command_tx
1136            .send(MobCommand::Respawn {
1137                meerkat_id,
1138                initial_message,
1139                reply_tx,
1140            })
1141            .await
1142            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1143        let reply = reply_rx
1144            .await
1145            .map_err(|_| MobError::Internal("actor reply dropped".into()))?;
1146        let mut receipt = match reply {
1147            Ok(receipt) => receipt,
1148            Err(MobRespawnError::TopologyRestoreFailed {
1149                mut receipt,
1150                failed_peer_ids,
1151            }) => {
1152                if receipt.old_session_id.is_none() {
1153                    receipt.old_session_id = old_session_id_before;
1154                }
1155                let post_material = self
1156                    .canonical_member_list_material(&receipt.member_id)
1157                    .await;
1158                if MobMemberTerminalClassifier::has_canonical_member(&post_material) {
1159                    receipt.new_session_id = post_material.current_session_id;
1160                }
1161                return Err(MobRespawnError::TopologyRestoreFailed {
1162                    receipt,
1163                    failed_peer_ids,
1164                });
1165            }
1166            Err(err) => return Err(err),
1167        };
1168        if receipt.old_session_id.is_none() {
1169            receipt.old_session_id = old_session_id_before;
1170        }
1171        let post_material = self
1172            .canonical_member_list_material(&receipt.member_id)
1173            .await;
1174        if MobMemberTerminalClassifier::has_canonical_member(&post_material) {
1175            receipt.new_session_id = post_material.current_session_id;
1176        }
1177        Ok(receipt)
1178    }
1179
1180    /// Retire all roster members concurrently in a single actor command.
1181    pub async fn retire_all(&self) -> Result<(), MobError> {
1182        let (reply_tx, reply_rx) = oneshot::channel();
1183        self.command_tx
1184            .send(MobCommand::RetireAll { reply_tx })
1185            .await
1186            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1187        reply_rx
1188            .await
1189            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1190    }
1191
1192    /// Wire a local member to either another local member or an external peer.
1193    pub async fn wire<T>(&self, local: MeerkatId, target: T) -> Result<(), MobError>
1194    where
1195        T: Into<PeerTarget>,
1196    {
1197        let (reply_tx, reply_rx) = oneshot::channel();
1198        self.command_tx
1199            .send(MobCommand::Wire {
1200                local,
1201                target: target.into(),
1202                reply_tx,
1203            })
1204            .await
1205            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1206        reply_rx
1207            .await
1208            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1209    }
1210
1211    /// Unwire a local member from either another local member or an external peer.
1212    pub async fn unwire<T>(&self, local: MeerkatId, target: T) -> Result<(), MobError>
1213    where
1214        T: Into<PeerTarget>,
1215    {
1216        let (reply_tx, reply_rx) = oneshot::channel();
1217        self.command_tx
1218            .send(MobCommand::Unwire {
1219                local,
1220                target: target.into(),
1221                reply_tx,
1222            })
1223            .await
1224            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1225        reply_rx
1226            .await
1227            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1228    }
1229
1230    /// Compatibility wrapper for internal-turn submission.
1231    ///
1232    /// Prefer [`MobHandle::member`] plus [`MemberHandle::internal_turn`] for
1233    /// the target 0.5 API shape.
1234    pub async fn internal_turn(
1235        &self,
1236        meerkat_id: MeerkatId,
1237        message: impl Into<meerkat_core::types::ContentInput>,
1238    ) -> Result<MemberDeliveryReceipt, MobError> {
1239        let session_id = self
1240            .internal_turn_for_member(meerkat_id.clone(), message.into())
1241            .await?;
1242        Ok(MemberDeliveryReceipt {
1243            member_id: meerkat_id,
1244            session_id,
1245            handling_mode: HandlingMode::Queue,
1246        })
1247    }
1248
1249    pub(super) async fn external_turn_for_member(
1250        &self,
1251        meerkat_id: MeerkatId,
1252        message: meerkat_core::types::ContentInput,
1253        handling_mode: HandlingMode,
1254        render_metadata: Option<RenderMetadata>,
1255    ) -> Result<meerkat_core::types::SessionId, MobError> {
1256        let (reply_tx, reply_rx) = oneshot::channel();
1257        self.command_tx
1258            .send(MobCommand::ExternalTurn {
1259                meerkat_id,
1260                content: message,
1261                handling_mode,
1262                render_metadata,
1263                reply_tx,
1264            })
1265            .await
1266            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1267        reply_rx
1268            .await
1269            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1270    }
1271
1272    pub(super) async fn internal_turn_for_member(
1273        &self,
1274        meerkat_id: MeerkatId,
1275        message: meerkat_core::types::ContentInput,
1276    ) -> Result<SessionId, MobError> {
1277        let (reply_tx, reply_rx) = oneshot::channel();
1278        self.command_tx
1279            .send(MobCommand::InternalTurn {
1280                meerkat_id,
1281                content: message,
1282                reply_tx,
1283            })
1284            .await
1285            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1286        reply_rx
1287            .await
1288            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1289    }
1290
1291    /// Transition Running -> Stopped. Mutation commands are rejected while stopped.
1292    pub async fn stop(&self) -> Result<(), MobError> {
1293        let (reply_tx, reply_rx) = oneshot::channel();
1294        self.command_tx
1295            .send(MobCommand::Stop { reply_tx })
1296            .await
1297            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1298        reply_rx
1299            .await
1300            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1301    }
1302
1303    /// Transition Stopped -> Running.
1304    pub async fn resume(&self) -> Result<(), MobError> {
1305        let (reply_tx, reply_rx) = oneshot::channel();
1306        self.command_tx
1307            .send(MobCommand::ResumeLifecycle { reply_tx })
1308            .await
1309            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1310        reply_rx
1311            .await
1312            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1313    }
1314
1315    /// Archive all members, emit MobCompleted, and transition to Completed.
1316    pub async fn complete(&self) -> Result<(), MobError> {
1317        let (reply_tx, reply_rx) = oneshot::channel();
1318        self.command_tx
1319            .send(MobCommand::Complete { reply_tx })
1320            .await
1321            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1322        reply_rx
1323            .await
1324            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1325    }
1326
1327    /// Wipe all runtime state and transition back to `Running`.
1328    ///
1329    /// Like `destroy()` but keeps the actor alive and transitions to `Running`
1330    /// instead of `Destroyed`. The handle remains usable after reset.
1331    pub async fn reset(&self) -> Result<(), MobError> {
1332        let (reply_tx, reply_rx) = oneshot::channel();
1333        self.command_tx
1334            .send(MobCommand::Reset { reply_tx })
1335            .await
1336            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1337        reply_rx
1338            .await
1339            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1340    }
1341
1342    /// Retire active members, clear persisted mob storage, and terminate the actor.
1343    pub async fn destroy(&self) -> Result<(), MobError> {
1344        let (reply_tx, reply_rx) = oneshot::channel();
1345        self.command_tx
1346            .send(MobCommand::Destroy { reply_tx })
1347            .await
1348            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1349        reply_rx
1350            .await
1351            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1352    }
1353
1354    /// Create a task in the shared mob task board.
1355    pub async fn task_create(
1356        &self,
1357        subject: String,
1358        description: String,
1359        blocked_by: Vec<TaskId>,
1360    ) -> Result<TaskId, MobError> {
1361        let (reply_tx, reply_rx) = oneshot::channel();
1362        self.command_tx
1363            .send(MobCommand::TaskCreate {
1364                subject,
1365                description,
1366                blocked_by,
1367                reply_tx,
1368            })
1369            .await
1370            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1371        reply_rx
1372            .await
1373            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1374    }
1375
1376    /// Update task status/owner in the shared mob task board.
1377    pub async fn task_update(
1378        &self,
1379        task_id: TaskId,
1380        status: TaskStatus,
1381        owner: Option<MeerkatId>,
1382    ) -> Result<(), MobError> {
1383        let (reply_tx, reply_rx) = oneshot::channel();
1384        self.command_tx
1385            .send(MobCommand::TaskUpdate {
1386                task_id,
1387                status,
1388                owner,
1389                reply_tx,
1390            })
1391            .await
1392            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1393        reply_rx
1394            .await
1395            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1396    }
1397
1398    /// List tasks from the in-memory task board projection.
1399    pub async fn task_list(&self) -> Result<Vec<MobTask>, MobError> {
1400        Ok(self.task_board.read().await.list().cloned().collect())
1401    }
1402
1403    /// Get a task by ID from the in-memory task board projection.
1404    pub async fn task_get(&self, task_id: &TaskId) -> Result<Option<MobTask>, MobError> {
1405        Ok(self.task_board.read().await.get(task_id).cloned())
1406    }
1407
1408    #[cfg(test)]
1409    pub async fn debug_flow_tracker_counts(&self) -> Result<(usize, usize), MobError> {
1410        let (reply_tx, reply_rx) = oneshot::channel();
1411        self.command_tx
1412            .send(MobCommand::FlowTrackerCounts { reply_tx })
1413            .await
1414            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1415        reply_rx
1416            .await
1417            .map_err(|_| MobError::Internal("actor reply dropped".into()))
1418    }
1419
1420    #[cfg(test)]
1421    pub async fn debug_orchestrator_snapshot(
1422        &self,
1423    ) -> Result<super::MobOrchestratorSnapshot, MobError> {
1424        let (reply_tx, reply_rx) = oneshot::channel();
1425        self.command_tx
1426            .send(MobCommand::OrchestratorSnapshot { reply_tx })
1427            .await
1428            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1429        reply_rx
1430            .await
1431            .map_err(|_| MobError::Internal("actor reply dropped".into()))
1432    }
1433
1434    /// Set or clear the spawn policy for automatic member provisioning.
1435    ///
1436    /// When set, external turns targeting an unknown meerkat ID will
1437    /// consult the policy before returning `MeerkatNotFound`.
1438    pub async fn set_spawn_policy(
1439        &self,
1440        policy: Option<Arc<dyn super::spawn_policy::SpawnPolicy>>,
1441    ) -> Result<(), MobError> {
1442        let (reply_tx, reply_rx) = oneshot::channel();
1443        self.command_tx
1444            .send(MobCommand::SetSpawnPolicy { policy, reply_tx })
1445            .await
1446            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1447        reply_rx
1448            .await
1449            .map_err(|_| MobError::Internal("actor reply dropped".into()))?;
1450        Ok(())
1451    }
1452
1453    /// Shut down the actor. After this, no more commands are accepted.
1454    pub async fn shutdown(&self) -> Result<(), MobError> {
1455        let (reply_tx, reply_rx) = oneshot::channel();
1456        self.command_tx
1457            .send(MobCommand::Shutdown { reply_tx })
1458            .await
1459            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1460        reply_rx
1461            .await
1462            .map_err(|_| MobError::Internal("actor reply dropped".into()))??;
1463        Ok(())
1464    }
1465
1466    /// Force-cancel a member's in-flight turn via session interrupt.
1467    ///
1468    /// Unlike [`retire`](Self::retire), this does not archive the session or
1469    /// remove the member from the roster — it only cancels the current turn.
1470    pub async fn force_cancel_member(&self, meerkat_id: MeerkatId) -> Result<(), MobError> {
1471        let (reply_tx, reply_rx) = oneshot::channel();
1472        self.command_tx
1473            .send(MobCommand::ForceCancel {
1474                meerkat_id,
1475                reply_tx,
1476            })
1477            .await
1478            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1479        reply_rx
1480            .await
1481            .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1482    }
1483
1484    async fn canonical_member_list_material(
1485        &self,
1486        meerkat_id: &MeerkatId,
1487    ) -> CanonicalMemberSnapshotMaterial {
1488        self.canonical_member_material(
1489            meerkat_id,
1490            PeerConnectivityProjection::Omit,
1491            SessionObservationProjection::LiveOnly,
1492        )
1493        .await
1494    }
1495
1496    async fn canonical_member_snapshot_material(
1497        &self,
1498        meerkat_id: &MeerkatId,
1499    ) -> CanonicalMemberSnapshotMaterial {
1500        self.canonical_member_material(
1501            meerkat_id,
1502            PeerConnectivityProjection::Include,
1503            SessionObservationProjection::Full,
1504        )
1505        .await
1506    }
1507
1508    async fn canonical_member_material(
1509        &self,
1510        meerkat_id: &MeerkatId,
1511        connectivity: PeerConnectivityProjection,
1512        observation: SessionObservationProjection,
1513    ) -> CanonicalMemberSnapshotMaterial {
1514        // Canonical helper-surface classification is derived only from roster
1515        // membership/state plus session-service activity, never side tables.
1516        let (roster_snapshot, roster_entry, roster_state, current_session_id) = {
1517            let roster = self.roster.read().await;
1518            match roster.get(meerkat_id) {
1519                Some(entry) => (
1520                    roster.snapshot(),
1521                    Some(entry.clone()),
1522                    Some(entry.state),
1523                    entry.member_ref.session_id().cloned(),
1524                ),
1525                None => (roster.snapshot(), None, None, None),
1526            }
1527        };
1528
1529        let restore_failure = {
1530            self.restore_diagnostics
1531                .read()
1532                .await
1533                .get(meerkat_id)
1534                .cloned()
1535        };
1536        if let Some(diag) = restore_failure {
1537            return MobMemberLifecycleAuthority::materialize(MobMemberLifecycleInput {
1538                member_present: roster_state.is_some(),
1539                roster_state,
1540                session_observation: CanonicalSessionObservation::Missing,
1541                restore_failure: Some(diag.reason),
1542                output_preview: None,
1543                tokens_used: 0,
1544                current_session_id: Some(diag.session_id),
1545                peer_connectivity: None,
1546                kickoff: roster_entry
1547                    .as_ref()
1548                    .and_then(|entry| entry.kickoff.clone()),
1549            });
1550        }
1551
1552        match (roster_state, current_session_id) {
1553            (None, _) => MobMemberLifecycleAuthority::materialize(MobMemberLifecycleInput {
1554                member_present: false,
1555                roster_state: None,
1556                session_observation: CanonicalSessionObservation::Missing,
1557                restore_failure: None,
1558                output_preview: None,
1559                tokens_used: 0,
1560                current_session_id: None,
1561                peer_connectivity: None,
1562                kickoff: None,
1563            }),
1564            (Some(roster_state), None) => {
1565                MobMemberLifecycleAuthority::materialize(MobMemberLifecycleInput {
1566                    member_present: true,
1567                    roster_state: Some(roster_state),
1568                    session_observation: CanonicalSessionObservation::Missing,
1569                    restore_failure: None,
1570                    output_preview: None,
1571                    tokens_used: 0,
1572                    current_session_id: None,
1573                    peer_connectivity: None,
1574                    kickoff: roster_entry
1575                        .as_ref()
1576                        .and_then(|entry| entry.kickoff.clone()),
1577                })
1578            }
1579            (Some(roster_state), Some(session_id)) => {
1580                let (output_preview, tokens_used, observation) = match observation {
1581                    SessionObservationProjection::LiveOnly => {
1582                        match self.session_service.has_live_session(&session_id).await {
1583                            Ok(false) => (None, 0, CanonicalSessionObservation::Missing),
1584                            Ok(true) => match self.session_service.read(&session_id).await {
1585                                Ok(view) => (
1586                                    view.state.last_assistant_text.clone(),
1587                                    view.billing.total_tokens,
1588                                    if view.state.is_active {
1589                                        CanonicalSessionObservation::Active
1590                                    } else {
1591                                        CanonicalSessionObservation::Inactive
1592                                    },
1593                                ),
1594                                Err(SessionError::NotFound { .. }) => {
1595                                    (None, 0, CanonicalSessionObservation::Unknown)
1596                                }
1597                                Err(_) => (None, 0, CanonicalSessionObservation::Unknown),
1598                            },
1599                            Err(_) => (None, 0, CanonicalSessionObservation::Unknown),
1600                        }
1601                    }
1602                    SessionObservationProjection::Full => {
1603                        match self.session_service.read(&session_id).await {
1604                            Ok(view) => (
1605                                view.state.last_assistant_text.clone(),
1606                                view.billing.total_tokens,
1607                                if view.state.is_active {
1608                                    CanonicalSessionObservation::Active
1609                                } else {
1610                                    CanonicalSessionObservation::Inactive
1611                                },
1612                            ),
1613                            Err(SessionError::NotFound { .. }) => {
1614                                (None, 0, CanonicalSessionObservation::Missing)
1615                            }
1616                            Err(_) => (None, 0, CanonicalSessionObservation::Unknown),
1617                        }
1618                    }
1619                };
1620                let peer_connectivity = if connectivity == PeerConnectivityProjection::Include {
1621                    match roster_entry.as_ref() {
1622                        Some(entry) => {
1623                            self.resolve_peer_connectivity(entry, &session_id, &roster_snapshot)
1624                                .await
1625                        }
1626                        None => None,
1627                    }
1628                } else {
1629                    None
1630                };
1631                MobMemberLifecycleAuthority::materialize(MobMemberLifecycleInput {
1632                    member_present: true,
1633                    roster_state: Some(roster_state),
1634                    session_observation: observation,
1635                    restore_failure: None,
1636                    output_preview,
1637                    tokens_used,
1638                    current_session_id: Some(session_id),
1639                    peer_connectivity,
1640                    kickoff: roster_entry.and_then(|entry| entry.kickoff),
1641                })
1642            }
1643        }
1644    }
1645
1646    async fn snapshot_kickoff_waiters(
1647        &self,
1648        meerkat_ids: Vec<MeerkatId>,
1649    ) -> Result<Vec<(MeerkatId, tokio::sync::watch::Receiver<bool>)>, MobError> {
1650        let (reply_tx, reply_rx) = oneshot::channel();
1651        self.command_tx
1652            .send(MobCommand::KickoffBarrierSnapshot {
1653                meerkat_ids,
1654                reply_tx,
1655            })
1656            .await
1657            .map_err(|_| MobError::Internal("mob actor dropped".into()))?;
1658        reply_rx
1659            .await
1660            .map_err(|_| MobError::Internal("actor reply dropped".into()))
1661    }
1662
1663    async fn wait_for_kickoff_receivers(
1664        &self,
1665        target_ids: &[MeerkatId],
1666        waiters: Vec<(MeerkatId, tokio::sync::watch::Receiver<bool>)>,
1667        timeout: Option<Duration>,
1668    ) -> Result<(), MobError> {
1669        if waiters.is_empty() {
1670            return Ok(());
1671        }
1672
1673        let deadline = Instant::now() + timeout.unwrap_or(DEFAULT_KICKOFF_WAIT_TIMEOUT);
1674        let mut pending = waiters
1675            .iter()
1676            .map(|(id, _)| id.clone())
1677            .collect::<std::collections::HashSet<_>>();
1678        let mut futures = FuturesUnordered::new();
1679
1680        for (id, mut rx) in waiters {
1681            if *rx.borrow() {
1682                pending.remove(&id);
1683                continue;
1684            }
1685            futures.push(async move {
1686                loop {
1687                    if *rx.borrow() {
1688                        break;
1689                    }
1690                    if rx.changed().await.is_err() {
1691                        break;
1692                    }
1693                }
1694                id
1695            });
1696        }
1697
1698        while !futures.is_empty() {
1699            let remaining = deadline.saturating_duration_since(Instant::now());
1700            if remaining.is_zero() {
1701                let pending_member_ids = target_ids
1702                    .iter()
1703                    .filter(|id| pending.contains(*id))
1704                    .cloned()
1705                    .collect();
1706                return Err(MobError::KickoffWaitTimedOut { pending_member_ids });
1707            }
1708
1709            let next_fut = futures.next();
1710            let sleep_fut = tokio::time::sleep(remaining);
1711            futures::pin_mut!(next_fut);
1712            futures::pin_mut!(sleep_fut);
1713
1714            match futures::future::select(next_fut, sleep_fut).await {
1715                futures::future::Either::Left((Some(id), _)) => {
1716                    pending.remove(&id);
1717                }
1718                futures::future::Either::Left((None, _)) => break,
1719                futures::future::Either::Right(((), _)) => {
1720                    let pending_member_ids = target_ids
1721                        .iter()
1722                        .filter(|id| pending.contains(*id))
1723                        .cloned()
1724                        .collect();
1725                    return Err(MobError::KickoffWaitTimedOut { pending_member_ids });
1726                }
1727            }
1728        }
1729
1730        Ok(())
1731    }
1732
1733    async fn wait_one_material(
1734        &self,
1735        meerkat_id: &MeerkatId,
1736    ) -> Result<CanonicalMemberSnapshotMaterial, MobError> {
1737        loop {
1738            let material = self.canonical_member_list_material(meerkat_id).await;
1739            if MobMemberTerminalClassifier::is_terminal(&material) {
1740                return Ok(material);
1741            }
1742            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1743        }
1744    }
1745
1746    /// Get a point-in-time execution snapshot for a member.
1747    ///
1748    /// This is the deep inspection surface. Unlike list projections, it
1749    /// resolves live peer connectivity when a comms runtime is available.
1750    pub async fn member_status(
1751        &self,
1752        meerkat_id: &MeerkatId,
1753    ) -> Result<MobMemberSnapshot, MobError> {
1754        let material = self.canonical_member_snapshot_material(meerkat_id).await;
1755        Ok(material.to_snapshot())
1756    }
1757
1758    /// Wait until all autonomous members have been admitted to the runtime.
1759    ///
1760    /// Autonomous members no longer run a separate kickoff turn — their initial
1761    /// prompt is injected through the runtime input path at spawn time. This
1762    /// method returns member snapshots immediately since admission is synchronous.
1763    pub async fn wait_for_kickoff_complete(
1764        &self,
1765        timeout: Option<Duration>,
1766    ) -> Result<Vec<(MeerkatId, MobMemberSnapshot)>, MobError> {
1767        let target_ids = self
1768            .list_all_members()
1769            .await
1770            .into_iter()
1771            .map(|entry| entry.meerkat_id)
1772            .collect::<Vec<_>>();
1773        self.wait_for_members_kickoff_complete(&target_ids, timeout)
1774            .await
1775    }
1776
1777    /// Wait until the given autonomous members have been admitted to the runtime.
1778    ///
1779    /// See [`wait_for_kickoff_complete`](Self::wait_for_kickoff_complete) for details.
1780    pub async fn wait_for_members_kickoff_complete(
1781        &self,
1782        ids: &[MeerkatId],
1783        timeout: Option<Duration>,
1784    ) -> Result<Vec<(MeerkatId, MobMemberSnapshot)>, MobError> {
1785        let target_ids = ids.to_vec();
1786        let waiters = self.snapshot_kickoff_waiters(target_ids.clone()).await?;
1787        self.wait_for_kickoff_receivers(&target_ids, waiters, timeout)
1788            .await?;
1789
1790        let mut snapshots = Vec::with_capacity(target_ids.len());
1791        for id in target_ids {
1792            snapshots.push((id.clone(), self.member_status(&id).await?));
1793        }
1794        Ok(snapshots)
1795    }
1796
1797    /// Wait for a specific member to reach a terminal state, then return its snapshot.
1798    ///
1799    /// Polls canonical member classification until terminal.
1800    pub async fn wait_one(&self, meerkat_id: &MeerkatId) -> Result<MobMemberSnapshot, MobError> {
1801        let material = self.wait_one_material(meerkat_id).await?;
1802        Ok(material.to_snapshot())
1803    }
1804
1805    /// Wait for all specified members to reach terminal states.
1806    pub async fn wait_all(
1807        &self,
1808        meerkat_ids: &[MeerkatId],
1809    ) -> Result<Vec<MobMemberSnapshot>, MobError> {
1810        let futs = meerkat_ids
1811            .iter()
1812            .map(|id| self.wait_one_material(id))
1813            .collect::<Vec<_>>();
1814        let results = futures::future::join_all(futs).await;
1815        results
1816            .into_iter()
1817            .map(|result| result.map(|material| material.to_snapshot()))
1818            .collect()
1819    }
1820
1821    /// Collect snapshots for all members that have reached terminal states.
1822    pub async fn collect_completed(&self) -> Vec<(MeerkatId, MobMemberSnapshot)> {
1823        let entries = self.list_all_members().await;
1824        let mut completed = Vec::new();
1825        for entry in entries {
1826            if let Ok(snapshot) = self.member_status(&entry.meerkat_id).await
1827                && snapshot.is_final
1828            {
1829                completed.push((entry.meerkat_id, snapshot));
1830            }
1831        }
1832        completed
1833    }
1834
1835    /// Spawn a fresh helper, wait for it to complete, retire it, and return its result.
1836    ///
1837    /// Helpers are short-lived TurnDriven tasks by default. Their completion
1838    /// truth is the spawn/create boundary plus the canonical post-spawn member
1839    /// snapshot, not full member terminality in the mob lifecycle.
1840    pub async fn spawn_helper(
1841        &self,
1842        meerkat_id: MeerkatId,
1843        task: impl Into<String>,
1844        options: HelperOptions,
1845    ) -> Result<HelperResult, MobError> {
1846        let profile_name = options
1847            .role_name
1848            .or_else(|| self.definition.profiles.keys().next().cloned())
1849            .ok_or_else(|| {
1850                MobError::Internal("no profile specified and definition has no profiles".into())
1851            })?;
1852        let task_text = task.into();
1853        let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id.clone());
1854        spec.initial_message = Some(task_text.into());
1855        spec.runtime_mode = Some(
1856            options
1857                .runtime_mode
1858                .unwrap_or(crate::MobRuntimeMode::TurnDriven),
1859        );
1860        spec.backend = options.backend;
1861        spec.tool_access_policy = options.tool_access_policy;
1862        spec.auto_wire_parent = true;
1863
1864        self.spawn_spec(spec).await?;
1865        let helper_material = self.canonical_member_list_material(&meerkat_id).await;
1866        let _ = self.retire(meerkat_id.clone()).await;
1867
1868        Ok(helper_material.to_helper_result())
1869    }
1870
1871    /// Fork from an existing member's context, wait for completion, retire, and return.
1872    ///
1873    /// Like `spawn_helper` but uses `MemberLaunchMode::Fork` to share
1874    /// conversation context with the source member.
1875    pub async fn fork_helper(
1876        &self,
1877        source_member_id: &MeerkatId,
1878        meerkat_id: MeerkatId,
1879        task: impl Into<String>,
1880        fork_context: crate::launch::ForkContext,
1881        options: HelperOptions,
1882    ) -> Result<HelperResult, MobError> {
1883        let profile_name = options
1884            .role_name
1885            .or_else(|| self.definition.profiles.keys().next().cloned())
1886            .ok_or_else(|| {
1887                MobError::Internal("no profile specified and definition has no profiles".into())
1888            })?;
1889        let task_text = task.into();
1890        let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id.clone());
1891        spec.initial_message = Some(task_text.into());
1892        spec.runtime_mode = Some(
1893            options
1894                .runtime_mode
1895                .unwrap_or(crate::MobRuntimeMode::TurnDriven),
1896        );
1897        spec.backend = options.backend;
1898        spec.tool_access_policy = options.tool_access_policy;
1899        spec.auto_wire_parent = true;
1900        spec.launch_mode = crate::launch::MemberLaunchMode::Fork {
1901            source_member_id: source_member_id.clone(),
1902            fork_context,
1903        };
1904
1905        self.spawn_spec(spec).await?;
1906        let helper_material = self.canonical_member_list_material(&meerkat_id).await;
1907        let _ = self.retire(meerkat_id.clone()).await;
1908
1909        Ok(helper_material.to_helper_result())
1910    }
1911}
1912
1913impl MemberHandle {
1914    /// Target member id for this capability.
1915    pub fn meerkat_id(&self) -> &MeerkatId {
1916        &self.meerkat_id
1917    }
1918
1919    /// Submit external work to this member through the canonical runtime path.
1920    pub async fn send(
1921        &self,
1922        content: impl Into<meerkat_core::types::ContentInput>,
1923        handling_mode: HandlingMode,
1924    ) -> Result<MemberDeliveryReceipt, MobError> {
1925        self.send_with_render_metadata(content, handling_mode, None)
1926            .await
1927    }
1928
1929    /// Submit external work with explicit normalized render metadata.
1930    pub async fn send_with_render_metadata(
1931        &self,
1932        content: impl Into<meerkat_core::types::ContentInput>,
1933        handling_mode: HandlingMode,
1934        render_metadata: Option<RenderMetadata>,
1935    ) -> Result<MemberDeliveryReceipt, MobError> {
1936        let session_id = self
1937            .mob
1938            .external_turn_for_member(
1939                self.meerkat_id.clone(),
1940                content.into(),
1941                handling_mode,
1942                render_metadata,
1943            )
1944            .await?;
1945        Ok(MemberDeliveryReceipt {
1946            member_id: self.meerkat_id.clone(),
1947            session_id,
1948            handling_mode,
1949        })
1950    }
1951
1952    /// Submit internal work to this member without external addressability checks.
1953    pub async fn internal_turn(
1954        &self,
1955        content: impl Into<meerkat_core::types::ContentInput>,
1956    ) -> Result<MemberDeliveryReceipt, MobError> {
1957        let session_id = self
1958            .mob
1959            .internal_turn_for_member(self.meerkat_id.clone(), content.into())
1960            .await?;
1961        Ok(MemberDeliveryReceipt {
1962            member_id: self.meerkat_id.clone(),
1963            session_id,
1964            handling_mode: HandlingMode::Queue,
1965        })
1966    }
1967
1968    /// Current session ID for this member, if a session bridge exists.
1969    pub async fn current_session_id(&self) -> Result<Option<SessionId>, MobError> {
1970        let roster = self.mob.roster.read().await;
1971        Ok(roster
1972            .get(&self.meerkat_id)
1973            .and_then(|e| e.member_ref.session_id().cloned()))
1974    }
1975
1976    /// Session reference for this member, if a session bridge exists.
1977    pub async fn session_ref(&self) -> Result<Option<MemberSessionRef>, MobError> {
1978        let roster = self.mob.roster.read().await;
1979        Ok(roster
1980            .get(&self.meerkat_id)
1981            .and_then(|e| e.member_ref.session_id().cloned())
1982            .map(|session_id| MemberSessionRef {
1983                member_id: self.meerkat_id.clone(),
1984                session_id,
1985            }))
1986    }
1987
1988    /// Get a point-in-time execution snapshot for this member.
1989    pub async fn status(&self) -> Result<MobMemberSnapshot, MobError> {
1990        self.mob.member_status(&self.meerkat_id).await
1991    }
1992
1993    /// Subscribe to this member's agent events.
1994    pub async fn events(&self) -> Result<EventStream, MobError> {
1995        self.mob.subscribe_agent_events(&self.meerkat_id).await
1996    }
1997}