1use super::*;
2use crate::MobRuntimeMode;
3use crate::machines::mob_machine as mob_dsl;
4use crate::mob_machine::{MobMachineCommand, MobMachineCommandResult};
5use crate::roster::MobMemberKickoffSnapshot;
6use crate::run::{MobMachineFlowRunCommand, flow_run};
7#[cfg(test)]
8use crate::runtime::MobLifecycleSnapshot;
9use crate::runtime::flow_frame_engine::FlowFrameLoopStorePlan;
10#[cfg(test)]
11use crate::runtime::mob_member_lifecycle_projection::{
12 CanonicalMemberSnapshotMaterial, CanonicalMemberStatus,
13};
14use crate::runtime::mob_member_lifecycle_projection::{
15 MobMemberLifecycleInput, MobMemberLifecycleProjection,
16};
17use crate::runtime::reconcile::{
18 EnsureMemberOutcome, MemberFilter, ReconcileFailure, ReconcileOptions, ReconcileReport,
19 ReconcileStage,
20};
21use crate::runtime::terminalization::{TerminalizationOutcome, TerminalizationTarget};
22#[cfg(target_arch = "wasm32")]
23use crate::tokio;
24use meerkat_core::comms::{
25 CommsCommand, PeerDirectoryEntry, PeerId, PeerReachability, PeerReachabilityReason,
26 SendReceipt, TrustedPeerDescriptor,
27};
28use meerkat_core::ops::OperationId;
29use meerkat_core::ops_lifecycle::OpsLifecycleRegistry;
30use meerkat_core::service::{MobToolAuthorityContext, SessionError};
31use meerkat_core::time_compat::Instant;
32use meerkat_core::types::{HandlingMode, RenderMetadata, SessionId};
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35use std::collections::BTreeSet;
36use std::collections::HashMap;
37use std::time::Duration;
38use tokio_util::sync::CancellationToken;
39
40const DEFAULT_KICKOFF_WAIT_TIMEOUT: Duration = Duration::from_secs(600);
41const DEFAULT_READY_WAIT_TIMEOUT: Duration = Duration::from_secs(600);
42
43#[derive(Debug, Clone, Serialize)]
44#[non_exhaustive]
45pub struct MobMemberSnapshot {
46 pub status: MobMemberStatus,
48 #[serde(skip)]
54 pub(crate) agent_runtime_id: AgentRuntimeId,
55 #[serde(skip)]
61 pub(crate) fence_token: FenceToken,
62 pub output_preview: Option<String>,
64 pub error: Option<String>,
66 pub tokens_used: u64,
68 pub is_final: bool,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub current_session_id: Option<SessionId>,
74 #[serde(skip)]
76 pub(crate) current_bridge_session_id: Option<SessionId>,
77 #[serde(skip_serializing_if = "Option::is_none")]
79 pub peer_connectivity: Option<MobPeerConnectivitySnapshot>,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub kickoff: Option<MobMemberKickoffSnapshot>,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
91 pub external_member: Option<ExternalMemberObservationSnapshot>,
92 #[serde(default, skip_serializing_if = "Option::is_none")]
95 pub resolved_capabilities: Option<meerkat_contracts::WireResolvedModelCapabilities>,
96}
97
98impl MobMemberSnapshot {
99 pub(crate) fn with_current_bridge_session_id(
100 mut self,
101 current_bridge_session_id: Option<SessionId>,
102 ) -> Self {
103 self.current_session_id = current_bridge_session_id.clone();
104 self.current_bridge_session_id = current_bridge_session_id;
105 self
106 }
107
108 pub(crate) fn current_bridge_session_id(&self) -> Option<&SessionId> {
109 self.current_bridge_session_id.as_ref()
110 }
111
112 #[must_use]
116 pub fn agent_identity(&self) -> &AgentIdentity {
117 &self.agent_runtime_id.identity
118 }
119
120 #[must_use]
127 pub fn runtime_identity_fields(&self) -> (&AgentRuntimeId, FenceToken) {
128 (&self.agent_runtime_id, self.fence_token)
129 }
130}
131
132#[derive(Debug, Clone, Serialize)]
133pub struct MobMemberListEntry {
134 pub agent_identity: AgentIdentity,
136 pub role: ProfileName,
138 pub runtime_mode: MobRuntimeMode,
139 #[serde(default)]
140 pub state: crate::roster::MemberState,
141 pub wired_to: BTreeSet<AgentIdentity>,
142 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
143 pub labels: BTreeMap<String, String>,
144 pub status: MobMemberStatus,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 pub error: Option<String>,
147 pub is_final: bool,
148 #[serde(default, skip_serializing_if = "Option::is_none")]
149 pub kickoff: Option<MobMemberKickoffSnapshot>,
150 #[serde(skip)]
166 pub(crate) agent_runtime_id: AgentRuntimeId,
167 #[serde(skip)]
168 pub(crate) fence_token: FenceToken,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
171 pub(crate) peer_id: Option<PeerId>,
172 #[serde(default, skip_serializing_if = "Option::is_none")]
174 pub(crate) transport_public_key: Option<String>,
175 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
176 pub(crate) external_peer_specs: BTreeMap<AgentIdentity, TrustedPeerDescriptor>,
177 #[serde(skip)]
178 pub(crate) current_session_id: Option<SessionId>,
179 #[serde(skip)]
180 pub(crate) current_bridge_session_id: Option<SessionId>,
181}
182
183impl MobMemberListEntry {
184 pub(crate) fn with_current_bridge_session_id(
185 mut self,
186 current_bridge_session_id: Option<SessionId>,
187 ) -> Self {
188 self.current_session_id = current_bridge_session_id.clone();
189 self.current_bridge_session_id = current_bridge_session_id;
190 self
191 }
192
193 pub fn binding_atoms(&self) -> (AgentRuntimeId, FenceToken) {
198 (self.agent_runtime_id.clone(), self.fence_token)
199 }
200}
201
202impl WorkDeliveryReceipt {
203 pub fn runtime_id(&self) -> &AgentRuntimeId {
206 &self.runtime_id
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212#[non_exhaustive]
213pub struct MobPeerConnectivitySnapshot {
214 pub reachable_peer_count: usize,
215 pub unknown_peer_count: usize,
216 pub unreachable_peers: Vec<MobUnreachablePeer>,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
221#[non_exhaustive]
222pub struct MobUnreachablePeer {
223 pub peer: String,
224 pub reason: Option<PeerReachabilityReason>,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
229#[serde(rename_all = "snake_case")]
230#[non_exhaustive]
231pub enum MobMemberStatus {
232 Active,
234 Retiring,
236 Broken,
238 Completed,
240 Unknown,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
246#[serde(rename_all = "snake_case")]
247#[non_exhaustive]
248pub struct ExternalMemberOwnerRef {
249 pub mob_id: MobId,
250 pub agent_identity: AgentIdentity,
251}
252
253#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
256#[serde(rename_all = "snake_case")]
257#[non_exhaustive]
258pub enum ExternalMemberBindingMode {
259 BridgeSessionBacked,
260 PeerOnly,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
265#[serde(tag = "status", rename_all = "snake_case")]
266#[non_exhaustive]
267pub enum ExternalMemberReachability {
268 Unknown,
270 Unavailable { reason: String },
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278#[serde(tag = "status", rename_all = "snake_case")]
279#[non_exhaustive]
280pub enum ExternalMemberRebindStatus {
281 NotRequired,
283 Available,
286 Unavailable { reason: String },
288 Failed { reason: String },
290}
291
292#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
294#[serde(rename_all = "snake_case")]
295#[non_exhaustive]
296pub enum ExternalMemberForwardingStatus {
297 Declared,
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
304#[serde(rename_all = "snake_case")]
305#[non_exhaustive]
306pub struct ExternalMemberForwardingHookRef {
307 pub owner: ExternalMemberOwnerRef,
308 pub status: ExternalMemberForwardingStatus,
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
313#[serde(rename_all = "snake_case")]
314#[non_exhaustive]
315pub struct ExternalMemberForwardingHooks {
316 pub artifacts: ExternalMemberForwardingHookRef,
317 pub approvals: ExternalMemberForwardingHookRef,
318}
319
320#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
322#[serde(rename_all = "snake_case")]
323#[non_exhaustive]
324pub struct ExternalMemberObservationSnapshot {
325 pub owner: ExternalMemberOwnerRef,
326 pub binding_mode: ExternalMemberBindingMode,
327 pub bridge_session_present: bool,
328 pub reachability: ExternalMemberReachability,
329 pub rebind: ExternalMemberRebindStatus,
330 pub forwarding: ExternalMemberForwardingHooks,
331}
332
333impl ExternalMemberObservationSnapshot {
334 fn hook(owner: &ExternalMemberOwnerRef) -> ExternalMemberForwardingHookRef {
335 ExternalMemberForwardingHookRef {
336 owner: owner.clone(),
337 status: ExternalMemberForwardingStatus::Declared,
338 }
339 }
340
341 fn forwarding(owner: &ExternalMemberOwnerRef) -> ExternalMemberForwardingHooks {
342 ExternalMemberForwardingHooks {
343 artifacts: Self::hook(owner),
344 approvals: Self::hook(owner),
345 }
346 }
347}
348
349#[derive(Debug, Clone, Serialize)]
351#[non_exhaustive]
352pub struct MemberRespawnReceipt {
353 pub identity: AgentIdentity,
355 #[serde(skip)]
357 pub(crate) agent_runtime_id: AgentRuntimeId,
358 #[serde(skip)]
360 pub(crate) previous_fence_token: FenceToken,
361 #[serde(skip)]
363 pub(crate) fence_token: FenceToken,
364}
365
366impl MemberRespawnReceipt {
367 pub fn new(
368 identity: AgentIdentity,
369 agent_runtime_id: AgentRuntimeId,
370 previous_fence_token: FenceToken,
371 fence_token: FenceToken,
372 ) -> Self {
373 Self {
374 identity,
375 agent_runtime_id,
376 previous_fence_token,
377 fence_token,
378 }
379 }
380}
381
382#[derive(Debug, Clone, Serialize)]
384#[non_exhaustive]
385pub struct SupervisorRotationReport {
386 pub previous_epoch: u64,
388 pub current_epoch: u64,
390 pub public_peer_id: String,
392}
393
394#[derive(Debug, Clone, Default, Serialize)]
396#[non_exhaustive]
397pub struct MobDestroyReport {
398 #[serde(default, skip_serializing_if = "Vec::is_empty")]
400 pub force_destroyed_members: Vec<AgentIdentity>,
401 #[serde(default, skip_serializing_if = "Vec::is_empty")]
403 pub orphaned_remote_members: Vec<AgentIdentity>,
404 #[serde(default)]
406 pub remote_cleanup_deadline_exceeded: bool,
407 #[serde(default)]
409 pub metadata_scrubbed: bool,
410 #[serde(default)]
412 pub events_cleared: bool,
413 #[serde(default)]
415 pub namespace_cleaned: bool,
416 #[serde(default, skip_serializing_if = "Vec::is_empty")]
418 pub errors: Vec<String>,
419}
420
421impl MobDestroyReport {
422 pub(crate) fn push_error(&mut self, error: impl Into<String>) {
423 self.errors.push(error.into());
424 }
425
426 fn error_summary(&self) -> String {
427 if self.errors.is_empty() {
428 "destroy cleanup did not complete".to_string()
429 } else {
430 self.errors.join("; ")
431 }
432 }
433}
434
435#[derive(Debug, thiserror::Error)]
437#[non_exhaustive]
438pub enum MobDestroyError {
439 #[error("destroy incomplete: {}", report.error_summary())]
441 Incomplete { report: MobDestroyReport },
442
443 #[error(transparent)]
445 Mob(#[from] MobError),
446}
447
448#[derive(Debug, Clone, Serialize)]
450#[non_exhaustive]
451pub struct PreviousMemberCleanupReport {
452 pub identity: AgentIdentity,
454 #[serde(skip)]
456 pub(crate) agent_runtime_id: AgentRuntimeId,
457 #[serde(skip)]
459 pub(crate) fence_token: FenceToken,
460 pub retire_attempted: bool,
462 #[serde(default, skip_serializing_if = "Option::is_none")]
464 pub retire_error: Option<String>,
465 #[serde(default)]
467 pub confirmatory_observation_attempted: bool,
468 #[serde(default, skip_serializing_if = "Option::is_none")]
470 pub confirmatory_observation: Option<String>,
471 #[serde(default)]
473 pub destroy_attempted: bool,
474 #[serde(default, skip_serializing_if = "Option::is_none")]
476 pub destroy_error: Option<String>,
477}
478
479#[derive(Debug, Clone, Serialize)]
481#[non_exhaustive]
482pub(crate) struct MemberSpawnReceipt {
483 pub(crate) member_ref: MemberRef,
485 pub(crate) operation_id: OperationId,
487}
488
489#[derive(Debug, Clone, Serialize)]
498#[non_exhaustive]
499pub struct SpawnResult {
500 pub agent_identity: AgentIdentity,
502 #[serde(skip)]
505 pub(crate) agent_runtime_id: AgentRuntimeId,
506 #[serde(skip)]
509 pub(crate) fence_token: FenceToken,
510}
511
512impl SpawnResult {
513 pub fn new(
515 agent_identity: AgentIdentity,
516 agent_runtime_id: AgentRuntimeId,
517 fence_token: FenceToken,
518 ) -> Self {
519 Self {
520 agent_identity,
521 agent_runtime_id,
522 fence_token,
523 }
524 }
525}
526
527#[derive(Clone)]
528pub(crate) struct CanonicalOpsOwnerContext {
529 pub(crate) owner_bridge_session_id: SessionId,
530 pub(crate) ops_registry: Arc<dyn OpsLifecycleRegistry>,
531}
532
533#[derive(Debug, thiserror::Error)]
535#[non_exhaustive]
536pub enum MobRespawnError {
537 #[error("no runtime control channel for member {identity}")]
539 NoRuntimeControl { identity: AgentIdentity },
540
541 #[error("spawn failed after retire for member {identity}: {reason}")]
543 SpawnAfterRetire {
544 identity: AgentIdentity,
545 reason: String,
546 },
547
548 #[error("topology restore failed for member {}: {} peer(s) failed", receipt.identity, failed_peer_ids.len())]
551 TopologyRestoreFailed {
552 receipt: MemberRespawnReceipt,
553 failed_peer_ids: Vec<AgentIdentity>,
554 },
555
556 #[error("previous member cleanup ambiguous for member {}", report.identity)]
559 PreviousMemberCleanupAmbiguous { report: PreviousMemberCleanupReport },
560
561 #[error(transparent)]
563 Mob(#[from] MobError),
564}
565
566#[derive(Debug, Clone, Serialize)]
568#[non_exhaustive]
569pub struct MemberDeliveryReceipt {
570 pub identity: AgentIdentity,
572 pub handling_mode: HandlingMode,
574 #[serde(skip)]
576 pub(crate) agent_runtime_id: AgentRuntimeId,
577 #[serde(skip)]
579 pub(crate) fence_token: FenceToken,
580}
581
582#[derive(Debug, Clone, Serialize)]
584#[non_exhaustive]
585pub struct PeerMessageReceipt {
586 pub from: AgentIdentity,
588 pub to: AgentIdentity,
590 pub envelope_id: uuid::Uuid,
592 pub acked: bool,
594 pub handling_mode: HandlingMode,
596}
597
598#[derive(Debug, Clone, Serialize)]
600#[non_exhaustive]
601pub struct WorkDeliveryReceipt {
602 pub work_ref: WorkRef,
604 #[serde(skip)]
606 pub(crate) runtime_id: AgentRuntimeId,
607}
608
609#[derive(Debug, Clone, Default)]
611#[non_exhaustive]
612pub struct HelperOptions {
613 pub role_name: Option<ProfileName>,
615 pub runtime_mode: Option<crate::MobRuntimeMode>,
617 pub backend: Option<MobBackendKind>,
619 pub tool_access_policy: Option<meerkat_core::ops::ToolAccessPolicy>,
621 pub auth_binding: Option<meerkat_core::AuthBindingRef>,
623 pub inherited_tool_filter: Option<meerkat_core::WitnessedToolFilter>,
625 pub override_profile: Option<crate::profile::Profile>,
627 pub model_override: Option<String>,
629 pub provider_params_override: Option<serde_json::Value>,
631}
632
633#[derive(Debug, Clone, Serialize)]
635#[non_exhaustive]
636pub struct HelperResult {
637 pub output: Option<String>,
639 pub tokens_used: u64,
641 pub agent_identity: AgentIdentity,
643 #[serde(skip)]
647 pub(crate) agent_runtime_id: AgentRuntimeId,
648 #[serde(skip)]
652 pub(crate) fence_token: FenceToken,
653}
654
655#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
657#[serde(rename_all = "snake_case")]
658pub enum PeerTarget {
659 Local(AgentIdentity),
661 ExternalName(meerkat_core::comms::PeerName),
663 ExternalBinding(ExternalPeerBindingSpec),
665 External(TrustedPeerDescriptor),
667}
668
669#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
671pub struct MobWireMembersBatchReport {
672 pub requested: usize,
674 pub already_wired: Vec<crate::event::MemberWireEdge>,
676 pub wired: Vec<crate::event::MemberWireEdge>,
678}
679
680#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
687pub struct ExternalPeerBindingSpec {
688 pub name: String,
689 pub address: String,
690 pub identity: meerkat_contracts::WireTrustedPeerIdentity,
691}
692
693impl ExternalPeerBindingSpec {
694 pub fn new(
695 name: impl Into<String>,
696 address: impl Into<String>,
697 identity: meerkat_contracts::WireTrustedPeerIdentity,
698 ) -> Self {
699 Self {
700 name: name.into(),
701 address: address.into(),
702 identity,
703 }
704 }
705}
706
707impl From<AgentIdentity> for PeerTarget {
711 fn from(value: AgentIdentity) -> Self {
712 Self::Local(value)
713 }
714}
715
716#[derive(Clone)]
729pub struct MobHandle {
730 pub(super) command_tx: mpsc::Sender<MobCommand>,
731 pub(super) roster: Arc<RwLock<RosterAuthority>>,
732 pub(super) definition: Arc<MobDefinition>,
733 pub(super) events: Arc<dyn MobEventStore>,
734 pub(super) run_store: Arc<dyn MobRunStore>,
735 pub(super) flow_streams:
736 Arc<tokio::sync::Mutex<BTreeMap<RunId, mpsc::Sender<meerkat_core::ScopedAgentEvent>>>>,
737 pub(super) session_service: Arc<dyn MobSessionService>,
738 #[cfg(feature = "runtime-adapter")]
739 pub(super) runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
740 pub(super) restore_diagnostics: Arc<RwLock<HashMap<MeerkatId, RestoreFailureDiagnostic>>>,
741 pub(super) machine_state_watch_rx: tokio::sync::watch::Receiver<mob_dsl::MobMachineState>,
746 pub(super) phase_watch_rx: tokio::sync::watch::Receiver<MobState>,
754 pub(super) realtime_session_factory: Option<Arc<dyn meerkat_client::RealtimeSessionFactory>>,
763}
764
765impl MobHandle {
766 pub fn realtime_session_factory(
769 &self,
770 ) -> Option<Arc<dyn meerkat_client::RealtimeSessionFactory>> {
771 self.realtime_session_factory.as_ref().map(Arc::clone)
772 }
773
774 async fn member_machine_projection(
775 &self,
776 agent_identity: &MeerkatId,
777 ) -> super::state::MobMemberMachineProjection {
778 self.send_actor_command(
779 |reply_tx| super::state::MobCommand::MemberMachineProjection {
780 agent_identity: AgentIdentity::from(agent_identity.as_str()),
781 reply_tx,
782 },
783 )
784 .await
785 .unwrap_or_default()
786 }
787}
788
789#[derive(Debug, Clone)]
790pub(crate) struct RestoreFailureDiagnostic {
791 pub(crate) bridge_session_id: Option<SessionId>,
792 pub(crate) reason: String,
793}
794
795#[derive(Clone)]
801pub struct MemberHandle {
802 mob: MobHandle,
803 agent_identity: MeerkatId,
804}
805
806#[derive(Clone)]
807pub struct MobEventsView {
808 handle: MobHandle,
809}
810
811#[derive(Debug, Clone, Copy)]
813#[non_exhaustive]
814pub struct MobEventsSubscriptionConfig {
815 pub after_cursor: Option<u64>,
817 pub batch_limit: usize,
819 pub channel_capacity: usize,
821}
822
823impl Default for MobEventsSubscriptionConfig {
824 fn default() -> Self {
825 Self {
826 after_cursor: None,
827 batch_limit: 128,
828 channel_capacity: 256,
829 }
830 }
831}
832
833pub struct MobEventsSubscription {
839 pub event_rx: mpsc::Receiver<crate::event::MobEvent>,
840 cancel: CancellationToken,
841}
842
843impl MobEventsSubscription {
844 pub fn cancel(&self) {
845 self.cancel.cancel();
846 }
847}
848
849impl Drop for MobEventsSubscription {
850 fn drop(&mut self) {
851 self.cancel.cancel();
852 }
853}
854
855#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
857#[serde(rename_all = "snake_case")]
858#[non_exhaustive]
859pub enum SpawnSource {
860 Consumer,
861 AgentSpawnMember,
862 HelperSpawn,
863 BatchItem,
864 PolicySpawn,
865 Respawn,
866 Resume,
867 Fork,
868}
869
870impl SpawnSource {
871 #[must_use]
872 pub fn as_str(self) -> &'static str {
873 match self {
874 Self::Consumer => "consumer",
875 Self::AgentSpawnMember => "agent_spawn_member",
876 Self::HelperSpawn => "helper_spawn",
877 Self::BatchItem => "batch_item",
878 Self::PolicySpawn => "policy_spawn",
879 Self::Respawn => "respawn",
880 Self::Resume => "resume",
881 Self::Fork => "fork",
882 }
883 }
884
885 #[must_use]
886 fn for_launch_mode(base: Self, launch_mode: &crate::launch::MemberLaunchMode) -> Self {
887 match launch_mode {
888 crate::launch::MemberLaunchMode::Resume { .. } => Self::Resume,
889 crate::launch::MemberLaunchMode::Fork { .. } => Self::Fork,
890 crate::launch::MemberLaunchMode::Fresh => base,
891 }
892 }
893}
894
895#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
897#[serde(rename_all = "snake_case")]
898#[non_exhaustive]
899pub enum SpawnSystemPromptOverride {
900 Replace(String),
901}
902
903#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
905#[serde(tag = "type", rename_all = "snake_case")]
906#[non_exhaustive]
907pub enum SpawnContinuityIntent {
908 #[default]
909 Ephemeral,
910 DurableIdentity {
911 continuity_key: String,
912 },
913}
914
915#[derive(Debug, Clone, PartialEq, Eq)]
917#[non_exhaustive]
918pub struct SpawnCustomizationContext {
919 pub mob_id: MobId,
920 pub spawn_source: SpawnSource,
921 pub spawner_identity: Option<AgentIdentity>,
922 pub spawner_runtime_id: Option<AgentRuntimeId>,
923 pub requested_profile: ProfileName,
924}
925
926pub trait SpawnMemberCustomizer: Send + Sync {
928 fn customize_spawn(
929 &self,
930 ctx: &SpawnCustomizationContext,
931 spec: &mut SpawnMemberSpec,
932 ) -> Result<(), MobError>;
933}
934
935#[derive(Clone)]
937#[non_exhaustive]
938pub struct SpawnMemberSpec {
939 pub role_name: ProfileName,
944 pub identity: AgentIdentity,
945 pub initial_message: Option<ContentInput>,
946 pub runtime_mode: Option<crate::MobRuntimeMode>,
947 pub backend: Option<MobBackendKind>,
948 pub binding: Option<crate::RuntimeBinding>,
952 pub context: Option<serde_json::Value>,
954 pub labels: Option<std::collections::BTreeMap<String, String>>,
956 pub launch_mode: crate::launch::MemberLaunchMode,
965 pub tool_access_policy: Option<meerkat_core::ops::ToolAccessPolicy>,
967 pub budget_split_policy: Option<crate::launch::BudgetSplitPolicy>,
969 pub auto_wire_parent: bool,
971 pub additional_instructions: Option<Vec<String>>,
973 pub shell_env: Option<std::collections::HashMap<String, String>>,
975 pub inherited_tool_filter: Option<meerkat_core::WitnessedToolFilter>,
981 pub override_profile: Option<crate::profile::Profile>,
988 pub model_override: Option<String>,
990 pub provider_params_override: Option<serde_json::Value>,
992 pub auth_binding: Option<meerkat_core::AuthBindingRef>,
998 pub external_tools: Option<Arc<dyn AgentToolDispatcher>>,
1000 pub system_prompt_override: Option<SpawnSystemPromptOverride>,
1002 pub continuity_intent: SpawnContinuityIntent,
1004}
1005
1006impl std::fmt::Debug for SpawnMemberSpec {
1007 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1008 f.debug_struct("SpawnMemberSpec")
1009 .field("role_name", &self.role_name)
1010 .field("identity", &self.identity)
1011 .field("initial_message", &self.initial_message)
1012 .field("runtime_mode", &self.runtime_mode)
1013 .field("backend", &self.backend)
1014 .field("binding", &self.binding)
1015 .field("context", &self.context)
1016 .field("labels", &self.labels)
1017 .field("launch_mode", &self.launch_mode)
1018 .field("tool_access_policy", &self.tool_access_policy)
1019 .field("budget_split_policy", &self.budget_split_policy)
1020 .field("auto_wire_parent", &self.auto_wire_parent)
1021 .field("additional_instructions", &self.additional_instructions)
1022 .field("shell_env", &self.shell_env)
1023 .field("inherited_tool_filter", &self.inherited_tool_filter)
1024 .field("override_profile", &self.override_profile)
1025 .field("model_override", &self.model_override)
1026 .field("provider_params_override", &self.provider_params_override)
1027 .field("auth_binding", &self.auth_binding)
1028 .field("external_tools", &self.external_tools.is_some())
1029 .field("system_prompt_override", &self.system_prompt_override)
1030 .field("continuity_intent", &self.continuity_intent)
1031 .finish()
1032 }
1033}
1034
1035impl SpawnMemberSpec {
1036 pub fn new(profile: impl Into<ProfileName>, identity: impl Into<AgentIdentity>) -> Self {
1037 Self {
1038 role_name: profile.into(),
1039 identity: identity.into(),
1040 initial_message: None,
1041 runtime_mode: None,
1042 backend: None,
1043 binding: None,
1044 context: None,
1045 labels: None,
1046 launch_mode: crate::launch::MemberLaunchMode::Fresh,
1047 tool_access_policy: None,
1048 budget_split_policy: None,
1049 auto_wire_parent: false,
1050 additional_instructions: None,
1051 shell_env: None,
1052 inherited_tool_filter: None,
1053 override_profile: None,
1054 model_override: None,
1055 provider_params_override: None,
1056 auth_binding: None,
1057 external_tools: None,
1058 system_prompt_override: None,
1059 continuity_intent: SpawnContinuityIntent::Ephemeral,
1060 }
1061 }
1062
1063 pub fn with_auth_binding(mut self, conn_ref: meerkat_core::AuthBindingRef) -> Self {
1065 self.auth_binding = Some(conn_ref);
1066 self
1067 }
1068
1069 pub fn with_shell_env(mut self, env: std::collections::HashMap<String, String>) -> Self {
1070 self.shell_env = Some(env);
1071 self
1072 }
1073
1074 pub fn with_initial_message(mut self, message: impl Into<ContentInput>) -> Self {
1075 self.initial_message = Some(message.into());
1076 self
1077 }
1078
1079 pub fn with_runtime_mode(mut self, mode: crate::MobRuntimeMode) -> Self {
1080 self.runtime_mode = Some(mode);
1081 self
1082 }
1083
1084 pub fn with_backend(mut self, backend: MobBackendKind) -> Self {
1085 self.backend = Some(backend);
1086 self
1087 }
1088
1089 pub fn with_context(mut self, context: serde_json::Value) -> Self {
1090 self.context = Some(context);
1091 self
1092 }
1093
1094 pub fn with_labels(mut self, labels: std::collections::BTreeMap<String, String>) -> Self {
1095 self.labels = Some(labels);
1096 self
1097 }
1098
1099 pub fn with_resume_bridge_session_id(mut self, id: meerkat_core::types::SessionId) -> Self {
1108 self.launch_mode = crate::launch::MemberLaunchMode::Resume {
1109 bridge_session_id: id,
1110 };
1111 self
1112 }
1113
1114 pub fn with_launch_mode(mut self, mode: crate::launch::MemberLaunchMode) -> Self {
1122 self.launch_mode = mode;
1123 self
1124 }
1125
1126 pub fn with_tool_access_policy(mut self, policy: meerkat_core::ops::ToolAccessPolicy) -> Self {
1127 self.tool_access_policy = Some(policy);
1128 self
1129 }
1130
1131 pub fn with_budget_split_policy(mut self, policy: crate::launch::BudgetSplitPolicy) -> Self {
1132 self.budget_split_policy = Some(policy);
1133 self
1134 }
1135
1136 pub fn with_auto_wire_parent(mut self, auto_wire: bool) -> Self {
1137 self.auto_wire_parent = auto_wire;
1138 self
1139 }
1140
1141 pub fn with_additional_instructions(mut self, instructions: Vec<String>) -> Self {
1142 self.additional_instructions = Some(instructions);
1143 self
1144 }
1145
1146 pub fn from_wire(
1147 profile: String,
1148 agent_identity: String,
1149 initial_message: Option<ContentInput>,
1150 runtime_mode: Option<crate::MobRuntimeMode>,
1151 backend: Option<MobBackendKind>,
1152 ) -> Self {
1153 let mut spec = Self::new(profile, agent_identity);
1154 spec.initial_message = initial_message;
1155 spec.runtime_mode = runtime_mode;
1156 spec.backend = backend;
1157 spec
1158 }
1159}
1160
1161impl MobEventsView {
1162 pub async fn latest_cursor(&self) -> Result<u64, MobError> {
1163 self.handle
1164 .events
1165 .latest_cursor()
1166 .await
1167 .map_err(MobError::from)
1168 }
1169
1170 pub async fn subscribe(&self) -> Result<MobEventsSubscription, MobError> {
1176 self.subscribe_with_config(MobEventsSubscriptionConfig::default())
1177 .await
1178 }
1179
1180 pub async fn subscribe_after(
1182 &self,
1183 after_cursor: u64,
1184 ) -> Result<MobEventsSubscription, MobError> {
1185 self.subscribe_with_config(MobEventsSubscriptionConfig {
1186 after_cursor: Some(after_cursor),
1187 ..MobEventsSubscriptionConfig::default()
1188 })
1189 .await
1190 }
1191
1192 pub async fn subscribe_with_config(
1194 &self,
1195 config: MobEventsSubscriptionConfig,
1196 ) -> Result<MobEventsSubscription, MobError> {
1197 let config = MobEventsSubscriptionConfig {
1198 batch_limit: config.batch_limit.max(1),
1199 channel_capacity: config.channel_capacity.max(1),
1200 ..config
1201 };
1202 let explicit_after_cursor = config.after_cursor.is_some();
1203 let source_rx = self.handle.events.subscribe().map_err(MobError::from)?;
1204 let after_cursor = match config.after_cursor {
1205 Some(cursor) => {
1206 let latest_cursor = self.latest_cursor().await?;
1207 if cursor > latest_cursor {
1208 return Err(MobError::StaleEventCursor {
1209 after_cursor: cursor,
1210 latest_cursor,
1211 });
1212 }
1213 cursor
1214 }
1215 None => self.latest_cursor().await?,
1216 };
1217 Ok(spawn_structural_event_subscription(
1218 self.clone(),
1219 source_rx,
1220 after_cursor,
1221 explicit_after_cursor,
1222 config,
1223 ))
1224 }
1225
1226 pub async fn poll(
1227 &self,
1228 after_cursor: u64,
1229 limit: usize,
1230 ) -> Result<Vec<crate::event::MobEvent>, MobError> {
1231 match self
1232 .handle
1233 .execute_machine_command(MobMachineCommand::PollEvents {
1234 after_cursor,
1235 limit,
1236 })
1237 .await?
1238 {
1239 MobMachineCommandResult::MobEvents(events) => Ok(events),
1240 _ => Err(MobError::Internal(
1241 "unexpected command result variant".into(),
1242 )),
1243 }
1244 }
1245
1246 pub async fn poll_strict(
1247 &self,
1248 after_cursor: u64,
1249 limit: usize,
1250 ) -> Result<Vec<crate::event::MobEvent>, MobError> {
1251 let latest_cursor = self.latest_cursor().await?;
1252 if after_cursor > latest_cursor {
1253 return Err(MobError::StaleEventCursor {
1254 after_cursor,
1255 latest_cursor,
1256 });
1257 }
1258 self.poll(after_cursor, limit).await
1259 }
1260
1261 pub async fn replay_all(&self) -> Result<Vec<crate::event::MobEvent>, MobError> {
1262 match self
1263 .handle
1264 .execute_machine_command(MobMachineCommand::ReplayAllEvents)
1265 .await?
1266 {
1267 MobMachineCommandResult::MobEvents(events) => Ok(events),
1268 _ => Err(MobError::Internal(
1269 "unexpected command result variant".into(),
1270 )),
1271 }
1272 }
1273}
1274
1275#[allow(clippy::ignored_unit_patterns)]
1276fn spawn_structural_event_subscription(
1277 events: MobEventsView,
1278 mut source_rx: crate::store::MobEventReceiver,
1279 mut cursor: u64,
1280 catch_up_on_start: bool,
1281 config: MobEventsSubscriptionConfig,
1282) -> MobEventsSubscription {
1283 let (event_tx, event_rx) = mpsc::channel(config.channel_capacity);
1284 let cancel = CancellationToken::new();
1285 let cancel_clone = cancel.clone();
1286
1287 tokio::spawn(async move {
1288 if catch_up_on_start
1289 && !catch_up_structural_events(&events, &event_tx, &mut cursor, config.batch_limit)
1290 .await
1291 {
1292 return;
1293 }
1294
1295 loop {
1296 tokio::select! {
1297 () = cancel_clone.cancelled() => break,
1298 received = source_rx.recv() => {
1299 match received {
1300 Ok(event) => {
1301 if event.cursor > cursor.saturating_add(1)
1302 && !catch_up_structural_events(
1303 &events,
1304 &event_tx,
1305 &mut cursor,
1306 config.batch_limit,
1307 )
1308 .await
1309 {
1310 return;
1311 }
1312 if event.cursor > cursor {
1313 cursor = event.cursor;
1314 if event_tx.send(event).await.is_err() {
1315 return;
1316 }
1317 }
1318 }
1319 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
1320 if !catch_up_structural_events(
1321 &events,
1322 &event_tx,
1323 &mut cursor,
1324 config.batch_limit,
1325 )
1326 .await
1327 {
1328 return;
1329 }
1330 }
1331 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1332 }
1333 }
1334 }
1335 }
1336 });
1337
1338 MobEventsSubscription { event_rx, cancel }
1339}
1340
1341async fn catch_up_structural_events(
1342 events: &MobEventsView,
1343 event_tx: &mpsc::Sender<crate::event::MobEvent>,
1344 cursor: &mut u64,
1345 batch_limit: usize,
1346) -> bool {
1347 loop {
1348 let batch = match events.poll(*cursor, batch_limit).await {
1349 Ok(batch) => batch,
1350 Err(error) => {
1351 tracing::warn!(
1352 error = %error,
1353 "mob structural event subscription stopped after catch-up failure",
1354 );
1355 return false;
1356 }
1357 };
1358 if batch.is_empty() {
1359 return true;
1360 }
1361
1362 let is_complete = batch.len() < batch_limit;
1363 for event in batch {
1364 if event.cursor <= *cursor {
1365 continue;
1366 }
1367 *cursor = event.cursor;
1368 if event_tx.send(event).await.is_err() {
1369 return false;
1370 }
1371 }
1372
1373 if is_complete {
1374 return true;
1375 }
1376 }
1377}
1378
1379impl MobHandle {
1380 async fn restore_failure_for(
1381 &self,
1382 agent_identity: &MeerkatId,
1383 ) -> Option<RestoreFailureDiagnostic> {
1384 self.restore_diagnostics
1385 .read()
1386 .await
1387 .get(agent_identity)
1388 .cloned()
1389 }
1390
1391 fn restore_failure_error(
1392 agent_identity: &MeerkatId,
1393 diag: RestoreFailureDiagnostic,
1394 ) -> MobError {
1395 MobError::MemberRestoreFailed {
1396 member_id: agent_identity.clone(),
1397 session_id: diag.bridge_session_id,
1398 reason: diag.reason,
1399 }
1400 }
1401
1402 async fn send_actor_command<R>(
1403 &self,
1404 build: impl FnOnce(oneshot::Sender<R>) -> MobCommand,
1405 ) -> Result<R, MobError> {
1406 let (reply_tx, reply_rx) = oneshot::channel();
1407 self.command_tx
1408 .send(build(reply_tx))
1409 .await
1410 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1411 reply_rx
1412 .await
1413 .map_err(|_| MobError::Internal("actor reply dropped".into()))
1414 }
1415
1416 async fn execute_machine_command(
1417 &self,
1418 command: MobMachineCommand,
1419 ) -> Result<MobMachineCommandResult, MobError> {
1420 match command {
1421 MobMachineCommand::RunFlow {
1422 flow_id,
1423 activation_params,
1424 scoped_event_tx,
1425 } => {
1426 let run_id = self
1427 .send_actor_command(|reply_tx| MobCommand::RunFlow {
1428 flow_id,
1429 activation_params,
1430 scoped_event_tx,
1431 reply_tx,
1432 })
1433 .await??;
1434 Ok(MobMachineCommandResult::RunId(run_id))
1435 }
1436 MobMachineCommand::CancelFlow { run_id } => {
1437 self.send_actor_command(|reply_tx| MobCommand::CancelFlow { run_id, reply_tx })
1438 .await??;
1439 Ok(MobMachineCommandResult::Unit)
1440 }
1441 MobMachineCommand::FlowStatus { run_id } => {
1442 let status = self
1443 .send_actor_command(|reply_tx| MobCommand::FlowStatus { run_id, reply_tx })
1444 .await??;
1445 Ok(MobMachineCommandResult::FlowStatus(status))
1446 }
1447 MobMachineCommand::Spawn {
1448 spec,
1449 spawn_source,
1450 owner_context,
1451 } => {
1452 let (owner_bridge_session_id, ops_registry) = match owner_context {
1453 Some(ctx) => (Some(ctx.owner_bridge_session_id), Some(ctx.ops_registry)),
1454 None => (None, None),
1455 };
1456 let receipt = self
1457 .send_actor_command(|reply_tx| MobCommand::Spawn {
1458 spec,
1459 spawn_source,
1460 owner_bridge_session_id,
1461 ops_registry,
1462 reply_tx,
1463 })
1464 .await??;
1465 Ok(MobMachineCommandResult::SpawnReceipt(receipt))
1466 }
1467 MobMachineCommand::EnsureMember { spec } => {
1468 let outcome = self.handle_ensure_member(*spec).await?;
1469 Ok(MobMachineCommandResult::EnsureMember(outcome))
1470 }
1471 MobMachineCommand::Reconcile { desired, options } => {
1472 let report = self.handle_reconcile(desired, options).await?;
1473 Ok(MobMachineCommandResult::Reconcile(Box::new(report)))
1474 }
1475 MobMachineCommand::ListMembersMatching { filter } => {
1476 let members = self.handle_list_members_matching(*filter).await;
1477 Ok(MobMachineCommandResult::ListMembers(members))
1478 }
1479 MobMachineCommand::Retire { agent_identity } => {
1480 self.send_actor_command(|reply_tx| MobCommand::Retire {
1481 agent_identity,
1482 reply_tx,
1483 })
1484 .await??;
1485 Ok(MobMachineCommandResult::Unit)
1486 }
1487 MobMachineCommand::Respawn {
1488 agent_identity,
1489 initial_message,
1490 } => {
1491 let receipt = self
1492 .send_actor_command(|reply_tx| MobCommand::Respawn {
1493 agent_identity,
1494 initial_message,
1495 reply_tx,
1496 })
1497 .await?;
1498 Ok(MobMachineCommandResult::Respawn(receipt))
1499 }
1500 MobMachineCommand::RetireAll => {
1501 self.send_actor_command(|reply_tx| MobCommand::RetireAll { reply_tx })
1502 .await??;
1503 Ok(MobMachineCommandResult::Unit)
1504 }
1505 MobMachineCommand::SubmitWork(cmd) => {
1506 let crate::mob_machine::SubmitWorkCommand {
1511 runtime_id,
1512 fence_token,
1513 work_ref,
1514 spec,
1515 handling_mode,
1516 render_metadata,
1517 ack_mode,
1518 } = *cmd;
1519 let receipt_work_ref = work_ref.clone();
1520 let payload = Box::new(super::state::SubmitWorkPayload {
1521 runtime_id,
1522 fence_token,
1523 work_ref,
1524 content: spec.content,
1525 origin: spec.origin,
1526 handling_mode,
1527 render_metadata,
1528 ack_mode,
1529 });
1530 self.send_actor_command(|reply_tx| MobCommand::SubmitWork { payload, reply_tx })
1531 .await??;
1532 Ok(MobMachineCommandResult::WorkReceipt {
1533 work_ref: receipt_work_ref,
1534 })
1535 }
1536 MobMachineCommand::CancelWork { work_ref } => {
1537 Err(MobError::WorkNotFound(work_ref))
1540 }
1541 MobMachineCommand::CancelAllWork {
1542 runtime_id,
1543 fence_token,
1544 } => {
1545 self.send_actor_command(|reply_tx| MobCommand::CancelAllWork {
1552 runtime_id,
1553 fence_token,
1554 reply_tx,
1555 })
1556 .await??;
1557 Ok(MobMachineCommandResult::Unit)
1558 }
1559 MobMachineCommand::Stop => {
1560 self.send_actor_command(|reply_tx| MobCommand::Stop { reply_tx })
1561 .await??;
1562 Ok(MobMachineCommandResult::Unit)
1563 }
1564 MobMachineCommand::Resume => {
1565 self.send_actor_command(|reply_tx| MobCommand::ResumeLifecycle { reply_tx })
1566 .await??;
1567 Ok(MobMachineCommandResult::Unit)
1568 }
1569 MobMachineCommand::Complete => {
1570 self.send_actor_command(|reply_tx| MobCommand::Complete { reply_tx })
1571 .await??;
1572 Ok(MobMachineCommandResult::Unit)
1573 }
1574 MobMachineCommand::Reset => {
1575 self.send_actor_command(|reply_tx| MobCommand::Reset { reply_tx })
1576 .await??;
1577 Ok(MobMachineCommandResult::Unit)
1578 }
1579 MobMachineCommand::Destroy => {
1580 let reply = self
1581 .send_actor_command(|reply_tx| MobCommand::Destroy { reply_tx })
1582 .await?;
1583 match reply {
1584 Ok(report) => Ok(MobMachineCommandResult::DestroyReport(report)),
1585 Err(MobDestroyError::Mob(error)) => Err(error),
1586 Err(MobDestroyError::Incomplete { report }) => Err(MobError::Internal(
1587 format!("destroy incomplete: {}", report.error_summary()),
1588 )),
1589 }
1590 }
1591 MobMachineCommand::RosterSnapshot => {
1592 let roster = self.roster.read().await.snapshot();
1593 Ok(MobMachineCommandResult::RosterSnapshot(roster))
1594 }
1595 MobMachineCommand::ListMembers => {
1596 let members = self
1597 .send_actor_command(|reply_tx| MobCommand::ProjectMemberList {
1598 include_retiring: false,
1599 reply_tx,
1600 })
1601 .await?;
1602 Ok(MobMachineCommandResult::ListMembers(members))
1603 }
1604 MobMachineCommand::ListMembersIncludingRetiring => {
1605 let members = self
1606 .send_actor_command(|reply_tx| MobCommand::ProjectMemberList {
1607 include_retiring: true,
1608 reply_tx,
1609 })
1610 .await?;
1611 Ok(MobMachineCommandResult::ListMembersIncludingRetiring(
1612 members,
1613 ))
1614 }
1615 MobMachineCommand::ListAllMembers => {
1616 let members = self.roster.read().await.list_all().cloned().collect();
1617 Ok(MobMachineCommandResult::ListAllMembers(members))
1618 }
1619 MobMachineCommand::MemberStatus { agent_identity } => {
1620 let snapshot = self
1621 .send_actor_command(|reply_tx| MobCommand::ProjectMemberStatus {
1622 agent_identity: AgentIdentity::from(agent_identity.as_str()),
1623 reply_tx,
1624 })
1625 .await?;
1626 Ok(MobMachineCommandResult::MemberStatus(snapshot))
1627 }
1628 MobMachineCommand::SubscribeAgentEvents { agent_identity } => {
1629 let stream = self
1630 .send_actor_command(|reply_tx| MobCommand::SubscribeAgentEvents {
1631 agent_identity,
1632 reply_tx,
1633 })
1634 .await??;
1635 Ok(MobMachineCommandResult::EventStream(stream))
1636 }
1637 MobMachineCommand::SubscribeAllAgentEvents => {
1638 let streams = self
1639 .send_actor_command(|reply_tx| MobCommand::SubscribeAllAgentEvents { reply_tx })
1640 .await??;
1641 Ok(MobMachineCommandResult::AllAgentEventStreams(streams))
1642 }
1643 MobMachineCommand::SubscribeMobEvents { config } => {
1644 Ok(MobMachineCommandResult::MobEventRouter(
1645 super::event_router::spawn_event_router(self.clone(), config),
1646 ))
1647 }
1648 MobMachineCommand::PollEvents {
1649 after_cursor,
1650 limit,
1651 } => {
1652 let events = if self.status().await? == MobState::Destroyed {
1653 self.events
1654 .poll(after_cursor, limit)
1655 .await
1656 .map_err(MobError::from)?
1657 } else {
1658 self.send_actor_command(|reply_tx| MobCommand::PollEvents {
1659 after_cursor,
1660 limit,
1661 reply_tx,
1662 })
1663 .await??
1664 };
1665 Ok(MobMachineCommandResult::MobEvents(events))
1666 }
1667 MobMachineCommand::ReplayAllEvents => {
1668 let events = if self.status().await? == MobState::Destroyed {
1669 self.events.replay_all().await.map_err(MobError::from)?
1670 } else {
1671 self.send_actor_command(|reply_tx| MobCommand::ReplayAllEvents { reply_tx })
1672 .await??
1673 };
1674 Ok(MobMachineCommandResult::MobEvents(events))
1675 }
1676 MobMachineCommand::RecordOperatorActionProvenance {
1677 tool_name,
1678 authority_context,
1679 } => {
1680 self.send_actor_command(|reply_tx| MobCommand::RecordOperatorActionProvenance {
1681 tool_name,
1682 authority_context,
1683 reply_tx,
1684 })
1685 .await??;
1686 Ok(MobMachineCommandResult::Unit)
1687 }
1688 MobMachineCommand::GetMember { agent_identity } => {
1689 let member = self.roster.read().await.entry(&agent_identity);
1690 Ok(MobMachineCommandResult::GetMember(member))
1691 }
1692 #[cfg(test)]
1693 MobMachineCommand::FlowTrackerCounts => {
1694 let counts = self
1695 .send_actor_command(|reply_tx| MobCommand::FlowTrackerCounts { reply_tx })
1696 .await?;
1697 Ok(MobMachineCommandResult::FlowTrackerCounts(counts))
1698 }
1699 #[cfg(test)]
1700 MobMachineCommand::OrchestratorSnapshot => {
1701 let snapshot = self
1702 .send_actor_command(|reply_tx| MobCommand::OrchestratorSnapshot { reply_tx })
1703 .await?;
1704 Ok(MobMachineCommandResult::OrchestratorSnapshot(snapshot))
1705 }
1706 #[cfg(test)]
1707 MobMachineCommand::LifecycleSnapshot => {
1708 let snapshot = self
1709 .send_actor_command(|reply_tx| MobCommand::LifecycleSnapshot { reply_tx })
1710 .await?;
1711 Ok(MobMachineCommandResult::LifecycleSnapshot(snapshot))
1712 }
1713 #[cfg(test)]
1714 MobMachineCommand::LifecycleNotificationBurst { count, message } => {
1715 self.send_actor_command(|reply_tx| MobCommand::LifecycleNotificationBurst {
1716 count,
1717 message,
1718 reply_tx,
1719 })
1720 .await??;
1721 Ok(MobMachineCommandResult::LifecycleNotificationBurst)
1722 }
1723 #[cfg(test)]
1724 MobMachineCommand::DslT2Snapshot => {
1725 let snapshot = self
1726 .send_actor_command(|reply_tx| MobCommand::DslT2Snapshot { reply_tx })
1727 .await?;
1728 Ok(MobMachineCommandResult::DslT2Snapshot(snapshot))
1729 }
1730 MobMachineCommand::SetSpawnPolicy { policy } => {
1731 self.send_actor_command(|reply_tx| MobCommand::SetSpawnPolicy { policy, reply_tx })
1732 .await??;
1733 Ok(MobMachineCommandResult::Unit)
1734 }
1735 MobMachineCommand::Shutdown => {
1736 self.send_actor_command(|reply_tx| MobCommand::Shutdown { reply_tx })
1737 .await??;
1738 Ok(MobMachineCommandResult::Unit)
1739 }
1740 MobMachineCommand::ForceCancel { agent_identity } => {
1741 self.send_actor_command(|reply_tx| MobCommand::ForceCancel {
1742 agent_identity,
1743 reply_tx,
1744 })
1745 .await??;
1746 Ok(MobMachineCommandResult::Unit)
1747 }
1748 MobMachineCommand::Wire { local, target } => {
1749 self.send_actor_command(|reply_tx| MobCommand::Wire {
1750 local,
1751 target,
1752 reply_tx,
1753 })
1754 .await??;
1755 Ok(MobMachineCommandResult::Unit)
1756 }
1757 MobMachineCommand::WireMembersBatch { edges } => {
1758 let report = self
1759 .send_actor_command(|reply_tx| MobCommand::WireMembersBatch { edges, reply_tx })
1760 .await??;
1761 Ok(MobMachineCommandResult::WireMembersBatchReport(report))
1762 }
1763 MobMachineCommand::Unwire { local, target } => {
1764 self.send_actor_command(|reply_tx| MobCommand::Unwire {
1765 local,
1766 target,
1767 reply_tx,
1768 })
1769 .await??;
1770 Ok(MobMachineCommandResult::Unit)
1771 }
1772 }
1773 }
1774
1775 async fn execute_destroy_machine_command(
1776 &self,
1777 command: MobMachineCommand,
1778 ) -> Result<MobMachineCommandResult, MobDestroyError> {
1779 match command {
1780 MobMachineCommand::Destroy => {
1781 let reply = self
1782 .send_actor_command(|reply_tx| MobCommand::Destroy { reply_tx })
1783 .await
1784 .map_err(MobDestroyError::from)?;
1785 match reply {
1786 Ok(report) => Ok(MobMachineCommandResult::DestroyReport(report)),
1787 Err(error) => Err(error),
1788 }
1789 }
1790 _ => Err(MobDestroyError::from(MobError::Internal(
1791 "unsupported destroy machine command".into(),
1792 ))),
1793 }
1794 }
1795
1796 pub async fn poll_events(
1798 &self,
1799 after_cursor: u64,
1800 limit: usize,
1801 ) -> Result<Vec<crate::event::MobEvent>, MobError> {
1802 match self
1803 .execute_machine_command(MobMachineCommand::PollEvents {
1804 after_cursor,
1805 limit,
1806 })
1807 .await?
1808 {
1809 MobMachineCommandResult::MobEvents(events) => Ok(events),
1810 _ => Err(MobError::Internal(
1811 "unexpected command result variant".into(),
1812 )),
1813 }
1814 }
1815
1816 pub async fn status(&self) -> Result<MobState, MobError> {
1824 match self
1825 .send_actor_command(|reply_tx| MobCommand::QueryPhase { reply_tx })
1826 .await
1827 {
1828 Ok(state) => Ok(state),
1829 Err(MobError::Internal(_)) => Ok(*self.phase_watch_rx.borrow()),
1835 Err(other) => Err(other),
1836 }
1837 }
1838
1839 pub fn definition(&self) -> &MobDefinition {
1841 &self.definition
1842 }
1843
1844 pub fn mob_id(&self) -> &MobId {
1846 &self.definition.id
1847 }
1848
1849 pub async fn roster(&self) -> Roster {
1851 match self
1852 .execute_machine_command(MobMachineCommand::RosterSnapshot)
1853 .await
1854 {
1855 Ok(MobMachineCommandResult::RosterSnapshot(roster)) => roster,
1856 Ok(_) => {
1857 tracing::error!("unexpected command result variant");
1858 Default::default()
1859 }
1860 Err(_) => Roster::new(),
1861 }
1862 }
1863
1864 fn derived_comms_name(&self, entry: &RosterEntry) -> String {
1865 format!(
1866 "{}/{}/{}",
1867 self.definition.id, entry.role, entry.agent_identity
1868 )
1869 }
1870
1871 async fn resolve_peer_connectivity(
1872 &self,
1873 entry: &RosterEntry,
1874 bridge_session_id: &SessionId,
1875 roster_snapshot: &Roster,
1876 ) -> Option<MobPeerConnectivitySnapshot> {
1877 let comms = self
1878 .session_service
1879 .comms_runtime(bridge_session_id)
1880 .await?;
1881 let peers = comms.peers().await;
1882 let peers_by_id: HashMap<PeerId, &PeerDirectoryEntry> =
1883 peers.iter().map(|peer| (peer.peer_id, peer)).collect();
1884 let peers_by_name: HashMap<&str, &PeerDirectoryEntry> = peers
1885 .iter()
1886 .map(|peer| (peer.name.as_str(), peer))
1887 .collect();
1888
1889 let mut reachable_peer_count = 0usize;
1890 let mut unknown_peer_count = 0usize;
1891 let mut unreachable_peers = Vec::new();
1892
1893 for wired_peer in &entry.wired_to {
1894 let wired_peer_meerkat = MeerkatId::from(wired_peer);
1895 let matched = if let Some(spec) = entry.external_peer_specs.get(&wired_peer_meerkat) {
1896 peers_by_id
1897 .get(&spec.peer_id)
1898 .copied()
1899 .or_else(|| peers_by_name.get(spec.name.as_str()).copied())
1900 } else {
1901 let local_entry = roster_snapshot.get(&wired_peer_meerkat);
1902 let live_peer_id = match local_entry
1903 .and_then(|peer_entry| peer_entry.member_ref.bridge_session_id())
1904 {
1905 Some(target_session_id) => self
1906 .session_service
1907 .comms_runtime(target_session_id)
1908 .await
1909 .and_then(|runtime| runtime.peer_id()),
1910 None => None,
1911 };
1912 live_peer_id
1913 .and_then(|peer_id| peers_by_id.get(&peer_id).copied())
1914 .or_else(|| {
1915 local_entry
1916 .and_then(|peer_entry| peer_entry.peer_id.as_ref())
1917 .and_then(|peer_id| peers_by_id.get(peer_id).copied())
1918 })
1919 .or_else(|| {
1920 local_entry
1921 .map(|peer_entry| self.derived_comms_name(peer_entry))
1922 .and_then(|name| peers_by_name.get(name.as_str()).copied())
1923 })
1924 };
1925
1926 match matched {
1927 Some(peer) => match peer.reachability {
1928 PeerReachability::Reachable => reachable_peer_count += 1,
1929 PeerReachability::Unknown => unknown_peer_count += 1,
1930 PeerReachability::Unreachable => unreachable_peers.push(MobUnreachablePeer {
1931 peer: peer.name.as_string(),
1932 reason: peer.last_unreachable_reason,
1933 }),
1934 },
1935 None => unknown_peer_count += 1,
1936 }
1937 }
1938
1939 Some(MobPeerConnectivitySnapshot {
1940 reachable_peer_count,
1941 unknown_peer_count,
1942 unreachable_peers,
1943 })
1944 }
1945
1946 pub async fn list_members(&self) -> Vec<MobMemberListEntry> {
1955 match self
1956 .execute_machine_command(MobMachineCommand::ListMembers)
1957 .await
1958 {
1959 Ok(MobMachineCommandResult::ListMembers(entries)) => entries,
1960 Ok(_) => {
1961 tracing::error!("unexpected command result variant");
1962 Default::default()
1963 }
1964 Err(_) => Vec::new(),
1965 }
1966 }
1967
1968 pub async fn list_members_including_retiring(&self) -> Vec<MobMemberListEntry> {
1975 if let Some(entries) = self.inflight_retiring_member_list().await {
1976 return entries;
1977 }
1978 match self
1979 .execute_machine_command(MobMachineCommand::ListMembersIncludingRetiring)
1980 .await
1981 {
1982 Ok(MobMachineCommandResult::ListMembersIncludingRetiring(entries)) => entries,
1983 Ok(_) => {
1984 tracing::error!("unexpected command result variant");
1985 Default::default()
1986 }
1987 Err(_) => Vec::new(),
1988 }
1989 }
1990
1991 async fn inflight_retiring_member_list(&self) -> Option<Vec<MobMemberListEntry>> {
1992 let entries: Vec<_> = {
1993 let roster = self.roster.read().await;
1994 let entries: Vec<_> = roster.list_all().cloned().collect();
1995 if !entries
1996 .iter()
1997 .any(|entry| entry.state == crate::roster::MemberState::Retiring)
1998 {
1999 return None;
2000 }
2001 entries
2002 };
2003 let machine_state = self.machine_state_watch_rx.borrow().clone();
2004 Some(self.project_member_list_entries_from_machine_state(entries, &machine_state))
2005 }
2006
2007 fn project_member_list_entries_from_machine_state(
2008 &self,
2009 entries: Vec<RosterEntry>,
2010 machine_state: &mob_dsl::MobMachineState,
2011 ) -> Vec<MobMemberListEntry> {
2012 entries
2013 .into_iter()
2014 .map(|entry| {
2015 let domain_identity =
2016 crate::ids::AgentIdentity::from(entry.agent_identity.as_str());
2017 let dsl_identity = mob_dsl::AgentIdentity::from_domain(&domain_identity);
2018 let machine_bridge_session_id = machine_state
2019 .member_session_bindings
2020 .get(&dsl_identity)
2021 .and_then(|dsl_session_id| SessionId::parse(&dsl_session_id.0).ok());
2022 let current_bridge_session_id = entry
2023 .member_ref
2024 .bridge_session_id()
2025 .cloned()
2026 .or(machine_bridge_session_id);
2027 let material = MobMemberLifecycleProjection::materialize(MobMemberLifecycleInput {
2028 member_present: true,
2029 machine_lifecycle: machine_state
2030 .member_lifecycle_for_identity(&dsl_identity, true),
2031 output_preview: None,
2032 tokens_used: 0,
2033 agent_runtime_id: entry.agent_runtime_id.clone(),
2034 fence_token: entry.fence_token,
2035 current_bridge_session_id,
2036 peer_connectivity: None,
2037 kickoff: entry.kickoff.clone(),
2038 });
2039 let snapshot = material.to_snapshot();
2040 let current_bridge_session_id = snapshot.current_bridge_session_id().cloned();
2041 MobMemberListEntry {
2042 agent_identity: entry.agent_identity,
2043 agent_runtime_id: entry.agent_runtime_id,
2044 fence_token: entry.fence_token,
2045 role: entry.role,
2046 runtime_mode: entry.runtime_mode,
2047 peer_id: entry.peer_id,
2048 transport_public_key: entry.transport_public_key,
2049 state: entry.state,
2050 wired_to: entry.wired_to,
2051 external_peer_specs: entry.external_peer_specs,
2052 labels: entry.labels,
2053 status: snapshot.status,
2054 error: snapshot.error,
2055 is_final: snapshot.is_final,
2056 current_session_id: None,
2057 current_bridge_session_id: None,
2058 kickoff: snapshot.kickoff,
2059 }
2060 .with_current_bridge_session_id(current_bridge_session_id)
2061 })
2062 .collect()
2063 }
2064
2065 pub(crate) async fn list_runnable_members(&self) -> Vec<MobMemberListEntry> {
2070 self.list_members()
2071 .await
2072 .into_iter()
2073 .filter(|entry| {
2074 entry.state == crate::roster::MemberState::Active
2075 && entry.status == MobMemberStatus::Active
2076 })
2077 .collect()
2078 }
2079
2080 pub async fn list_all_members(&self) -> Vec<RosterEntry> {
2086 self.roster.read().await.list_all().cloned().collect()
2087 }
2088
2089 pub async fn get_member(&self, identity: &AgentIdentity) -> Option<RosterEntry> {
2091 let meerkat_id = MeerkatId::from(identity);
2092 match self
2093 .execute_machine_command(MobMachineCommand::GetMember {
2094 agent_identity: meerkat_id,
2095 })
2096 .await
2097 {
2098 Ok(MobMachineCommandResult::GetMember(entry)) => entry,
2099 Ok(_) => {
2100 tracing::error!("unexpected command result variant");
2101 Default::default()
2102 }
2103 Err(_) => None,
2104 }
2105 }
2106
2107 pub(crate) async fn get_member_by_meerkat_id(
2109 &self,
2110 agent_identity: &MeerkatId,
2111 ) -> Option<RosterEntry> {
2112 self.get_member(&AgentIdentity::from(agent_identity.as_str()))
2113 .await
2114 }
2115
2116 pub async fn resolve_bridge_session_id(&self, identity: &AgentIdentity) -> Option<SessionId> {
2154 self.get_member(identity)
2155 .await
2156 .and_then(|entry| entry.member_ref.bridge_session_id().cloned())
2157 }
2158
2159 pub async fn member(&self, identity: &AgentIdentity) -> Result<MemberHandle, MobError> {
2161 let meerkat_id = MeerkatId::from(identity);
2162 if let Some(diag) = self.restore_failure_for(&meerkat_id).await {
2163 return Err(Self::restore_failure_error(&meerkat_id, diag));
2164 }
2165 let entry = self
2166 .get_member(identity)
2167 .await
2168 .ok_or_else(|| MobError::MemberNotFound(meerkat_id.clone()))?;
2169 if entry.state != crate::roster::MemberState::Active {
2170 return Err(MobError::MemberNotFound(meerkat_id.clone()));
2171 }
2172 Ok(MemberHandle {
2173 mob: self.clone(),
2174 agent_identity: meerkat_id,
2175 })
2176 }
2177
2178 pub fn events(&self) -> MobEventsView {
2180 MobEventsView {
2181 handle: self.clone(),
2182 }
2183 }
2184
2185 pub async fn record_operator_action_provenance(
2190 &self,
2191 tool_name: &str,
2192 authority_context: &MobToolAuthorityContext,
2193 ) -> Result<(), MobError> {
2194 match self
2195 .execute_machine_command(MobMachineCommand::RecordOperatorActionProvenance {
2196 tool_name: tool_name.to_string(),
2197 authority_context: authority_context.clone(),
2198 })
2199 .await?
2200 {
2201 MobMachineCommandResult::Unit => Ok(()),
2202 _ => Err(MobError::Internal(
2203 "unexpected command result variant".into(),
2204 )),
2205 }
2206 }
2207
2208 pub async fn subscribe_agent_events(
2216 &self,
2217 identity: &AgentIdentity,
2218 ) -> Result<EventStream, MobError> {
2219 match self
2220 .execute_machine_command(MobMachineCommand::SubscribeAgentEvents {
2221 agent_identity: MeerkatId::from(identity),
2222 })
2223 .await?
2224 {
2225 MobMachineCommandResult::EventStream(stream) => Ok(stream),
2226 _ => Err(MobError::Internal(
2227 "unexpected command result variant".into(),
2228 )),
2229 }
2230 }
2231
2232 pub async fn subscribe_all_agent_events(
2238 &self,
2239 ) -> Result<Vec<(AgentIdentity, EventStream)>, MobError> {
2240 match self
2241 .execute_machine_command(MobMachineCommand::SubscribeAllAgentEvents)
2242 .await
2243 {
2244 Ok(MobMachineCommandResult::AllAgentEventStreams(streams)) => Ok(streams
2245 .into_iter()
2246 .map(|(mid, stream)| (AgentIdentity::from(mid.as_str()), stream))
2247 .collect()),
2248 Ok(_) => {
2249 tracing::error!("unexpected command result variant");
2250 Err(MobError::Internal(
2251 "unexpected command result variant".into(),
2252 ))
2253 }
2254 Err(error) => Err(error),
2255 }
2256 }
2257
2258 pub async fn subscribe_mob_events(&self) -> super::event_router::MobEventRouterHandle {
2265 self.subscribe_mob_events_with_config(super::event_router::MobEventRouterConfig::default())
2266 .await
2267 }
2268
2269 pub async fn subscribe_mob_events_with_config(
2271 &self,
2272 config: super::event_router::MobEventRouterConfig,
2273 ) -> super::event_router::MobEventRouterHandle {
2274 match self
2275 .execute_machine_command(MobMachineCommand::SubscribeMobEvents { config })
2276 .await
2277 {
2278 Ok(MobMachineCommandResult::MobEventRouter(handle)) => handle,
2279 Ok(_) => {
2280 tracing::error!("unexpected command result variant for subscribe_mob_events");
2281 super::event_router::spawn_event_router(self.clone(), config)
2282 }
2283 Err(_) => super::event_router::spawn_event_router(self.clone(), config),
2284 }
2285 }
2286
2287 pub async fn run_flow(
2289 &self,
2290 flow_id: FlowId,
2291 params: serde_json::Value,
2292 ) -> Result<RunId, MobError> {
2293 self.run_flow_with_stream(flow_id, params, None).await
2294 }
2295
2296 pub async fn run_flow_with_stream(
2298 &self,
2299 flow_id: FlowId,
2300 params: serde_json::Value,
2301 scoped_event_tx: Option<mpsc::Sender<meerkat_core::ScopedAgentEvent>>,
2302 ) -> Result<RunId, MobError> {
2303 match self
2304 .execute_machine_command(MobMachineCommand::RunFlow {
2305 flow_id,
2306 activation_params: params,
2307 scoped_event_tx,
2308 })
2309 .await?
2310 {
2311 MobMachineCommandResult::RunId(run_id) => Ok(run_id),
2312 _ => Err(MobError::Internal(
2313 "unexpected command result variant".into(),
2314 )),
2315 }
2316 }
2317
2318 pub async fn cancel_flow(&self, run_id: RunId) -> Result<(), MobError> {
2320 match self
2321 .execute_machine_command(MobMachineCommand::CancelFlow { run_id })
2322 .await?
2323 {
2324 MobMachineCommandResult::Unit => Ok(()),
2325 _ => Err(MobError::Internal(
2326 "unexpected command result variant".into(),
2327 )),
2328 }
2329 }
2330
2331 pub async fn flow_status(&self, run_id: RunId) -> Result<Option<MobRun>, MobError> {
2333 match self
2334 .execute_machine_command(MobMachineCommand::FlowStatus { run_id })
2335 .await?
2336 {
2337 MobMachineCommandResult::FlowStatus(status) => Ok(status),
2338 _ => Err(MobError::Internal(
2339 "unexpected command result variant".into(),
2340 )),
2341 }
2342 }
2343
2344 pub async fn list_runs(&self, flow_id: Option<&FlowId>) -> Result<Vec<MobRun>, MobError> {
2346 self.run_store
2347 .list_runs(&self.definition.id, flow_id)
2348 .await
2349 .map_err(MobError::from)
2350 }
2351
2352 pub fn list_flows(&self) -> Vec<FlowId> {
2354 self.definition.flows.keys().cloned().collect()
2355 }
2356
2357 #[cfg(test)]
2359 pub(crate) async fn spawn(
2360 &self,
2361 profile_name: ProfileName,
2362 agent_identity: MeerkatId,
2363 initial_message: Option<ContentInput>,
2364 ) -> Result<MemberRef, MobError> {
2365 self.spawn_with_options(profile_name, agent_identity, initial_message, None, None)
2366 .await
2367 }
2368
2369 #[cfg(test)]
2371 pub(crate) async fn spawn_with_binding(
2372 &self,
2373 profile_name: ProfileName,
2374 agent_identity: MeerkatId,
2375 initial_message: Option<ContentInput>,
2376 binding: crate::RuntimeBinding,
2377 ) -> Result<MemberRef, MobError> {
2378 let mut spec = SpawnMemberSpec::new(profile_name, agent_identity);
2379 spec.initial_message = initial_message;
2380 spec.binding = Some(binding);
2381 self.spawn_spec_internal(spec).await
2382 }
2383
2384 #[cfg(test)]
2386 pub(crate) async fn spawn_with_backend(
2387 &self,
2388 profile_name: ProfileName,
2389 agent_identity: MeerkatId,
2390 initial_message: Option<ContentInput>,
2391 backend: Option<MobBackendKind>,
2392 ) -> Result<MemberRef, MobError> {
2393 self.spawn_with_options(profile_name, agent_identity, initial_message, None, backend)
2394 .await
2395 }
2396
2397 #[cfg(test)]
2399 pub(crate) async fn spawn_with_options(
2400 &self,
2401 profile_name: ProfileName,
2402 agent_identity: MeerkatId,
2403 initial_message: Option<ContentInput>,
2404 runtime_mode: Option<crate::MobRuntimeMode>,
2405 backend: Option<MobBackendKind>,
2406 ) -> Result<MemberRef, MobError> {
2407 let mut spec = SpawnMemberSpec::new(profile_name, agent_identity);
2408 spec.initial_message = initial_message;
2409 spec.runtime_mode = runtime_mode;
2410 spec.backend = backend;
2411 self.spawn_spec_internal(spec).await
2412 }
2413
2414 #[cfg(test)]
2416 pub(crate) async fn attach_existing_session(
2417 &self,
2418 profile_name: ProfileName,
2419 agent_identity: MeerkatId,
2420 session_id: meerkat_core::types::SessionId,
2421 runtime_mode: Option<crate::MobRuntimeMode>,
2422 backend: Option<MobBackendKind>,
2423 ) -> Result<MemberRef, MobError> {
2424 let mut spec = SpawnMemberSpec::new(profile_name, agent_identity);
2425 spec.launch_mode = crate::launch::MemberLaunchMode::Resume {
2426 bridge_session_id: session_id,
2427 };
2428 spec.runtime_mode = runtime_mode;
2429 spec.backend = backend;
2430 self.spawn_spec_internal(spec).await
2431 }
2432
2433 #[cfg(test)]
2435 pub(crate) async fn attach_existing_session_as_member(
2436 &self,
2437 profile_name: ProfileName,
2438 agent_identity: MeerkatId,
2439 session_id: meerkat_core::types::SessionId,
2440 ) -> Result<MemberRef, MobError> {
2441 self.attach_existing_session(profile_name, agent_identity, session_id, None, None)
2442 .await
2443 }
2444
2445 pub async fn spawn_spec(&self, spec: SpawnMemberSpec) -> Result<SpawnResult, MobError> {
2447 let identity = spec.identity.clone();
2448 self.spawn_spec_internal(spec).await?;
2449 let entry = self.get_member(&identity).await.ok_or_else(|| {
2453 MobError::Internal(format!(
2454 "spawn succeeded but roster entry missing for '{identity}'"
2455 ))
2456 })?;
2457 Ok(SpawnResult {
2458 agent_identity: entry.agent_identity,
2459 agent_runtime_id: entry.agent_runtime_id,
2460 fence_token: entry.fence_token,
2461 })
2462 }
2463
2464 pub(crate) async fn spawn_spec_internal(
2466 &self,
2467 spec: SpawnMemberSpec,
2468 ) -> Result<MemberRef, MobError> {
2469 self.spawn_spec_internal_with_source(spec, SpawnSource::Consumer)
2470 .await
2471 }
2472
2473 pub(crate) async fn spawn_spec_internal_with_source(
2474 &self,
2475 spec: SpawnMemberSpec,
2476 spawn_source: SpawnSource,
2477 ) -> Result<MemberRef, MobError> {
2478 let spawn_source = SpawnSource::for_launch_mode(spawn_source, &spec.launch_mode);
2479 match self
2480 .execute_machine_command(MobMachineCommand::Spawn {
2481 spec: Box::new(spec),
2482 spawn_source,
2483 owner_context: None,
2484 })
2485 .await?
2486 {
2487 MobMachineCommandResult::SpawnReceipt(receipt) => Ok(receipt.member_ref),
2488 _ => Err(MobError::Internal(
2489 "unexpected command result variant".into(),
2490 )),
2491 }
2492 }
2493
2494 pub(super) async fn spawn_spec_receipt_with_owner_context(
2495 &self,
2496 spec: SpawnMemberSpec,
2497 owner_context: CanonicalOpsOwnerContext,
2498 ) -> Result<MemberSpawnReceipt, MobError> {
2499 self.spawn_spec_receipt_with_owner_context_and_source(
2500 spec,
2501 owner_context,
2502 SpawnSource::AgentSpawnMember,
2503 )
2504 .await
2505 }
2506
2507 pub(super) async fn spawn_spec_receipt_with_owner_context_and_source(
2508 &self,
2509 spec: SpawnMemberSpec,
2510 owner_context: CanonicalOpsOwnerContext,
2511 spawn_source: SpawnSource,
2512 ) -> Result<MemberSpawnReceipt, MobError> {
2513 match self
2514 .execute_machine_command(MobMachineCommand::Spawn {
2515 spawn_source: SpawnSource::for_launch_mode(spawn_source, &spec.launch_mode),
2516 spec: Box::new(spec),
2517 owner_context: Some(owner_context),
2518 })
2519 .await?
2520 {
2521 MobMachineCommandResult::SpawnReceipt(receipt) => Ok(receipt),
2522 _ => Err(MobError::Internal(
2523 "unexpected command result variant".into(),
2524 )),
2525 }
2526 }
2527
2528 pub async fn spawn_many(
2532 &self,
2533 specs: Vec<SpawnMemberSpec>,
2534 ) -> Vec<Result<SpawnResult, MobError>> {
2535 futures::future::join_all(specs.into_iter().map(|spec| async move {
2536 let identity = spec.identity.clone();
2537 self.spawn_spec_internal_with_source(spec, SpawnSource::BatchItem)
2538 .await?;
2539 let entry = self.get_member(&identity).await.ok_or_else(|| {
2540 MobError::Internal(format!(
2541 "spawn succeeded but roster entry missing for '{identity}'"
2542 ))
2543 })?;
2544 Ok(SpawnResult {
2545 agent_identity: entry.agent_identity,
2546 agent_runtime_id: entry.agent_runtime_id,
2547 fence_token: entry.fence_token,
2548 })
2549 }))
2550 .await
2551 }
2552
2553 pub(super) async fn spawn_many_receipts_with_owner_context(
2554 &self,
2555 specs: Vec<SpawnMemberSpec>,
2556 owner_context: CanonicalOpsOwnerContext,
2557 ) -> Vec<Result<MemberSpawnReceipt, MobError>> {
2558 futures::future::join_all(specs.into_iter().map(|spec| {
2559 self.spawn_spec_receipt_with_owner_context_and_source(
2560 spec,
2561 owner_context.clone(),
2562 SpawnSource::BatchItem,
2563 )
2564 }))
2565 .await
2566 }
2567
2568 pub async fn retire(&self, identity: AgentIdentity) -> Result<(), MobError> {
2570 let meerkat_id = MeerkatId::from(&identity);
2571 match self
2572 .execute_machine_command(MobMachineCommand::Retire {
2573 agent_identity: meerkat_id,
2574 })
2575 .await?
2576 {
2577 MobMachineCommandResult::Unit => Ok(()),
2578 _ => Err(MobError::Internal(
2579 "unexpected command result variant".into(),
2580 )),
2581 }
2582 }
2583
2584 pub async fn respawn(
2590 &self,
2591 identity: AgentIdentity,
2592 initial_message: Option<ContentInput>,
2593 ) -> Result<MemberRespawnReceipt, MobRespawnError> {
2594 let meerkat_id = MeerkatId::from(&identity);
2595 let reply = match self
2596 .execute_machine_command(MobMachineCommand::Respawn {
2597 agent_identity: meerkat_id,
2598 initial_message,
2599 })
2600 .await?
2601 {
2602 MobMachineCommandResult::Respawn(reply) => reply,
2603 _ => {
2604 return Err(MobRespawnError::from(MobError::Internal(
2605 "unexpected command result variant".into(),
2606 )));
2607 }
2608 };
2609 match reply {
2610 Ok(receipt) => Ok(receipt),
2611 Err(err) => Err(err),
2612 }
2613 }
2614
2615 pub async fn retire_all(&self) -> Result<(), MobError> {
2617 match self
2618 .execute_machine_command(MobMachineCommand::RetireAll)
2619 .await?
2620 {
2621 MobMachineCommandResult::Unit => Ok(()),
2622 _ => Err(MobError::Internal(
2623 "unexpected command result variant".into(),
2624 )),
2625 }
2626 }
2627
2628 async fn handle_ensure_member(
2635 &self,
2636 spec: SpawnMemberSpec,
2637 ) -> Result<EnsureMemberOutcome, MobError> {
2638 let identity = spec.identity.clone();
2639 self.project_machine_input(mob_dsl::MobMachineInput::EnsureMember {
2640 agent_identity: mob_dsl::AgentIdentity::from_domain(&identity),
2641 })
2642 .await?;
2643 match Box::pin(self.spawn_spec(spec)).await {
2647 Ok(spawn_result) => Ok(EnsureMemberOutcome::Spawned(spawn_result)),
2648 Err(MobError::MemberAlreadyExists(_)) => {
2649 let existing = Box::pin(self.list_members())
2650 .await
2651 .into_iter()
2652 .find(|entry| entry.agent_identity == identity)
2653 .ok_or_else(|| {
2654 MobError::Internal(format!(
2655 "ensure_member: member '{identity}' reported existing but not found in roster"
2656 ))
2657 })?;
2658 Ok(EnsureMemberOutcome::Existed(Box::new(existing)))
2659 }
2660 Err(other) => Err(other),
2661 }
2662 }
2663
2664 async fn handle_reconcile(
2675 &self,
2676 desired: Vec<SpawnMemberSpec>,
2677 options: ReconcileOptions,
2678 ) -> Result<ReconcileReport, MobError> {
2679 self.project_machine_input(mob_dsl::MobMachineInput::Reconcile {
2680 desired: desired
2681 .iter()
2682 .map(|spec| mob_dsl::AgentIdentity::from_domain(&spec.identity))
2683 .collect(),
2684 retire_stale: options.retire_stale,
2685 })
2686 .await?;
2687
2688 let mut report = ReconcileReport {
2689 desired: desired.iter().map(|spec| spec.identity.clone()).collect(),
2690 ..ReconcileReport::default()
2691 };
2692
2693 let current: std::collections::BTreeSet<AgentIdentity> = Box::pin(self.list_members())
2694 .await
2695 .into_iter()
2696 .map(|entry| entry.agent_identity)
2697 .collect();
2698 let desired_ids: std::collections::BTreeSet<AgentIdentity> =
2699 desired.iter().map(|spec| spec.identity.clone()).collect();
2700
2701 for spec in desired {
2702 let identity = spec.identity.clone();
2703 if current.contains(&identity) {
2704 report.retained.push(identity);
2705 continue;
2706 }
2707 match Box::pin(self.spawn_spec(spec)).await {
2708 Ok(spawn_result) => report.spawned.push(spawn_result),
2709 Err(error) => report.failures.push(ReconcileFailure {
2710 agent_identity: identity,
2711 error,
2712 stage: ReconcileStage::Spawn,
2713 }),
2714 }
2715 }
2716
2717 if options.retire_stale {
2718 for identity in current.difference(&desired_ids).cloned() {
2719 match Box::pin(self.retire(identity.clone())).await {
2720 Ok(()) => report.retired.push(identity),
2721 Err(error) => report.failures.push(ReconcileFailure {
2722 agent_identity: identity,
2723 error,
2724 stage: ReconcileStage::Retire,
2725 }),
2726 }
2727 }
2728 }
2729
2730 Ok(report)
2731 }
2732
2733 async fn handle_list_members_matching(&self, filter: MemberFilter) -> Vec<MobMemberListEntry> {
2738 Box::pin(self.list_members())
2739 .await
2740 .into_iter()
2741 .filter(|entry| {
2742 if let Some(role) = &filter.role
2743 && entry.role != *role
2744 {
2745 return false;
2746 }
2747 if let Some(state) = filter.state
2748 && entry.state != state
2749 {
2750 return false;
2751 }
2752 for (key, value) in &filter.labels {
2753 if entry.labels.get(key).is_none_or(|v| v != value) {
2754 return false;
2755 }
2756 }
2757 true
2758 })
2759 .collect()
2760 }
2761
2762 pub async fn ensure_member(
2771 &self,
2772 spec: SpawnMemberSpec,
2773 ) -> Result<EnsureMemberOutcome, MobError> {
2774 match self
2775 .execute_machine_command(MobMachineCommand::EnsureMember {
2776 spec: Box::new(spec),
2777 })
2778 .await?
2779 {
2780 MobMachineCommandResult::EnsureMember(outcome) => Ok(outcome),
2781 _ => Err(MobError::Internal(
2782 "unexpected command result variant".into(),
2783 )),
2784 }
2785 }
2786
2787 pub async fn reconcile(
2796 &self,
2797 desired: Vec<SpawnMemberSpec>,
2798 options: ReconcileOptions,
2799 ) -> Result<ReconcileReport, MobError> {
2800 match self
2801 .execute_machine_command(MobMachineCommand::Reconcile { desired, options })
2802 .await?
2803 {
2804 MobMachineCommandResult::Reconcile(report) => Ok(*report),
2805 _ => Err(MobError::Internal(
2806 "unexpected command result variant".into(),
2807 )),
2808 }
2809 }
2810
2811 pub async fn list_members_matching(&self, filter: MemberFilter) -> Vec<MobMemberListEntry> {
2819 match self
2820 .execute_machine_command(MobMachineCommand::ListMembersMatching {
2821 filter: Box::new(filter),
2822 })
2823 .await
2824 {
2825 Ok(MobMachineCommandResult::ListMembers(entries)) => entries,
2826 Ok(_) => {
2827 tracing::error!("unexpected command result variant");
2828 Default::default()
2829 }
2830 Err(_) => Vec::new(),
2831 }
2832 }
2833
2834 pub async fn rotate_supervisor(&self) -> Result<SupervisorRotationReport, MobError> {
2889 self.send_actor_command(|reply_tx| MobCommand::RotateSupervisor { reply_tx })
2890 .await?
2891 }
2892
2893 pub async fn wire<T>(&self, local: AgentIdentity, target: T) -> Result<(), MobError>
2895 where
2896 T: Into<PeerTarget>,
2897 {
2898 match self
2899 .execute_machine_command(MobMachineCommand::Wire {
2900 local: MeerkatId::from(&local),
2901 target: target.into(),
2902 })
2903 .await?
2904 {
2905 MobMachineCommandResult::Unit => Ok(()),
2906 _ => Err(MobError::Internal(
2907 "unexpected command result variant".into(),
2908 )),
2909 }
2910 }
2911
2912 pub async fn wire_members_batch<I, A, B>(
2919 &self,
2920 edges: I,
2921 ) -> Result<MobWireMembersBatchReport, MobError>
2922 where
2923 I: IntoIterator<Item = (A, B)>,
2924 A: Into<AgentIdentity>,
2925 B: Into<AgentIdentity>,
2926 {
2927 let edges = edges
2928 .into_iter()
2929 .map(|(a, b)| (a.into(), b.into()))
2930 .collect();
2931 match self
2932 .execute_machine_command(MobMachineCommand::WireMembersBatch { edges })
2933 .await?
2934 {
2935 MobMachineCommandResult::WireMembersBatchReport(report) => Ok(report),
2936 _ => Err(MobError::Internal(
2937 "unexpected command result variant".into(),
2938 )),
2939 }
2940 }
2941
2942 pub async fn send_peer_message(
2948 &self,
2949 from: AgentIdentity,
2950 to: AgentIdentity,
2951 content: impl Into<meerkat_core::types::ContentInput>,
2952 handling_mode: HandlingMode,
2953 ) -> Result<PeerMessageReceipt, MobError> {
2954 let receipt = self
2955 .send_actor_command(|reply_tx| MobCommand::SendPeerMessage {
2956 from: MeerkatId::from(&from),
2957 to: MeerkatId::from(&to),
2958 content: content.into(),
2959 handling_mode,
2960 reply_tx,
2961 })
2962 .await??;
2963 match receipt {
2964 SendReceipt::PeerMessageSent { envelope_id, acked } => Ok(PeerMessageReceipt {
2965 from,
2966 to,
2967 envelope_id,
2968 acked,
2969 handling_mode,
2970 }),
2971 other => Err(MobError::Internal(format!(
2972 "unexpected peer-message receipt variant: {other:?}"
2973 ))),
2974 }
2975 }
2976
2977 pub async fn unwire<T>(&self, local: AgentIdentity, target: T) -> Result<(), MobError>
2979 where
2980 T: Into<PeerTarget>,
2981 {
2982 match self
2983 .execute_machine_command(MobMachineCommand::Unwire {
2984 local: MeerkatId::from(&local),
2985 target: target.into(),
2986 })
2987 .await?
2988 {
2989 MobMachineCommandResult::Unit => Ok(()),
2990 _ => Err(MobError::Internal(
2991 "unexpected command result variant".into(),
2992 )),
2993 }
2994 }
2995
2996 pub async fn internal_turn(
3015 &self,
3016 identity: AgentIdentity,
3017 message: impl Into<meerkat_core::types::ContentInput>,
3018 ) -> Result<MemberDeliveryReceipt, MobError> {
3019 let meerkat_id = MeerkatId::from(&identity);
3020 self.internal_turn_for_member(meerkat_id.clone(), message.into())
3021 .await?;
3022 let snapshot = self.member_status(&identity).await?;
3023 Ok(MemberDeliveryReceipt {
3024 identity,
3025 agent_runtime_id: snapshot.agent_runtime_id,
3026 fence_token: snapshot.fence_token,
3027 handling_mode: HandlingMode::Queue,
3028 })
3029 }
3030
3031 pub(super) async fn external_turn_for_member(
3032 &self,
3033 agent_identity: MeerkatId,
3034 message: meerkat_core::types::ContentInput,
3035 handling_mode: HandlingMode,
3036 render_metadata: Option<RenderMetadata>,
3037 ) -> Result<(), MobError> {
3038 let snapshot = self
3039 .member_status(&AgentIdentity::from(agent_identity.as_str()))
3040 .await?;
3041 let cmd = Box::new(crate::mob_machine::SubmitWorkCommand {
3042 runtime_id: snapshot.agent_runtime_id,
3043 fence_token: snapshot.fence_token,
3044 work_ref: WorkRef::new(),
3045 spec: WorkSpec::new(message, WorkOrigin::External),
3046 handling_mode,
3047 render_metadata,
3048 ack_mode: crate::mob_machine::SubmitWorkAckMode::IngressAccepted,
3049 });
3050 self.execute_machine_command(MobMachineCommand::SubmitWork(cmd))
3051 .await?;
3052 Ok(())
3053 }
3054
3055 pub(super) async fn internal_turn_for_member(
3056 &self,
3057 agent_identity: MeerkatId,
3058 message: meerkat_core::types::ContentInput,
3059 ) -> Result<(), MobError> {
3060 {
3064 let roster = self.roster.read().await;
3065 match roster.get(&agent_identity) {
3066 None => return Err(MobError::MemberNotFound(agent_identity)),
3067 Some(entry) if entry.state != crate::roster::MemberState::Active => {
3068 return Err(MobError::MemberNotFound(agent_identity));
3069 }
3070 _ => {}
3071 }
3072 }
3073 let snapshot = self
3074 .member_status(&AgentIdentity::from(agent_identity.as_str()))
3075 .await?;
3076 let cmd = Box::new(crate::mob_machine::SubmitWorkCommand {
3077 runtime_id: snapshot.agent_runtime_id,
3078 fence_token: snapshot.fence_token,
3079 work_ref: WorkRef::new(),
3080 spec: WorkSpec::new(message, WorkOrigin::Internal),
3081 handling_mode: HandlingMode::Queue,
3082 render_metadata: None,
3083 ack_mode: crate::mob_machine::SubmitWorkAckMode::TurnCompleted,
3084 });
3085 self.execute_machine_command(MobMachineCommand::SubmitWork(cmd))
3086 .await?;
3087 Ok(())
3088 }
3089
3090 pub async fn submit_work(
3101 &self,
3102 runtime_id: AgentRuntimeId,
3103 fence_token: FenceToken,
3104 work_ref: WorkRef,
3105 spec: WorkSpec,
3106 ) -> Result<WorkDeliveryReceipt, MobError> {
3107 let cmd = Box::new(crate::mob_machine::SubmitWorkCommand {
3108 runtime_id: runtime_id.clone(),
3109 fence_token,
3110 work_ref: work_ref.clone(),
3111 spec,
3112 handling_mode: HandlingMode::Queue,
3113 render_metadata: None,
3114 ack_mode: crate::mob_machine::SubmitWorkAckMode::IngressAccepted,
3115 });
3116 match self
3117 .execute_machine_command(MobMachineCommand::SubmitWork(cmd))
3118 .await?
3119 {
3120 MobMachineCommandResult::WorkReceipt { work_ref: ref_out } => Ok(WorkDeliveryReceipt {
3121 work_ref: ref_out,
3122 runtime_id,
3123 }),
3124 _ => Err(MobError::Internal(
3125 "unexpected command result variant".into(),
3126 )),
3127 }
3128 }
3129
3130 pub async fn cancel_work(&self, work_ref: WorkRef) -> Result<(), MobError> {
3136 match self
3137 .execute_machine_command(MobMachineCommand::CancelWork { work_ref })
3138 .await?
3139 {
3140 MobMachineCommandResult::Unit => Ok(()),
3141 _ => Err(MobError::Internal(
3142 "unexpected command result variant".into(),
3143 )),
3144 }
3145 }
3146
3147 pub async fn cancel_all_work(
3151 &self,
3152 runtime_id: AgentRuntimeId,
3153 fence_token: FenceToken,
3154 ) -> Result<(), MobError> {
3155 match self
3156 .execute_machine_command(MobMachineCommand::CancelAllWork {
3157 runtime_id,
3158 fence_token,
3159 })
3160 .await?
3161 {
3162 MobMachineCommandResult::Unit => Ok(()),
3163 _ => Err(MobError::Internal(
3164 "unexpected command result variant".into(),
3165 )),
3166 }
3167 }
3168
3169 pub async fn stop(&self) -> Result<(), MobError> {
3171 match self
3172 .execute_machine_command(MobMachineCommand::Stop)
3173 .await?
3174 {
3175 MobMachineCommandResult::Unit => Ok(()),
3176 _ => Err(MobError::Internal(
3177 "unexpected command result variant".into(),
3178 )),
3179 }
3180 }
3181
3182 pub async fn resume(&self) -> Result<(), MobError> {
3184 match self
3185 .execute_machine_command(MobMachineCommand::Resume)
3186 .await?
3187 {
3188 MobMachineCommandResult::Unit => Ok(()),
3189 _ => Err(MobError::Internal(
3190 "unexpected command result variant".into(),
3191 )),
3192 }
3193 }
3194
3195 pub async fn complete(&self) -> Result<(), MobError> {
3197 match self
3198 .execute_machine_command(MobMachineCommand::Complete)
3199 .await?
3200 {
3201 MobMachineCommandResult::Unit => Ok(()),
3202 _ => Err(MobError::Internal(
3203 "unexpected command result variant".into(),
3204 )),
3205 }
3206 }
3207
3208 pub async fn reset(&self) -> Result<(), MobError> {
3255 match self
3256 .execute_machine_command(MobMachineCommand::Reset)
3257 .await?
3258 {
3259 MobMachineCommandResult::Unit => Ok(()),
3260 _ => Err(MobError::Internal(
3261 "unexpected command result variant".into(),
3262 )),
3263 }
3264 }
3265
3266 pub async fn destroy(&self) -> Result<MobDestroyReport, MobDestroyError> {
3268 match self
3269 .execute_destroy_machine_command(MobMachineCommand::Destroy)
3270 .await?
3271 {
3272 MobMachineCommandResult::DestroyReport(report) => Ok(report),
3273 _ => Err(MobDestroyError::from(MobError::Internal(
3274 "unexpected command result variant".into(),
3275 ))),
3276 }
3277 }
3278
3279 #[cfg(test)]
3280 pub async fn debug_flow_tracker_counts(&self) -> Result<(usize, usize), MobError> {
3281 match self
3282 .execute_machine_command(MobMachineCommand::FlowTrackerCounts)
3283 .await?
3284 {
3285 MobMachineCommandResult::FlowTrackerCounts(counts) => Ok(counts),
3286 _ => Err(MobError::Internal(
3287 "unexpected command result variant".into(),
3288 )),
3289 }
3290 }
3291
3292 #[cfg(test)]
3293 pub(crate) async fn debug_orchestrator_snapshot(
3294 &self,
3295 ) -> Result<super::MobOrchestratorSnapshot, MobError> {
3296 match self
3297 .execute_machine_command(MobMachineCommand::OrchestratorSnapshot)
3298 .await?
3299 {
3300 MobMachineCommandResult::OrchestratorSnapshot(snapshot) => Ok(snapshot),
3301 _ => Err(MobError::Internal(
3302 "unexpected command result variant".into(),
3303 )),
3304 }
3305 }
3306
3307 #[cfg(test)]
3308 pub(crate) async fn debug_lifecycle_snapshot(&self) -> Result<MobLifecycleSnapshot, MobError> {
3309 match self
3310 .execute_machine_command(MobMachineCommand::LifecycleSnapshot)
3311 .await?
3312 {
3313 MobMachineCommandResult::LifecycleSnapshot(snapshot) => Ok(snapshot),
3314 _ => Err(MobError::Internal(
3315 "unexpected command result variant".into(),
3316 )),
3317 }
3318 }
3319
3320 #[cfg(test)]
3321 pub(crate) async fn debug_lifecycle_notification_burst(
3322 &self,
3323 count: usize,
3324 message: impl Into<String>,
3325 ) -> Result<(), MobError> {
3326 match self
3327 .execute_machine_command(MobMachineCommand::LifecycleNotificationBurst {
3328 count,
3329 message: message.into(),
3330 })
3331 .await?
3332 {
3333 MobMachineCommandResult::LifecycleNotificationBurst => Ok(()),
3334 _ => Err(MobError::Internal(
3335 "unexpected command result variant".into(),
3336 )),
3337 }
3338 }
3339
3340 #[cfg(test)]
3341 pub(crate) async fn debug_dsl_t2_snapshot(&self) -> Result<super::MobDslT2Snapshot, MobError> {
3342 match self
3343 .execute_machine_command(MobMachineCommand::DslT2Snapshot)
3344 .await?
3345 {
3346 MobMachineCommandResult::DslT2Snapshot(snapshot) => Ok(snapshot),
3347 _ => Err(MobError::Internal(
3348 "unexpected command result variant".into(),
3349 )),
3350 }
3351 }
3352
3353 pub async fn set_spawn_policy(
3358 &self,
3359 policy: Option<Arc<dyn super::spawn_policy::SpawnPolicy>>,
3360 ) -> Result<(), MobError> {
3361 match self
3362 .execute_machine_command(MobMachineCommand::SetSpawnPolicy { policy })
3363 .await?
3364 {
3365 MobMachineCommandResult::Unit => Ok(()),
3366 _ => Err(MobError::Internal(
3367 "unexpected command result variant".into(),
3368 )),
3369 }
3370 }
3371
3372 pub async fn shutdown(&self) -> Result<(), MobError> {
3374 match self
3375 .execute_machine_command(MobMachineCommand::Shutdown)
3376 .await?
3377 {
3378 MobMachineCommandResult::Unit => Ok(()),
3379 _ => Err(MobError::Internal(
3380 "unexpected command result variant".into(),
3381 )),
3382 }
3383 }
3384
3385 pub async fn force_cancel_member(&self, identity: AgentIdentity) -> Result<(), MobError> {
3390 match self
3391 .execute_machine_command(MobMachineCommand::ForceCancel {
3392 agent_identity: MeerkatId::from(&identity),
3393 })
3394 .await?
3395 {
3396 MobMachineCommandResult::Unit => Ok(()),
3397 _ => Err(MobError::Internal(
3398 "unexpected command result variant".into(),
3399 )),
3400 }
3401 }
3402
3403 async fn startup_kickoff_snapshot(
3404 &self,
3405 ) -> Result<super::state::MobStartupKickoffSnapshot, MobError> {
3406 self.send_actor_command(|reply_tx| MobCommand::StartupKickoffSnapshot { reply_tx })
3407 .await
3408 }
3409
3410 fn kickoff_wait_is_satisfied(
3411 entry: &RosterEntry,
3412 snapshot: &MobMemberSnapshot,
3413 pending_kickoff_member_ids: &BTreeSet<String>,
3414 ) -> bool {
3415 if entry.runtime_mode != crate::MobRuntimeMode::AutonomousHost {
3416 return true;
3417 }
3418 match snapshot.status {
3419 MobMemberStatus::Unknown => false,
3420 MobMemberStatus::Active => {
3421 !pending_kickoff_member_ids.contains(entry.agent_identity.as_str())
3422 }
3423 MobMemberStatus::Retiring | MobMemberStatus::Broken | MobMemberStatus::Completed => {
3424 true
3425 }
3426 }
3427 }
3428
3429 fn ready_wait_is_satisfied(
3430 entry: &RosterEntry,
3431 snapshot: &MobMemberSnapshot,
3432 ready_runtime_ids: &BTreeSet<String>,
3433 ) -> bool {
3434 if entry.runtime_mode != crate::MobRuntimeMode::AutonomousHost {
3435 return true;
3436 }
3437 match snapshot.status {
3438 MobMemberStatus::Unknown => false,
3439 MobMemberStatus::Active => {
3440 ready_runtime_ids.contains(&entry.agent_runtime_id.to_string())
3441 }
3442 MobMemberStatus::Retiring | MobMemberStatus::Broken | MobMemberStatus::Completed => {
3443 true
3444 }
3445 }
3446 }
3447
3448 async fn wait_for_kickoff_resolution(
3449 &self,
3450 target_ids: &[MeerkatId],
3451 timeout: Option<Duration>,
3452 ) -> Result<(), MobError> {
3453 if target_ids.is_empty() {
3454 return Ok(());
3455 }
3456
3457 let deadline = Instant::now() + timeout.unwrap_or(DEFAULT_KICKOFF_WAIT_TIMEOUT);
3458 loop {
3459 let kickoff_snapshot = self.startup_kickoff_snapshot().await?;
3460 let entries = self
3461 .list_all_members()
3462 .await
3463 .into_iter()
3464 .map(|entry| (entry.agent_identity.clone(), entry))
3465 .collect::<HashMap<_, _>>();
3466
3467 let mut pending_member_ids = Vec::new();
3468 for id in target_ids {
3469 let Some(entry) = entries.get(id) else {
3470 continue;
3471 };
3472 let member_snapshot = self
3473 .member_status(&AgentIdentity::from(id.as_str()))
3474 .await?;
3475 if !Self::kickoff_wait_is_satisfied(
3476 entry,
3477 &member_snapshot,
3478 &kickoff_snapshot.pending_kickoff_member_ids,
3479 ) {
3480 pending_member_ids.push(id.clone());
3481 }
3482 }
3483
3484 if pending_member_ids.is_empty() {
3485 return Ok(());
3486 }
3487
3488 let remaining = deadline.saturating_duration_since(Instant::now());
3489 if remaining.is_zero() {
3490 return Err(MobError::KickoffWaitTimedOut { pending_member_ids });
3491 }
3492
3493 tokio::time::sleep(std::cmp::min(remaining, Duration::from_millis(50))).await;
3494 }
3495 }
3496
3497 async fn wait_for_ready_resolution(
3498 &self,
3499 target_ids: &[MeerkatId],
3500 timeout: Option<Duration>,
3501 ) -> Result<(), MobError> {
3502 if target_ids.is_empty() {
3503 return Ok(());
3504 }
3505
3506 let deadline = Instant::now() + timeout.unwrap_or(DEFAULT_READY_WAIT_TIMEOUT);
3507 loop {
3508 let snapshot = self.startup_kickoff_snapshot().await?;
3509 let entries = self
3510 .list_all_members()
3511 .await
3512 .into_iter()
3513 .map(|entry| (entry.agent_identity.clone(), entry))
3514 .collect::<HashMap<_, _>>();
3515
3516 let mut pending_member_ids = Vec::new();
3517 for id in target_ids {
3518 let Some(entry) = entries.get(id) else {
3519 continue;
3520 };
3521 let member_snapshot = self
3522 .member_status(&AgentIdentity::from(id.as_str()))
3523 .await?;
3524 if !Self::ready_wait_is_satisfied(
3525 entry,
3526 &member_snapshot,
3527 &snapshot.ready_runtime_ids,
3528 ) {
3529 pending_member_ids.push(id.clone());
3530 }
3531 }
3532
3533 if pending_member_ids.is_empty() {
3534 return Ok(());
3535 }
3536
3537 let remaining = deadline.saturating_duration_since(Instant::now());
3538 if remaining.is_zero() {
3539 return Err(MobError::ReadyWaitTimedOut { pending_member_ids });
3540 }
3541
3542 tokio::time::sleep(std::cmp::min(remaining, Duration::from_millis(50))).await;
3543 }
3544 }
3545
3546 async fn wait_one_snapshot(
3547 &self,
3548 agent_identity: &MeerkatId,
3549 ) -> Result<MobMemberSnapshot, MobError> {
3550 loop {
3551 let snapshot = self
3552 .member_status(&AgentIdentity::from(agent_identity.as_str()))
3553 .await?;
3554 if snapshot.is_final {
3555 return Ok(snapshot);
3556 }
3557 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3558 }
3559 }
3560
3561 pub async fn member_status(
3568 &self,
3569 identity: &AgentIdentity,
3570 ) -> Result<MobMemberSnapshot, MobError> {
3571 if let Some(snapshot) = self.inflight_retiring_snapshot(identity).await {
3572 return Ok(snapshot);
3573 }
3574 let mut snapshot = match self
3575 .execute_machine_command(MobMachineCommand::MemberStatus {
3576 agent_identity: MeerkatId::from(identity),
3577 })
3578 .await?
3579 {
3580 MobMachineCommandResult::MemberStatus(snapshot) => snapshot,
3581 _ => {
3582 return Err(MobError::Internal(
3583 "unexpected command result variant".into(),
3584 ));
3585 }
3586 };
3587 self.apply_inflight_member_projection(identity, &mut snapshot)
3588 .await;
3589 snapshot.peer_connectivity = match tokio::time::timeout(
3590 Duration::from_secs(2),
3591 self.project_member_peer_connectivity(identity, &snapshot),
3592 )
3593 .await
3594 {
3595 Ok(connectivity) => connectivity,
3596 Err(_) => {
3597 tracing::warn!(
3598 agent_identity = %identity,
3599 "mob member status peer-connectivity projection timed out"
3600 );
3601 None
3602 }
3603 };
3604 snapshot.resolved_capabilities = self.project_resolved_capabilities(&snapshot).await;
3605 snapshot.external_member = self
3606 .project_external_member_observation(identity, &snapshot)
3607 .await;
3608 Ok(snapshot)
3609 }
3610
3611 async fn inflight_retiring_snapshot(
3612 &self,
3613 identity: &AgentIdentity,
3614 ) -> Option<MobMemberSnapshot> {
3615 let entry = {
3616 let roster = self.roster.read().await;
3617 roster.get(&MeerkatId::from(identity)).cloned()
3618 }?;
3619 if entry.state != crate::roster::MemberState::Retiring {
3620 return None;
3621 }
3622 Some(
3623 MobMemberSnapshot {
3624 status: MobMemberStatus::Retiring,
3625 agent_runtime_id: entry.agent_runtime_id,
3626 fence_token: entry.fence_token,
3627 output_preview: None,
3628 error: None,
3629 tokens_used: 0,
3630 is_final: false,
3631 current_session_id: None,
3632 current_bridge_session_id: None,
3633 peer_connectivity: None,
3634 kickoff: entry.kickoff,
3635 external_member: None,
3636 resolved_capabilities: None,
3637 }
3638 .with_current_bridge_session_id(entry.member_ref.bridge_session_id().cloned()),
3639 )
3640 }
3641
3642 async fn apply_inflight_member_projection(
3643 &self,
3644 identity: &AgentIdentity,
3645 snapshot: &mut MobMemberSnapshot,
3646 ) {
3647 if snapshot.status != MobMemberStatus::Unknown {
3648 return;
3649 }
3650 let is_retiring = {
3651 let roster = self.roster.read().await;
3652 roster
3653 .get(&MeerkatId::from(identity))
3654 .is_some_and(|entry| entry.state == crate::roster::MemberState::Retiring)
3655 };
3656 if is_retiring {
3657 snapshot.status = MobMemberStatus::Retiring;
3658 snapshot.is_final = false;
3659 }
3660 }
3661
3662 async fn project_member_peer_connectivity(
3663 &self,
3664 identity: &AgentIdentity,
3665 snapshot: &MobMemberSnapshot,
3666 ) -> Option<MobPeerConnectivitySnapshot> {
3667 let bridge_session_id = snapshot.current_bridge_session_id().cloned()?;
3668 let (entry, roster_snapshot) = {
3669 let roster = self.roster.read().await;
3670 (
3671 roster.get(&MeerkatId::from(identity)).cloned()?,
3672 roster.snapshot(),
3673 )
3674 };
3675 self.resolve_peer_connectivity(&entry, &bridge_session_id, &roster_snapshot)
3676 .await
3677 }
3678
3679 async fn project_resolved_capabilities(
3682 &self,
3683 snapshot: &MobMemberSnapshot,
3684 ) -> Option<meerkat_contracts::WireResolvedModelCapabilities> {
3685 #[cfg(feature = "runtime-adapter")]
3686 {
3687 use meerkat_runtime::service_ext::SessionServiceRuntimeExt as _;
3688 let session_id = snapshot.current_bridge_session_id().cloned()?;
3689 let runtime = self.runtime_adapter.as_ref()?.as_ref();
3690 runtime
3691 .resolved_session_llm_capabilities(&session_id)
3692 .await
3693 .ok()
3694 .flatten()
3695 .map(|surface| surface.to_wire_resolved())
3696 }
3697 #[cfg(not(feature = "runtime-adapter"))]
3698 {
3699 let _ = snapshot;
3700 None
3701 }
3702 }
3703
3704 async fn project_external_member_observation(
3705 &self,
3706 identity: &AgentIdentity,
3707 snapshot: &MobMemberSnapshot,
3708 ) -> Option<ExternalMemberObservationSnapshot> {
3709 let entry = {
3710 let roster = self.roster.read().await;
3711 roster.get(identity).cloned()
3712 }?;
3713 let MemberRef::BackendPeer {
3714 session_id,
3715 bootstrap_token,
3716 ..
3717 } = &entry.member_ref
3718 else {
3719 return None;
3720 };
3721
3722 let owner = ExternalMemberOwnerRef {
3723 mob_id: self.definition.id.clone(),
3724 agent_identity: identity.clone(),
3725 };
3726 let bridge_session_present = session_id.is_some();
3727 let has_bootstrap_token = bootstrap_token
3728 .as_ref()
3729 .is_some_and(|token| !token.is_empty());
3730 let binding_mode = if bridge_session_present {
3731 ExternalMemberBindingMode::BridgeSessionBacked
3732 } else {
3733 ExternalMemberBindingMode::PeerOnly
3734 };
3735 let reachability = match snapshot.status {
3736 MobMemberStatus::Broken => ExternalMemberReachability::Unavailable {
3737 reason: snapshot
3738 .error
3739 .clone()
3740 .unwrap_or_else(|| "external member restore failed".to_string()),
3741 },
3742 _ => ExternalMemberReachability::Unknown,
3743 };
3744 let rebind = match snapshot.status {
3745 MobMemberStatus::Broken => ExternalMemberRebindStatus::Failed {
3746 reason: snapshot
3747 .error
3748 .clone()
3749 .unwrap_or_else(|| "external member restore failed".to_string()),
3750 },
3751 _ if bridge_session_present => ExternalMemberRebindStatus::NotRequired,
3752 _ if has_bootstrap_token => ExternalMemberRebindStatus::Available,
3753 _ => ExternalMemberRebindStatus::Unavailable {
3754 reason: "missing bootstrap_token for supervisor rebind".to_string(),
3755 },
3756 };
3757
3758 Some(ExternalMemberObservationSnapshot {
3759 owner: owner.clone(),
3760 binding_mode,
3761 bridge_session_present,
3762 reachability,
3763 rebind,
3764 forwarding: ExternalMemberObservationSnapshot::forwarding(&owner),
3765 })
3766 }
3767
3768 pub async fn wait_for_kickoff_complete(
3776 &self,
3777 timeout: Option<Duration>,
3778 ) -> Result<Vec<(AgentIdentity, MobMemberSnapshot)>, MobError> {
3779 let target_ids = self
3780 .list_all_members()
3781 .await
3782 .into_iter()
3783 .map(|entry| entry.agent_identity)
3784 .collect::<Vec<_>>();
3785 let identities: Vec<AgentIdentity> = target_ids.clone();
3786 self.wait_for_kickoff_resolution(&target_ids, timeout)
3787 .await?;
3788
3789 let mut snapshots = Vec::with_capacity(identities.len());
3790 for identity in identities {
3791 snapshots.push((identity.clone(), self.member_status(&identity).await?));
3792 }
3793 Ok(snapshots)
3794 }
3795
3796 pub async fn wait_for_members_kickoff_complete(
3800 &self,
3801 ids: &[AgentIdentity],
3802 timeout: Option<Duration>,
3803 ) -> Result<Vec<(AgentIdentity, MobMemberSnapshot)>, MobError> {
3804 let target_meerkat_ids: Vec<MeerkatId> = ids.iter().map(MeerkatId::from).collect();
3805 self.wait_for_kickoff_resolution(&target_meerkat_ids, timeout)
3806 .await?;
3807
3808 let mut snapshots = Vec::with_capacity(ids.len());
3809 for identity in ids {
3810 snapshots.push((identity.clone(), self.member_status(identity).await?));
3811 }
3812 Ok(snapshots)
3813 }
3814
3815 pub async fn wait_for_ready(
3817 &self,
3818 timeout: Option<Duration>,
3819 ) -> Result<Vec<(AgentIdentity, MobMemberSnapshot)>, MobError> {
3820 let target_ids = self
3821 .list_all_members()
3822 .await
3823 .into_iter()
3824 .map(|entry| entry.agent_identity)
3825 .collect::<Vec<_>>();
3826 let identities: Vec<AgentIdentity> = target_ids.clone();
3827 self.wait_for_ready_resolution(&target_ids, timeout).await?;
3828
3829 let mut snapshots = Vec::with_capacity(identities.len());
3830 for identity in identities {
3831 snapshots.push((identity.clone(), self.member_status(&identity).await?));
3832 }
3833 Ok(snapshots)
3834 }
3835
3836 pub async fn wait_for_members_ready(
3838 &self,
3839 ids: &[AgentIdentity],
3840 timeout: Option<Duration>,
3841 ) -> Result<Vec<(AgentIdentity, MobMemberSnapshot)>, MobError> {
3842 let target_meerkat_ids: Vec<MeerkatId> = ids.iter().map(MeerkatId::from).collect();
3843 self.wait_for_ready_resolution(&target_meerkat_ids, timeout)
3844 .await?;
3845
3846 let mut snapshots = Vec::with_capacity(ids.len());
3847 for identity in ids {
3848 snapshots.push((identity.clone(), self.member_status(identity).await?));
3849 }
3850 Ok(snapshots)
3851 }
3852
3853 pub async fn wait_one(&self, identity: &AgentIdentity) -> Result<MobMemberSnapshot, MobError> {
3857 let meerkat_id = MeerkatId::from(identity);
3858 self.wait_one_snapshot(&meerkat_id).await
3859 }
3860
3861 pub async fn wait_all(
3863 &self,
3864 identities: &[AgentIdentity],
3865 ) -> Result<Vec<MobMemberSnapshot>, MobError> {
3866 let meerkat_ids: Vec<MeerkatId> = identities.iter().map(MeerkatId::from).collect();
3867 let futs = meerkat_ids
3868 .iter()
3869 .map(|mid| self.wait_one_snapshot(mid))
3870 .collect::<Vec<_>>();
3871 let results = futures::future::join_all(futs).await;
3872 results.into_iter().collect()
3873 }
3874
3875 pub async fn collect_completed(&self) -> Vec<(AgentIdentity, MobMemberSnapshot)> {
3877 let entries = self.list_all_members().await;
3878 let mut completed = Vec::new();
3879 for entry in entries {
3880 if let Ok(snapshot) = self.member_status(&entry.agent_identity).await
3881 && snapshot.is_final
3882 {
3883 completed.push((entry.agent_identity, snapshot));
3884 }
3885 }
3886 completed
3887 }
3888
3889 pub async fn spawn_helper(
3895 &self,
3896 identity: AgentIdentity,
3897 task: impl Into<String>,
3898 options: HelperOptions,
3899 ) -> Result<HelperResult, MobError> {
3900 let profile_name = options
3901 .role_name
3902 .or_else(|| self.definition.profiles.keys().next().cloned())
3903 .ok_or_else(|| {
3904 MobError::Internal("no profile specified and definition has no profiles".into())
3905 })?;
3906 let task_text = task.into();
3907 let meerkat_id = MeerkatId::from(&identity);
3908 let mut spec = SpawnMemberSpec::new(profile_name, identity.clone());
3909 spec.initial_message = Some(task_text.into());
3910 spec.runtime_mode = Some(
3911 options
3912 .runtime_mode
3913 .unwrap_or(crate::MobRuntimeMode::TurnDriven),
3914 );
3915 spec.backend = options.backend;
3916 spec.tool_access_policy = options.tool_access_policy;
3917 spec.auth_binding = options.auth_binding;
3918 spec.inherited_tool_filter = options.inherited_tool_filter;
3919 spec.override_profile = options.override_profile;
3920 spec.model_override = options.model_override;
3921 spec.provider_params_override = options.provider_params_override;
3922 spec.auto_wire_parent = true;
3923
3924 self.spawn_spec_internal_with_source(spec, SpawnSource::HelperSpawn)
3925 .await?;
3926 let helper_snapshot = self.member_status(&identity).await?;
3927 let _ = self.retire(identity).await;
3928
3929 Ok(HelperResult {
3930 output: helper_snapshot.output_preview,
3931 tokens_used: helper_snapshot.tokens_used,
3932 agent_identity: helper_snapshot.agent_runtime_id.identity.clone(),
3933 agent_runtime_id: helper_snapshot.agent_runtime_id,
3934 fence_token: helper_snapshot.fence_token,
3935 })
3936 }
3937
3938 pub async fn fork_helper(
3943 &self,
3944 source_identity: &AgentIdentity,
3945 identity: AgentIdentity,
3946 task: impl Into<String>,
3947 fork_context: crate::launch::ForkContext,
3948 options: HelperOptions,
3949 ) -> Result<HelperResult, MobError> {
3950 let profile_name = options
3951 .role_name
3952 .or_else(|| self.definition.profiles.keys().next().cloned())
3953 .ok_or_else(|| {
3954 MobError::Internal("no profile specified and definition has no profiles".into())
3955 })?;
3956 let task_text = task.into();
3957 let meerkat_id = MeerkatId::from(&identity);
3958 let source_member_id = MeerkatId::from(source_identity);
3959 let mut spec = SpawnMemberSpec::new(profile_name, identity.clone());
3960 spec.initial_message = Some(task_text.into());
3961 spec.runtime_mode = Some(
3962 options
3963 .runtime_mode
3964 .unwrap_or(crate::MobRuntimeMode::TurnDriven),
3965 );
3966 spec.backend = options.backend;
3967 spec.tool_access_policy = options.tool_access_policy;
3968 spec.auth_binding = options.auth_binding;
3969 spec.inherited_tool_filter = options.inherited_tool_filter;
3970 spec.override_profile = options.override_profile;
3971 spec.model_override = options.model_override;
3972 spec.provider_params_override = options.provider_params_override;
3973 spec.auto_wire_parent = true;
3974 spec.launch_mode = crate::launch::MemberLaunchMode::Fork {
3975 source_member_id,
3976 fork_context,
3977 };
3978
3979 self.spawn_spec_internal_with_source(spec, SpawnSource::Fork)
3980 .await?;
3981 let helper_snapshot = self.member_status(&identity).await?;
3982 let _ = self.retire(identity).await;
3983
3984 Ok(HelperResult {
3985 output: helper_snapshot.output_preview,
3986 tokens_used: helper_snapshot.tokens_used,
3987 agent_identity: helper_snapshot.agent_runtime_id.identity.clone(),
3988 agent_runtime_id: helper_snapshot.agent_runtime_id,
3989 fence_token: helper_snapshot.fence_token,
3990 })
3991 }
3992
3993 pub(crate) async fn project_machine_input(
3994 &self,
3995 input: crate::machines::mob_machine::MobMachineInput,
3996 ) -> Result<crate::machines::mob_machine::MobMachineState, MobError> {
3997 self.send_actor_command(|reply_tx| MobCommand::ProjectMachineInput {
3998 input: Box::new(input),
3999 reply_tx,
4000 })
4001 .await?
4002 }
4003
4004 pub(super) async fn commit_flow_run_command(
4005 &self,
4006 run_id: &RunId,
4007 command: MobMachineFlowRunCommand,
4008 context: &'static str,
4009 ) -> Result<Option<Vec<flow_run::Effect>>, MobError> {
4010 self.send_actor_command(|reply_tx| MobCommand::CommitFlowRunCommand {
4011 run_id: run_id.clone(),
4012 command: Box::new(command),
4013 context,
4014 reply_tx,
4015 })
4016 .await?
4017 }
4018
4019 pub(super) async fn commit_flow_terminalization(
4020 &self,
4021 run_id: RunId,
4022 flow_id: FlowId,
4023 target: TerminalizationTarget,
4024 command: MobMachineFlowRunCommand,
4025 context: &'static str,
4026 ) -> Result<TerminalizationOutcome, MobError> {
4027 self.send_actor_command(|reply_tx| MobCommand::CommitFlowTerminalization {
4028 run_id,
4029 flow_id,
4030 target,
4031 command: Box::new(command),
4032 context,
4033 reply_tx,
4034 })
4035 .await?
4036 }
4037
4038 pub(super) async fn commit_flow_frame_store_plan(
4039 &self,
4040 run_id: &RunId,
4041 plan: FlowFrameLoopStorePlan,
4042 ) -> Result<bool, MobError> {
4043 self.send_actor_command(|reply_tx| MobCommand::CommitFlowFrameStorePlan {
4044 run_id: run_id.clone(),
4045 plan: Box::new(plan),
4046 reply_tx,
4047 })
4048 .await?
4049 }
4050
4051 pub(crate) async fn preview_machine_input(
4052 &self,
4053 input: crate::machines::mob_machine::MobMachineInput,
4054 ) -> Result<crate::machines::mob_machine::MobMachineState, MobError> {
4055 self.send_actor_command(|reply_tx| MobCommand::PreviewMachineInput {
4056 input: Box::new(input),
4057 reply_tx,
4058 })
4059 .await?
4060 }
4061
4062 pub(crate) async fn query_machine_state(
4063 &self,
4064 ) -> Result<crate::machines::mob_machine::MobMachineState, MobError> {
4065 self.send_actor_command(|reply_tx| MobCommand::QueryMachineState { reply_tx })
4066 .await
4067 }
4068}
4069
4070impl MemberHandle {
4071 pub fn identity(&self) -> AgentIdentity {
4073 AgentIdentity::from(self.agent_identity.as_str())
4074 }
4075
4076 pub async fn send(
4078 &self,
4079 content: impl Into<meerkat_core::types::ContentInput>,
4080 handling_mode: HandlingMode,
4081 ) -> Result<MemberDeliveryReceipt, MobError> {
4082 self.send_with_render_metadata(content, handling_mode, None)
4083 .await
4084 }
4085
4086 pub async fn send_with_render_metadata(
4088 &self,
4089 content: impl Into<meerkat_core::types::ContentInput>,
4090 handling_mode: HandlingMode,
4091 render_metadata: Option<RenderMetadata>,
4092 ) -> Result<MemberDeliveryReceipt, MobError> {
4093 self.mob
4094 .external_turn_for_member(
4095 self.agent_identity.clone(),
4096 content.into(),
4097 handling_mode,
4098 render_metadata,
4099 )
4100 .await?;
4101 let snapshot = self
4102 .mob
4103 .member_status(&AgentIdentity::from(self.agent_identity.as_str()))
4104 .await?;
4105 Ok(MemberDeliveryReceipt {
4106 identity: self.identity(),
4107 agent_runtime_id: snapshot.agent_runtime_id,
4108 fence_token: snapshot.fence_token,
4109 handling_mode,
4110 })
4111 }
4112
4113 pub async fn send_peer_message(
4118 &self,
4119 to: AgentIdentity,
4120 content: impl Into<meerkat_core::types::ContentInput>,
4121 handling_mode: HandlingMode,
4122 ) -> Result<PeerMessageReceipt, MobError> {
4123 self.mob
4124 .send_peer_message(self.identity(), to, content, handling_mode)
4125 .await
4126 }
4127
4128 pub async fn internal_turn(
4130 &self,
4131 content: impl Into<meerkat_core::types::ContentInput>,
4132 ) -> Result<MemberDeliveryReceipt, MobError> {
4133 self.mob
4134 .internal_turn_for_member(self.agent_identity.clone(), content.into())
4135 .await?;
4136 let snapshot = self
4137 .mob
4138 .member_status(&AgentIdentity::from(self.agent_identity.as_str()))
4139 .await?;
4140 Ok(MemberDeliveryReceipt {
4141 identity: self.identity(),
4142 agent_runtime_id: snapshot.agent_runtime_id,
4143 fence_token: snapshot.fence_token,
4144 handling_mode: HandlingMode::Queue,
4145 })
4146 }
4147
4148 #[cfg(test)]
4150 pub(crate) async fn current_bridge_session_id(&self) -> Result<Option<SessionId>, MobError> {
4151 let status = self.status().await?;
4152 Ok(status.current_bridge_session_id().cloned())
4153 }
4154
4155 pub async fn status(&self) -> Result<MobMemberSnapshot, MobError> {
4157 self.mob.member_status(&self.identity()).await
4158 }
4159
4160 pub async fn events(&self) -> Result<EventStream, MobError> {
4162 self.mob.subscribe_agent_events(&self.identity()).await
4163 }
4164}
4165
4166#[cfg(test)]
4167#[allow(clippy::expect_used)]
4168mod tests {
4169 use super::*;
4170 use crate::ids::Generation;
4171
4172 #[test]
4173 fn member_projection_types_omit_bridge_session_fields_in_serialized_output() {
4174 let sid = SessionId::new();
4175
4176 let snapshot = MobMemberSnapshot {
4177 status: MobMemberStatus::Active,
4178 agent_runtime_id: AgentRuntimeId::initial(AgentIdentity::from("worker")),
4179 fence_token: FenceToken::new(0),
4180 output_preview: None,
4181 error: None,
4182 tokens_used: 0,
4183 is_final: false,
4184 current_session_id: None,
4185 current_bridge_session_id: None,
4186 peer_connectivity: None,
4187 kickoff: None,
4188 external_member: None,
4189 resolved_capabilities: None,
4190 }
4191 .with_current_bridge_session_id(Some(sid.clone()));
4192 let snapshot_value =
4193 serde_json::to_value(&snapshot).expect("snapshot should serialize to json");
4194 assert!(snapshot_value.get("current_bridge_session_id").is_none());
4196 assert!(snapshot_value.get("agent_runtime_id").is_none());
4202 assert!(snapshot_value.get("fence_token").is_none());
4203 }
4204
4205 #[test]
4206 fn mob_member_snapshot_exposes_runtime_identity_only_by_accessor() {
4207 let runtime_id = AgentRuntimeId::new(AgentIdentity::from("worker"), Generation::new(3));
4208 let snapshot = MobMemberSnapshot {
4209 status: MobMemberStatus::Active,
4210 agent_runtime_id: runtime_id.clone(),
4211 fence_token: FenceToken::new(9),
4212 output_preview: None,
4213 error: None,
4214 tokens_used: 0,
4215 is_final: false,
4216 current_session_id: None,
4217 current_bridge_session_id: None,
4218 peer_connectivity: None,
4219 kickoff: None,
4220 external_member: None,
4221 resolved_capabilities: None,
4222 };
4223
4224 let snapshot_value =
4225 serde_json::to_value(&snapshot).expect("snapshot should serialize to json");
4226 assert!(snapshot_value.get("agent_runtime_id").is_none());
4227 assert!(snapshot_value.get("fence_token").is_none());
4228
4229 let (projected_runtime_id, projected_fence_token) = snapshot.runtime_identity_fields();
4230 assert_eq!(projected_runtime_id, &runtime_id);
4231 assert_eq!(projected_fence_token, FenceToken::new(9));
4232 }
4233
4234 #[test]
4235 fn mob_member_snapshot_exposes_agent_identity_convenience() {
4236 let snapshot = MobMemberSnapshot {
4240 status: MobMemberStatus::Active,
4241 agent_runtime_id: AgentRuntimeId::initial(AgentIdentity::from("singer")),
4242 fence_token: FenceToken::new(0),
4243 output_preview: None,
4244 error: None,
4245 tokens_used: 0,
4246 is_final: false,
4247 current_session_id: None,
4248 current_bridge_session_id: None,
4249 peer_connectivity: None,
4250 kickoff: None,
4251 external_member: None,
4252 resolved_capabilities: None,
4253 };
4254 assert_eq!(
4255 snapshot.agent_identity(),
4256 &AgentIdentity::from("singer"),
4257 "agent_identity() must return the canonical identity without requiring callers to reach through agent_runtime_id",
4258 );
4259 }
4260
4261 #[test]
4262 fn canonical_member_material_populates_bridge_binding_from_canonical_state() {
4263 let sid = SessionId::new();
4264 let snapshot = CanonicalMemberSnapshotMaterial {
4265 member_present: true,
4266 status: CanonicalMemberStatus::Active,
4267 is_terminal: false,
4268 error: None,
4269 output_preview: None,
4270 tokens_used: 0,
4271 agent_runtime_id: AgentRuntimeId::initial(AgentIdentity::from("worker")),
4272 fence_token: FenceToken::new(0),
4273 current_bridge_session_id: Some(sid.clone()),
4274 peer_connectivity: None,
4275 kickoff: None,
4276 }
4277 .to_snapshot();
4278
4279 assert_eq!(snapshot.current_bridge_session_id(), Some(&sid));
4280 assert_eq!(snapshot.current_bridge_session_id, Some(sid));
4281 }
4282
4283 #[test]
4284 fn member_receipt_types_omit_bridge_session_fields_in_serialized_output() {
4285 let runtime_id = AgentRuntimeId::new(AgentIdentity::from("worker"), Generation::new(1));
4286 let receipt = MemberRespawnReceipt::new(
4287 AgentIdentity::from("worker"),
4288 runtime_id.clone(),
4289 FenceToken::new(7),
4290 FenceToken::new(8),
4291 );
4292 let receipt_value =
4293 serde_json::to_value(&receipt).expect("respawn receipt should serialize to json");
4294 assert_eq!(receipt_value["identity"], "worker");
4300 assert!(receipt_value.get("agent_runtime_id").is_none());
4301 assert!(receipt_value.get("previous_fence_token").is_none());
4302 assert!(receipt_value.get("fence_token").is_none());
4303
4304 let delivery = MemberDeliveryReceipt {
4305 identity: AgentIdentity::from("worker"),
4306 agent_runtime_id: runtime_id,
4307 fence_token: FenceToken::new(8),
4308 handling_mode: HandlingMode::Queue,
4309 };
4310 let delivery_value =
4311 serde_json::to_value(&delivery).expect("delivery receipt should serialize to json");
4312 assert_eq!(delivery_value["identity"], "worker");
4313 assert!(delivery_value.get("agent_runtime_id").is_none());
4314 assert!(delivery_value.get("fence_token").is_none());
4315 }
4316
4317 #[test]
4318 fn helper_result_omits_binding_era_atoms_in_serialized_output() {
4319 let runtime_id = AgentRuntimeId::new(AgentIdentity::from("worker"), Generation::new(2));
4320 let result = HelperResult {
4321 output: Some("done".to_string()),
4322 tokens_used: 7,
4323 agent_identity: AgentIdentity::from("worker"),
4324 agent_runtime_id: runtime_id.clone(),
4325 fence_token: FenceToken::new(9),
4326 };
4327
4328 let value = serde_json::to_value(&result).expect("helper result should serialize to json");
4329 assert_eq!(value["agent_identity"], "worker");
4335 assert_eq!(value["tokens_used"], 7);
4336 assert!(value.get("agent_runtime_id").is_none());
4337 assert!(value.get("fence_token").is_none());
4338 assert!(value.get("session_id").is_none());
4339 assert!(value.get("bridge_session_id").is_none());
4340 }
4341
4342 #[test]
4343 fn spawn_member_spec_resume_bridge_session_accessors_stay_additive() {
4344 let sid = SessionId::new();
4345 let spec =
4346 SpawnMemberSpec::new("worker", "worker-1").with_resume_bridge_session_id(sid.clone());
4347
4348 assert_eq!(spec.launch_mode.resume_bridge_session_id(), Some(&sid));
4349 assert_eq!(spec.launch_mode.resume_bridge_session_id(), Some(&sid));
4350 }
4351
4352 #[test]
4353 fn spawn_source_launch_mode_classification_is_surface_independent() {
4354 let sid = SessionId::new();
4355 let resume = crate::launch::MemberLaunchMode::Resume {
4356 bridge_session_id: sid,
4357 };
4358 let fork = crate::launch::MemberLaunchMode::Fork {
4359 source_member_id: MeerkatId::from("lead-1"),
4360 fork_context: crate::launch::ForkContext::LastMessages { count: 1 },
4361 };
4362
4363 assert_eq!(
4364 SpawnSource::for_launch_mode(SpawnSource::Consumer, &resume),
4365 SpawnSource::Resume
4366 );
4367 assert_eq!(
4368 SpawnSource::for_launch_mode(SpawnSource::AgentSpawnMember, &resume),
4369 SpawnSource::Resume
4370 );
4371 assert_eq!(
4372 SpawnSource::for_launch_mode(SpawnSource::BatchItem, &fork),
4373 SpawnSource::Fork
4374 );
4375 assert_eq!(
4376 SpawnSource::for_launch_mode(
4377 SpawnSource::AgentSpawnMember,
4378 &crate::launch::MemberLaunchMode::Fresh,
4379 ),
4380 SpawnSource::AgentSpawnMember
4381 );
4382 }
4383}