Skip to main content

meerkat_mob/runtime/
handle.rs

1use super::*;
2use crate::MobRuntimeMode;
3use crate::machines::mob_machine as mob_dsl;
4use crate::mob_machine::{MobMachineCommand, MobMachineCommandResult};
5use crate::roster::MobMemberKickoffSnapshot;
6use crate::run::{MobMachineFlowRunCommand, flow_run};
7#[cfg(test)]
8use crate::runtime::MobLifecycleSnapshot;
9use crate::runtime::flow_frame_engine::FlowFrameLoopStorePlan;
10#[cfg(test)]
11use crate::runtime::mob_member_lifecycle_projection::{
12    CanonicalMemberSnapshotMaterial, CanonicalMemberStatus,
13};
14use crate::runtime::mob_member_lifecycle_projection::{
15    MobMemberLifecycleInput, MobMemberLifecycleProjection,
16};
17use crate::runtime::reconcile::{
18    EnsureMemberOutcome, MemberFilter, ReconcileFailure, ReconcileOptions, ReconcileReport,
19    ReconcileStage,
20};
21use crate::runtime::terminalization::{TerminalizationOutcome, TerminalizationTarget};
22#[cfg(target_arch = "wasm32")]
23use crate::tokio;
24use meerkat_core::comms::{
25    CommsCommand, PeerDirectoryEntry, PeerId, PeerReachability, PeerReachabilityReason,
26    SendReceipt, TrustedPeerDescriptor,
27};
28use meerkat_core::ops::OperationId;
29use meerkat_core::ops_lifecycle::OpsLifecycleRegistry;
30use meerkat_core::service::{MobToolAuthorityContext, SessionError};
31use meerkat_core::time_compat::Instant;
32use meerkat_core::types::{HandlingMode, RenderMetadata, SessionId};
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35use std::collections::BTreeSet;
36use std::collections::HashMap;
37use std::time::Duration;
38use tokio_util::sync::CancellationToken;
39
40const DEFAULT_KICKOFF_WAIT_TIMEOUT: Duration = Duration::from_secs(600);
41const DEFAULT_READY_WAIT_TIMEOUT: Duration = Duration::from_secs(600);
42
43#[derive(Debug, Clone, Serialize)]
44#[non_exhaustive]
45pub struct MobMemberSnapshot {
46    /// Current lifecycle status.
47    pub status: MobMemberStatus,
48    /// Identity-native runtime ID for this incarnation.
49    ///
50    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`
51    /// so external consumers use `agent_identity()` (derived from
52    /// `agent_runtime_id.identity`) as the public identity contract.
53    #[serde(skip)]
54    pub(crate) agent_runtime_id: AgentRuntimeId,
55    /// Fence token for the current incarnation.
56    ///
57    /// Binding-era atom used by the bridge for stale-command rejection.
58    /// `pub(crate)` + `#[serde(skip)]` so it does not leak into
59    /// app-facing payloads.
60    #[serde(skip)]
61    pub(crate) fence_token: FenceToken,
62    /// Preview of the current bridge session's last committed assistant text.
63    pub output_preview: Option<String>,
64    /// Error description (if the member errored).
65    pub error: Option<String>,
66    /// Cumulative token usage.
67    pub tokens_used: u64,
68    /// Whether the member has reached a terminal state.
69    pub is_final: bool,
70    /// Diagnostic session id for the member's current bridge session.
71    /// Observable for status/continuity diagnostics only.
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub current_session_id: Option<SessionId>,
74    /// Bridge-internal session binding — not part of the public identity contract.
75    #[serde(skip)]
76    pub(crate) current_bridge_session_id: Option<SessionId>,
77    /// Live comms connectivity for currently wired peers, when available.
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub peer_connectivity: Option<MobPeerConnectivitySnapshot>,
80    /// Initial autonomous-turn kickoff state, when this member has one.
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub kickoff: Option<MobMemberKickoffSnapshot>,
83    /// External-member observation projection, when this member is backed by
84    /// an external peer rather than a local session.
85    ///
86    /// This is a read-only projection over the roster member ref and restore
87    /// diagnostics. It intentionally avoids peer ids, transport addresses,
88    /// bootstrap tokens, runtime incarnation ids, and fence tokens; those are
89    /// binding mechanics, not app-facing authority.
90    #[serde(default, skip_serializing_if = "Option::is_none")]
91    pub external_member: Option<ExternalMemberObservationSnapshot>,
92    /// Runtime-owned resolved LLM capability projection for the member's
93    /// current bridge session.
94    #[serde(default, skip_serializing_if = "Option::is_none")]
95    pub resolved_capabilities: Option<meerkat_contracts::WireResolvedModelCapabilities>,
96}
97
98impl MobMemberSnapshot {
99    pub(crate) fn with_current_bridge_session_id(
100        mut self,
101        current_bridge_session_id: Option<SessionId>,
102    ) -> Self {
103        self.current_session_id = current_bridge_session_id.clone();
104        self.current_bridge_session_id = current_bridge_session_id;
105        self
106    }
107
108    pub(crate) fn current_bridge_session_id(&self) -> Option<&SessionId> {
109        self.current_bridge_session_id.as_ref()
110    }
111
112    /// Convenience accessor for the canonical member identity. Equivalent to
113    /// `&self.agent_runtime_id.identity` but saves every consumer from
114    /// reaching through the runtime-id wrapper.
115    #[must_use]
116    pub fn agent_identity(&self) -> &AgentIdentity {
117        &self.agent_runtime_id.identity
118    }
119
120    /// Runtime incarnation identity for diagnostic/control projections.
121    ///
122    /// These atoms stay out of generic `Serialize` output so app-facing
123    /// receipts do not couple callers to bridge internals. Surfaces that own a
124    /// control contract, such as `mob/member_status`, must opt in through this
125    /// accessor and project the fields explicitly.
126    #[must_use]
127    pub fn runtime_identity_fields(&self) -> (&AgentRuntimeId, FenceToken) {
128        (&self.agent_runtime_id, self.fence_token)
129    }
130}
131
132#[derive(Debug, Clone, Serialize)]
133pub struct MobMemberListEntry {
134    /// Canonical member identity.
135    pub agent_identity: AgentIdentity,
136    /// Member role (profile name).
137    pub role: ProfileName,
138    pub runtime_mode: MobRuntimeMode,
139    #[serde(default)]
140    pub state: crate::roster::MemberState,
141    pub wired_to: BTreeSet<AgentIdentity>,
142    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
143    pub labels: BTreeMap<String, String>,
144    pub status: MobMemberStatus,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub error: Option<String>,
147    pub is_final: bool,
148    #[serde(default, skip_serializing_if = "Option::is_none")]
149    pub kickoff: Option<MobMemberKickoffSnapshot>,
150    // --- Bridge internals (pub(crate)) ---
151    // `list_members` stays the lightweight roster view: no session_id
152    // in the wire shape (see `tests.rs::test_identity_first_list_members_returns_identity_native_entries`
153    // which regression-asserts that). `current_session_id` is kept as
154    // bridge-internal projection state; public realtime callers use the
155    // stable mob-member realtime target instead of routing through this id.
156    //
157    // `agent_runtime_id` and `fence_token` are binding-era atoms used
158    // by the bridge for wiring and stale-command rejection. They are
159    // `pub(crate)` and `#[serde(skip)]` so they do not leak to
160    // app-facing payloads (wire contract: app tools receive
161    // `agent_identity`; bridge session ids stay diagnostic/control
162    // projection data). External consumers that legitimately need these
163    // atoms (mob-mcp, mob-pack verify paths) route through a typed
164    // helper; they never reach into the field directly.
165    #[serde(skip)]
166    pub(crate) agent_runtime_id: AgentRuntimeId,
167    #[serde(skip)]
168    pub(crate) fence_token: FenceToken,
169    /// Canonical comms routing ID for bridge-internal peer lookup.
170    #[serde(default, skip_serializing_if = "Option::is_none")]
171    pub(crate) peer_id: Option<PeerId>,
172    /// Transport/auth public key material, separate from canonical `peer_id`.
173    #[serde(default, skip_serializing_if = "Option::is_none")]
174    pub(crate) transport_public_key: Option<String>,
175    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
176    pub(crate) external_peer_specs: BTreeMap<AgentIdentity, TrustedPeerDescriptor>,
177    #[serde(skip)]
178    pub(crate) current_session_id: Option<SessionId>,
179    #[serde(skip)]
180    pub(crate) current_bridge_session_id: Option<SessionId>,
181}
182
183impl MobMemberListEntry {
184    pub(crate) fn with_current_bridge_session_id(
185        mut self,
186        current_bridge_session_id: Option<SessionId>,
187    ) -> Self {
188        self.current_session_id = current_bridge_session_id.clone();
189        self.current_bridge_session_id = current_bridge_session_id;
190        self
191    }
192
193    /// Typed helper for server-side control dispatch that resolves an
194    /// app-facing `WireMemberRef` into the current incarnation before it
195    /// enters the work lane. Keeps the fields `pub(crate)` + `#[serde(skip)]`
196    /// so they never leak through Serialize/Debug-derived paths.
197    pub fn binding_atoms(&self) -> (AgentRuntimeId, FenceToken) {
198        (self.agent_runtime_id.clone(), self.fence_token)
199    }
200}
201
202impl WorkDeliveryReceipt {
203    /// Typed accessor for the submitting runtime identity. See
204    /// `MobMemberListEntry::binding_atoms` for rationale.
205    pub fn runtime_id(&self) -> &AgentRuntimeId {
206        &self.runtime_id
207    }
208}
209
210/// Live connectivity summary for a member's currently wired peers.
211#[derive(Debug, Clone, Serialize, Deserialize)]
212#[non_exhaustive]
213pub struct MobPeerConnectivitySnapshot {
214    pub reachable_peer_count: usize,
215    pub unknown_peer_count: usize,
216    pub unreachable_peers: Vec<MobUnreachablePeer>,
217}
218
219/// One currently wired peer that is known to be unreachable.
220#[derive(Debug, Clone, Serialize, Deserialize)]
221#[non_exhaustive]
222pub struct MobUnreachablePeer {
223    pub peer: String,
224    pub reason: Option<PeerReachabilityReason>,
225}
226
227/// Execution status for a mob member.
228#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
229#[serde(rename_all = "snake_case")]
230#[non_exhaustive]
231pub enum MobMemberStatus {
232    /// Member is active and potentially running.
233    Active,
234    /// Member is in the process of retiring.
235    Retiring,
236    /// Member failed to restore durable session state and needs repair.
237    Broken,
238    /// Member has completed (session archived or not found).
239    Completed,
240    /// Member is not in the roster.
241    Unknown,
242}
243
244/// Identity-native owner reference for external-member observation and hooks.
245#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
246#[serde(rename_all = "snake_case")]
247#[non_exhaustive]
248pub struct ExternalMemberOwnerRef {
249    pub mob_id: MobId,
250    pub agent_identity: AgentIdentity,
251}
252
253/// Whether an external member is still bridged through a local session or is
254/// a peer-only binding.
255#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
256#[serde(rename_all = "snake_case")]
257#[non_exhaustive]
258pub enum ExternalMemberBindingMode {
259    BridgeSessionBacked,
260    PeerOnly,
261}
262
263/// Current external-member reachability observation.
264#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
265#[serde(tag = "status", rename_all = "snake_case")]
266#[non_exhaustive]
267pub enum ExternalMemberReachability {
268    /// No live probe has been executed as part of this snapshot.
269    Unknown,
270    /// The member is known unavailable because canonical restore/binding
271    /// projection has failed.
272    Unavailable { reason: String },
273}
274
275/// Whether the supervisor has enough durable proof to rebind this external
276/// member after reconnect.
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278#[serde(tag = "status", rename_all = "snake_case")]
279#[non_exhaustive]
280pub enum ExternalMemberRebindStatus {
281    /// A bridge session is still present; peer-only rebind is not required.
282    NotRequired,
283    /// A peer-only binding carries the typed bootstrap proof needed for
284    /// supervisor bind fallback.
285    Available,
286    /// The peer-only binding is missing rebind proof.
287    Unavailable { reason: String },
288    /// Restore/binding normalization failed.
289    Failed { reason: String },
290}
291
292/// Declared external-member forwarding hook status.
293#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
294#[serde(rename_all = "snake_case")]
295#[non_exhaustive]
296pub enum ExternalMemberForwardingStatus {
297    /// The hook owner is typed and can be used by future artifact/approval
298    /// forwarding code; this slice does not execute forwarding.
299    Declared,
300}
301
302/// Stable forwarding hook reference for a mob-owned external member.
303#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
304#[serde(rename_all = "snake_case")]
305#[non_exhaustive]
306pub struct ExternalMemberForwardingHookRef {
307    pub owner: ExternalMemberOwnerRef,
308    pub status: ExternalMemberForwardingStatus,
309}
310
311/// External-member artifact/approval forwarding hook projection.
312#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
313#[serde(rename_all = "snake_case")]
314#[non_exhaustive]
315pub struct ExternalMemberForwardingHooks {
316    pub artifacts: ExternalMemberForwardingHookRef,
317    pub approvals: ExternalMemberForwardingHookRef,
318}
319
320/// Read-only observation block for an external mob member.
321#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
322#[serde(rename_all = "snake_case")]
323#[non_exhaustive]
324pub struct ExternalMemberObservationSnapshot {
325    pub owner: ExternalMemberOwnerRef,
326    pub binding_mode: ExternalMemberBindingMode,
327    pub bridge_session_present: bool,
328    pub reachability: ExternalMemberReachability,
329    pub rebind: ExternalMemberRebindStatus,
330    pub forwarding: ExternalMemberForwardingHooks,
331}
332
333impl ExternalMemberObservationSnapshot {
334    fn hook(owner: &ExternalMemberOwnerRef) -> ExternalMemberForwardingHookRef {
335        ExternalMemberForwardingHookRef {
336            owner: owner.clone(),
337            status: ExternalMemberForwardingStatus::Declared,
338        }
339    }
340
341    fn forwarding(owner: &ExternalMemberOwnerRef) -> ExternalMemberForwardingHooks {
342        ExternalMemberForwardingHooks {
343            artifacts: Self::hook(owner),
344            approvals: Self::hook(owner),
345        }
346    }
347}
348
349/// Receipt returned by a successful member respawn.
350#[derive(Debug, Clone, Serialize)]
351#[non_exhaustive]
352pub struct MemberRespawnReceipt {
353    /// The member identity that was respawned.
354    pub identity: AgentIdentity,
355    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
356    #[serde(skip)]
357    pub(crate) agent_runtime_id: AgentRuntimeId,
358    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
359    #[serde(skip)]
360    pub(crate) previous_fence_token: FenceToken,
361    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
362    #[serde(skip)]
363    pub(crate) fence_token: FenceToken,
364}
365
366impl MemberRespawnReceipt {
367    pub fn new(
368        identity: AgentIdentity,
369        agent_runtime_id: AgentRuntimeId,
370        previous_fence_token: FenceToken,
371        fence_token: FenceToken,
372    ) -> Self {
373        Self {
374            identity,
375            agent_runtime_id,
376            previous_fence_token,
377            fence_token,
378        }
379    }
380}
381
382/// Report returned after rotating a mob-owned supervisor authority.
383#[derive(Debug, Clone, Serialize)]
384#[non_exhaustive]
385pub struct SupervisorRotationReport {
386    /// Supervisor epoch before rotation.
387    pub previous_epoch: u64,
388    /// Supervisor epoch after rotation.
389    pub current_epoch: u64,
390    /// Public peer id for the new supervisor keypair.
391    pub public_peer_id: String,
392}
393
394/// Structured report returned from mob destroy.
395#[derive(Debug, Clone, Default, Serialize)]
396#[non_exhaustive]
397pub struct MobDestroyReport {
398    /// Members that required force-destroy semantics during cleanup.
399    #[serde(default, skip_serializing_if = "Vec::is_empty")]
400    pub force_destroyed_members: Vec<AgentIdentity>,
401    /// Remote members whose cleanup could not be completed before destroy ended.
402    #[serde(default, skip_serializing_if = "Vec::is_empty")]
403    pub orphaned_remote_members: Vec<AgentIdentity>,
404    /// Whether aggregate remote cleanup exceeded its deadline.
405    #[serde(default)]
406    pub remote_cleanup_deadline_exceeded: bool,
407    /// Whether runtime metadata was scrubbed.
408    #[serde(default)]
409    pub metadata_scrubbed: bool,
410    /// Whether persisted mob events were cleared.
411    #[serde(default)]
412    pub events_cleared: bool,
413    /// Whether namespace cleanup completed.
414    #[serde(default)]
415    pub namespace_cleaned: bool,
416    /// Human-readable cleanup errors captured while destroying.
417    #[serde(default, skip_serializing_if = "Vec::is_empty")]
418    pub errors: Vec<String>,
419}
420
421impl MobDestroyReport {
422    pub(crate) fn push_error(&mut self, error: impl Into<String>) {
423        self.errors.push(error.into());
424    }
425
426    fn error_summary(&self) -> String {
427        if self.errors.is_empty() {
428            "destroy cleanup did not complete".to_string()
429        } else {
430            self.errors.join("; ")
431        }
432    }
433}
434
435/// Structured error returned by mob destroy.
436#[derive(Debug, thiserror::Error)]
437#[non_exhaustive]
438pub enum MobDestroyError {
439    /// Destroy performed partial cleanup but could not finish the full contract.
440    #[error("destroy incomplete: {}", report.error_summary())]
441    Incomplete { report: MobDestroyReport },
442
443    /// A preflight or actor-level mob error occurred before partial reporting.
444    #[error(transparent)]
445    Mob(#[from] MobError),
446}
447
448/// Structured evidence captured when respawn cannot prove the old member is gone.
449#[derive(Debug, Clone, Serialize)]
450#[non_exhaustive]
451pub struct PreviousMemberCleanupReport {
452    /// Stable member identity.
453    pub identity: AgentIdentity,
454    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
455    #[serde(skip)]
456    pub(crate) agent_runtime_id: AgentRuntimeId,
457    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
458    #[serde(skip)]
459    pub(crate) fence_token: FenceToken,
460    /// Whether graceful retire was attempted.
461    pub retire_attempted: bool,
462    /// Error returned from the graceful retire attempt, when any.
463    #[serde(default, skip_serializing_if = "Option::is_none")]
464    pub retire_error: Option<String>,
465    /// Whether a confirmatory observation probe was attempted.
466    #[serde(default)]
467    pub confirmatory_observation_attempted: bool,
468    /// Observation probe detail, when any.
469    #[serde(default, skip_serializing_if = "Option::is_none")]
470    pub confirmatory_observation: Option<String>,
471    /// Whether force-destroy was attempted.
472    #[serde(default)]
473    pub destroy_attempted: bool,
474    /// Error returned from the force-destroy attempt, when any.
475    #[serde(default, skip_serializing_if = "Option::is_none")]
476    pub destroy_error: Option<String>,
477}
478
479/// Receipt returned by a successful member spawn.
480#[derive(Debug, Clone, Serialize)]
481#[non_exhaustive]
482pub(crate) struct MemberSpawnReceipt {
483    /// The member identity that was provisioned and committed into the roster.
484    pub(crate) member_ref: MemberRef,
485    /// Canonical mob child operation for the spawned member lifecycle.
486    pub(crate) operation_id: OperationId,
487}
488
489/// Public result from a successful member spawn.
490///
491/// The identity-native `agent_identity` is the public contract — it is
492/// what app-facing payloads surface. `agent_runtime_id` and `fence_token`
493/// are carried for crate-internal bridging (provisioning, wiring) but
494/// `pub(crate)` so external consumers must route through a `MobMemberView`
495/// or the identity seam rather than reading these binding-era atoms
496/// directly.
497#[derive(Debug, Clone, Serialize)]
498#[non_exhaustive]
499pub struct SpawnResult {
500    /// Stable member identity — the one app-facing identity atom.
501    pub agent_identity: AgentIdentity,
502    /// Composite runtime id. `pub(crate)` — binding-era detail, not
503    /// an app-facing identity.
504    #[serde(skip)]
505    pub(crate) agent_runtime_id: AgentRuntimeId,
506    /// Fence token for stale-command rejection. `pub(crate)` — the
507    /// bridge uses it; app-facing payloads do not surface it.
508    #[serde(skip)]
509    pub(crate) fence_token: FenceToken,
510}
511
512impl SpawnResult {
513    /// Create a new spawn result from identity-native fields.
514    pub fn new(
515        agent_identity: AgentIdentity,
516        agent_runtime_id: AgentRuntimeId,
517        fence_token: FenceToken,
518    ) -> Self {
519        Self {
520            agent_identity,
521            agent_runtime_id,
522            fence_token,
523        }
524    }
525}
526
527#[derive(Clone)]
528pub(crate) struct CanonicalOpsOwnerContext {
529    pub(crate) owner_bridge_session_id: SessionId,
530    pub(crate) ops_registry: Arc<dyn OpsLifecycleRegistry>,
531}
532
533/// Structured error for direct-Rust respawn failures.
534#[derive(Debug, thiserror::Error)]
535#[non_exhaustive]
536pub enum MobRespawnError {
537    /// Member has no runtime control channel for replacement.
538    #[error("no runtime control channel for member {identity}")]
539    NoRuntimeControl { identity: AgentIdentity },
540
541    /// Spawn failed after the old member was retired.
542    #[error("spawn failed after retire for member {identity}: {reason}")]
543    SpawnAfterRetire {
544        identity: AgentIdentity,
545        reason: String,
546    },
547
548    /// Topology restore failed after replacement spawn.
549    /// The replacement receipt is carried so callers can still use the new session.
550    #[error("topology restore failed for member {}: {} peer(s) failed", receipt.identity, failed_peer_ids.len())]
551    TopologyRestoreFailed {
552        receipt: MemberRespawnReceipt,
553        failed_peer_ids: Vec<AgentIdentity>,
554    },
555
556    /// Retire cleanup progressed far enough that the old member may still exist,
557    /// but respawn could not prove it was fully cleaned up.
558    #[error("previous member cleanup ambiguous for member {}", report.identity)]
559    PreviousMemberCleanupAmbiguous { report: PreviousMemberCleanupReport },
560
561    /// An underlying mob error occurred before mutation.
562    #[error(transparent)]
563    Mob(#[from] MobError),
564}
565
566/// Receipt returned by member message delivery.
567#[derive(Debug, Clone, Serialize)]
568#[non_exhaustive]
569pub struct MemberDeliveryReceipt {
570    /// The member identity.
571    pub identity: AgentIdentity,
572    /// How the message was handled.
573    pub handling_mode: HandlingMode,
574    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
575    #[serde(skip)]
576    pub(crate) agent_runtime_id: AgentRuntimeId,
577    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
578    #[serde(skip)]
579    pub(crate) fence_token: FenceToken,
580}
581
582/// Receipt returned by sender-aware mob peer-message delivery.
583#[derive(Debug, Clone, Serialize)]
584#[non_exhaustive]
585pub struct PeerMessageReceipt {
586    /// Sender mob-member identity.
587    pub from: AgentIdentity,
588    /// Recipient mob-member identity.
589    pub to: AgentIdentity,
590    /// Transport envelope id for the typed peer message.
591    pub envelope_id: uuid::Uuid,
592    /// Whether the transport reported an acknowledgement.
593    pub acked: bool,
594    /// How the recipient should handle the peer message.
595    pub handling_mode: HandlingMode,
596}
597
598/// Receipt confirming that a unit of work was accepted by the work lane.
599#[derive(Debug, Clone, Serialize)]
600#[non_exhaustive]
601pub struct WorkDeliveryReceipt {
602    /// The work reference for the submitted unit.
603    pub work_ref: WorkRef,
604    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
605    #[serde(skip)]
606    pub(crate) runtime_id: AgentRuntimeId,
607}
608
609/// Options for helper convenience spawns.
610#[derive(Debug, Clone, Default)]
611#[non_exhaustive]
612pub struct HelperOptions {
613    /// Role name (profile key) to use. If None, requires a default profile in the definition.
614    pub role_name: Option<ProfileName>,
615    /// Runtime mode override.
616    pub runtime_mode: Option<crate::MobRuntimeMode>,
617    /// Backend override.
618    pub backend: Option<MobBackendKind>,
619    /// Tool access policy for the helper.
620    pub tool_access_policy: Option<meerkat_core::ops::ToolAccessPolicy>,
621    /// Explicit auth binding used for the helper member's agent build.
622    pub auth_binding: Option<meerkat_core::AuthBindingRef>,
623    /// Pre-resolved inherited tool filter from scheduled or agent-owned tooling resolution.
624    pub inherited_tool_filter: Option<meerkat_core::WitnessedToolFilter>,
625    /// Override profile resolved from scheduled or agent-owned tooling resolution.
626    pub override_profile: Option<crate::profile::Profile>,
627    /// Model override resolved from scheduled helper tooling.
628    pub model_override: Option<String>,
629    /// Provider params override resolved from scheduled helper tooling.
630    pub provider_params_override: Option<serde_json::Value>,
631}
632
633/// Result from a helper spawn-and-wait operation.
634#[derive(Debug, Clone, Serialize)]
635#[non_exhaustive]
636pub struct HelperResult {
637    /// The member's final output text.
638    pub output: Option<String>,
639    /// Total tokens used by the helper.
640    pub tokens_used: u64,
641    /// Stable member identity for the helper run.
642    pub agent_identity: AgentIdentity,
643    /// Identity-native runtime ID for this incarnation.
644    ///
645    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
646    #[serde(skip)]
647    pub(crate) agent_runtime_id: AgentRuntimeId,
648    /// Fence token for the current incarnation.
649    ///
650    /// Binding-era atom: bridge-internal, `pub(crate)` + `#[serde(skip)]`.
651    #[serde(skip)]
652    pub(crate) fence_token: FenceToken,
653}
654
655/// Target for a wire operation from a local mob member.
656#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
657#[serde(rename_all = "snake_case")]
658pub enum PeerTarget {
659    /// Another member in the same mob roster.
660    Local(AgentIdentity),
661    /// External peer handle for operations that only need the mob-owned edge.
662    ExternalName(meerkat_core::comms::PeerName),
663    /// A typed external binding request resolved by the mob actor before trust install.
664    ExternalBinding(ExternalPeerBindingSpec),
665    /// A trusted peer that lives outside the local mob roster.
666    External(TrustedPeerDescriptor),
667}
668
669/// Summary for one dense local-member topology materialization pass.
670#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
671pub struct MobWireMembersBatchReport {
672    /// Number of edges requested before normalization/deduplication.
673    pub requested: usize,
674    /// Normalized unique edges already present in the MobMachine graph.
675    pub already_wired: Vec<crate::event::MemberWireEdge>,
676    /// Normalized unique edges newly admitted by the MobMachine graph.
677    pub wired: Vec<crate::event::MemberWireEdge>,
678}
679
680/// Typed request to bind a local member to an external peer.
681///
682/// App-facing surfaces provide this shape instead of comms-owned `peer_id` /
683/// `pubkey` atoms. The mob actor resolves the evidence into a
684/// `TrustedPeerDescriptor` immediately before the machine admits the external
685/// edge and before any trust is installed.
686#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
687pub struct ExternalPeerBindingSpec {
688    pub name: String,
689    pub address: String,
690    pub identity: meerkat_contracts::WireTrustedPeerIdentity,
691}
692
693impl ExternalPeerBindingSpec {
694    pub fn new(
695        name: impl Into<String>,
696        address: impl Into<String>,
697        identity: meerkat_contracts::WireTrustedPeerIdentity,
698    ) -> Self {
699        Self {
700            name: name.into(),
701            address: address.into(),
702            identity,
703        }
704    }
705}
706
707// DELETE_ME A5 DSL-schema migration: `MeerkatId` is now a type alias
708// for `AgentIdentity`, so the two `From<...> for PeerTarget` impls
709// that used to exist collapse into one.
710impl From<AgentIdentity> for PeerTarget {
711    fn from(value: AgentIdentity) -> Self {
712        Self::Local(value)
713    }
714}
715
716// ---------------------------------------------------------------------------
717// MobHandle
718// ---------------------------------------------------------------------------
719
720/// Clone-cheap, thread-safe handle for interacting with a running mob.
721///
722/// All mutation commands are sent through an mpsc channel to the actor.
723/// Public orchestration, event, and task surfaces are routed through the
724/// top-level machine command seam. A few immutable/shared projections still
725/// read canonical shared state directly inside that seam's implementation.
726/// The persisted event ledger is also retained here for terminal read-only
727/// fallback after `Destroy`, when the actor has exited by contract.
728#[derive(Clone)]
729pub struct MobHandle {
730    pub(super) command_tx: mpsc::Sender<MobCommand>,
731    pub(super) roster: Arc<RwLock<RosterAuthority>>,
732    pub(super) definition: Arc<MobDefinition>,
733    pub(super) events: Arc<dyn MobEventStore>,
734    pub(super) run_store: Arc<dyn MobRunStore>,
735    pub(super) flow_streams:
736        Arc<tokio::sync::Mutex<BTreeMap<RunId, mpsc::Sender<meerkat_core::ScopedAgentEvent>>>>,
737    pub(super) session_service: Arc<dyn MobSessionService>,
738    #[cfg(feature = "runtime-adapter")]
739    pub(super) runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
740    pub(super) restore_diagnostics: Arc<RwLock<HashMap<MeerkatId, RestoreFailureDiagnostic>>>,
741    /// Read-only projection of the actor-owned MobMachine state. The actor is
742    /// the sole writer; handles use this only for non-blocking status/list
743    /// surfaces that must remain observable while a mutating command is
744    /// awaiting shell cleanup.
745    pub(super) machine_state_watch_rx: tokio::sync::watch::Receiver<mob_dsl::MobMachineState>,
746    /// Read-only receiver for the actor's terminal-phase projection. The
747    /// actor (sole writer) publishes the current DSL phase after every
748    /// phase-changing transition and once more before exiting. Used by
749    /// `status()` as the fallback when the command channel has closed
750    /// (actor has exited). Dogma-#13 projection: source truth is the DSL
751    /// authority inside the actor; this seam is rebuildable (replay) and
752    /// read-only on the handle side.
753    pub(super) phase_watch_rx: tokio::sync::watch::Receiver<MobState>,
754    /// Optional realtime session factory injected via
755    /// [`super::MobBuilder::with_realtime_session_factory`] (W2-E / issue
756    /// #264). Test harnesses retrieve it via
757    /// [`MobHandle::realtime_session_factory`] so a `RealtimeWsHost`
758    /// bound to the same runtime can be configured against a
759    /// deterministic in-process mock. `None` when no factory was
760    /// provided (production mob paths typically wire the factory at the
761    /// surface layer directly).
762    pub(super) realtime_session_factory: Option<Arc<dyn meerkat_client::RealtimeSessionFactory>>,
763}
764
765impl MobHandle {
766    /// Accessor for the realtime session factory carried from
767    /// [`super::MobBuilder::with_realtime_session_factory`] (W2-E).
768    pub fn realtime_session_factory(
769        &self,
770    ) -> Option<Arc<dyn meerkat_client::RealtimeSessionFactory>> {
771        self.realtime_session_factory.as_ref().map(Arc::clone)
772    }
773
774    async fn member_machine_projection(
775        &self,
776        agent_identity: &MeerkatId,
777    ) -> super::state::MobMemberMachineProjection {
778        self.send_actor_command(
779            |reply_tx| super::state::MobCommand::MemberMachineProjection {
780                agent_identity: AgentIdentity::from(agent_identity.as_str()),
781                reply_tx,
782            },
783        )
784        .await
785        .unwrap_or_default()
786    }
787}
788
789#[derive(Debug, Clone)]
790pub(crate) struct RestoreFailureDiagnostic {
791    pub(crate) bridge_session_id: Option<SessionId>,
792    pub(crate) reason: String,
793}
794
795/// Clone-cheap, capability-bearing handle for interacting with one mob member.
796///
797/// This is the target 0.5 API surface for message/turn submission. The mob
798/// handle remains orchestration/control-plane oriented, while member-directed
799/// delivery goes through this narrower capability.
800#[derive(Clone)]
801pub struct MemberHandle {
802    mob: MobHandle,
803    agent_identity: MeerkatId,
804}
805
806#[derive(Clone)]
807pub struct MobEventsView {
808    handle: MobHandle,
809}
810
811/// Configuration for a structural mob event subscription.
812#[derive(Debug, Clone, Copy)]
813#[non_exhaustive]
814pub struct MobEventsSubscriptionConfig {
815    /// Cursor to start after. `None` starts at the current latest cursor.
816    pub after_cursor: Option<u64>,
817    /// Maximum number of persisted events read per catch-up batch.
818    pub batch_limit: usize,
819    /// Capacity of the output event channel.
820    pub channel_capacity: usize,
821}
822
823impl Default for MobEventsSubscriptionConfig {
824    fn default() -> Self {
825        Self {
826            after_cursor: None,
827            batch_limit: 128,
828            channel_capacity: 256,
829        }
830    }
831}
832
833/// Handle for a structural mob event subscription.
834///
835/// Receives persisted [`crate::event::MobEvent`] records from the mob event
836/// ledger. Drop the handle, or call [`Self::cancel`], to stop the background
837/// forwarding task.
838pub struct MobEventsSubscription {
839    pub event_rx: mpsc::Receiver<crate::event::MobEvent>,
840    cancel: CancellationToken,
841}
842
843impl MobEventsSubscription {
844    pub fn cancel(&self) {
845        self.cancel.cancel();
846    }
847}
848
849impl Drop for MobEventsSubscription {
850    fn drop(&mut self) {
851        self.cancel.cancel();
852    }
853}
854
855/// Typed source of a mob member spawn request.
856#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
857#[serde(rename_all = "snake_case")]
858#[non_exhaustive]
859pub enum SpawnSource {
860    Consumer,
861    AgentSpawnMember,
862    HelperSpawn,
863    BatchItem,
864    PolicySpawn,
865    Respawn,
866    Resume,
867    Fork,
868}
869
870impl SpawnSource {
871    #[must_use]
872    pub fn as_str(self) -> &'static str {
873        match self {
874            Self::Consumer => "consumer",
875            Self::AgentSpawnMember => "agent_spawn_member",
876            Self::HelperSpawn => "helper_spawn",
877            Self::BatchItem => "batch_item",
878            Self::PolicySpawn => "policy_spawn",
879            Self::Respawn => "respawn",
880            Self::Resume => "resume",
881            Self::Fork => "fork",
882        }
883    }
884
885    #[must_use]
886    fn for_launch_mode(base: Self, launch_mode: &crate::launch::MemberLaunchMode) -> Self {
887        match launch_mode {
888            crate::launch::MemberLaunchMode::Resume { .. } => Self::Resume,
889            crate::launch::MemberLaunchMode::Fork { .. } => Self::Fork,
890            crate::launch::MemberLaunchMode::Fresh => base,
891        }
892    }
893}
894
895/// Typed system prompt replacement for a single spawn.
896#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
897#[serde(rename_all = "snake_case")]
898#[non_exhaustive]
899pub enum SpawnSystemPromptOverride {
900    Replace(String),
901}
902
903/// Durable identity continuity intent attached to a spawn.
904#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
905#[serde(tag = "type", rename_all = "snake_case")]
906#[non_exhaustive]
907pub enum SpawnContinuityIntent {
908    #[default]
909    Ephemeral,
910    DurableIdentity {
911        continuity_key: String,
912    },
913}
914
915/// Build-boundary context supplied to [`SpawnMemberCustomizer`].
916#[derive(Debug, Clone, PartialEq, Eq)]
917#[non_exhaustive]
918pub struct SpawnCustomizationContext {
919    pub mob_id: MobId,
920    pub spawn_source: SpawnSource,
921    pub spawner_identity: Option<AgentIdentity>,
922    pub spawner_runtime_id: Option<AgentRuntimeId>,
923    pub requested_profile: ProfileName,
924}
925
926/// Narrow pre-build mutator for per-spawn construction inputs.
927pub trait SpawnMemberCustomizer: Send + Sync {
928    fn customize_spawn(
929        &self,
930        ctx: &SpawnCustomizationContext,
931        spec: &mut SpawnMemberSpec,
932    ) -> Result<(), MobError>;
933}
934
935/// Spawn request for first-class batch member provisioning.
936#[derive(Clone)]
937#[non_exhaustive]
938pub struct SpawnMemberSpec {
939    /// The role name (profile key) for this member in the mob roster.
940    ///
941    /// When `tooling` is present it controls model/tool resolution;
942    /// `role_name` remains a roster/topology label.
943    pub role_name: ProfileName,
944    pub identity: AgentIdentity,
945    pub initial_message: Option<ContentInput>,
946    pub runtime_mode: Option<crate::MobRuntimeMode>,
947    pub backend: Option<MobBackendKind>,
948    /// Runtime binding for this member. When set, takes precedence over
949    /// `backend` and carries concrete binding details (e.g., external process
950    /// comms identity). First step toward identity-first mobs.
951    pub binding: Option<crate::RuntimeBinding>,
952    /// Opaque application context passed through to the agent build pipeline.
953    pub context: Option<serde_json::Value>,
954    /// Application-defined labels for this member.
955    pub labels: Option<std::collections::BTreeMap<String, String>>,
956    /// How this member should be launched (fresh, resume, or fork).
957    ///
958    /// Public spawn-policy seam (DELETE_ME A3 + C1): external consumers
959    /// use [`Self::with_launch_mode`] /
960    /// [`Self::with_resume_bridge_session_id`] to configure session
961    /// adoption. See [`crate::launch::MemberLaunchMode`] for the
962    /// variants and [`crate::launch::ForkContext`] for fork
963    /// configuration.
964    pub launch_mode: crate::launch::MemberLaunchMode,
965    /// Tool access policy for this member.
966    pub tool_access_policy: Option<meerkat_core::ops::ToolAccessPolicy>,
967    /// How to split budget from the orchestrator to this member.
968    pub budget_split_policy: Option<crate::launch::BudgetSplitPolicy>,
969    /// When true, automatically wire this member to its spawner.
970    pub auto_wire_parent: bool,
971    /// Additional instruction sections appended to the system prompt for this member.
972    pub additional_instructions: Option<Vec<String>>,
973    /// Per-agent environment variables injected into shell tool subprocesses.
974    pub shell_env: Option<std::collections::HashMap<String, String>>,
975    /// Pre-resolved inherited tool filter from spawn tooling resolution.
976    ///
977    /// When set, stored in canonical session visibility metadata with filter
978    /// witnesses so the runtime-backed core build restores it as witnessed
979    /// machine-owned visibility.
980    pub inherited_tool_filter: Option<meerkat_core::WitnessedToolFilter>,
981    /// Override profile resolved from `SpawnTooling::Profile` source.
982    ///
983    /// When set, the spawn path uses this profile instead of looking up by
984    /// `role_name` from the mob definition. This allows agent-owned spawn
985    /// tooling to specify a different model/skills/tools via inline or
986    /// realm-scoped profiles.
987    pub override_profile: Option<crate::profile::Profile>,
988    /// Model override resolved outside the mob runtime while keeping the selected role profile.
989    pub model_override: Option<String>,
990    /// Provider params override resolved outside the mob runtime while keeping the selected role profile.
991    pub provider_params_override: Option<serde_json::Value>,
992    /// Per-member auth binding. When set, this member's agent builds with
993    /// `AgentBuildConfig.auth_binding = Some(this)`, scoping credential
994    /// resolution to the named realm + binding. `None` means the caller did not
995    /// provide binding authority; build paths that require a binding must reject
996    /// the spawn instead of promoting an ambient fallback.
997    pub auth_binding: Option<meerkat_core::AuthBindingRef>,
998    /// Per-spawn external tool overlay. In-process only and not persisted.
999    pub external_tools: Option<Arc<dyn AgentToolDispatcher>>,
1000    /// Typed prompt replacement for this spawn.
1001    pub system_prompt_override: Option<SpawnSystemPromptOverride>,
1002    /// Explicit helper/member continuity intent.
1003    pub continuity_intent: SpawnContinuityIntent,
1004}
1005
1006impl std::fmt::Debug for SpawnMemberSpec {
1007    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1008        f.debug_struct("SpawnMemberSpec")
1009            .field("role_name", &self.role_name)
1010            .field("identity", &self.identity)
1011            .field("initial_message", &self.initial_message)
1012            .field("runtime_mode", &self.runtime_mode)
1013            .field("backend", &self.backend)
1014            .field("binding", &self.binding)
1015            .field("context", &self.context)
1016            .field("labels", &self.labels)
1017            .field("launch_mode", &self.launch_mode)
1018            .field("tool_access_policy", &self.tool_access_policy)
1019            .field("budget_split_policy", &self.budget_split_policy)
1020            .field("auto_wire_parent", &self.auto_wire_parent)
1021            .field("additional_instructions", &self.additional_instructions)
1022            .field("shell_env", &self.shell_env)
1023            .field("inherited_tool_filter", &self.inherited_tool_filter)
1024            .field("override_profile", &self.override_profile)
1025            .field("model_override", &self.model_override)
1026            .field("provider_params_override", &self.provider_params_override)
1027            .field("auth_binding", &self.auth_binding)
1028            .field("external_tools", &self.external_tools.is_some())
1029            .field("system_prompt_override", &self.system_prompt_override)
1030            .field("continuity_intent", &self.continuity_intent)
1031            .finish()
1032    }
1033}
1034
1035impl SpawnMemberSpec {
1036    pub fn new(profile: impl Into<ProfileName>, identity: impl Into<AgentIdentity>) -> Self {
1037        Self {
1038            role_name: profile.into(),
1039            identity: identity.into(),
1040            initial_message: None,
1041            runtime_mode: None,
1042            backend: None,
1043            binding: None,
1044            context: None,
1045            labels: None,
1046            launch_mode: crate::launch::MemberLaunchMode::Fresh,
1047            tool_access_policy: None,
1048            budget_split_policy: None,
1049            auto_wire_parent: false,
1050            additional_instructions: None,
1051            shell_env: None,
1052            inherited_tool_filter: None,
1053            override_profile: None,
1054            model_override: None,
1055            provider_params_override: None,
1056            auth_binding: None,
1057            external_tools: None,
1058            system_prompt_override: None,
1059            continuity_intent: SpawnContinuityIntent::Ephemeral,
1060        }
1061    }
1062
1063    /// Set the per-member auth binding (deferral §1).
1064    pub fn with_auth_binding(mut self, conn_ref: meerkat_core::AuthBindingRef) -> Self {
1065        self.auth_binding = Some(conn_ref);
1066        self
1067    }
1068
1069    pub fn with_shell_env(mut self, env: std::collections::HashMap<String, String>) -> Self {
1070        self.shell_env = Some(env);
1071        self
1072    }
1073
1074    pub fn with_initial_message(mut self, message: impl Into<ContentInput>) -> Self {
1075        self.initial_message = Some(message.into());
1076        self
1077    }
1078
1079    pub fn with_runtime_mode(mut self, mode: crate::MobRuntimeMode) -> Self {
1080        self.runtime_mode = Some(mode);
1081        self
1082    }
1083
1084    pub fn with_backend(mut self, backend: MobBackendKind) -> Self {
1085        self.backend = Some(backend);
1086        self
1087    }
1088
1089    pub fn with_context(mut self, context: serde_json::Value) -> Self {
1090        self.context = Some(context);
1091        self
1092    }
1093
1094    pub fn with_labels(mut self, labels: std::collections::BTreeMap<String, String>) -> Self {
1095        self.labels = Some(labels);
1096        self
1097    }
1098
1099    /// Set launch mode to resume an existing bridge session.
1100    ///
1101    /// DELETE_ME A3 + C1: public session-adoption seam. Callers holding
1102    /// a bridge session id (for example from a prior
1103    /// [`crate::runtime::MobHandle::resolve_bridge_session_id`] lookup
1104    /// or from durable mob-event replay) use this builder method to
1105    /// spawn a member whose backing session continues that binding
1106    /// instead of starting fresh.
1107    pub fn with_resume_bridge_session_id(mut self, id: meerkat_core::types::SessionId) -> Self {
1108        self.launch_mode = crate::launch::MemberLaunchMode::Resume {
1109            bridge_session_id: id,
1110        };
1111        self
1112    }
1113
1114    /// Set an explicit [`crate::launch::MemberLaunchMode`].
1115    ///
1116    /// DELETE_ME A3 + C1: public session-adoption seam for callers that
1117    /// construct their own `MemberLaunchMode` value (e.g. fork-from-
1118    /// sibling with a caller-chosen [`crate::launch::ForkContext`]).
1119    /// For the common "resume a specific bridge session" case prefer
1120    /// [`Self::with_resume_bridge_session_id`].
1121    pub fn with_launch_mode(mut self, mode: crate::launch::MemberLaunchMode) -> Self {
1122        self.launch_mode = mode;
1123        self
1124    }
1125
1126    pub fn with_tool_access_policy(mut self, policy: meerkat_core::ops::ToolAccessPolicy) -> Self {
1127        self.tool_access_policy = Some(policy);
1128        self
1129    }
1130
1131    pub fn with_budget_split_policy(mut self, policy: crate::launch::BudgetSplitPolicy) -> Self {
1132        self.budget_split_policy = Some(policy);
1133        self
1134    }
1135
1136    pub fn with_auto_wire_parent(mut self, auto_wire: bool) -> Self {
1137        self.auto_wire_parent = auto_wire;
1138        self
1139    }
1140
1141    pub fn with_additional_instructions(mut self, instructions: Vec<String>) -> Self {
1142        self.additional_instructions = Some(instructions);
1143        self
1144    }
1145
1146    pub fn from_wire(
1147        profile: String,
1148        agent_identity: String,
1149        initial_message: Option<ContentInput>,
1150        runtime_mode: Option<crate::MobRuntimeMode>,
1151        backend: Option<MobBackendKind>,
1152    ) -> Self {
1153        let mut spec = Self::new(profile, agent_identity);
1154        spec.initial_message = initial_message;
1155        spec.runtime_mode = runtime_mode;
1156        spec.backend = backend;
1157        spec
1158    }
1159}
1160
1161impl MobEventsView {
1162    pub async fn latest_cursor(&self) -> Result<u64, MobError> {
1163        self.handle
1164            .events
1165            .latest_cursor()
1166            .await
1167            .map_err(MobError::from)
1168    }
1169
1170    /// Subscribe to structural mob events recorded in the mob event ledger.
1171    ///
1172    /// This is distinct from [`MobHandle::subscribe_mob_events`], which routes
1173    /// member-agent events. The returned stream yields [`crate::event::MobEvent`]
1174    /// records and starts after the current latest cursor.
1175    pub async fn subscribe(&self) -> Result<MobEventsSubscription, MobError> {
1176        self.subscribe_with_config(MobEventsSubscriptionConfig::default())
1177            .await
1178    }
1179
1180    /// Subscribe to structural mob events after an explicit cursor.
1181    pub async fn subscribe_after(
1182        &self,
1183        after_cursor: u64,
1184    ) -> Result<MobEventsSubscription, MobError> {
1185        self.subscribe_with_config(MobEventsSubscriptionConfig {
1186            after_cursor: Some(after_cursor),
1187            ..MobEventsSubscriptionConfig::default()
1188        })
1189        .await
1190    }
1191
1192    /// Like [`Self::subscribe`] with explicit catch-up and channel settings.
1193    pub async fn subscribe_with_config(
1194        &self,
1195        config: MobEventsSubscriptionConfig,
1196    ) -> Result<MobEventsSubscription, MobError> {
1197        let config = MobEventsSubscriptionConfig {
1198            batch_limit: config.batch_limit.max(1),
1199            channel_capacity: config.channel_capacity.max(1),
1200            ..config
1201        };
1202        let explicit_after_cursor = config.after_cursor.is_some();
1203        let source_rx = self.handle.events.subscribe().map_err(MobError::from)?;
1204        let after_cursor = match config.after_cursor {
1205            Some(cursor) => {
1206                let latest_cursor = self.latest_cursor().await?;
1207                if cursor > latest_cursor {
1208                    return Err(MobError::StaleEventCursor {
1209                        after_cursor: cursor,
1210                        latest_cursor,
1211                    });
1212                }
1213                cursor
1214            }
1215            None => self.latest_cursor().await?,
1216        };
1217        Ok(spawn_structural_event_subscription(
1218            self.clone(),
1219            source_rx,
1220            after_cursor,
1221            explicit_after_cursor,
1222            config,
1223        ))
1224    }
1225
1226    pub async fn poll(
1227        &self,
1228        after_cursor: u64,
1229        limit: usize,
1230    ) -> Result<Vec<crate::event::MobEvent>, MobError> {
1231        match self
1232            .handle
1233            .execute_machine_command(MobMachineCommand::PollEvents {
1234                after_cursor,
1235                limit,
1236            })
1237            .await?
1238        {
1239            MobMachineCommandResult::MobEvents(events) => Ok(events),
1240            _ => Err(MobError::Internal(
1241                "unexpected command result variant".into(),
1242            )),
1243        }
1244    }
1245
1246    pub async fn poll_strict(
1247        &self,
1248        after_cursor: u64,
1249        limit: usize,
1250    ) -> Result<Vec<crate::event::MobEvent>, MobError> {
1251        let latest_cursor = self.latest_cursor().await?;
1252        if after_cursor > latest_cursor {
1253            return Err(MobError::StaleEventCursor {
1254                after_cursor,
1255                latest_cursor,
1256            });
1257        }
1258        self.poll(after_cursor, limit).await
1259    }
1260
1261    pub async fn replay_all(&self) -> Result<Vec<crate::event::MobEvent>, MobError> {
1262        match self
1263            .handle
1264            .execute_machine_command(MobMachineCommand::ReplayAllEvents)
1265            .await?
1266        {
1267            MobMachineCommandResult::MobEvents(events) => Ok(events),
1268            _ => Err(MobError::Internal(
1269                "unexpected command result variant".into(),
1270            )),
1271        }
1272    }
1273}
1274
1275#[allow(clippy::ignored_unit_patterns)]
1276fn spawn_structural_event_subscription(
1277    events: MobEventsView,
1278    mut source_rx: crate::store::MobEventReceiver,
1279    mut cursor: u64,
1280    catch_up_on_start: bool,
1281    config: MobEventsSubscriptionConfig,
1282) -> MobEventsSubscription {
1283    let (event_tx, event_rx) = mpsc::channel(config.channel_capacity);
1284    let cancel = CancellationToken::new();
1285    let cancel_clone = cancel.clone();
1286
1287    tokio::spawn(async move {
1288        if catch_up_on_start
1289            && !catch_up_structural_events(&events, &event_tx, &mut cursor, config.batch_limit)
1290                .await
1291        {
1292            return;
1293        }
1294
1295        loop {
1296            tokio::select! {
1297                () = cancel_clone.cancelled() => break,
1298                received = source_rx.recv() => {
1299                    match received {
1300                        Ok(event) => {
1301                            if event.cursor > cursor.saturating_add(1)
1302                                && !catch_up_structural_events(
1303                                    &events,
1304                                    &event_tx,
1305                                    &mut cursor,
1306                                    config.batch_limit,
1307                                )
1308                                .await
1309                            {
1310                                return;
1311                            }
1312                            if event.cursor > cursor {
1313                                cursor = event.cursor;
1314                                if event_tx.send(event).await.is_err() {
1315                                    return;
1316                                }
1317                            }
1318                        }
1319                        Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
1320                            if !catch_up_structural_events(
1321                                &events,
1322                                &event_tx,
1323                                &mut cursor,
1324                                config.batch_limit,
1325                            )
1326                            .await
1327                            {
1328                                return;
1329                            }
1330                        }
1331                        Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1332                    }
1333                }
1334            }
1335        }
1336    });
1337
1338    MobEventsSubscription { event_rx, cancel }
1339}
1340
1341async fn catch_up_structural_events(
1342    events: &MobEventsView,
1343    event_tx: &mpsc::Sender<crate::event::MobEvent>,
1344    cursor: &mut u64,
1345    batch_limit: usize,
1346) -> bool {
1347    loop {
1348        let batch = match events.poll(*cursor, batch_limit).await {
1349            Ok(batch) => batch,
1350            Err(error) => {
1351                tracing::warn!(
1352                    error = %error,
1353                    "mob structural event subscription stopped after catch-up failure",
1354                );
1355                return false;
1356            }
1357        };
1358        if batch.is_empty() {
1359            return true;
1360        }
1361
1362        let is_complete = batch.len() < batch_limit;
1363        for event in batch {
1364            if event.cursor <= *cursor {
1365                continue;
1366            }
1367            *cursor = event.cursor;
1368            if event_tx.send(event).await.is_err() {
1369                return false;
1370            }
1371        }
1372
1373        if is_complete {
1374            return true;
1375        }
1376    }
1377}
1378
1379impl MobHandle {
1380    async fn restore_failure_for(
1381        &self,
1382        agent_identity: &MeerkatId,
1383    ) -> Option<RestoreFailureDiagnostic> {
1384        self.restore_diagnostics
1385            .read()
1386            .await
1387            .get(agent_identity)
1388            .cloned()
1389    }
1390
1391    fn restore_failure_error(
1392        agent_identity: &MeerkatId,
1393        diag: RestoreFailureDiagnostic,
1394    ) -> MobError {
1395        MobError::MemberRestoreFailed {
1396            member_id: agent_identity.clone(),
1397            session_id: diag.bridge_session_id,
1398            reason: diag.reason,
1399        }
1400    }
1401
1402    async fn send_actor_command<R>(
1403        &self,
1404        build: impl FnOnce(oneshot::Sender<R>) -> MobCommand,
1405    ) -> Result<R, MobError> {
1406        let (reply_tx, reply_rx) = oneshot::channel();
1407        self.command_tx
1408            .send(build(reply_tx))
1409            .await
1410            .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1411        reply_rx
1412            .await
1413            .map_err(|_| MobError::Internal("actor reply dropped".into()))
1414    }
1415
1416    async fn execute_machine_command(
1417        &self,
1418        command: MobMachineCommand,
1419    ) -> Result<MobMachineCommandResult, MobError> {
1420        match command {
1421            MobMachineCommand::RunFlow {
1422                flow_id,
1423                activation_params,
1424                scoped_event_tx,
1425            } => {
1426                let run_id = self
1427                    .send_actor_command(|reply_tx| MobCommand::RunFlow {
1428                        flow_id,
1429                        activation_params,
1430                        scoped_event_tx,
1431                        reply_tx,
1432                    })
1433                    .await??;
1434                Ok(MobMachineCommandResult::RunId(run_id))
1435            }
1436            MobMachineCommand::CancelFlow { run_id } => {
1437                self.send_actor_command(|reply_tx| MobCommand::CancelFlow { run_id, reply_tx })
1438                    .await??;
1439                Ok(MobMachineCommandResult::Unit)
1440            }
1441            MobMachineCommand::FlowStatus { run_id } => {
1442                let status = self
1443                    .send_actor_command(|reply_tx| MobCommand::FlowStatus { run_id, reply_tx })
1444                    .await??;
1445                Ok(MobMachineCommandResult::FlowStatus(status))
1446            }
1447            MobMachineCommand::Spawn {
1448                spec,
1449                spawn_source,
1450                owner_context,
1451            } => {
1452                let (owner_bridge_session_id, ops_registry) = match owner_context {
1453                    Some(ctx) => (Some(ctx.owner_bridge_session_id), Some(ctx.ops_registry)),
1454                    None => (None, None),
1455                };
1456                let receipt = self
1457                    .send_actor_command(|reply_tx| MobCommand::Spawn {
1458                        spec,
1459                        spawn_source,
1460                        owner_bridge_session_id,
1461                        ops_registry,
1462                        reply_tx,
1463                    })
1464                    .await??;
1465                Ok(MobMachineCommandResult::SpawnReceipt(receipt))
1466            }
1467            MobMachineCommand::EnsureMember { spec } => {
1468                let outcome = self.handle_ensure_member(*spec).await?;
1469                Ok(MobMachineCommandResult::EnsureMember(outcome))
1470            }
1471            MobMachineCommand::Reconcile { desired, options } => {
1472                let report = self.handle_reconcile(desired, options).await?;
1473                Ok(MobMachineCommandResult::Reconcile(Box::new(report)))
1474            }
1475            MobMachineCommand::ListMembersMatching { filter } => {
1476                let members = self.handle_list_members_matching(*filter).await;
1477                Ok(MobMachineCommandResult::ListMembers(members))
1478            }
1479            MobMachineCommand::Retire { agent_identity } => {
1480                self.send_actor_command(|reply_tx| MobCommand::Retire {
1481                    agent_identity,
1482                    reply_tx,
1483                })
1484                .await??;
1485                Ok(MobMachineCommandResult::Unit)
1486            }
1487            MobMachineCommand::Respawn {
1488                agent_identity,
1489                initial_message,
1490            } => {
1491                let receipt = self
1492                    .send_actor_command(|reply_tx| MobCommand::Respawn {
1493                        agent_identity,
1494                        initial_message,
1495                        reply_tx,
1496                    })
1497                    .await?;
1498                Ok(MobMachineCommandResult::Respawn(receipt))
1499            }
1500            MobMachineCommand::RetireAll => {
1501                self.send_actor_command(|reply_tx| MobCommand::RetireAll { reply_tx })
1502                    .await??;
1503                Ok(MobMachineCommandResult::Unit)
1504            }
1505            MobMachineCommand::SubmitWork(cmd) => {
1506                // Shell dispatch is a thin forward: the mob actor owns
1507                // work-origin legality via the MobMachine DSL. There is no
1508                // origin re-decision here — `spec.origin` is forwarded
1509                // verbatim and the DSL accepts or rejects.
1510                let crate::mob_machine::SubmitWorkCommand {
1511                    runtime_id,
1512                    fence_token,
1513                    work_ref,
1514                    spec,
1515                    handling_mode,
1516                    render_metadata,
1517                    ack_mode,
1518                } = *cmd;
1519                let receipt_work_ref = work_ref.clone();
1520                let payload = Box::new(super::state::SubmitWorkPayload {
1521                    runtime_id,
1522                    fence_token,
1523                    work_ref,
1524                    content: spec.content,
1525                    origin: spec.origin,
1526                    handling_mode,
1527                    render_metadata,
1528                    ack_mode,
1529                });
1530                self.send_actor_command(|reply_tx| MobCommand::SubmitWork { payload, reply_tx })
1531                    .await??;
1532                Ok(MobMachineCommandResult::WorkReceipt {
1533                    work_ref: receipt_work_ref,
1534                })
1535            }
1536            MobMachineCommand::CancelWork { work_ref } => {
1537                // Work tracking ledger is introduced in C7. Until then,
1538                // individual work cancellation is not supported.
1539                Err(MobError::WorkNotFound(work_ref))
1540            }
1541            MobMachineCommand::CancelAllWork {
1542                runtime_id,
1543                fence_token,
1544            } => {
1545                // Identity derivation is a projection, not a decision: the
1546                // MobMachine DSL CancelAllWork guards own live-runtime
1547                // membership legality; the fence check is a shell-level
1548                // concurrency freshness invariant. The actor's unified
1549                // `handle_cancel_all_work` forwards both to the DSL and
1550                // then dispatches the interrupt when the machine accepts.
1551                self.send_actor_command(|reply_tx| MobCommand::CancelAllWork {
1552                    runtime_id,
1553                    fence_token,
1554                    reply_tx,
1555                })
1556                .await??;
1557                Ok(MobMachineCommandResult::Unit)
1558            }
1559            MobMachineCommand::Stop => {
1560                self.send_actor_command(|reply_tx| MobCommand::Stop { reply_tx })
1561                    .await??;
1562                Ok(MobMachineCommandResult::Unit)
1563            }
1564            MobMachineCommand::Resume => {
1565                self.send_actor_command(|reply_tx| MobCommand::ResumeLifecycle { reply_tx })
1566                    .await??;
1567                Ok(MobMachineCommandResult::Unit)
1568            }
1569            MobMachineCommand::Complete => {
1570                self.send_actor_command(|reply_tx| MobCommand::Complete { reply_tx })
1571                    .await??;
1572                Ok(MobMachineCommandResult::Unit)
1573            }
1574            MobMachineCommand::Reset => {
1575                self.send_actor_command(|reply_tx| MobCommand::Reset { reply_tx })
1576                    .await??;
1577                Ok(MobMachineCommandResult::Unit)
1578            }
1579            MobMachineCommand::Destroy => {
1580                let reply = self
1581                    .send_actor_command(|reply_tx| MobCommand::Destroy { reply_tx })
1582                    .await?;
1583                match reply {
1584                    Ok(report) => Ok(MobMachineCommandResult::DestroyReport(report)),
1585                    Err(MobDestroyError::Mob(error)) => Err(error),
1586                    Err(MobDestroyError::Incomplete { report }) => Err(MobError::Internal(
1587                        format!("destroy incomplete: {}", report.error_summary()),
1588                    )),
1589                }
1590            }
1591            MobMachineCommand::RosterSnapshot => {
1592                let roster = self.roster.read().await.snapshot();
1593                Ok(MobMachineCommandResult::RosterSnapshot(roster))
1594            }
1595            MobMachineCommand::ListMembers => {
1596                let members = self
1597                    .send_actor_command(|reply_tx| MobCommand::ProjectMemberList {
1598                        include_retiring: false,
1599                        reply_tx,
1600                    })
1601                    .await?;
1602                Ok(MobMachineCommandResult::ListMembers(members))
1603            }
1604            MobMachineCommand::ListMembersIncludingRetiring => {
1605                let members = self
1606                    .send_actor_command(|reply_tx| MobCommand::ProjectMemberList {
1607                        include_retiring: true,
1608                        reply_tx,
1609                    })
1610                    .await?;
1611                Ok(MobMachineCommandResult::ListMembersIncludingRetiring(
1612                    members,
1613                ))
1614            }
1615            MobMachineCommand::ListAllMembers => {
1616                let members = self.roster.read().await.list_all().cloned().collect();
1617                Ok(MobMachineCommandResult::ListAllMembers(members))
1618            }
1619            MobMachineCommand::MemberStatus { agent_identity } => {
1620                let snapshot = self
1621                    .send_actor_command(|reply_tx| MobCommand::ProjectMemberStatus {
1622                        agent_identity: AgentIdentity::from(agent_identity.as_str()),
1623                        reply_tx,
1624                    })
1625                    .await?;
1626                Ok(MobMachineCommandResult::MemberStatus(snapshot))
1627            }
1628            MobMachineCommand::SubscribeAgentEvents { agent_identity } => {
1629                let stream = self
1630                    .send_actor_command(|reply_tx| MobCommand::SubscribeAgentEvents {
1631                        agent_identity,
1632                        reply_tx,
1633                    })
1634                    .await??;
1635                Ok(MobMachineCommandResult::EventStream(stream))
1636            }
1637            MobMachineCommand::SubscribeAllAgentEvents => {
1638                let streams = self
1639                    .send_actor_command(|reply_tx| MobCommand::SubscribeAllAgentEvents { reply_tx })
1640                    .await??;
1641                Ok(MobMachineCommandResult::AllAgentEventStreams(streams))
1642            }
1643            MobMachineCommand::SubscribeMobEvents { config } => {
1644                Ok(MobMachineCommandResult::MobEventRouter(
1645                    super::event_router::spawn_event_router(self.clone(), config),
1646                ))
1647            }
1648            MobMachineCommand::PollEvents {
1649                after_cursor,
1650                limit,
1651            } => {
1652                let events = if self.status().await? == MobState::Destroyed {
1653                    self.events
1654                        .poll(after_cursor, limit)
1655                        .await
1656                        .map_err(MobError::from)?
1657                } else {
1658                    self.send_actor_command(|reply_tx| MobCommand::PollEvents {
1659                        after_cursor,
1660                        limit,
1661                        reply_tx,
1662                    })
1663                    .await??
1664                };
1665                Ok(MobMachineCommandResult::MobEvents(events))
1666            }
1667            MobMachineCommand::ReplayAllEvents => {
1668                let events = if self.status().await? == MobState::Destroyed {
1669                    self.events.replay_all().await.map_err(MobError::from)?
1670                } else {
1671                    self.send_actor_command(|reply_tx| MobCommand::ReplayAllEvents { reply_tx })
1672                        .await??
1673                };
1674                Ok(MobMachineCommandResult::MobEvents(events))
1675            }
1676            MobMachineCommand::RecordOperatorActionProvenance {
1677                tool_name,
1678                authority_context,
1679            } => {
1680                self.send_actor_command(|reply_tx| MobCommand::RecordOperatorActionProvenance {
1681                    tool_name,
1682                    authority_context,
1683                    reply_tx,
1684                })
1685                .await??;
1686                Ok(MobMachineCommandResult::Unit)
1687            }
1688            MobMachineCommand::GetMember { agent_identity } => {
1689                let member = self.roster.read().await.entry(&agent_identity);
1690                Ok(MobMachineCommandResult::GetMember(member))
1691            }
1692            #[cfg(test)]
1693            MobMachineCommand::FlowTrackerCounts => {
1694                let counts = self
1695                    .send_actor_command(|reply_tx| MobCommand::FlowTrackerCounts { reply_tx })
1696                    .await?;
1697                Ok(MobMachineCommandResult::FlowTrackerCounts(counts))
1698            }
1699            #[cfg(test)]
1700            MobMachineCommand::OrchestratorSnapshot => {
1701                let snapshot = self
1702                    .send_actor_command(|reply_tx| MobCommand::OrchestratorSnapshot { reply_tx })
1703                    .await?;
1704                Ok(MobMachineCommandResult::OrchestratorSnapshot(snapshot))
1705            }
1706            #[cfg(test)]
1707            MobMachineCommand::LifecycleSnapshot => {
1708                let snapshot = self
1709                    .send_actor_command(|reply_tx| MobCommand::LifecycleSnapshot { reply_tx })
1710                    .await?;
1711                Ok(MobMachineCommandResult::LifecycleSnapshot(snapshot))
1712            }
1713            #[cfg(test)]
1714            MobMachineCommand::LifecycleNotificationBurst { count, message } => {
1715                self.send_actor_command(|reply_tx| MobCommand::LifecycleNotificationBurst {
1716                    count,
1717                    message,
1718                    reply_tx,
1719                })
1720                .await??;
1721                Ok(MobMachineCommandResult::LifecycleNotificationBurst)
1722            }
1723            #[cfg(test)]
1724            MobMachineCommand::DslT2Snapshot => {
1725                let snapshot = self
1726                    .send_actor_command(|reply_tx| MobCommand::DslT2Snapshot { reply_tx })
1727                    .await?;
1728                Ok(MobMachineCommandResult::DslT2Snapshot(snapshot))
1729            }
1730            MobMachineCommand::SetSpawnPolicy { policy } => {
1731                self.send_actor_command(|reply_tx| MobCommand::SetSpawnPolicy { policy, reply_tx })
1732                    .await??;
1733                Ok(MobMachineCommandResult::Unit)
1734            }
1735            MobMachineCommand::Shutdown => {
1736                self.send_actor_command(|reply_tx| MobCommand::Shutdown { reply_tx })
1737                    .await??;
1738                Ok(MobMachineCommandResult::Unit)
1739            }
1740            MobMachineCommand::ForceCancel { agent_identity } => {
1741                self.send_actor_command(|reply_tx| MobCommand::ForceCancel {
1742                    agent_identity,
1743                    reply_tx,
1744                })
1745                .await??;
1746                Ok(MobMachineCommandResult::Unit)
1747            }
1748            MobMachineCommand::Wire { local, target } => {
1749                self.send_actor_command(|reply_tx| MobCommand::Wire {
1750                    local,
1751                    target,
1752                    reply_tx,
1753                })
1754                .await??;
1755                Ok(MobMachineCommandResult::Unit)
1756            }
1757            MobMachineCommand::WireMembersBatch { edges } => {
1758                let report = self
1759                    .send_actor_command(|reply_tx| MobCommand::WireMembersBatch { edges, reply_tx })
1760                    .await??;
1761                Ok(MobMachineCommandResult::WireMembersBatchReport(report))
1762            }
1763            MobMachineCommand::Unwire { local, target } => {
1764                self.send_actor_command(|reply_tx| MobCommand::Unwire {
1765                    local,
1766                    target,
1767                    reply_tx,
1768                })
1769                .await??;
1770                Ok(MobMachineCommandResult::Unit)
1771            }
1772        }
1773    }
1774
1775    async fn execute_destroy_machine_command(
1776        &self,
1777        command: MobMachineCommand,
1778    ) -> Result<MobMachineCommandResult, MobDestroyError> {
1779        match command {
1780            MobMachineCommand::Destroy => {
1781                let reply = self
1782                    .send_actor_command(|reply_tx| MobCommand::Destroy { reply_tx })
1783                    .await
1784                    .map_err(MobDestroyError::from)?;
1785                match reply {
1786                    Ok(report) => Ok(MobMachineCommandResult::DestroyReport(report)),
1787                    Err(error) => Err(error),
1788                }
1789            }
1790            _ => Err(MobDestroyError::from(MobError::Internal(
1791                "unsupported destroy machine command".into(),
1792            ))),
1793        }
1794    }
1795
1796    /// Poll mob events from the underlying store.
1797    pub async fn poll_events(
1798        &self,
1799        after_cursor: u64,
1800        limit: usize,
1801    ) -> Result<Vec<crate::event::MobEvent>, MobError> {
1802        match self
1803            .execute_machine_command(MobMachineCommand::PollEvents {
1804                after_cursor,
1805                limit,
1806            })
1807            .await?
1808        {
1809            MobMachineCommandResult::MobEvents(events) => Ok(events),
1810            _ => Err(MobError::Internal(
1811                "unexpected command result variant".into(),
1812            )),
1813        }
1814    }
1815
1816    /// Current mob lifecycle state, read directly from the DSL authority
1817    /// via the actor command channel. There is no atomic shadow — the DSL
1818    /// authority is the single source of truth (dogma #1, #13, #17).
1819    ///
1820    /// After `Destroy` has terminated the actor, the command channel is
1821    /// closed and this returns `Ok(MobState::Destroyed)`; callers that need
1822    /// post-destroy event replay should go through the `MobEventsView`.
1823    pub async fn status(&self) -> Result<MobState, MobError> {
1824        match self
1825            .send_actor_command(|reply_tx| MobCommand::QueryPhase { reply_tx })
1826            .await
1827        {
1828            Ok(state) => Ok(state),
1829            // If the actor task has exited (after Shutdown / Destroy) the
1830            // command channel send or receive fails. Fall back to the
1831            // terminal-phase watch, which the actor updates after every
1832            // DSL phase transition so its last observed value is the
1833            // authoritative terminal phase.
1834            Err(MobError::Internal(_)) => Ok(*self.phase_watch_rx.borrow()),
1835            Err(other) => Err(other),
1836        }
1837    }
1838
1839    /// Access the mob definition.
1840    pub fn definition(&self) -> &MobDefinition {
1841        &self.definition
1842    }
1843
1844    /// Mob ID.
1845    pub fn mob_id(&self) -> &MobId {
1846        &self.definition.id
1847    }
1848
1849    /// Snapshot of the current roster.
1850    pub async fn roster(&self) -> Roster {
1851        match self
1852            .execute_machine_command(MobMachineCommand::RosterSnapshot)
1853            .await
1854        {
1855            Ok(MobMachineCommandResult::RosterSnapshot(roster)) => roster,
1856            Ok(_) => {
1857                tracing::error!("unexpected command result variant");
1858                Default::default()
1859            }
1860            Err(_) => Roster::new(),
1861        }
1862    }
1863
1864    fn derived_comms_name(&self, entry: &RosterEntry) -> String {
1865        format!(
1866            "{}/{}/{}",
1867            self.definition.id, entry.role, entry.agent_identity
1868        )
1869    }
1870
1871    async fn resolve_peer_connectivity(
1872        &self,
1873        entry: &RosterEntry,
1874        bridge_session_id: &SessionId,
1875        roster_snapshot: &Roster,
1876    ) -> Option<MobPeerConnectivitySnapshot> {
1877        let comms = self
1878            .session_service
1879            .comms_runtime(bridge_session_id)
1880            .await?;
1881        let peers = comms.peers().await;
1882        let peers_by_id: HashMap<PeerId, &PeerDirectoryEntry> =
1883            peers.iter().map(|peer| (peer.peer_id, peer)).collect();
1884        let peers_by_name: HashMap<&str, &PeerDirectoryEntry> = peers
1885            .iter()
1886            .map(|peer| (peer.name.as_str(), peer))
1887            .collect();
1888
1889        let mut reachable_peer_count = 0usize;
1890        let mut unknown_peer_count = 0usize;
1891        let mut unreachable_peers = Vec::new();
1892
1893        for wired_peer in &entry.wired_to {
1894            let wired_peer_meerkat = MeerkatId::from(wired_peer);
1895            let matched = if let Some(spec) = entry.external_peer_specs.get(&wired_peer_meerkat) {
1896                peers_by_id
1897                    .get(&spec.peer_id)
1898                    .copied()
1899                    .or_else(|| peers_by_name.get(spec.name.as_str()).copied())
1900            } else {
1901                let local_entry = roster_snapshot.get(&wired_peer_meerkat);
1902                let live_peer_id = match local_entry
1903                    .and_then(|peer_entry| peer_entry.member_ref.bridge_session_id())
1904                {
1905                    Some(target_session_id) => self
1906                        .session_service
1907                        .comms_runtime(target_session_id)
1908                        .await
1909                        .and_then(|runtime| runtime.peer_id()),
1910                    None => None,
1911                };
1912                live_peer_id
1913                    .and_then(|peer_id| peers_by_id.get(&peer_id).copied())
1914                    .or_else(|| {
1915                        local_entry
1916                            .and_then(|peer_entry| peer_entry.peer_id.as_ref())
1917                            .and_then(|peer_id| peers_by_id.get(peer_id).copied())
1918                    })
1919                    .or_else(|| {
1920                        local_entry
1921                            .map(|peer_entry| self.derived_comms_name(peer_entry))
1922                            .and_then(|name| peers_by_name.get(name.as_str()).copied())
1923                    })
1924            };
1925
1926            match matched {
1927                Some(peer) => match peer.reachability {
1928                    PeerReachability::Reachable => reachable_peer_count += 1,
1929                    PeerReachability::Unknown => unknown_peer_count += 1,
1930                    PeerReachability::Unreachable => unreachable_peers.push(MobUnreachablePeer {
1931                        peer: peer.name.as_string(),
1932                        reason: peer.last_unreachable_reason,
1933                    }),
1934                },
1935                None => unknown_peer_count += 1,
1936            }
1937        }
1938
1939        Some(MobPeerConnectivitySnapshot {
1940            reachable_peer_count,
1941            unknown_peer_count,
1942            unreachable_peers,
1943        })
1944    }
1945
1946    /// List members as an operational projection surface.
1947    ///
1948    /// This includes structural roster fields plus current runtime status,
1949    /// error/finality state, and the current session binding when known.
1950    /// It intentionally skips live peer-connectivity fanout so ordinary
1951    /// membership polling cannot stall on comms reachability lookups.
1952    /// For low-level structural roster visibility without runtime projection,
1953    /// use [`list_all_members`](Self::list_all_members).
1954    pub async fn list_members(&self) -> Vec<MobMemberListEntry> {
1955        match self
1956            .execute_machine_command(MobMachineCommand::ListMembers)
1957            .await
1958        {
1959            Ok(MobMachineCommandResult::ListMembers(entries)) => entries,
1960            Ok(_) => {
1961                tracing::error!("unexpected command result variant");
1962                Default::default()
1963            }
1964            Err(_) => Vec::new(),
1965        }
1966    }
1967
1968    /// List all members including those in `Retiring` state, with canonical
1969    /// lifecycle/session projection.
1970    ///
1971    /// Like [`list_members`](Self::list_members), this intentionally avoids
1972    /// live peer-connectivity fanout. Use [`member_status`](Self::member_status)
1973    /// for deep per-member inspection including live comms reachability.
1974    pub async fn list_members_including_retiring(&self) -> Vec<MobMemberListEntry> {
1975        if let Some(entries) = self.inflight_retiring_member_list().await {
1976            return entries;
1977        }
1978        match self
1979            .execute_machine_command(MobMachineCommand::ListMembersIncludingRetiring)
1980            .await
1981        {
1982            Ok(MobMachineCommandResult::ListMembersIncludingRetiring(entries)) => entries,
1983            Ok(_) => {
1984                tracing::error!("unexpected command result variant");
1985                Default::default()
1986            }
1987            Err(_) => Vec::new(),
1988        }
1989    }
1990
1991    async fn inflight_retiring_member_list(&self) -> Option<Vec<MobMemberListEntry>> {
1992        let entries: Vec<_> = {
1993            let roster = self.roster.read().await;
1994            let entries: Vec<_> = roster.list_all().cloned().collect();
1995            if !entries
1996                .iter()
1997                .any(|entry| entry.state == crate::roster::MemberState::Retiring)
1998            {
1999                return None;
2000            }
2001            entries
2002        };
2003        let machine_state = self.machine_state_watch_rx.borrow().clone();
2004        Some(self.project_member_list_entries_from_machine_state(entries, &machine_state))
2005    }
2006
2007    fn project_member_list_entries_from_machine_state(
2008        &self,
2009        entries: Vec<RosterEntry>,
2010        machine_state: &mob_dsl::MobMachineState,
2011    ) -> Vec<MobMemberListEntry> {
2012        entries
2013            .into_iter()
2014            .map(|entry| {
2015                let domain_identity =
2016                    crate::ids::AgentIdentity::from(entry.agent_identity.as_str());
2017                let dsl_identity = mob_dsl::AgentIdentity::from_domain(&domain_identity);
2018                let machine_bridge_session_id = machine_state
2019                    .member_session_bindings
2020                    .get(&dsl_identity)
2021                    .and_then(|dsl_session_id| SessionId::parse(&dsl_session_id.0).ok());
2022                let current_bridge_session_id = entry
2023                    .member_ref
2024                    .bridge_session_id()
2025                    .cloned()
2026                    .or(machine_bridge_session_id);
2027                let material = MobMemberLifecycleProjection::materialize(MobMemberLifecycleInput {
2028                    member_present: true,
2029                    machine_lifecycle: machine_state
2030                        .member_lifecycle_for_identity(&dsl_identity, true),
2031                    output_preview: None,
2032                    tokens_used: 0,
2033                    agent_runtime_id: entry.agent_runtime_id.clone(),
2034                    fence_token: entry.fence_token,
2035                    current_bridge_session_id,
2036                    peer_connectivity: None,
2037                    kickoff: entry.kickoff.clone(),
2038                });
2039                let snapshot = material.to_snapshot();
2040                let current_bridge_session_id = snapshot.current_bridge_session_id().cloned();
2041                MobMemberListEntry {
2042                    agent_identity: entry.agent_identity,
2043                    agent_runtime_id: entry.agent_runtime_id,
2044                    fence_token: entry.fence_token,
2045                    role: entry.role,
2046                    runtime_mode: entry.runtime_mode,
2047                    peer_id: entry.peer_id,
2048                    transport_public_key: entry.transport_public_key,
2049                    state: entry.state,
2050                    wired_to: entry.wired_to,
2051                    external_peer_specs: entry.external_peer_specs,
2052                    labels: entry.labels,
2053                    status: snapshot.status,
2054                    error: snapshot.error,
2055                    is_final: snapshot.is_final,
2056                    current_session_id: None,
2057                    current_bridge_session_id: None,
2058                    kickoff: snapshot.kickoff,
2059                }
2060                .with_current_bridge_session_id(current_bridge_session_id)
2061            })
2062            .collect()
2063    }
2064
2065    /// List members currently eligible for runtime work dispatch.
2066    ///
2067    /// Excludes retiring, completed, broken, or unknown members even if they
2068    /// still appear in the public operational projection.
2069    pub(crate) async fn list_runnable_members(&self) -> Vec<MobMemberListEntry> {
2070        self.list_members()
2071            .await
2072            .into_iter()
2073            .filter(|entry| {
2074                entry.state == crate::roster::MemberState::Active
2075                    && entry.status == MobMemberStatus::Active
2076            })
2077            .collect()
2078    }
2079
2080    /// List all members including those in `Retiring` state.
2081    ///
2082    /// The `state` field on each [`RosterEntry`] distinguishes `Active` from
2083    /// `Retiring`. Use this for observability and membership inspection where
2084    /// in-flight retires should be visible.
2085    pub async fn list_all_members(&self) -> Vec<RosterEntry> {
2086        self.roster.read().await.list_all().cloned().collect()
2087    }
2088
2089    /// Get a specific member entry by identity.
2090    pub async fn get_member(&self, identity: &AgentIdentity) -> Option<RosterEntry> {
2091        let meerkat_id = MeerkatId::from(identity);
2092        match self
2093            .execute_machine_command(MobMachineCommand::GetMember {
2094                agent_identity: meerkat_id,
2095            })
2096            .await
2097        {
2098            Ok(MobMachineCommandResult::GetMember(entry)) => entry,
2099            Ok(_) => {
2100                tracing::error!("unexpected command result variant");
2101                Default::default()
2102            }
2103            Err(_) => None,
2104        }
2105    }
2106
2107    /// Get a specific member entry by legacy MeerkatId (bridge helper).
2108    pub(crate) async fn get_member_by_meerkat_id(
2109        &self,
2110        agent_identity: &MeerkatId,
2111    ) -> Option<RosterEntry> {
2112        self.get_member(&AgentIdentity::from(agent_identity.as_str()))
2113            .await
2114    }
2115
2116    /// Resolve the backing bridge session ID for a member by identity.
2117    ///
2118    /// # When to use this
2119    ///
2120    /// This is the canonical identity → bridge session mapping used by
2121    /// **surface implementations** (RPC/MCP/REST handlers, web-runtime
2122    /// wrappers) that must delegate a mob-identity action to a
2123    /// session-scoped canonical API — e.g. `mob/turn_start` delegating to
2124    /// the runtime's `turn/start`, or a delegation tool projecting
2125    /// assistant output from a helper's backing session. Returns `None` if
2126    /// the member is not found or has no bridge session binding.
2127    ///
2128    /// # When not to use it
2129    ///
2130    /// Application code acting on a mob should prefer the identity-native
2131    /// [`MobHandle`] APIs: [`MobHandle::member`] to acquire a
2132    /// capability-bearing handle, [`MobHandle::internal_turn`] to deliver
2133    /// content without the RPC turn-start dance, [`MobHandle::peer_send`]
2134    /// / [`MobHandle::member_send`] for peer comms, etc. Those hide the
2135    /// session_id entirely.
2136    ///
2137    /// # Dogma fit (A8)
2138    ///
2139    /// DELETE_ME finding A8 flagged this method as contradicting the
2140    /// "hide session_id from callers" principle of identity-first mobs.
2141    /// The apparent contradiction was a scoping confusion: identity-first
2142    /// hides session_id from **consumers of the public mob surface**
2143    /// (application code, end-users, SDK clients). Surface implementations
2144    /// must still bridge identity to session when delegating to the
2145    /// canonical session-scoped runtime APIs they don't own themselves —
2146    /// that delegation is explicitly permitted by
2147    /// `docs/architecture/meerkat-runtime-dogma.md` principle #3
2148    /// ("shell owns mechanics, not meaning"). The resolver reads the
2149    /// roster's canonical identity→bridge mapping; no parallel truth is
2150    /// introduced. Regression
2151    /// `resolve_bridge_session_id_is_lookup_not_mutation` proves this is
2152    /// a pure read against the single owner (the mob roster).
2153    pub async fn resolve_bridge_session_id(&self, identity: &AgentIdentity) -> Option<SessionId> {
2154        self.get_member(identity)
2155            .await
2156            .and_then(|entry| entry.member_ref.bridge_session_id().cloned())
2157    }
2158
2159    /// Acquire a capability-bearing handle for a specific active member.
2160    pub async fn member(&self, identity: &AgentIdentity) -> Result<MemberHandle, MobError> {
2161        let meerkat_id = MeerkatId::from(identity);
2162        if let Some(diag) = self.restore_failure_for(&meerkat_id).await {
2163            return Err(Self::restore_failure_error(&meerkat_id, diag));
2164        }
2165        let entry = self
2166            .get_member(identity)
2167            .await
2168            .ok_or_else(|| MobError::MemberNotFound(meerkat_id.clone()))?;
2169        if entry.state != crate::roster::MemberState::Active {
2170            return Err(MobError::MemberNotFound(meerkat_id.clone()));
2171        }
2172        Ok(MemberHandle {
2173            mob: self.clone(),
2174            agent_identity: meerkat_id,
2175        })
2176    }
2177
2178    /// Access a read-only events view for polling, replay, and subscription.
2179    pub fn events(&self) -> MobEventsView {
2180        MobEventsView {
2181            handle: self.clone(),
2182        }
2183    }
2184
2185    /// Append a dispatcher-owned operator provenance projection.
2186    ///
2187    /// This is audit/projection data only. It must never become
2188    /// authorization truth.
2189    pub async fn record_operator_action_provenance(
2190        &self,
2191        tool_name: &str,
2192        authority_context: &MobToolAuthorityContext,
2193    ) -> Result<(), MobError> {
2194        match self
2195            .execute_machine_command(MobMachineCommand::RecordOperatorActionProvenance {
2196                tool_name: tool_name.to_string(),
2197                authority_context: authority_context.clone(),
2198            })
2199            .await?
2200        {
2201            MobMachineCommandResult::Unit => Ok(()),
2202            _ => Err(MobError::Internal(
2203                "unexpected command result variant".into(),
2204            )),
2205        }
2206    }
2207
2208    /// Subscribe to agent-level events for a specific member.
2209    ///
2210    /// Looks up the member's backing bridge session from the roster, then
2211    /// subscribes to the session-level event stream via [`MobSessionService`].
2212    ///
2213    /// Returns `MobError::MemberNotFound` if the member is not in the
2214    /// roster or has no backing bridge session.
2215    pub async fn subscribe_agent_events(
2216        &self,
2217        identity: &AgentIdentity,
2218    ) -> Result<EventStream, MobError> {
2219        match self
2220            .execute_machine_command(MobMachineCommand::SubscribeAgentEvents {
2221                agent_identity: MeerkatId::from(identity),
2222            })
2223            .await?
2224        {
2225            MobMachineCommandResult::EventStream(stream) => Ok(stream),
2226            _ => Err(MobError::Internal(
2227                "unexpected command result variant".into(),
2228            )),
2229        }
2230    }
2231
2232    /// Subscribe to agent events for all active members (point-in-time snapshot).
2233    ///
2234    /// Returns one stream per active member that has a live bridge binding. Members
2235    /// spawned after this call are not included — use [`subscribe_mob_events`]
2236    /// for a continuously updated view.
2237    pub async fn subscribe_all_agent_events(
2238        &self,
2239    ) -> Result<Vec<(AgentIdentity, EventStream)>, MobError> {
2240        match self
2241            .execute_machine_command(MobMachineCommand::SubscribeAllAgentEvents)
2242            .await
2243        {
2244            Ok(MobMachineCommandResult::AllAgentEventStreams(streams)) => Ok(streams
2245                .into_iter()
2246                .map(|(mid, stream)| (AgentIdentity::from(mid.as_str()), stream))
2247                .collect()),
2248            Ok(_) => {
2249                tracing::error!("unexpected command result variant");
2250                Err(MobError::Internal(
2251                    "unexpected command result variant".into(),
2252                ))
2253            }
2254            Err(error) => Err(error),
2255        }
2256    }
2257
2258    /// Subscribe to a continuously-updated, mob-level event bus.
2259    ///
2260    /// Spawns an independent task that merges per-member session streams,
2261    /// tags each event with [`AttributedEvent`], and tracks roster changes
2262    /// (spawns/retires) automatically. Drop the returned handle to stop
2263    /// the router.
2264    pub async fn subscribe_mob_events(&self) -> super::event_router::MobEventRouterHandle {
2265        self.subscribe_mob_events_with_config(super::event_router::MobEventRouterConfig::default())
2266            .await
2267    }
2268
2269    /// Like [`subscribe_mob_events`](Self::subscribe_mob_events) with explicit config.
2270    pub async fn subscribe_mob_events_with_config(
2271        &self,
2272        config: super::event_router::MobEventRouterConfig,
2273    ) -> super::event_router::MobEventRouterHandle {
2274        match self
2275            .execute_machine_command(MobMachineCommand::SubscribeMobEvents { config })
2276            .await
2277        {
2278            Ok(MobMachineCommandResult::MobEventRouter(handle)) => handle,
2279            Ok(_) => {
2280                tracing::error!("unexpected command result variant for subscribe_mob_events");
2281                super::event_router::spawn_event_router(self.clone(), config)
2282            }
2283            Err(_) => super::event_router::spawn_event_router(self.clone(), config),
2284        }
2285    }
2286
2287    /// Start a flow run and return its run ID.
2288    pub async fn run_flow(
2289        &self,
2290        flow_id: FlowId,
2291        params: serde_json::Value,
2292    ) -> Result<RunId, MobError> {
2293        self.run_flow_with_stream(flow_id, params, None).await
2294    }
2295
2296    /// Start a flow run with an optional scoped stream sink.
2297    pub async fn run_flow_with_stream(
2298        &self,
2299        flow_id: FlowId,
2300        params: serde_json::Value,
2301        scoped_event_tx: Option<mpsc::Sender<meerkat_core::ScopedAgentEvent>>,
2302    ) -> Result<RunId, MobError> {
2303        match self
2304            .execute_machine_command(MobMachineCommand::RunFlow {
2305                flow_id,
2306                activation_params: params,
2307                scoped_event_tx,
2308            })
2309            .await?
2310        {
2311            MobMachineCommandResult::RunId(run_id) => Ok(run_id),
2312            _ => Err(MobError::Internal(
2313                "unexpected command result variant".into(),
2314            )),
2315        }
2316    }
2317
2318    /// Request cancellation of an in-flight flow run.
2319    pub async fn cancel_flow(&self, run_id: RunId) -> Result<(), MobError> {
2320        match self
2321            .execute_machine_command(MobMachineCommand::CancelFlow { run_id })
2322            .await?
2323        {
2324            MobMachineCommandResult::Unit => Ok(()),
2325            _ => Err(MobError::Internal(
2326                "unexpected command result variant".into(),
2327            )),
2328        }
2329    }
2330
2331    /// Fetch a flow run snapshot from the run store.
2332    pub async fn flow_status(&self, run_id: RunId) -> Result<Option<MobRun>, MobError> {
2333        match self
2334            .execute_machine_command(MobMachineCommand::FlowStatus { run_id })
2335            .await?
2336        {
2337            MobMachineCommandResult::FlowStatus(status) => Ok(status),
2338            _ => Err(MobError::Internal(
2339                "unexpected command result variant".into(),
2340            )),
2341        }
2342    }
2343
2344    /// List flow runs for this mob, optionally filtered to one flow ID.
2345    pub async fn list_runs(&self, flow_id: Option<&FlowId>) -> Result<Vec<MobRun>, MobError> {
2346        self.run_store
2347            .list_runs(&self.definition.id, flow_id)
2348            .await
2349            .map_err(MobError::from)
2350    }
2351
2352    /// List all configured flow IDs in this mob definition.
2353    pub fn list_flows(&self) -> Vec<FlowId> {
2354        self.definition.flows.keys().cloned().collect()
2355    }
2356
2357    /// Spawn a new member from a profile and return its member reference.
2358    #[cfg(test)]
2359    pub(crate) async fn spawn(
2360        &self,
2361        profile_name: ProfileName,
2362        agent_identity: MeerkatId,
2363        initial_message: Option<ContentInput>,
2364    ) -> Result<MemberRef, MobError> {
2365        self.spawn_with_options(profile_name, agent_identity, initial_message, None, None)
2366            .await
2367    }
2368
2369    /// Spawn a new member with an explicit runtime binding.
2370    #[cfg(test)]
2371    pub(crate) async fn spawn_with_binding(
2372        &self,
2373        profile_name: ProfileName,
2374        agent_identity: MeerkatId,
2375        initial_message: Option<ContentInput>,
2376        binding: crate::RuntimeBinding,
2377    ) -> Result<MemberRef, MobError> {
2378        let mut spec = SpawnMemberSpec::new(profile_name, agent_identity);
2379        spec.initial_message = initial_message;
2380        spec.binding = Some(binding);
2381        self.spawn_spec_internal(spec).await
2382    }
2383
2384    /// Spawn a new member from a profile with explicit backend override.
2385    #[cfg(test)]
2386    pub(crate) async fn spawn_with_backend(
2387        &self,
2388        profile_name: ProfileName,
2389        agent_identity: MeerkatId,
2390        initial_message: Option<ContentInput>,
2391        backend: Option<MobBackendKind>,
2392    ) -> Result<MemberRef, MobError> {
2393        self.spawn_with_options(profile_name, agent_identity, initial_message, None, backend)
2394            .await
2395    }
2396
2397    /// Spawn a new member from a profile with explicit runtime mode/backend overrides.
2398    #[cfg(test)]
2399    pub(crate) async fn spawn_with_options(
2400        &self,
2401        profile_name: ProfileName,
2402        agent_identity: MeerkatId,
2403        initial_message: Option<ContentInput>,
2404        runtime_mode: Option<crate::MobRuntimeMode>,
2405        backend: Option<MobBackendKind>,
2406    ) -> Result<MemberRef, MobError> {
2407        let mut spec = SpawnMemberSpec::new(profile_name, agent_identity);
2408        spec.initial_message = initial_message;
2409        spec.runtime_mode = runtime_mode;
2410        spec.backend = backend;
2411        self.spawn_spec_internal(spec).await
2412    }
2413
2414    /// Attach an existing session by reusing the mob spawn control-plane path.
2415    #[cfg(test)]
2416    pub(crate) async fn attach_existing_session(
2417        &self,
2418        profile_name: ProfileName,
2419        agent_identity: MeerkatId,
2420        session_id: meerkat_core::types::SessionId,
2421        runtime_mode: Option<crate::MobRuntimeMode>,
2422        backend: Option<MobBackendKind>,
2423    ) -> Result<MemberRef, MobError> {
2424        let mut spec = SpawnMemberSpec::new(profile_name, agent_identity);
2425        spec.launch_mode = crate::launch::MemberLaunchMode::Resume {
2426            bridge_session_id: session_id,
2427        };
2428        spec.runtime_mode = runtime_mode;
2429        spec.backend = backend;
2430        self.spawn_spec_internal(spec).await
2431    }
2432
2433    /// Attach an existing session as a regular mob member.
2434    #[cfg(test)]
2435    pub(crate) async fn attach_existing_session_as_member(
2436        &self,
2437        profile_name: ProfileName,
2438        agent_identity: MeerkatId,
2439        session_id: meerkat_core::types::SessionId,
2440    ) -> Result<MemberRef, MobError> {
2441        self.attach_existing_session(profile_name, agent_identity, session_id, None, None)
2442            .await
2443    }
2444
2445    /// Spawn a member from a fully-specified [`SpawnMemberSpec`].
2446    pub async fn spawn_spec(&self, spec: SpawnMemberSpec) -> Result<SpawnResult, MobError> {
2447        let identity = spec.identity.clone();
2448        self.spawn_spec_internal(spec).await?;
2449        // The roster is updated synchronously during spawn finalization,
2450        // so the entry is guaranteed to be present by the time the reply
2451        // arrives.
2452        let entry = self.get_member(&identity).await.ok_or_else(|| {
2453            MobError::Internal(format!(
2454                "spawn succeeded but roster entry missing for '{identity}'"
2455            ))
2456        })?;
2457        Ok(SpawnResult {
2458            agent_identity: entry.agent_identity,
2459            agent_runtime_id: entry.agent_runtime_id,
2460            fence_token: entry.fence_token,
2461        })
2462    }
2463
2464    /// Internal spawn that returns the raw `MemberRef` for crate-internal callers.
2465    pub(crate) async fn spawn_spec_internal(
2466        &self,
2467        spec: SpawnMemberSpec,
2468    ) -> Result<MemberRef, MobError> {
2469        self.spawn_spec_internal_with_source(spec, SpawnSource::Consumer)
2470            .await
2471    }
2472
2473    pub(crate) async fn spawn_spec_internal_with_source(
2474        &self,
2475        spec: SpawnMemberSpec,
2476        spawn_source: SpawnSource,
2477    ) -> Result<MemberRef, MobError> {
2478        let spawn_source = SpawnSource::for_launch_mode(spawn_source, &spec.launch_mode);
2479        match self
2480            .execute_machine_command(MobMachineCommand::Spawn {
2481                spec: Box::new(spec),
2482                spawn_source,
2483                owner_context: None,
2484            })
2485            .await?
2486        {
2487            MobMachineCommandResult::SpawnReceipt(receipt) => Ok(receipt.member_ref),
2488            _ => Err(MobError::Internal(
2489                "unexpected command result variant".into(),
2490            )),
2491        }
2492    }
2493
2494    pub(super) async fn spawn_spec_receipt_with_owner_context(
2495        &self,
2496        spec: SpawnMemberSpec,
2497        owner_context: CanonicalOpsOwnerContext,
2498    ) -> Result<MemberSpawnReceipt, MobError> {
2499        self.spawn_spec_receipt_with_owner_context_and_source(
2500            spec,
2501            owner_context,
2502            SpawnSource::AgentSpawnMember,
2503        )
2504        .await
2505    }
2506
2507    pub(super) async fn spawn_spec_receipt_with_owner_context_and_source(
2508        &self,
2509        spec: SpawnMemberSpec,
2510        owner_context: CanonicalOpsOwnerContext,
2511        spawn_source: SpawnSource,
2512    ) -> Result<MemberSpawnReceipt, MobError> {
2513        match self
2514            .execute_machine_command(MobMachineCommand::Spawn {
2515                spawn_source: SpawnSource::for_launch_mode(spawn_source, &spec.launch_mode),
2516                spec: Box::new(spec),
2517                owner_context: Some(owner_context),
2518            })
2519            .await?
2520        {
2521            MobMachineCommandResult::SpawnReceipt(receipt) => Ok(receipt),
2522            _ => Err(MobError::Internal(
2523                "unexpected command result variant".into(),
2524            )),
2525        }
2526    }
2527
2528    /// Spawn multiple members in parallel.
2529    ///
2530    /// Results preserve input order.
2531    pub async fn spawn_many(
2532        &self,
2533        specs: Vec<SpawnMemberSpec>,
2534    ) -> Vec<Result<SpawnResult, MobError>> {
2535        futures::future::join_all(specs.into_iter().map(|spec| async move {
2536            let identity = spec.identity.clone();
2537            self.spawn_spec_internal_with_source(spec, SpawnSource::BatchItem)
2538                .await?;
2539            let entry = self.get_member(&identity).await.ok_or_else(|| {
2540                MobError::Internal(format!(
2541                    "spawn succeeded but roster entry missing for '{identity}'"
2542                ))
2543            })?;
2544            Ok(SpawnResult {
2545                agent_identity: entry.agent_identity,
2546                agent_runtime_id: entry.agent_runtime_id,
2547                fence_token: entry.fence_token,
2548            })
2549        }))
2550        .await
2551    }
2552
2553    pub(super) async fn spawn_many_receipts_with_owner_context(
2554        &self,
2555        specs: Vec<SpawnMemberSpec>,
2556        owner_context: CanonicalOpsOwnerContext,
2557    ) -> Vec<Result<MemberSpawnReceipt, MobError>> {
2558        futures::future::join_all(specs.into_iter().map(|spec| {
2559            self.spawn_spec_receipt_with_owner_context_and_source(
2560                spec,
2561                owner_context.clone(),
2562                SpawnSource::BatchItem,
2563            )
2564        }))
2565        .await
2566    }
2567
2568    /// Retire a member, archiving its session and removing trust.
2569    pub async fn retire(&self, identity: AgentIdentity) -> Result<(), MobError> {
2570        let meerkat_id = MeerkatId::from(&identity);
2571        match self
2572            .execute_machine_command(MobMachineCommand::Retire {
2573                agent_identity: meerkat_id,
2574            })
2575            .await?
2576        {
2577            MobMachineCommandResult::Unit => Ok(()),
2578            _ => Err(MobError::Internal(
2579                "unexpected command result variant".into(),
2580            )),
2581        }
2582    }
2583
2584    /// Retire a member and respawn with the same profile, labels, wiring, and mode.
2585    ///
2586    /// This is a helper convenience over primitive mob behavior, not a
2587    /// machine-owned primitive. Returns a receipt on full success, or a
2588    /// structured error on failure. No rollback is attempted after retire.
2589    pub async fn respawn(
2590        &self,
2591        identity: AgentIdentity,
2592        initial_message: Option<ContentInput>,
2593    ) -> Result<MemberRespawnReceipt, MobRespawnError> {
2594        let meerkat_id = MeerkatId::from(&identity);
2595        let reply = match self
2596            .execute_machine_command(MobMachineCommand::Respawn {
2597                agent_identity: meerkat_id,
2598                initial_message,
2599            })
2600            .await?
2601        {
2602            MobMachineCommandResult::Respawn(reply) => reply,
2603            _ => {
2604                return Err(MobRespawnError::from(MobError::Internal(
2605                    "unexpected command result variant".into(),
2606                )));
2607            }
2608        };
2609        match reply {
2610            Ok(receipt) => Ok(receipt),
2611            Err(err) => Err(err),
2612        }
2613    }
2614
2615    /// Retire all roster members concurrently in a single actor command.
2616    pub async fn retire_all(&self) -> Result<(), MobError> {
2617        match self
2618            .execute_machine_command(MobMachineCommand::RetireAll)
2619            .await?
2620        {
2621            MobMachineCommandResult::Unit => Ok(()),
2622            _ => Err(MobError::Internal(
2623                "unexpected command result variant".into(),
2624            )),
2625        }
2626    }
2627
2628    /// Core `ensure_member` worker invoked by `execute_machine_command`.
2629    ///
2630    /// Tries to spawn the member; on [`MobError::MemberAlreadyExists`],
2631    /// resolves the existing member via [`list_members`] and wraps it as
2632    /// [`EnsureMemberOutcome::Existed`]. Other spawn errors propagate
2633    /// unchanged.
2634    async fn handle_ensure_member(
2635        &self,
2636        spec: SpawnMemberSpec,
2637    ) -> Result<EnsureMemberOutcome, MobError> {
2638        let identity = spec.identity.clone();
2639        self.project_machine_input(mob_dsl::MobMachineInput::EnsureMember {
2640            agent_identity: mob_dsl::AgentIdentity::from_domain(&identity),
2641        })
2642        .await?;
2643        // `Box::pin` breaks the compiler-visible recursion:
2644        // handle_ensure_member -> spawn_spec -> execute_machine_command ->
2645        // (MobMachineCommand::Spawn arm, which never re-enters this fn).
2646        match Box::pin(self.spawn_spec(spec)).await {
2647            Ok(spawn_result) => Ok(EnsureMemberOutcome::Spawned(spawn_result)),
2648            Err(MobError::MemberAlreadyExists(_)) => {
2649                let existing = Box::pin(self.list_members())
2650                    .await
2651                    .into_iter()
2652                    .find(|entry| entry.agent_identity == identity)
2653                    .ok_or_else(|| {
2654                        MobError::Internal(format!(
2655                            "ensure_member: member '{identity}' reported existing but not found in roster"
2656                        ))
2657                    })?;
2658                Ok(EnsureMemberOutcome::Existed(Box::new(existing)))
2659            }
2660            Err(other) => Err(other),
2661        }
2662    }
2663
2664    /// Core `reconcile` worker invoked by `execute_machine_command`.
2665    ///
2666    /// Compares `desired` against the current roster:
2667    /// * Desired identities present in the roster become `retained`.
2668    /// * Desired identities absent are spawned; successes land in
2669    ///   `spawned`, per-identity failures land in `failures` tagged with
2670    ///   [`ReconcileStage::Spawn`].
2671    /// * When [`ReconcileOptions::retire_stale`] is set, identities in the
2672    ///   roster that are not in `desired` are retired; failures land in
2673    ///   `failures` tagged with [`ReconcileStage::Retire`].
2674    async fn handle_reconcile(
2675        &self,
2676        desired: Vec<SpawnMemberSpec>,
2677        options: ReconcileOptions,
2678    ) -> Result<ReconcileReport, MobError> {
2679        self.project_machine_input(mob_dsl::MobMachineInput::Reconcile {
2680            desired: desired
2681                .iter()
2682                .map(|spec| mob_dsl::AgentIdentity::from_domain(&spec.identity))
2683                .collect(),
2684            retire_stale: options.retire_stale,
2685        })
2686        .await?;
2687
2688        let mut report = ReconcileReport {
2689            desired: desired.iter().map(|spec| spec.identity.clone()).collect(),
2690            ..ReconcileReport::default()
2691        };
2692
2693        let current: std::collections::BTreeSet<AgentIdentity> = Box::pin(self.list_members())
2694            .await
2695            .into_iter()
2696            .map(|entry| entry.agent_identity)
2697            .collect();
2698        let desired_ids: std::collections::BTreeSet<AgentIdentity> =
2699            desired.iter().map(|spec| spec.identity.clone()).collect();
2700
2701        for spec in desired {
2702            let identity = spec.identity.clone();
2703            if current.contains(&identity) {
2704                report.retained.push(identity);
2705                continue;
2706            }
2707            match Box::pin(self.spawn_spec(spec)).await {
2708                Ok(spawn_result) => report.spawned.push(spawn_result),
2709                Err(error) => report.failures.push(ReconcileFailure {
2710                    agent_identity: identity,
2711                    error,
2712                    stage: ReconcileStage::Spawn,
2713                }),
2714            }
2715        }
2716
2717        if options.retire_stale {
2718            for identity in current.difference(&desired_ids).cloned() {
2719                match Box::pin(self.retire(identity.clone())).await {
2720                    Ok(()) => report.retired.push(identity),
2721                    Err(error) => report.failures.push(ReconcileFailure {
2722                        agent_identity: identity,
2723                        error,
2724                        stage: ReconcileStage::Retire,
2725                    }),
2726                }
2727            }
2728        }
2729
2730        Ok(report)
2731    }
2732
2733    /// Core `list_members_matching` worker invoked by
2734    /// `execute_machine_command`. Composition over
2735    /// [`list_members`](Self::list_members) with each constraint applied
2736    /// conjunctively. An empty filter matches every member.
2737    async fn handle_list_members_matching(&self, filter: MemberFilter) -> Vec<MobMemberListEntry> {
2738        Box::pin(self.list_members())
2739            .await
2740            .into_iter()
2741            .filter(|entry| {
2742                if let Some(role) = &filter.role
2743                    && entry.role != *role
2744                {
2745                    return false;
2746                }
2747                if let Some(state) = filter.state
2748                    && entry.state != state
2749                {
2750                    return false;
2751                }
2752                for (key, value) in &filter.labels {
2753                    if entry.labels.get(key).is_none_or(|v| v != value) {
2754                        return false;
2755                    }
2756                }
2757                true
2758            })
2759            .collect()
2760    }
2761
2762    /// Declarative: spawn the member described by `spec` if absent; otherwise
2763    /// return the existing roster entry unchanged.
2764    ///
2765    /// Composition over [`spawn_spec`](Self::spawn_spec) +
2766    /// [`get_member`](Self::get_member). Idempotent with respect to
2767    /// [`SpawnMemberSpec::identity`]. The spec's `initial_message`, launch
2768    /// mode, and other per-spawn options are applied only when a new member
2769    /// is created.
2770    pub async fn ensure_member(
2771        &self,
2772        spec: SpawnMemberSpec,
2773    ) -> Result<EnsureMemberOutcome, MobError> {
2774        match self
2775            .execute_machine_command(MobMachineCommand::EnsureMember {
2776                spec: Box::new(spec),
2777            })
2778            .await?
2779        {
2780            MobMachineCommandResult::EnsureMember(outcome) => Ok(outcome),
2781            _ => Err(MobError::Internal(
2782                "unexpected command result variant".into(),
2783            )),
2784        }
2785    }
2786
2787    /// Declarative: drive the roster toward the `desired` set of specs.
2788    ///
2789    /// For each desired spec, spawn if absent or retain if present. When
2790    /// [`ReconcileOptions::retire_stale`] is set, members whose identity is
2791    /// not in the desired set are retired. Failures are collected per-
2792    /// identity in [`ReconcileReport::failures`] rather than short-circuiting.
2793    ///
2794    /// Composition over spawn + retire + list_members; no new lifecycle.
2795    pub async fn reconcile(
2796        &self,
2797        desired: Vec<SpawnMemberSpec>,
2798        options: ReconcileOptions,
2799    ) -> Result<ReconcileReport, MobError> {
2800        match self
2801            .execute_machine_command(MobMachineCommand::Reconcile { desired, options })
2802            .await?
2803        {
2804            MobMachineCommandResult::Reconcile(report) => Ok(*report),
2805            _ => Err(MobError::Internal(
2806                "unexpected command result variant".into(),
2807            )),
2808        }
2809    }
2810
2811    /// Declarative: list members matching every constraint in `filter`.
2812    ///
2813    /// Composition over [`list_members`](Self::list_members) followed by
2814    /// in-process filtering. An empty filter matches every currently active
2815    /// member. Only the `labels` pairs in `filter` must match (extra labels
2816    /// on the member are allowed); `role`, `state`, and `has_realtime_intent`
2817    /// each apply only when set.
2818    pub async fn list_members_matching(&self, filter: MemberFilter) -> Vec<MobMemberListEntry> {
2819        match self
2820            .execute_machine_command(MobMachineCommand::ListMembersMatching {
2821                filter: Box::new(filter),
2822            })
2823            .await
2824        {
2825            Ok(MobMachineCommandResult::ListMembers(entries)) => entries,
2826            Ok(_) => {
2827                tracing::error!("unexpected command result variant");
2828                Default::default()
2829            }
2830            Err(_) => Vec::new(),
2831        }
2832    }
2833
2834    /// Rotate the persisted mob supervisor authority.
2835    ///
2836    /// # Scope: mob-wide
2837    ///
2838    /// The supervisor authority is a **single per-mob fact** persisted in
2839    /// [`SupervisorAuthorityRecord`](crate::store::SupervisorAuthorityRecord) keyed by
2840    /// `mob_id`. Rotation generates a fresh authority (new public peer id,
2841    /// incremented epoch) and broadcasts
2842    /// [`BridgeCommand::AuthorizeSupervisor`](meerkat_contracts::wire::supervisor_bridge::BridgeCommand)
2843    /// to **every** remote member binding currently on the roster, then
2844    /// advances the persisted local authority only after every remote binding
2845    /// has confirmed the next authority.
2846    ///
2847    /// There is no per-member scope here, and no scoping parameter is
2848    /// missing. Per-member [`BridgeBootstrapToken`](meerkat_contracts::wire::supervisor_bridge::BridgeBootstrapToken)s
2849    /// carried on `MemberRef::BackendPeer` are the **bootstrap proof** that
2850    /// authorizes a specific member's bridge to (re)establish under the
2851    /// current supervisor — they are not a separate supervisor identity.
2852    /// One supervisor, many bootstrap tokens.
2853    ///
2854    /// # Incomplete-rotation semantics
2855    ///
2856    /// If some remote bindings accept the attempted next authority and a later
2857    /// remote rejects it, the rotation fails closed: the persisted current
2858    /// supervisor authority remains at the pre-rotation epoch and callers
2859    /// receive [`MobError::SupervisorRotationIncomplete`]. The attempted
2860    /// authority is retained as explicit pending rotation metadata when any
2861    /// remote remains bound to it. A retry verifies recorded accepted peers
2862    /// with the attempted authority before skipping them, then rotates the
2863    /// remaining peers with the still-current supervisor before committing the
2864    /// attempted authority. If a pending-accepted peer later rebinds to current
2865    /// authority, that stale accepted membership is cleared so retry cannot
2866    /// skip a current-bound peer. Rollback failure is reported on the typed
2867    /// error rather than treated as permission to advance current local
2868    /// authority. If the pending metadata write or stale-accepted clear fails
2869    /// after a remote has already accepted the attempt, the actor keeps a
2870    /// process-local pending authority override so a same-process retry can
2871    /// still reconcile without advancing current authority; restart retry still
2872    /// probes durable accepted peers before trusting them.
2873    ///
2874    /// # Dogma fit (B4)
2875    ///
2876    /// DELETE_ME finding B4 flagged the `&self`-only signature as
2877    /// potentially missing a scoping parameter. After audit the
2878    /// supervisor is unambiguously mob-wide (one
2879    /// `SupervisorAuthorityRecord` per `mob_id`, one persistence key,
2880    /// one rotation broadcast), so a scoping parameter would be
2881    /// fictional. Per dogma principle #1 ("one semantic fact, one
2882    /// owner") the signature already matches the data model.
2883    /// Regression coverage lives in `meerkat-mob/src/runtime/tests.rs`:
2884    /// `test_rotate_supervisor_updates_runtime_metadata`,
2885    /// `test_rotate_supervisor_reauthorizes_live_remote_members_and_rejects_stale_epoch`,
2886    /// `test_rotate_supervisor_bind_fallback_binds_next_authority`, and
2887    /// `test_rotate_supervisor_fails_closed_when_remote_rollback_fails`.
2888    pub async fn rotate_supervisor(&self) -> Result<SupervisorRotationReport, MobError> {
2889        self.send_actor_command(|reply_tx| MobCommand::RotateSupervisor { reply_tx })
2890            .await?
2891    }
2892
2893    /// Wire a local member to either another local member or an external peer.
2894    pub async fn wire<T>(&self, local: AgentIdentity, target: T) -> Result<(), MobError>
2895    where
2896        T: Into<PeerTarget>,
2897    {
2898        match self
2899            .execute_machine_command(MobMachineCommand::Wire {
2900                local: MeerkatId::from(&local),
2901                target: target.into(),
2902            })
2903            .await?
2904        {
2905            MobMachineCommandResult::Unit => Ok(()),
2906            _ => Err(MobError::Internal(
2907                "unexpected command result variant".into(),
2908            )),
2909        }
2910    }
2911
2912    /// Materialize many local-member wiring edges in one actor command.
2913    ///
2914    /// This is intended for initial topology reconciliation, where callers
2915    /// already have a graph snapshot. It only accepts local mob-member
2916    /// identities; external peer wiring stays on the single-edge path because
2917    /// those mutations carry descriptor/rollback semantics per peer.
2918    pub async fn wire_members_batch<I, A, B>(
2919        &self,
2920        edges: I,
2921    ) -> Result<MobWireMembersBatchReport, MobError>
2922    where
2923        I: IntoIterator<Item = (A, B)>,
2924        A: Into<AgentIdentity>,
2925        B: Into<AgentIdentity>,
2926    {
2927        let edges = edges
2928            .into_iter()
2929            .map(|(a, b)| (a.into(), b.into()))
2930            .collect();
2931        match self
2932            .execute_machine_command(MobMachineCommand::WireMembersBatch { edges })
2933            .await?
2934        {
2935            MobMachineCommandResult::WireMembersBatchReport(report) => Ok(report),
2936            _ => Err(MobError::Internal(
2937                "unexpected command result variant".into(),
2938            )),
2939        }
2940    }
2941
2942    /// Send typed peer communication from one mob member to another.
2943    ///
2944    /// This uses the sender member's comms runtime and the mob's installed
2945    /// wiring/trust state. It is deliberately distinct from
2946    /// [`MemberHandle::send`], which submits anonymous external work.
2947    pub async fn send_peer_message(
2948        &self,
2949        from: AgentIdentity,
2950        to: AgentIdentity,
2951        content: impl Into<meerkat_core::types::ContentInput>,
2952        handling_mode: HandlingMode,
2953    ) -> Result<PeerMessageReceipt, MobError> {
2954        let receipt = self
2955            .send_actor_command(|reply_tx| MobCommand::SendPeerMessage {
2956                from: MeerkatId::from(&from),
2957                to: MeerkatId::from(&to),
2958                content: content.into(),
2959                handling_mode,
2960                reply_tx,
2961            })
2962            .await??;
2963        match receipt {
2964            SendReceipt::PeerMessageSent { envelope_id, acked } => Ok(PeerMessageReceipt {
2965                from,
2966                to,
2967                envelope_id,
2968                acked,
2969                handling_mode,
2970            }),
2971            other => Err(MobError::Internal(format!(
2972                "unexpected peer-message receipt variant: {other:?}"
2973            ))),
2974        }
2975    }
2976
2977    /// Unwire a local member from either another local member or an external peer.
2978    pub async fn unwire<T>(&self, local: AgentIdentity, target: T) -> Result<(), MobError>
2979    where
2980        T: Into<PeerTarget>,
2981    {
2982        match self
2983            .execute_machine_command(MobMachineCommand::Unwire {
2984                local: MeerkatId::from(&local),
2985                target: target.into(),
2986            })
2987            .await?
2988        {
2989            MobMachineCommandResult::Unit => Ok(()),
2990            _ => Err(MobError::Internal(
2991                "unexpected command result variant".into(),
2992            )),
2993        }
2994    }
2995
2996    /// Compatibility wrapper for internal-turn submission.
2997    ///
2998    /// Prefer [`MobHandle::member`] plus [`MemberHandle::internal_turn`] for
2999    /// the target 0.5 API shape.
3000    ///
3001    /// DELETE_ME B5: three operations that sometimes get mistaken for the
3002    /// same thing are actually three distinct slices of "deliver content to
3003    /// a member". [`MobHandle::internal_turn`] / [`MemberHandle::internal_turn`]
3004    /// (this) is Rust in-process direct write into the member's pending
3005    /// turn slot — no peer comms, no handling-mode selection. `mob/turn_start`
3006    /// (RPC) resolves the identity to the bridge session and delegates to
3007    /// the canonical `turn/start` handler with turn-level overrides.
3008    /// `mob/member_send` (RPC) is peer-delivery shape over comms with
3009    /// `HandlingMode` + `RenderMetadata`; it lands in the member's comms
3010    /// inbox, not as a new turn. The three surfaces share a name fragment
3011    /// but diverge on who authorizes the delivery, what the member's
3012    /// runtime does with it, and what the caller gets back. Keep them
3013    /// separate — collapsing them would erase real policy distinctions.
3014    pub async fn internal_turn(
3015        &self,
3016        identity: AgentIdentity,
3017        message: impl Into<meerkat_core::types::ContentInput>,
3018    ) -> Result<MemberDeliveryReceipt, MobError> {
3019        let meerkat_id = MeerkatId::from(&identity);
3020        self.internal_turn_for_member(meerkat_id.clone(), message.into())
3021            .await?;
3022        let snapshot = self.member_status(&identity).await?;
3023        Ok(MemberDeliveryReceipt {
3024            identity,
3025            agent_runtime_id: snapshot.agent_runtime_id,
3026            fence_token: snapshot.fence_token,
3027            handling_mode: HandlingMode::Queue,
3028        })
3029    }
3030
3031    pub(super) async fn external_turn_for_member(
3032        &self,
3033        agent_identity: MeerkatId,
3034        message: meerkat_core::types::ContentInput,
3035        handling_mode: HandlingMode,
3036        render_metadata: Option<RenderMetadata>,
3037    ) -> Result<(), MobError> {
3038        let snapshot = self
3039            .member_status(&AgentIdentity::from(agent_identity.as_str()))
3040            .await?;
3041        let cmd = Box::new(crate::mob_machine::SubmitWorkCommand {
3042            runtime_id: snapshot.agent_runtime_id,
3043            fence_token: snapshot.fence_token,
3044            work_ref: WorkRef::new(),
3045            spec: WorkSpec::new(message, WorkOrigin::External),
3046            handling_mode,
3047            render_metadata,
3048            ack_mode: crate::mob_machine::SubmitWorkAckMode::IngressAccepted,
3049        });
3050        self.execute_machine_command(MobMachineCommand::SubmitWork(cmd))
3051            .await?;
3052        Ok(())
3053    }
3054
3055    pub(super) async fn internal_turn_for_member(
3056        &self,
3057        agent_identity: MeerkatId,
3058        message: meerkat_core::types::ContentInput,
3059    ) -> Result<(), MobError> {
3060        // #31 Wave D: retiring members reject new internal work. This
3061        // matches the `member()` gate for external turns so the observable
3062        // contract is symmetric across the retire window.
3063        {
3064            let roster = self.roster.read().await;
3065            match roster.get(&agent_identity) {
3066                None => return Err(MobError::MemberNotFound(agent_identity)),
3067                Some(entry) if entry.state != crate::roster::MemberState::Active => {
3068                    return Err(MobError::MemberNotFound(agent_identity));
3069                }
3070                _ => {}
3071            }
3072        }
3073        let snapshot = self
3074            .member_status(&AgentIdentity::from(agent_identity.as_str()))
3075            .await?;
3076        let cmd = Box::new(crate::mob_machine::SubmitWorkCommand {
3077            runtime_id: snapshot.agent_runtime_id,
3078            fence_token: snapshot.fence_token,
3079            work_ref: WorkRef::new(),
3080            spec: WorkSpec::new(message, WorkOrigin::Internal),
3081            handling_mode: HandlingMode::Queue,
3082            render_metadata: None,
3083            ack_mode: crate::mob_machine::SubmitWorkAckMode::TurnCompleted,
3084        });
3085        self.execute_machine_command(MobMachineCommand::SubmitWork(cmd))
3086            .await?;
3087        Ok(())
3088    }
3089
3090    // -----------------------------------------------------------------
3091    // Work lane
3092    // -----------------------------------------------------------------
3093
3094    /// Submit a unit of work to a mob member.
3095    ///
3096    /// The fence token is validated against the member's current incarnation at
3097    /// the dispatch boundary. If the token is stale (i.e., the member has been
3098    /// respawned or reset since the caller obtained the token), the submission
3099    /// is rejected with [`MobError::StaleFenceToken`].
3100    pub async fn submit_work(
3101        &self,
3102        runtime_id: AgentRuntimeId,
3103        fence_token: FenceToken,
3104        work_ref: WorkRef,
3105        spec: WorkSpec,
3106    ) -> Result<WorkDeliveryReceipt, MobError> {
3107        let cmd = Box::new(crate::mob_machine::SubmitWorkCommand {
3108            runtime_id: runtime_id.clone(),
3109            fence_token,
3110            work_ref: work_ref.clone(),
3111            spec,
3112            handling_mode: HandlingMode::Queue,
3113            render_metadata: None,
3114            ack_mode: crate::mob_machine::SubmitWorkAckMode::IngressAccepted,
3115        });
3116        match self
3117            .execute_machine_command(MobMachineCommand::SubmitWork(cmd))
3118            .await?
3119        {
3120            MobMachineCommandResult::WorkReceipt { work_ref: ref_out } => Ok(WorkDeliveryReceipt {
3121                work_ref: ref_out,
3122                runtime_id,
3123            }),
3124            _ => Err(MobError::Internal(
3125                "unexpected command result variant".into(),
3126            )),
3127        }
3128    }
3129
3130    /// Cancel a previously submitted unit of work.
3131    ///
3132    /// Returns `Ok(())` if the work was found and cancellation was initiated.
3133    /// Returns [`MobError::WorkNotFound`] if no in-flight work with the given
3134    /// reference exists.
3135    pub async fn cancel_work(&self, work_ref: WorkRef) -> Result<(), MobError> {
3136        match self
3137            .execute_machine_command(MobMachineCommand::CancelWork { work_ref })
3138            .await?
3139        {
3140            MobMachineCommandResult::Unit => Ok(()),
3141            _ => Err(MobError::Internal(
3142                "unexpected command result variant".into(),
3143            )),
3144        }
3145    }
3146
3147    /// Cancel all in-flight work for a mob member.
3148    ///
3149    /// The fence token is validated before cancellation proceeds.
3150    pub async fn cancel_all_work(
3151        &self,
3152        runtime_id: AgentRuntimeId,
3153        fence_token: FenceToken,
3154    ) -> Result<(), MobError> {
3155        match self
3156            .execute_machine_command(MobMachineCommand::CancelAllWork {
3157                runtime_id,
3158                fence_token,
3159            })
3160            .await?
3161        {
3162            MobMachineCommandResult::Unit => Ok(()),
3163            _ => Err(MobError::Internal(
3164                "unexpected command result variant".into(),
3165            )),
3166        }
3167    }
3168
3169    /// Transition Running -> Stopped. Mutation commands are rejected while stopped.
3170    pub async fn stop(&self) -> Result<(), MobError> {
3171        match self
3172            .execute_machine_command(MobMachineCommand::Stop)
3173            .await?
3174        {
3175            MobMachineCommandResult::Unit => Ok(()),
3176            _ => Err(MobError::Internal(
3177                "unexpected command result variant".into(),
3178            )),
3179        }
3180    }
3181
3182    /// Transition Stopped -> Running.
3183    pub async fn resume(&self) -> Result<(), MobError> {
3184        match self
3185            .execute_machine_command(MobMachineCommand::Resume)
3186            .await?
3187        {
3188            MobMachineCommandResult::Unit => Ok(()),
3189            _ => Err(MobError::Internal(
3190                "unexpected command result variant".into(),
3191            )),
3192        }
3193    }
3194
3195    /// Archive all members, emit MobCompleted, and transition to Completed.
3196    pub async fn complete(&self) -> Result<(), MobError> {
3197        match self
3198            .execute_machine_command(MobMachineCommand::Complete)
3199            .await?
3200        {
3201            MobMachineCommandResult::Unit => Ok(()),
3202            _ => Err(MobError::Internal(
3203                "unexpected command result variant".into(),
3204            )),
3205        }
3206    }
3207
3208    /// Wipe all runtime state and transition back to `Running`.
3209    ///
3210    /// # Scope vs `destroy`
3211    ///
3212    /// `reset` and [`Self::destroy`] look similar (both wipe runtime
3213    /// state, both teardown MCP servers, both append epoch-marker
3214    /// events) but they have **deliberately different semantics**:
3215    ///
3216    /// | aspect               | `reset()`                                          | `destroy()`                                                 |
3217    /// |----------------------|----------------------------------------------------|-------------------------------------------------------------|
3218    /// | actor                | **stays alive**, transitions to `Running`          | terminates, transitions to `Destroyed`                      |
3219    /// | member teardown      | `retire_all_members` (idempotent, all-or-retry)    | `destroy_all_members_for_destroy` (force-fallback, atomic)  |
3220    /// | return               | `Result<(), MobError>` — clean or retry            | [`Result<MobDestroyReport, MobDestroyError>`]               |
3221    /// | partial outcomes     | retire-idempotent → reissuing reset retries safely | structured report carries force-destroyed / orphaned / errs |
3222    /// | event marker         | `MobCreated` + `MobReset` (new epoch, replayable)  | `MobDestroying` until successful storage clear              |
3223    /// | handle usable after? | yes                                                | no                                                          ||
3224    ///
3225    /// The `()` return is not hiding partial-state information: retire
3226    /// is idempotent by construction (see `handle_retire` in
3227    /// `actor.rs` — "cleanup errors are best-effort. If any member
3228    /// fails to retire the operation is aborted — the caller can retry
3229    /// since already-retired members are idempotent"), so on error
3230    /// the contract is "retry `reset()`" rather than "read the partial
3231    /// outcome from the report." `destroy`'s richer return exists
3232    /// because force-fallback produces **genuinely new state**
3233    /// (force-destroyed members, orphaned remote bindings that
3234    /// couldn't be cleanly dismantled) that the caller needs to see;
3235    /// `reset` by design avoids that regime and so has no equivalent
3236    /// data to surface.
3237    ///
3238    /// # Dogma fit (B3)
3239    ///
3240    /// DELETE_ME finding B3 flagged the divergent return types as an
3241    /// API asymmetry. After audit the asymmetry is load-bearing: the
3242    /// return types match the underlying member-teardown shape
3243    /// (idempotent retire vs force-fallback destroy). Per dogma
3244    /// principle #5 ("typed truth, never string folklore") the reset
3245    /// return does not need to pretend to carry a report it cannot
3246    /// produce; and per principle #1 ("one semantic fact, one
3247    /// owner") this matches the single underlying model: the
3248    /// teardown path authors the outcome shape, the handle signature
3249    /// reflects it. Regression coverage lives in
3250    /// `test_reset_clears_roster_events_and_returns_to_running`,
3251    /// `test_reset_allows_spawn_after_reset`, and the
3252    /// supervisor-escalation reset tests in
3253    /// `meerkat-mob/src/runtime/tests.rs`.
3254    pub async fn reset(&self) -> Result<(), MobError> {
3255        match self
3256            .execute_machine_command(MobMachineCommand::Reset)
3257            .await?
3258        {
3259            MobMachineCommandResult::Unit => Ok(()),
3260            _ => Err(MobError::Internal(
3261                "unexpected command result variant".into(),
3262            )),
3263        }
3264    }
3265
3266    /// Retire active members, clear persisted mob storage, and terminate the actor.
3267    pub async fn destroy(&self) -> Result<MobDestroyReport, MobDestroyError> {
3268        match self
3269            .execute_destroy_machine_command(MobMachineCommand::Destroy)
3270            .await?
3271        {
3272            MobMachineCommandResult::DestroyReport(report) => Ok(report),
3273            _ => Err(MobDestroyError::from(MobError::Internal(
3274                "unexpected command result variant".into(),
3275            ))),
3276        }
3277    }
3278
3279    #[cfg(test)]
3280    pub async fn debug_flow_tracker_counts(&self) -> Result<(usize, usize), MobError> {
3281        match self
3282            .execute_machine_command(MobMachineCommand::FlowTrackerCounts)
3283            .await?
3284        {
3285            MobMachineCommandResult::FlowTrackerCounts(counts) => Ok(counts),
3286            _ => Err(MobError::Internal(
3287                "unexpected command result variant".into(),
3288            )),
3289        }
3290    }
3291
3292    #[cfg(test)]
3293    pub(crate) async fn debug_orchestrator_snapshot(
3294        &self,
3295    ) -> Result<super::MobOrchestratorSnapshot, MobError> {
3296        match self
3297            .execute_machine_command(MobMachineCommand::OrchestratorSnapshot)
3298            .await?
3299        {
3300            MobMachineCommandResult::OrchestratorSnapshot(snapshot) => Ok(snapshot),
3301            _ => Err(MobError::Internal(
3302                "unexpected command result variant".into(),
3303            )),
3304        }
3305    }
3306
3307    #[cfg(test)]
3308    pub(crate) async fn debug_lifecycle_snapshot(&self) -> Result<MobLifecycleSnapshot, MobError> {
3309        match self
3310            .execute_machine_command(MobMachineCommand::LifecycleSnapshot)
3311            .await?
3312        {
3313            MobMachineCommandResult::LifecycleSnapshot(snapshot) => Ok(snapshot),
3314            _ => Err(MobError::Internal(
3315                "unexpected command result variant".into(),
3316            )),
3317        }
3318    }
3319
3320    #[cfg(test)]
3321    pub(crate) async fn debug_lifecycle_notification_burst(
3322        &self,
3323        count: usize,
3324        message: impl Into<String>,
3325    ) -> Result<(), MobError> {
3326        match self
3327            .execute_machine_command(MobMachineCommand::LifecycleNotificationBurst {
3328                count,
3329                message: message.into(),
3330            })
3331            .await?
3332        {
3333            MobMachineCommandResult::LifecycleNotificationBurst => Ok(()),
3334            _ => Err(MobError::Internal(
3335                "unexpected command result variant".into(),
3336            )),
3337        }
3338    }
3339
3340    #[cfg(test)]
3341    pub(crate) async fn debug_dsl_t2_snapshot(&self) -> Result<super::MobDslT2Snapshot, MobError> {
3342        match self
3343            .execute_machine_command(MobMachineCommand::DslT2Snapshot)
3344            .await?
3345        {
3346            MobMachineCommandResult::DslT2Snapshot(snapshot) => Ok(snapshot),
3347            _ => Err(MobError::Internal(
3348                "unexpected command result variant".into(),
3349            )),
3350        }
3351    }
3352
3353    /// Set or clear the spawn policy for automatic member provisioning.
3354    ///
3355    /// When set, external turns targeting an unknown member identity will
3356    /// consult the policy before returning `MeerkatNotFound`.
3357    pub async fn set_spawn_policy(
3358        &self,
3359        policy: Option<Arc<dyn super::spawn_policy::SpawnPolicy>>,
3360    ) -> Result<(), MobError> {
3361        match self
3362            .execute_machine_command(MobMachineCommand::SetSpawnPolicy { policy })
3363            .await?
3364        {
3365            MobMachineCommandResult::Unit => Ok(()),
3366            _ => Err(MobError::Internal(
3367                "unexpected command result variant".into(),
3368            )),
3369        }
3370    }
3371
3372    /// Shut down the actor. After this, no more commands are accepted.
3373    pub async fn shutdown(&self) -> Result<(), MobError> {
3374        match self
3375            .execute_machine_command(MobMachineCommand::Shutdown)
3376            .await?
3377        {
3378            MobMachineCommandResult::Unit => Ok(()),
3379            _ => Err(MobError::Internal(
3380                "unexpected command result variant".into(),
3381            )),
3382        }
3383    }
3384
3385    /// Force-cancel a member's in-flight turn via the user interrupt path.
3386    ///
3387    /// Unlike [`retire`](Self::retire), this does not archive the session or
3388    /// remove the member from the roster — it only cancels the current turn.
3389    pub async fn force_cancel_member(&self, identity: AgentIdentity) -> Result<(), MobError> {
3390        match self
3391            .execute_machine_command(MobMachineCommand::ForceCancel {
3392                agent_identity: MeerkatId::from(&identity),
3393            })
3394            .await?
3395        {
3396            MobMachineCommandResult::Unit => Ok(()),
3397            _ => Err(MobError::Internal(
3398                "unexpected command result variant".into(),
3399            )),
3400        }
3401    }
3402
3403    async fn startup_kickoff_snapshot(
3404        &self,
3405    ) -> Result<super::state::MobStartupKickoffSnapshot, MobError> {
3406        self.send_actor_command(|reply_tx| MobCommand::StartupKickoffSnapshot { reply_tx })
3407            .await
3408    }
3409
3410    fn kickoff_wait_is_satisfied(
3411        entry: &RosterEntry,
3412        snapshot: &MobMemberSnapshot,
3413        pending_kickoff_member_ids: &BTreeSet<String>,
3414    ) -> bool {
3415        if entry.runtime_mode != crate::MobRuntimeMode::AutonomousHost {
3416            return true;
3417        }
3418        match snapshot.status {
3419            MobMemberStatus::Unknown => false,
3420            MobMemberStatus::Active => {
3421                !pending_kickoff_member_ids.contains(entry.agent_identity.as_str())
3422            }
3423            MobMemberStatus::Retiring | MobMemberStatus::Broken | MobMemberStatus::Completed => {
3424                true
3425            }
3426        }
3427    }
3428
3429    fn ready_wait_is_satisfied(
3430        entry: &RosterEntry,
3431        snapshot: &MobMemberSnapshot,
3432        ready_runtime_ids: &BTreeSet<String>,
3433    ) -> bool {
3434        if entry.runtime_mode != crate::MobRuntimeMode::AutonomousHost {
3435            return true;
3436        }
3437        match snapshot.status {
3438            MobMemberStatus::Unknown => false,
3439            MobMemberStatus::Active => {
3440                ready_runtime_ids.contains(&entry.agent_runtime_id.to_string())
3441            }
3442            MobMemberStatus::Retiring | MobMemberStatus::Broken | MobMemberStatus::Completed => {
3443                true
3444            }
3445        }
3446    }
3447
3448    async fn wait_for_kickoff_resolution(
3449        &self,
3450        target_ids: &[MeerkatId],
3451        timeout: Option<Duration>,
3452    ) -> Result<(), MobError> {
3453        if target_ids.is_empty() {
3454            return Ok(());
3455        }
3456
3457        let deadline = Instant::now() + timeout.unwrap_or(DEFAULT_KICKOFF_WAIT_TIMEOUT);
3458        loop {
3459            let kickoff_snapshot = self.startup_kickoff_snapshot().await?;
3460            let entries = self
3461                .list_all_members()
3462                .await
3463                .into_iter()
3464                .map(|entry| (entry.agent_identity.clone(), entry))
3465                .collect::<HashMap<_, _>>();
3466
3467            let mut pending_member_ids = Vec::new();
3468            for id in target_ids {
3469                let Some(entry) = entries.get(id) else {
3470                    continue;
3471                };
3472                let member_snapshot = self
3473                    .member_status(&AgentIdentity::from(id.as_str()))
3474                    .await?;
3475                if !Self::kickoff_wait_is_satisfied(
3476                    entry,
3477                    &member_snapshot,
3478                    &kickoff_snapshot.pending_kickoff_member_ids,
3479                ) {
3480                    pending_member_ids.push(id.clone());
3481                }
3482            }
3483
3484            if pending_member_ids.is_empty() {
3485                return Ok(());
3486            }
3487
3488            let remaining = deadline.saturating_duration_since(Instant::now());
3489            if remaining.is_zero() {
3490                return Err(MobError::KickoffWaitTimedOut { pending_member_ids });
3491            }
3492
3493            tokio::time::sleep(std::cmp::min(remaining, Duration::from_millis(50))).await;
3494        }
3495    }
3496
3497    async fn wait_for_ready_resolution(
3498        &self,
3499        target_ids: &[MeerkatId],
3500        timeout: Option<Duration>,
3501    ) -> Result<(), MobError> {
3502        if target_ids.is_empty() {
3503            return Ok(());
3504        }
3505
3506        let deadline = Instant::now() + timeout.unwrap_or(DEFAULT_READY_WAIT_TIMEOUT);
3507        loop {
3508            let snapshot = self.startup_kickoff_snapshot().await?;
3509            let entries = self
3510                .list_all_members()
3511                .await
3512                .into_iter()
3513                .map(|entry| (entry.agent_identity.clone(), entry))
3514                .collect::<HashMap<_, _>>();
3515
3516            let mut pending_member_ids = Vec::new();
3517            for id in target_ids {
3518                let Some(entry) = entries.get(id) else {
3519                    continue;
3520                };
3521                let member_snapshot = self
3522                    .member_status(&AgentIdentity::from(id.as_str()))
3523                    .await?;
3524                if !Self::ready_wait_is_satisfied(
3525                    entry,
3526                    &member_snapshot,
3527                    &snapshot.ready_runtime_ids,
3528                ) {
3529                    pending_member_ids.push(id.clone());
3530                }
3531            }
3532
3533            if pending_member_ids.is_empty() {
3534                return Ok(());
3535            }
3536
3537            let remaining = deadline.saturating_duration_since(Instant::now());
3538            if remaining.is_zero() {
3539                return Err(MobError::ReadyWaitTimedOut { pending_member_ids });
3540            }
3541
3542            tokio::time::sleep(std::cmp::min(remaining, Duration::from_millis(50))).await;
3543        }
3544    }
3545
3546    async fn wait_one_snapshot(
3547        &self,
3548        agent_identity: &MeerkatId,
3549    ) -> Result<MobMemberSnapshot, MobError> {
3550        loop {
3551            let snapshot = self
3552                .member_status(&AgentIdentity::from(agent_identity.as_str()))
3553                .await?;
3554            if snapshot.is_final {
3555                return Ok(snapshot);
3556            }
3557            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3558        }
3559    }
3560
3561    /// Get a point-in-time execution snapshot for a member.
3562    ///
3563    /// This is the deep inspection surface. Unlike list projections, it
3564    /// resolves live peer connectivity when a comms runtime is available,
3565    /// and projects the current realtime attachment status from the
3566    /// MeerkatMachine (when the runtime adapter is available).
3567    pub async fn member_status(
3568        &self,
3569        identity: &AgentIdentity,
3570    ) -> Result<MobMemberSnapshot, MobError> {
3571        if let Some(snapshot) = self.inflight_retiring_snapshot(identity).await {
3572            return Ok(snapshot);
3573        }
3574        let mut snapshot = match self
3575            .execute_machine_command(MobMachineCommand::MemberStatus {
3576                agent_identity: MeerkatId::from(identity),
3577            })
3578            .await?
3579        {
3580            MobMachineCommandResult::MemberStatus(snapshot) => snapshot,
3581            _ => {
3582                return Err(MobError::Internal(
3583                    "unexpected command result variant".into(),
3584                ));
3585            }
3586        };
3587        self.apply_inflight_member_projection(identity, &mut snapshot)
3588            .await;
3589        snapshot.peer_connectivity = match tokio::time::timeout(
3590            Duration::from_secs(2),
3591            self.project_member_peer_connectivity(identity, &snapshot),
3592        )
3593        .await
3594        {
3595            Ok(connectivity) => connectivity,
3596            Err(_) => {
3597                tracing::warn!(
3598                    agent_identity = %identity,
3599                    "mob member status peer-connectivity projection timed out"
3600                );
3601                None
3602            }
3603        };
3604        snapshot.resolved_capabilities = self.project_resolved_capabilities(&snapshot).await;
3605        snapshot.external_member = self
3606            .project_external_member_observation(identity, &snapshot)
3607            .await;
3608        Ok(snapshot)
3609    }
3610
3611    async fn inflight_retiring_snapshot(
3612        &self,
3613        identity: &AgentIdentity,
3614    ) -> Option<MobMemberSnapshot> {
3615        let entry = {
3616            let roster = self.roster.read().await;
3617            roster.get(&MeerkatId::from(identity)).cloned()
3618        }?;
3619        if entry.state != crate::roster::MemberState::Retiring {
3620            return None;
3621        }
3622        Some(
3623            MobMemberSnapshot {
3624                status: MobMemberStatus::Retiring,
3625                agent_runtime_id: entry.agent_runtime_id,
3626                fence_token: entry.fence_token,
3627                output_preview: None,
3628                error: None,
3629                tokens_used: 0,
3630                is_final: false,
3631                current_session_id: None,
3632                current_bridge_session_id: None,
3633                peer_connectivity: None,
3634                kickoff: entry.kickoff,
3635                external_member: None,
3636                resolved_capabilities: None,
3637            }
3638            .with_current_bridge_session_id(entry.member_ref.bridge_session_id().cloned()),
3639        )
3640    }
3641
3642    async fn apply_inflight_member_projection(
3643        &self,
3644        identity: &AgentIdentity,
3645        snapshot: &mut MobMemberSnapshot,
3646    ) {
3647        if snapshot.status != MobMemberStatus::Unknown {
3648            return;
3649        }
3650        let is_retiring = {
3651            let roster = self.roster.read().await;
3652            roster
3653                .get(&MeerkatId::from(identity))
3654                .is_some_and(|entry| entry.state == crate::roster::MemberState::Retiring)
3655        };
3656        if is_retiring {
3657            snapshot.status = MobMemberStatus::Retiring;
3658            snapshot.is_final = false;
3659        }
3660    }
3661
3662    async fn project_member_peer_connectivity(
3663        &self,
3664        identity: &AgentIdentity,
3665        snapshot: &MobMemberSnapshot,
3666    ) -> Option<MobPeerConnectivitySnapshot> {
3667        let bridge_session_id = snapshot.current_bridge_session_id().cloned()?;
3668        let (entry, roster_snapshot) = {
3669            let roster = self.roster.read().await;
3670            (
3671                roster.get(&MeerkatId::from(identity)).cloned()?,
3672                roster.snapshot(),
3673            )
3674        };
3675        self.resolve_peer_connectivity(&entry, &bridge_session_id, &roster_snapshot)
3676            .await
3677    }
3678
3679    /// Project the current realtime attachment status for the given member
3680    /// snapshot by consulting the MeerkatMachine runtime adapter. Returns
3681    async fn project_resolved_capabilities(
3682        &self,
3683        snapshot: &MobMemberSnapshot,
3684    ) -> Option<meerkat_contracts::WireResolvedModelCapabilities> {
3685        #[cfg(feature = "runtime-adapter")]
3686        {
3687            use meerkat_runtime::service_ext::SessionServiceRuntimeExt as _;
3688            let session_id = snapshot.current_bridge_session_id().cloned()?;
3689            let runtime = self.runtime_adapter.as_ref()?.as_ref();
3690            runtime
3691                .resolved_session_llm_capabilities(&session_id)
3692                .await
3693                .ok()
3694                .flatten()
3695                .map(|surface| surface.to_wire_resolved())
3696        }
3697        #[cfg(not(feature = "runtime-adapter"))]
3698        {
3699            let _ = snapshot;
3700            None
3701        }
3702    }
3703
3704    async fn project_external_member_observation(
3705        &self,
3706        identity: &AgentIdentity,
3707        snapshot: &MobMemberSnapshot,
3708    ) -> Option<ExternalMemberObservationSnapshot> {
3709        let entry = {
3710            let roster = self.roster.read().await;
3711            roster.get(identity).cloned()
3712        }?;
3713        let MemberRef::BackendPeer {
3714            session_id,
3715            bootstrap_token,
3716            ..
3717        } = &entry.member_ref
3718        else {
3719            return None;
3720        };
3721
3722        let owner = ExternalMemberOwnerRef {
3723            mob_id: self.definition.id.clone(),
3724            agent_identity: identity.clone(),
3725        };
3726        let bridge_session_present = session_id.is_some();
3727        let has_bootstrap_token = bootstrap_token
3728            .as_ref()
3729            .is_some_and(|token| !token.is_empty());
3730        let binding_mode = if bridge_session_present {
3731            ExternalMemberBindingMode::BridgeSessionBacked
3732        } else {
3733            ExternalMemberBindingMode::PeerOnly
3734        };
3735        let reachability = match snapshot.status {
3736            MobMemberStatus::Broken => ExternalMemberReachability::Unavailable {
3737                reason: snapshot
3738                    .error
3739                    .clone()
3740                    .unwrap_or_else(|| "external member restore failed".to_string()),
3741            },
3742            _ => ExternalMemberReachability::Unknown,
3743        };
3744        let rebind = match snapshot.status {
3745            MobMemberStatus::Broken => ExternalMemberRebindStatus::Failed {
3746                reason: snapshot
3747                    .error
3748                    .clone()
3749                    .unwrap_or_else(|| "external member restore failed".to_string()),
3750            },
3751            _ if bridge_session_present => ExternalMemberRebindStatus::NotRequired,
3752            _ if has_bootstrap_token => ExternalMemberRebindStatus::Available,
3753            _ => ExternalMemberRebindStatus::Unavailable {
3754                reason: "missing bootstrap_token for supervisor rebind".to_string(),
3755            },
3756        };
3757
3758        Some(ExternalMemberObservationSnapshot {
3759            owner: owner.clone(),
3760            binding_mode,
3761            bridge_session_present,
3762            reachability,
3763            rebind,
3764            forwarding: ExternalMemberObservationSnapshot::forwarding(&owner),
3765        })
3766    }
3767
3768    /// Wait until all current autonomous members resolve their initial kickoff.
3769    ///
3770    /// In 0.6 autonomous members no longer run a synthetic second kickoff turn,
3771    /// but their initial prompt still resolves asynchronously through the
3772    /// runtime-backed input path. This barrier is satisfied once each targeted
3773    /// autonomous member leaves `pending` / `starting` / `callback_pending`
3774    /// and reaches a terminal kickoff phase.
3775    pub async fn wait_for_kickoff_complete(
3776        &self,
3777        timeout: Option<Duration>,
3778    ) -> Result<Vec<(AgentIdentity, MobMemberSnapshot)>, MobError> {
3779        let target_ids = self
3780            .list_all_members()
3781            .await
3782            .into_iter()
3783            .map(|entry| entry.agent_identity)
3784            .collect::<Vec<_>>();
3785        let identities: Vec<AgentIdentity> = target_ids.clone();
3786        self.wait_for_kickoff_resolution(&target_ids, timeout)
3787            .await?;
3788
3789        let mut snapshots = Vec::with_capacity(identities.len());
3790        for identity in identities {
3791            snapshots.push((identity.clone(), self.member_status(&identity).await?));
3792        }
3793        Ok(snapshots)
3794    }
3795
3796    /// Wait until the given members resolve their initial kickoff.
3797    ///
3798    /// See [`wait_for_kickoff_complete`](Self::wait_for_kickoff_complete) for details.
3799    pub async fn wait_for_members_kickoff_complete(
3800        &self,
3801        ids: &[AgentIdentity],
3802        timeout: Option<Duration>,
3803    ) -> Result<Vec<(AgentIdentity, MobMemberSnapshot)>, MobError> {
3804        let target_meerkat_ids: Vec<MeerkatId> = ids.iter().map(MeerkatId::from).collect();
3805        self.wait_for_kickoff_resolution(&target_meerkat_ids, timeout)
3806            .await?;
3807
3808        let mut snapshots = Vec::with_capacity(ids.len());
3809        for identity in ids {
3810            snapshots.push((identity.clone(), self.member_status(identity).await?));
3811        }
3812        Ok(snapshots)
3813    }
3814
3815    /// Wait until all current members are startup-ready for orchestration.
3816    pub async fn wait_for_ready(
3817        &self,
3818        timeout: Option<Duration>,
3819    ) -> Result<Vec<(AgentIdentity, MobMemberSnapshot)>, MobError> {
3820        let target_ids = self
3821            .list_all_members()
3822            .await
3823            .into_iter()
3824            .map(|entry| entry.agent_identity)
3825            .collect::<Vec<_>>();
3826        let identities: Vec<AgentIdentity> = target_ids.clone();
3827        self.wait_for_ready_resolution(&target_ids, timeout).await?;
3828
3829        let mut snapshots = Vec::with_capacity(identities.len());
3830        for identity in identities {
3831            snapshots.push((identity.clone(), self.member_status(&identity).await?));
3832        }
3833        Ok(snapshots)
3834    }
3835
3836    /// Wait until the given members are startup-ready for orchestration.
3837    pub async fn wait_for_members_ready(
3838        &self,
3839        ids: &[AgentIdentity],
3840        timeout: Option<Duration>,
3841    ) -> Result<Vec<(AgentIdentity, MobMemberSnapshot)>, MobError> {
3842        let target_meerkat_ids: Vec<MeerkatId> = ids.iter().map(MeerkatId::from).collect();
3843        self.wait_for_ready_resolution(&target_meerkat_ids, timeout)
3844            .await?;
3845
3846        let mut snapshots = Vec::with_capacity(ids.len());
3847        for identity in ids {
3848            snapshots.push((identity.clone(), self.member_status(identity).await?));
3849        }
3850        Ok(snapshots)
3851    }
3852
3853    /// Wait for a specific member to reach a terminal state, then return its snapshot.
3854    ///
3855    /// Polls canonical member classification until terminal.
3856    pub async fn wait_one(&self, identity: &AgentIdentity) -> Result<MobMemberSnapshot, MobError> {
3857        let meerkat_id = MeerkatId::from(identity);
3858        self.wait_one_snapshot(&meerkat_id).await
3859    }
3860
3861    /// Wait for all specified members to reach terminal states.
3862    pub async fn wait_all(
3863        &self,
3864        identities: &[AgentIdentity],
3865    ) -> Result<Vec<MobMemberSnapshot>, MobError> {
3866        let meerkat_ids: Vec<MeerkatId> = identities.iter().map(MeerkatId::from).collect();
3867        let futs = meerkat_ids
3868            .iter()
3869            .map(|mid| self.wait_one_snapshot(mid))
3870            .collect::<Vec<_>>();
3871        let results = futures::future::join_all(futs).await;
3872        results.into_iter().collect()
3873    }
3874
3875    /// Collect snapshots for all members that have reached terminal states.
3876    pub async fn collect_completed(&self) -> Vec<(AgentIdentity, MobMemberSnapshot)> {
3877        let entries = self.list_all_members().await;
3878        let mut completed = Vec::new();
3879        for entry in entries {
3880            if let Ok(snapshot) = self.member_status(&entry.agent_identity).await
3881                && snapshot.is_final
3882            {
3883                completed.push((entry.agent_identity, snapshot));
3884            }
3885        }
3886        completed
3887    }
3888
3889    /// Spawn a fresh helper, wait for it to complete, retire it, and return its result.
3890    ///
3891    /// Helpers are short-lived TurnDriven tasks by default. Their completion
3892    /// truth is the spawn/create boundary plus the canonical post-spawn member
3893    /// snapshot, not full member terminality in the mob lifecycle.
3894    pub async fn spawn_helper(
3895        &self,
3896        identity: AgentIdentity,
3897        task: impl Into<String>,
3898        options: HelperOptions,
3899    ) -> Result<HelperResult, MobError> {
3900        let profile_name = options
3901            .role_name
3902            .or_else(|| self.definition.profiles.keys().next().cloned())
3903            .ok_or_else(|| {
3904                MobError::Internal("no profile specified and definition has no profiles".into())
3905            })?;
3906        let task_text = task.into();
3907        let meerkat_id = MeerkatId::from(&identity);
3908        let mut spec = SpawnMemberSpec::new(profile_name, identity.clone());
3909        spec.initial_message = Some(task_text.into());
3910        spec.runtime_mode = Some(
3911            options
3912                .runtime_mode
3913                .unwrap_or(crate::MobRuntimeMode::TurnDriven),
3914        );
3915        spec.backend = options.backend;
3916        spec.tool_access_policy = options.tool_access_policy;
3917        spec.auth_binding = options.auth_binding;
3918        spec.inherited_tool_filter = options.inherited_tool_filter;
3919        spec.override_profile = options.override_profile;
3920        spec.model_override = options.model_override;
3921        spec.provider_params_override = options.provider_params_override;
3922        spec.auto_wire_parent = true;
3923
3924        self.spawn_spec_internal_with_source(spec, SpawnSource::HelperSpawn)
3925            .await?;
3926        let helper_snapshot = self.member_status(&identity).await?;
3927        let _ = self.retire(identity).await;
3928
3929        Ok(HelperResult {
3930            output: helper_snapshot.output_preview,
3931            tokens_used: helper_snapshot.tokens_used,
3932            agent_identity: helper_snapshot.agent_runtime_id.identity.clone(),
3933            agent_runtime_id: helper_snapshot.agent_runtime_id,
3934            fence_token: helper_snapshot.fence_token,
3935        })
3936    }
3937
3938    /// Fork from an existing member's context, wait for completion, retire, and return.
3939    ///
3940    /// Like `spawn_helper` but uses `MemberLaunchMode::Fork` to share
3941    /// conversation context with the source member.
3942    pub async fn fork_helper(
3943        &self,
3944        source_identity: &AgentIdentity,
3945        identity: AgentIdentity,
3946        task: impl Into<String>,
3947        fork_context: crate::launch::ForkContext,
3948        options: HelperOptions,
3949    ) -> Result<HelperResult, MobError> {
3950        let profile_name = options
3951            .role_name
3952            .or_else(|| self.definition.profiles.keys().next().cloned())
3953            .ok_or_else(|| {
3954                MobError::Internal("no profile specified and definition has no profiles".into())
3955            })?;
3956        let task_text = task.into();
3957        let meerkat_id = MeerkatId::from(&identity);
3958        let source_member_id = MeerkatId::from(source_identity);
3959        let mut spec = SpawnMemberSpec::new(profile_name, identity.clone());
3960        spec.initial_message = Some(task_text.into());
3961        spec.runtime_mode = Some(
3962            options
3963                .runtime_mode
3964                .unwrap_or(crate::MobRuntimeMode::TurnDriven),
3965        );
3966        spec.backend = options.backend;
3967        spec.tool_access_policy = options.tool_access_policy;
3968        spec.auth_binding = options.auth_binding;
3969        spec.inherited_tool_filter = options.inherited_tool_filter;
3970        spec.override_profile = options.override_profile;
3971        spec.model_override = options.model_override;
3972        spec.provider_params_override = options.provider_params_override;
3973        spec.auto_wire_parent = true;
3974        spec.launch_mode = crate::launch::MemberLaunchMode::Fork {
3975            source_member_id,
3976            fork_context,
3977        };
3978
3979        self.spawn_spec_internal_with_source(spec, SpawnSource::Fork)
3980            .await?;
3981        let helper_snapshot = self.member_status(&identity).await?;
3982        let _ = self.retire(identity).await;
3983
3984        Ok(HelperResult {
3985            output: helper_snapshot.output_preview,
3986            tokens_used: helper_snapshot.tokens_used,
3987            agent_identity: helper_snapshot.agent_runtime_id.identity.clone(),
3988            agent_runtime_id: helper_snapshot.agent_runtime_id,
3989            fence_token: helper_snapshot.fence_token,
3990        })
3991    }
3992
3993    pub(crate) async fn project_machine_input(
3994        &self,
3995        input: crate::machines::mob_machine::MobMachineInput,
3996    ) -> Result<crate::machines::mob_machine::MobMachineState, MobError> {
3997        self.send_actor_command(|reply_tx| MobCommand::ProjectMachineInput {
3998            input: Box::new(input),
3999            reply_tx,
4000        })
4001        .await?
4002    }
4003
4004    pub(super) async fn commit_flow_run_command(
4005        &self,
4006        run_id: &RunId,
4007        command: MobMachineFlowRunCommand,
4008        context: &'static str,
4009    ) -> Result<Option<Vec<flow_run::Effect>>, MobError> {
4010        self.send_actor_command(|reply_tx| MobCommand::CommitFlowRunCommand {
4011            run_id: run_id.clone(),
4012            command: Box::new(command),
4013            context,
4014            reply_tx,
4015        })
4016        .await?
4017    }
4018
4019    pub(super) async fn commit_flow_terminalization(
4020        &self,
4021        run_id: RunId,
4022        flow_id: FlowId,
4023        target: TerminalizationTarget,
4024        command: MobMachineFlowRunCommand,
4025        context: &'static str,
4026    ) -> Result<TerminalizationOutcome, MobError> {
4027        self.send_actor_command(|reply_tx| MobCommand::CommitFlowTerminalization {
4028            run_id,
4029            flow_id,
4030            target,
4031            command: Box::new(command),
4032            context,
4033            reply_tx,
4034        })
4035        .await?
4036    }
4037
4038    pub(super) async fn commit_flow_frame_store_plan(
4039        &self,
4040        run_id: &RunId,
4041        plan: FlowFrameLoopStorePlan,
4042    ) -> Result<bool, MobError> {
4043        self.send_actor_command(|reply_tx| MobCommand::CommitFlowFrameStorePlan {
4044            run_id: run_id.clone(),
4045            plan: Box::new(plan),
4046            reply_tx,
4047        })
4048        .await?
4049    }
4050
4051    pub(crate) async fn preview_machine_input(
4052        &self,
4053        input: crate::machines::mob_machine::MobMachineInput,
4054    ) -> Result<crate::machines::mob_machine::MobMachineState, MobError> {
4055        self.send_actor_command(|reply_tx| MobCommand::PreviewMachineInput {
4056            input: Box::new(input),
4057            reply_tx,
4058        })
4059        .await?
4060    }
4061
4062    pub(crate) async fn query_machine_state(
4063        &self,
4064    ) -> Result<crate::machines::mob_machine::MobMachineState, MobError> {
4065        self.send_actor_command(|reply_tx| MobCommand::QueryMachineState { reply_tx })
4066            .await
4067    }
4068}
4069
4070impl MemberHandle {
4071    /// Target member identity.
4072    pub fn identity(&self) -> AgentIdentity {
4073        AgentIdentity::from(self.agent_identity.as_str())
4074    }
4075
4076    /// Submit external work to this member through the canonical runtime path.
4077    pub async fn send(
4078        &self,
4079        content: impl Into<meerkat_core::types::ContentInput>,
4080        handling_mode: HandlingMode,
4081    ) -> Result<MemberDeliveryReceipt, MobError> {
4082        self.send_with_render_metadata(content, handling_mode, None)
4083            .await
4084    }
4085
4086    /// Submit external work with explicit normalized render metadata.
4087    pub async fn send_with_render_metadata(
4088        &self,
4089        content: impl Into<meerkat_core::types::ContentInput>,
4090        handling_mode: HandlingMode,
4091        render_metadata: Option<RenderMetadata>,
4092    ) -> Result<MemberDeliveryReceipt, MobError> {
4093        self.mob
4094            .external_turn_for_member(
4095                self.agent_identity.clone(),
4096                content.into(),
4097                handling_mode,
4098                render_metadata,
4099            )
4100            .await?;
4101        let snapshot = self
4102            .mob
4103            .member_status(&AgentIdentity::from(self.agent_identity.as_str()))
4104            .await?;
4105        Ok(MemberDeliveryReceipt {
4106            identity: self.identity(),
4107            agent_runtime_id: snapshot.agent_runtime_id,
4108            fence_token: snapshot.fence_token,
4109            handling_mode,
4110        })
4111    }
4112
4113    /// Send typed peer communication from this member to another mob member.
4114    ///
4115    /// Unlike [`Self::send`], this preserves sender/recipient attribution by
4116    /// routing through this member's comms runtime.
4117    pub async fn send_peer_message(
4118        &self,
4119        to: AgentIdentity,
4120        content: impl Into<meerkat_core::types::ContentInput>,
4121        handling_mode: HandlingMode,
4122    ) -> Result<PeerMessageReceipt, MobError> {
4123        self.mob
4124            .send_peer_message(self.identity(), to, content, handling_mode)
4125            .await
4126    }
4127
4128    /// Submit internal work to this member without external addressability checks.
4129    pub async fn internal_turn(
4130        &self,
4131        content: impl Into<meerkat_core::types::ContentInput>,
4132    ) -> Result<MemberDeliveryReceipt, MobError> {
4133        self.mob
4134            .internal_turn_for_member(self.agent_identity.clone(), content.into())
4135            .await?;
4136        let snapshot = self
4137            .mob
4138            .member_status(&AgentIdentity::from(self.agent_identity.as_str()))
4139            .await?;
4140        Ok(MemberDeliveryReceipt {
4141            identity: self.identity(),
4142            agent_runtime_id: snapshot.agent_runtime_id,
4143            fence_token: snapshot.fence_token,
4144            handling_mode: HandlingMode::Queue,
4145        })
4146    }
4147
4148    /// Current bridge session ID for this member, if a session bridge exists.
4149    #[cfg(test)]
4150    pub(crate) async fn current_bridge_session_id(&self) -> Result<Option<SessionId>, MobError> {
4151        let status = self.status().await?;
4152        Ok(status.current_bridge_session_id().cloned())
4153    }
4154
4155    /// Get a point-in-time execution snapshot for this member.
4156    pub async fn status(&self) -> Result<MobMemberSnapshot, MobError> {
4157        self.mob.member_status(&self.identity()).await
4158    }
4159
4160    /// Subscribe to this member's agent events.
4161    pub async fn events(&self) -> Result<EventStream, MobError> {
4162        self.mob.subscribe_agent_events(&self.identity()).await
4163    }
4164}
4165
4166#[cfg(test)]
4167#[allow(clippy::expect_used)]
4168mod tests {
4169    use super::*;
4170    use crate::ids::Generation;
4171
4172    #[test]
4173    fn member_projection_types_omit_bridge_session_fields_in_serialized_output() {
4174        let sid = SessionId::new();
4175
4176        let snapshot = MobMemberSnapshot {
4177            status: MobMemberStatus::Active,
4178            agent_runtime_id: AgentRuntimeId::initial(AgentIdentity::from("worker")),
4179            fence_token: FenceToken::new(0),
4180            output_preview: None,
4181            error: None,
4182            tokens_used: 0,
4183            is_final: false,
4184            current_session_id: None,
4185            current_bridge_session_id: None,
4186            peer_connectivity: None,
4187            kickoff: None,
4188            external_member: None,
4189            resolved_capabilities: None,
4190        }
4191        .with_current_bridge_session_id(Some(sid.clone()));
4192        let snapshot_value =
4193            serde_json::to_value(&snapshot).expect("snapshot should serialize to json");
4194        // 0.6 clean break: session fields are #[serde(skip)] and must not appear
4195        assert!(snapshot_value.get("current_bridge_session_id").is_none());
4196        // `agent_runtime_id` and `fence_token` are binding-era atoms marked
4197        // `pub(crate)` + `#[serde(skip)]` per the struct definition — they
4198        // are bridge-internal and must NOT leak into app-facing serialized
4199        // payloads. The public identity contract is `agent_identity()`
4200        // (derived from `agent_runtime_id.identity`).
4201        assert!(snapshot_value.get("agent_runtime_id").is_none());
4202        assert!(snapshot_value.get("fence_token").is_none());
4203    }
4204
4205    #[test]
4206    fn mob_member_snapshot_exposes_runtime_identity_only_by_accessor() {
4207        let runtime_id = AgentRuntimeId::new(AgentIdentity::from("worker"), Generation::new(3));
4208        let snapshot = MobMemberSnapshot {
4209            status: MobMemberStatus::Active,
4210            agent_runtime_id: runtime_id.clone(),
4211            fence_token: FenceToken::new(9),
4212            output_preview: None,
4213            error: None,
4214            tokens_used: 0,
4215            is_final: false,
4216            current_session_id: None,
4217            current_bridge_session_id: None,
4218            peer_connectivity: None,
4219            kickoff: None,
4220            external_member: None,
4221            resolved_capabilities: None,
4222        };
4223
4224        let snapshot_value =
4225            serde_json::to_value(&snapshot).expect("snapshot should serialize to json");
4226        assert!(snapshot_value.get("agent_runtime_id").is_none());
4227        assert!(snapshot_value.get("fence_token").is_none());
4228
4229        let (projected_runtime_id, projected_fence_token) = snapshot.runtime_identity_fields();
4230        assert_eq!(projected_runtime_id, &runtime_id);
4231        assert_eq!(projected_fence_token, FenceToken::new(9));
4232    }
4233
4234    #[test]
4235    fn mob_member_snapshot_exposes_agent_identity_convenience() {
4236        // Regression for DELETE_ME C9: every consumer used to reach through
4237        // `snapshot.agent_runtime_id.identity`; the snapshot now exposes a
4238        // direct accessor.
4239        let snapshot = MobMemberSnapshot {
4240            status: MobMemberStatus::Active,
4241            agent_runtime_id: AgentRuntimeId::initial(AgentIdentity::from("singer")),
4242            fence_token: FenceToken::new(0),
4243            output_preview: None,
4244            error: None,
4245            tokens_used: 0,
4246            is_final: false,
4247            current_session_id: None,
4248            current_bridge_session_id: None,
4249            peer_connectivity: None,
4250            kickoff: None,
4251            external_member: None,
4252            resolved_capabilities: None,
4253        };
4254        assert_eq!(
4255            snapshot.agent_identity(),
4256            &AgentIdentity::from("singer"),
4257            "agent_identity() must return the canonical identity without requiring callers to reach through agent_runtime_id",
4258        );
4259    }
4260
4261    #[test]
4262    fn canonical_member_material_populates_bridge_binding_from_canonical_state() {
4263        let sid = SessionId::new();
4264        let snapshot = CanonicalMemberSnapshotMaterial {
4265            member_present: true,
4266            status: CanonicalMemberStatus::Active,
4267            is_terminal: false,
4268            error: None,
4269            output_preview: None,
4270            tokens_used: 0,
4271            agent_runtime_id: AgentRuntimeId::initial(AgentIdentity::from("worker")),
4272            fence_token: FenceToken::new(0),
4273            current_bridge_session_id: Some(sid.clone()),
4274            peer_connectivity: None,
4275            kickoff: None,
4276        }
4277        .to_snapshot();
4278
4279        assert_eq!(snapshot.current_bridge_session_id(), Some(&sid));
4280        assert_eq!(snapshot.current_bridge_session_id, Some(sid));
4281    }
4282
4283    #[test]
4284    fn member_receipt_types_omit_bridge_session_fields_in_serialized_output() {
4285        let runtime_id = AgentRuntimeId::new(AgentIdentity::from("worker"), Generation::new(1));
4286        let receipt = MemberRespawnReceipt::new(
4287            AgentIdentity::from("worker"),
4288            runtime_id.clone(),
4289            FenceToken::new(7),
4290            FenceToken::new(8),
4291        );
4292        let receipt_value =
4293            serde_json::to_value(&receipt).expect("respawn receipt should serialize to json");
4294        // Public contract: `identity` is the only identity field that
4295        // surfaces in app-facing serialized output. The binding-era atoms
4296        // (`agent_runtime_id`, `previous_fence_token`, `fence_token`) are
4297        // `pub(crate)` + `#[serde(skip)]` on the struct definition — they
4298        // are bridge-internal and must not leak.
4299        assert_eq!(receipt_value["identity"], "worker");
4300        assert!(receipt_value.get("agent_runtime_id").is_none());
4301        assert!(receipt_value.get("previous_fence_token").is_none());
4302        assert!(receipt_value.get("fence_token").is_none());
4303
4304        let delivery = MemberDeliveryReceipt {
4305            identity: AgentIdentity::from("worker"),
4306            agent_runtime_id: runtime_id,
4307            fence_token: FenceToken::new(8),
4308            handling_mode: HandlingMode::Queue,
4309        };
4310        let delivery_value =
4311            serde_json::to_value(&delivery).expect("delivery receipt should serialize to json");
4312        assert_eq!(delivery_value["identity"], "worker");
4313        assert!(delivery_value.get("agent_runtime_id").is_none());
4314        assert!(delivery_value.get("fence_token").is_none());
4315    }
4316
4317    #[test]
4318    fn helper_result_omits_binding_era_atoms_in_serialized_output() {
4319        let runtime_id = AgentRuntimeId::new(AgentIdentity::from("worker"), Generation::new(2));
4320        let result = HelperResult {
4321            output: Some("done".to_string()),
4322            tokens_used: 7,
4323            agent_identity: AgentIdentity::from("worker"),
4324            agent_runtime_id: runtime_id.clone(),
4325            fence_token: FenceToken::new(9),
4326        };
4327
4328        let value = serde_json::to_value(&result).expect("helper result should serialize to json");
4329        // Public contract: `agent_identity`, `output`, `tokens_used` surface
4330        // in app-facing output. The binding-era atoms (`agent_runtime_id`,
4331        // `fence_token`) are `pub(crate)` + `#[serde(skip)]` per the struct
4332        // definition — bridge-internal and must not leak. Session fields
4333        // were never present.
4334        assert_eq!(value["agent_identity"], "worker");
4335        assert_eq!(value["tokens_used"], 7);
4336        assert!(value.get("agent_runtime_id").is_none());
4337        assert!(value.get("fence_token").is_none());
4338        assert!(value.get("session_id").is_none());
4339        assert!(value.get("bridge_session_id").is_none());
4340    }
4341
4342    #[test]
4343    fn spawn_member_spec_resume_bridge_session_accessors_stay_additive() {
4344        let sid = SessionId::new();
4345        let spec =
4346            SpawnMemberSpec::new("worker", "worker-1").with_resume_bridge_session_id(sid.clone());
4347
4348        assert_eq!(spec.launch_mode.resume_bridge_session_id(), Some(&sid));
4349        assert_eq!(spec.launch_mode.resume_bridge_session_id(), Some(&sid));
4350    }
4351
4352    #[test]
4353    fn spawn_source_launch_mode_classification_is_surface_independent() {
4354        let sid = SessionId::new();
4355        let resume = crate::launch::MemberLaunchMode::Resume {
4356            bridge_session_id: sid,
4357        };
4358        let fork = crate::launch::MemberLaunchMode::Fork {
4359            source_member_id: MeerkatId::from("lead-1"),
4360            fork_context: crate::launch::ForkContext::LastMessages { count: 1 },
4361        };
4362
4363        assert_eq!(
4364            SpawnSource::for_launch_mode(SpawnSource::Consumer, &resume),
4365            SpawnSource::Resume
4366        );
4367        assert_eq!(
4368            SpawnSource::for_launch_mode(SpawnSource::AgentSpawnMember, &resume),
4369            SpawnSource::Resume
4370        );
4371        assert_eq!(
4372            SpawnSource::for_launch_mode(SpawnSource::BatchItem, &fork),
4373            SpawnSource::Fork
4374        );
4375        assert_eq!(
4376            SpawnSource::for_launch_mode(
4377                SpawnSource::AgentSpawnMember,
4378                &crate::launch::MemberLaunchMode::Fresh,
4379            ),
4380            SpawnSource::AgentSpawnMember
4381        );
4382    }
4383}