1use super::*;
2use crate::MobRuntimeMode;
3use crate::roster::MobMemberKickoffSnapshot;
4use crate::runtime::mob_member_lifecycle_authority::{
5 CanonicalMemberSnapshotMaterial, CanonicalMemberStatus, CanonicalSessionObservation,
6 MobMemberLifecycleAuthority, MobMemberLifecycleInput, MobMemberTerminalClass,
7};
8#[cfg(target_arch = "wasm32")]
9use crate::tokio;
10use futures::stream::{FuturesUnordered, StreamExt};
11use meerkat_core::comms::{
12 PeerDirectoryEntry, PeerReachability, PeerReachabilityReason, TrustedPeerSpec,
13};
14use meerkat_core::ops::OperationId;
15use meerkat_core::ops_lifecycle::OpsLifecycleRegistry;
16use meerkat_core::service::{MobToolAuthorityContext, SessionError};
17use meerkat_core::time_compat::Instant;
18use meerkat_core::types::{HandlingMode, RenderMetadata, SessionId};
19use serde::{Deserialize, Serialize};
20use std::collections::BTreeMap;
21use std::collections::BTreeSet;
22use std::collections::HashMap;
23use std::time::Duration;
24
25const DEFAULT_KICKOFF_WAIT_TIMEOUT: Duration = Duration::from_secs(600);
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28enum PeerConnectivityProjection {
29 Omit,
30 Include,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34enum SessionObservationProjection {
35 LiveOnly,
36 Full,
37}
38
39#[derive(Debug, Clone, Serialize)]
41#[non_exhaustive]
42pub struct MobMemberSnapshot {
43 pub status: MobMemberStatus,
45 pub output_preview: Option<String>,
47 pub error: Option<String>,
49 pub tokens_used: u64,
51 pub is_final: bool,
53 pub current_session_id: Option<SessionId>,
55 #[serde(skip_serializing_if = "Option::is_none")]
57 pub peer_connectivity: Option<MobPeerConnectivitySnapshot>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub kickoff: Option<MobMemberKickoffSnapshot>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[non_exhaustive]
65pub struct MobMemberListEntry {
66 pub meerkat_id: MeerkatId,
67 pub profile: ProfileName,
68 pub member_ref: MemberRef,
69 pub runtime_mode: MobRuntimeMode,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub peer_id: Option<String>,
72 #[serde(default)]
73 pub state: crate::roster::MemberState,
74 pub wired_to: BTreeSet<MeerkatId>,
75 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
76 pub external_peer_specs: BTreeMap<MeerkatId, TrustedPeerSpec>,
77 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
78 pub labels: BTreeMap<String, String>,
79 pub status: MobMemberStatus,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub error: Option<String>,
82 pub is_final: bool,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub current_session_id: Option<SessionId>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub kickoff: Option<MobMemberKickoffSnapshot>,
87}
88
89impl MobMemberListEntry {
90 pub fn session_id(&self) -> Option<&SessionId> {
91 self.member_ref.session_id()
92 }
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97#[non_exhaustive]
98pub struct MobPeerConnectivitySnapshot {
99 pub reachable_peer_count: usize,
100 pub unknown_peer_count: usize,
101 pub unreachable_peers: Vec<MobUnreachablePeer>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106#[non_exhaustive]
107pub struct MobUnreachablePeer {
108 pub peer: String,
109 pub reason: Option<PeerReachabilityReason>,
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
114#[serde(rename_all = "snake_case")]
115#[non_exhaustive]
116pub enum MobMemberStatus {
117 Active,
119 Retiring,
121 Broken,
123 Completed,
125 Unknown,
127}
128
129#[derive(Debug, Clone, Serialize)]
131#[non_exhaustive]
132pub struct MemberRespawnReceipt {
133 pub member_id: MeerkatId,
135 pub old_session_id: Option<SessionId>,
137 pub new_session_id: Option<SessionId>,
139}
140
141impl MemberRespawnReceipt {
142 pub fn new(
143 member_id: MeerkatId,
144 old_session_id: Option<SessionId>,
145 new_session_id: Option<SessionId>,
146 ) -> Self {
147 Self {
148 member_id,
149 old_session_id,
150 new_session_id,
151 }
152 }
153}
154
155#[derive(Debug, Clone, Serialize)]
157#[non_exhaustive]
158pub(crate) struct MemberSpawnReceipt {
159 pub(crate) member_ref: MemberRef,
161 pub(crate) operation_id: OperationId,
163}
164
165#[derive(Clone)]
166pub(crate) struct CanonicalOpsOwnerContext {
167 pub(crate) owner_session_id: SessionId,
168 pub(crate) ops_registry: Arc<dyn OpsLifecycleRegistry>,
169}
170
171#[derive(Debug, thiserror::Error)]
173#[non_exhaustive]
174pub enum MobRespawnError {
175 #[error("no current session bridge for member {member_id}")]
177 NoSessionBridge { member_id: MeerkatId },
178
179 #[error("spawn failed after retire for member {member_id}: {reason}")]
181 SpawnAfterRetire {
182 member_id: MeerkatId,
183 reason: String,
184 },
185
186 #[error("topology restore failed for member {}: {} peer(s) failed", receipt.member_id, failed_peer_ids.len())]
189 TopologyRestoreFailed {
190 receipt: MemberRespawnReceipt,
191 failed_peer_ids: Vec<MeerkatId>,
192 },
193
194 #[error(transparent)]
196 Mob(#[from] MobError),
197}
198
199#[derive(Debug, Clone, Serialize)]
201#[non_exhaustive]
202pub struct MemberDeliveryReceipt {
203 pub member_id: MeerkatId,
205 pub session_id: SessionId,
207 pub handling_mode: HandlingMode,
209}
210
211#[derive(Debug, Clone, Serialize)]
213#[non_exhaustive]
214pub struct MemberSessionRef {
215 pub member_id: MeerkatId,
217 pub session_id: SessionId,
219}
220
221#[derive(Debug, Clone, Default)]
223#[non_exhaustive]
224pub struct HelperOptions {
225 pub role_name: Option<ProfileName>,
227 pub runtime_mode: Option<crate::MobRuntimeMode>,
229 pub backend: Option<MobBackendKind>,
231 pub tool_access_policy: Option<meerkat_core::ops::ToolAccessPolicy>,
233}
234
235#[derive(Debug, Clone)]
237#[non_exhaustive]
238pub struct HelperResult {
239 pub output: Option<String>,
241 pub tokens_used: u64,
243 pub session_id: Option<meerkat_core::types::SessionId>,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
249#[serde(rename_all = "snake_case")]
250pub enum PeerTarget {
251 Local(MeerkatId),
253 External(TrustedPeerSpec),
255}
256
257impl From<MeerkatId> for PeerTarget {
258 fn from(value: MeerkatId) -> Self {
259 Self::Local(value)
260 }
261}
262
263struct MobMemberTerminalClassifier;
264
265impl MobMemberTerminalClassifier {
266 fn classify(material: &CanonicalMemberSnapshotMaterial) -> MobMemberTerminalClass {
267 if !material.member_present {
268 return MobMemberTerminalClass::TerminalUnknown;
269 }
270 match material.status {
271 CanonicalMemberStatus::Retiring => MobMemberTerminalClass::Running,
274 CanonicalMemberStatus::Broken => MobMemberTerminalClass::TerminalFailure,
275 CanonicalMemberStatus::Active => match material.session_observation {
276 CanonicalSessionObservation::Active
277 | CanonicalSessionObservation::Inactive
278 | CanonicalSessionObservation::Unknown => MobMemberTerminalClass::Running,
279 CanonicalSessionObservation::Missing => MobMemberTerminalClass::TerminalCompleted,
280 },
281 CanonicalMemberStatus::Completed => MobMemberTerminalClass::TerminalCompleted,
282 CanonicalMemberStatus::Unknown => MobMemberTerminalClass::TerminalUnknown,
283 }
284 }
285
286 fn is_terminal(material: &CanonicalMemberSnapshotMaterial) -> bool {
287 matches!(
288 Self::classify(material),
289 MobMemberTerminalClass::TerminalFailure
290 | MobMemberTerminalClass::TerminalUnknown
291 | MobMemberTerminalClass::TerminalCompleted
292 )
293 }
294
295 fn has_canonical_member(material: &CanonicalMemberSnapshotMaterial) -> bool {
296 material.member_present
297 }
298}
299
300#[derive(Clone)]
310pub struct MobHandle {
311 pub(super) command_tx: mpsc::Sender<MobCommand>,
312 pub(super) roster: Arc<RwLock<RosterAuthority>>,
313 pub(super) task_board: Arc<RwLock<TaskBoard>>,
314 pub(super) definition: Arc<MobDefinition>,
315 pub(super) state: Arc<AtomicU8>,
316 pub(super) events: Arc<dyn MobEventStore>,
317 pub(super) mcp_servers: Arc<tokio::sync::Mutex<BTreeMap<String, actor::McpServerEntry>>>,
318 pub(super) flow_streams:
319 Arc<tokio::sync::Mutex<BTreeMap<RunId, mpsc::Sender<meerkat_core::ScopedAgentEvent>>>>,
320 pub(super) session_service: Arc<dyn MobSessionService>,
321 pub(super) restore_diagnostics: Arc<RwLock<HashMap<MeerkatId, RestoreFailureDiagnostic>>>,
322}
323
324#[derive(Debug, Clone)]
325pub(crate) struct RestoreFailureDiagnostic {
326 pub(crate) session_id: SessionId,
327 pub(crate) reason: String,
328}
329
330#[derive(Clone)]
336pub struct MemberHandle {
337 mob: MobHandle,
338 meerkat_id: MeerkatId,
339}
340
341#[derive(Clone)]
342pub struct MobEventsView {
343 inner: Arc<dyn MobEventStore>,
344}
345
346#[derive(Clone, Debug)]
348#[non_exhaustive]
349pub struct SpawnMemberSpec {
350 pub role_name: ProfileName,
355 pub meerkat_id: MeerkatId,
356 pub initial_message: Option<ContentInput>,
357 pub runtime_mode: Option<crate::MobRuntimeMode>,
358 pub backend: Option<MobBackendKind>,
359 pub binding: Option<crate::RuntimeBinding>,
363 pub context: Option<serde_json::Value>,
365 pub labels: Option<std::collections::BTreeMap<String, String>>,
367 pub launch_mode: crate::launch::MemberLaunchMode,
369 pub tool_access_policy: Option<meerkat_core::ops::ToolAccessPolicy>,
371 pub budget_split_policy: Option<crate::launch::BudgetSplitPolicy>,
373 pub auto_wire_parent: bool,
375 pub additional_instructions: Option<Vec<String>>,
377 pub shell_env: Option<std::collections::HashMap<String, String>>,
379 pub inherited_tool_filter: Option<meerkat_core::tool_scope::ToolFilter>,
384 pub override_profile: Option<crate::profile::Profile>,
391}
392
393impl SpawnMemberSpec {
394 pub fn new(profile: impl Into<ProfileName>, meerkat_id: impl Into<MeerkatId>) -> Self {
395 Self {
396 role_name: profile.into(),
397 meerkat_id: meerkat_id.into(),
398 initial_message: None,
399 runtime_mode: None,
400 backend: None,
401 binding: None,
402 context: None,
403 labels: None,
404 launch_mode: crate::launch::MemberLaunchMode::Fresh,
405 tool_access_policy: None,
406 budget_split_policy: None,
407 auto_wire_parent: false,
408 additional_instructions: None,
409 shell_env: None,
410 inherited_tool_filter: None,
411 override_profile: None,
412 }
413 }
414
415 pub fn with_shell_env(mut self, env: std::collections::HashMap<String, String>) -> Self {
416 self.shell_env = Some(env);
417 self
418 }
419
420 pub fn with_initial_message(mut self, message: impl Into<ContentInput>) -> Self {
421 self.initial_message = Some(message.into());
422 self
423 }
424
425 pub fn with_runtime_mode(mut self, mode: crate::MobRuntimeMode) -> Self {
426 self.runtime_mode = Some(mode);
427 self
428 }
429
430 pub fn with_backend(mut self, backend: MobBackendKind) -> Self {
431 self.backend = Some(backend);
432 self
433 }
434
435 pub fn with_context(mut self, context: serde_json::Value) -> Self {
436 self.context = Some(context);
437 self
438 }
439
440 pub fn with_labels(mut self, labels: std::collections::BTreeMap<String, String>) -> Self {
441 self.labels = Some(labels);
442 self
443 }
444
445 pub fn with_resume_session_id(mut self, id: meerkat_core::types::SessionId) -> Self {
450 self.launch_mode = crate::launch::MemberLaunchMode::Resume { session_id: id };
451 self
452 }
453
454 pub fn with_launch_mode(mut self, mode: crate::launch::MemberLaunchMode) -> Self {
455 self.launch_mode = mode;
456 self
457 }
458
459 pub fn with_tool_access_policy(mut self, policy: meerkat_core::ops::ToolAccessPolicy) -> Self {
460 self.tool_access_policy = Some(policy);
461 self
462 }
463
464 pub fn with_budget_split_policy(mut self, policy: crate::launch::BudgetSplitPolicy) -> Self {
465 self.budget_split_policy = Some(policy);
466 self
467 }
468
469 pub fn with_auto_wire_parent(mut self, auto_wire: bool) -> Self {
470 self.auto_wire_parent = auto_wire;
471 self
472 }
473
474 pub fn with_additional_instructions(mut self, instructions: Vec<String>) -> Self {
475 self.additional_instructions = Some(instructions);
476 self
477 }
478
479 pub fn from_wire(
480 profile: String,
481 meerkat_id: String,
482 initial_message: Option<ContentInput>,
483 runtime_mode: Option<crate::MobRuntimeMode>,
484 backend: Option<MobBackendKind>,
485 ) -> Self {
486 let mut spec = Self::new(profile, meerkat_id);
487 spec.initial_message = initial_message;
488 spec.runtime_mode = runtime_mode;
489 spec.backend = backend;
490 spec
491 }
492
493 pub fn resume_session_id(&self) -> Option<&meerkat_core::types::SessionId> {
495 match &self.launch_mode {
496 crate::launch::MemberLaunchMode::Resume { session_id } => Some(session_id),
497 _ => None,
498 }
499 }
500}
501
502impl MobEventsView {
503 pub async fn poll(
504 &self,
505 after_cursor: u64,
506 limit: usize,
507 ) -> Result<Vec<crate::event::MobEvent>, MobError> {
508 self.inner
509 .poll(after_cursor, limit)
510 .await
511 .map_err(MobError::from)
512 }
513
514 pub async fn replay_all(&self) -> Result<Vec<crate::event::MobEvent>, MobError> {
515 self.inner.replay_all().await.map_err(MobError::from)
516 }
517}
518
519impl MobHandle {
520 async fn restore_failure_for(
521 &self,
522 meerkat_id: &MeerkatId,
523 ) -> Option<RestoreFailureDiagnostic> {
524 self.restore_diagnostics
525 .read()
526 .await
527 .get(meerkat_id)
528 .cloned()
529 }
530
531 fn restore_failure_error(meerkat_id: &MeerkatId, diag: RestoreFailureDiagnostic) -> MobError {
532 MobError::MemberRestoreFailed {
533 member_id: meerkat_id.clone(),
534 session_id: diag.session_id,
535 reason: diag.reason,
536 }
537 }
538
539 pub async fn poll_events(
541 &self,
542 after_cursor: u64,
543 limit: usize,
544 ) -> Result<Vec<crate::event::MobEvent>, MobError> {
545 self.events
546 .poll(after_cursor, limit)
547 .await
548 .map_err(MobError::from)
549 }
550
551 pub fn status(&self) -> MobState {
553 MobState::from_u8(self.state.load(Ordering::Acquire))
554 }
555
556 pub fn definition(&self) -> &MobDefinition {
558 &self.definition
559 }
560
561 pub fn mob_id(&self) -> &MobId {
563 &self.definition.id
564 }
565
566 pub async fn roster(&self) -> Roster {
568 self.roster.read().await.snapshot()
569 }
570
571 fn derived_comms_name(&self, entry: &RosterEntry) -> String {
572 format!(
573 "{}/{}/{}",
574 self.definition.id, entry.profile, entry.meerkat_id
575 )
576 }
577
578 async fn resolve_peer_connectivity(
579 &self,
580 entry: &RosterEntry,
581 session_id: &SessionId,
582 roster_snapshot: &Roster,
583 ) -> Option<MobPeerConnectivitySnapshot> {
584 let comms = self.session_service.comms_runtime(session_id).await?;
585 let peers = comms.peers().await;
586 let peers_by_id: HashMap<&str, &PeerDirectoryEntry> = peers
587 .iter()
588 .map(|peer| (peer.peer_id.as_str(), peer))
589 .collect();
590 let peers_by_name: HashMap<&str, &PeerDirectoryEntry> = peers
591 .iter()
592 .map(|peer| (peer.name.as_str(), peer))
593 .collect();
594
595 let mut reachable_peer_count = 0usize;
596 let mut unknown_peer_count = 0usize;
597 let mut unreachable_peers = Vec::new();
598
599 for wired_peer in &entry.wired_to {
600 let matched = if let Some(spec) = entry.external_peer_specs.get(wired_peer) {
601 peers_by_id
602 .get(spec.peer_id.as_str())
603 .copied()
604 .or_else(|| peers_by_name.get(spec.name.as_str()).copied())
605 } else {
606 let local_entry = roster_snapshot.get(wired_peer);
607 let live_peer_id =
608 match local_entry.and_then(|peer_entry| peer_entry.member_ref.session_id()) {
609 Some(target_session_id) => self
610 .session_service
611 .comms_runtime(target_session_id)
612 .await
613 .and_then(|runtime| runtime.public_key()),
614 None => None,
615 };
616 live_peer_id
617 .as_deref()
618 .and_then(|peer_id| peers_by_id.get(peer_id).copied())
619 .or_else(|| {
620 local_entry
621 .and_then(|peer_entry| peer_entry.peer_id.as_deref())
622 .and_then(|peer_id| peers_by_id.get(peer_id).copied())
623 })
624 .or_else(|| {
625 local_entry
626 .map(|peer_entry| self.derived_comms_name(peer_entry))
627 .and_then(|name| peers_by_name.get(name.as_str()).copied())
628 })
629 };
630
631 match matched {
632 Some(peer) => match peer.reachability {
633 PeerReachability::Reachable => reachable_peer_count += 1,
634 PeerReachability::Unknown => unknown_peer_count += 1,
635 PeerReachability::Unreachable => unreachable_peers.push(MobUnreachablePeer {
636 peer: peer.name.as_string(),
637 reason: peer.last_unreachable_reason,
638 }),
639 },
640 None => unknown_peer_count += 1,
641 }
642 }
643
644 Some(MobPeerConnectivitySnapshot {
645 reachable_peer_count,
646 unknown_peer_count,
647 unreachable_peers,
648 })
649 }
650
651 pub async fn list_members(&self) -> Vec<MobMemberListEntry> {
660 self.project_member_list(self.roster.read().await.list())
661 .await
662 }
663
664 pub async fn list_members_including_retiring(&self) -> Vec<MobMemberListEntry> {
671 self.project_member_list(self.roster.read().await.list_all())
672 .await
673 }
674
675 async fn project_member_list<'a>(
676 &self,
677 entries: impl Iterator<Item = &'a crate::roster::RosterEntry>,
678 ) -> Vec<MobMemberListEntry> {
679 let entries: Vec<_> = entries.cloned().collect();
680 let mut projected = Vec::with_capacity(entries.len());
681 for entry in entries {
682 let snapshot = self
683 .canonical_member_list_material(&entry.meerkat_id)
684 .await
685 .to_snapshot();
686 let snapshot = Some(snapshot);
687 let (status, error, is_final, current_session_id, kickoff) = match snapshot {
688 Some(snapshot) => (
689 snapshot.status,
690 snapshot.error,
691 snapshot.is_final,
692 snapshot.current_session_id,
693 snapshot.kickoff,
694 ),
695 None => (
696 MobMemberStatus::Unknown,
697 None,
698 true,
699 entry.session_id().cloned(),
700 None,
701 ),
702 };
703 projected.push(MobMemberListEntry {
704 meerkat_id: entry.meerkat_id,
705 profile: entry.profile,
706 member_ref: entry.member_ref,
707 runtime_mode: entry.runtime_mode,
708 peer_id: entry.peer_id,
709 state: entry.state,
710 wired_to: entry.wired_to,
711 external_peer_specs: entry.external_peer_specs,
712 labels: entry.labels,
713 status,
714 error,
715 is_final,
716 current_session_id,
717 kickoff,
718 });
719 }
720 projected
721 }
722
723 pub(crate) async fn list_runnable_members(&self) -> Vec<MobMemberListEntry> {
728 self.list_members()
729 .await
730 .into_iter()
731 .filter(|entry| {
732 entry.state == crate::roster::MemberState::Active
733 && entry.status == MobMemberStatus::Active
734 })
735 .collect()
736 }
737
738 pub async fn list_all_members(&self) -> Vec<RosterEntry> {
744 self.roster.read().await.list_all().cloned().collect()
745 }
746
747 pub async fn get_member(&self, meerkat_id: &MeerkatId) -> Option<RosterEntry> {
749 self.roster.read().await.get(meerkat_id).cloned()
750 }
751
752 pub async fn member(&self, meerkat_id: &MeerkatId) -> Result<MemberHandle, MobError> {
754 if let Some(diag) = self.restore_failure_for(meerkat_id).await {
755 return Err(Self::restore_failure_error(meerkat_id, diag));
756 }
757 let entry = self
758 .get_member(meerkat_id)
759 .await
760 .ok_or_else(|| MobError::MeerkatNotFound(meerkat_id.clone()))?;
761 if entry.state != crate::roster::MemberState::Active {
762 return Err(MobError::MeerkatNotFound(meerkat_id.clone()));
763 }
764 Ok(MemberHandle {
765 mob: self.clone(),
766 meerkat_id: meerkat_id.clone(),
767 })
768 }
769
770 pub fn events(&self) -> MobEventsView {
772 MobEventsView {
773 inner: self.events.clone(),
774 }
775 }
776
777 pub async fn record_operator_action_provenance(
782 &self,
783 tool_name: &str,
784 authority_context: &MobToolAuthorityContext,
785 ) -> Result<(), MobError> {
786 self.events
787 .append(NewMobEvent {
788 mob_id: self.definition.id.clone(),
789 timestamp: None,
790 kind: MobEventKind::OperatorActionRecorded {
791 tool_name: tool_name.to_string(),
792 principal_token: authority_context.principal_token().clone(),
793 caller_provenance: authority_context.caller_provenance().cloned(),
794 audit_invocation_id: authority_context
795 .audit_invocation_id()
796 .map(ToOwned::to_owned),
797 },
798 })
799 .await
800 .map(|_| ())
801 .map_err(MobError::from)
802 }
803
804 pub async fn subscribe_agent_events(
812 &self,
813 meerkat_id: &MeerkatId,
814 ) -> Result<EventStream, MobError> {
815 let session_id = {
816 let roster = self.roster.read().await;
817 roster
818 .session_id(meerkat_id)
819 .cloned()
820 .ok_or_else(|| MobError::MeerkatNotFound(meerkat_id.clone()))?
821 };
822 SessionService::subscribe_session_events(self.session_service.as_ref(), &session_id)
823 .await
824 .map_err(|e| {
825 MobError::Internal(format!(
826 "failed to subscribe to agent events for '{meerkat_id}': {e}"
827 ))
828 })
829 }
830
831 pub async fn subscribe_all_agent_events(&self) -> Vec<(MeerkatId, EventStream)> {
837 let entries: Vec<_> = {
838 let roster = self.roster.read().await;
839 roster
840 .list()
841 .filter_map(|e| {
842 e.member_ref
843 .session_id()
844 .map(|sid| (e.meerkat_id.clone(), sid.clone()))
845 })
846 .collect()
847 };
848 let mut streams = Vec::with_capacity(entries.len());
849 for (meerkat_id, session_id) in entries {
850 if let Ok(stream) =
851 SessionService::subscribe_session_events(self.session_service.as_ref(), &session_id)
852 .await
853 {
854 streams.push((meerkat_id, stream));
855 }
856 }
857 streams
858 }
859
860 pub fn subscribe_mob_events(&self) -> super::event_router::MobEventRouterHandle {
867 self.subscribe_mob_events_with_config(super::event_router::MobEventRouterConfig::default())
868 }
869
870 pub fn subscribe_mob_events_with_config(
872 &self,
873 config: super::event_router::MobEventRouterConfig,
874 ) -> super::event_router::MobEventRouterHandle {
875 super::event_router::spawn_event_router(
876 self.session_service.clone(),
877 self.events.clone(),
878 self.roster.clone(),
879 config,
880 )
881 }
882
883 pub async fn mcp_server_states(&self) -> BTreeMap<String, bool> {
885 self.mcp_servers
886 .lock()
887 .await
888 .iter()
889 .map(|(name, entry)| (name.clone(), entry.running))
890 .collect()
891 }
892
893 pub async fn run_flow(
895 &self,
896 flow_id: FlowId,
897 params: serde_json::Value,
898 ) -> Result<RunId, MobError> {
899 self.run_flow_with_stream(flow_id, params, None).await
900 }
901
902 pub async fn run_flow_with_stream(
904 &self,
905 flow_id: FlowId,
906 params: serde_json::Value,
907 scoped_event_tx: Option<mpsc::Sender<meerkat_core::ScopedAgentEvent>>,
908 ) -> Result<RunId, MobError> {
909 let (reply_tx, reply_rx) = oneshot::channel();
910 self.command_tx
911 .send(MobCommand::RunFlow {
912 flow_id,
913 activation_params: params,
914 scoped_event_tx,
915 reply_tx,
916 })
917 .await
918 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
919 reply_rx
920 .await
921 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
922 }
923
924 pub async fn cancel_flow(&self, run_id: RunId) -> Result<(), MobError> {
926 let (reply_tx, reply_rx) = oneshot::channel();
927 self.command_tx
928 .send(MobCommand::CancelFlow { run_id, reply_tx })
929 .await
930 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
931 reply_rx
932 .await
933 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
934 }
935
936 pub async fn flow_status(&self, run_id: RunId) -> Result<Option<MobRun>, MobError> {
938 let (reply_tx, reply_rx) = oneshot::channel();
939 self.command_tx
940 .send(MobCommand::FlowStatus { run_id, reply_tx })
941 .await
942 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
943 reply_rx
944 .await
945 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
946 }
947
948 pub fn list_flows(&self) -> Vec<FlowId> {
950 self.definition.flows.keys().cloned().collect()
951 }
952
953 pub async fn spawn(
955 &self,
956 profile_name: ProfileName,
957 meerkat_id: MeerkatId,
958 initial_message: Option<ContentInput>,
959 ) -> Result<MemberRef, MobError> {
960 self.spawn_with_options(profile_name, meerkat_id, initial_message, None, None)
961 .await
962 }
963
964 pub async fn spawn_with_binding(
966 &self,
967 profile_name: ProfileName,
968 meerkat_id: MeerkatId,
969 initial_message: Option<ContentInput>,
970 binding: crate::RuntimeBinding,
971 ) -> Result<MemberRef, MobError> {
972 let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id);
973 spec.initial_message = initial_message;
974 spec.binding = Some(binding);
975 self.spawn_spec(spec).await
976 }
977
978 pub async fn spawn_with_backend(
980 &self,
981 profile_name: ProfileName,
982 meerkat_id: MeerkatId,
983 initial_message: Option<ContentInput>,
984 backend: Option<MobBackendKind>,
985 ) -> Result<MemberRef, MobError> {
986 self.spawn_with_options(profile_name, meerkat_id, initial_message, None, backend)
987 .await
988 }
989
990 pub async fn spawn_with_options(
992 &self,
993 profile_name: ProfileName,
994 meerkat_id: MeerkatId,
995 initial_message: Option<ContentInput>,
996 runtime_mode: Option<crate::MobRuntimeMode>,
997 backend: Option<MobBackendKind>,
998 ) -> Result<MemberRef, MobError> {
999 let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id);
1000 spec.initial_message = initial_message;
1001 spec.runtime_mode = runtime_mode;
1002 spec.backend = backend;
1003 self.spawn_spec(spec).await
1004 }
1005
1006 pub async fn attach_existing_session(
1008 &self,
1009 profile_name: ProfileName,
1010 meerkat_id: MeerkatId,
1011 session_id: meerkat_core::types::SessionId,
1012 runtime_mode: Option<crate::MobRuntimeMode>,
1013 backend: Option<MobBackendKind>,
1014 ) -> Result<MemberRef, MobError> {
1015 let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id);
1016 spec.launch_mode = crate::launch::MemberLaunchMode::Resume { session_id };
1017 spec.runtime_mode = runtime_mode;
1018 spec.backend = backend;
1019 self.spawn_spec(spec).await
1020 }
1021
1022 pub async fn attach_existing_session_as_orchestrator(
1024 &self,
1025 profile_name: ProfileName,
1026 meerkat_id: MeerkatId,
1027 session_id: meerkat_core::types::SessionId,
1028 ) -> Result<MemberRef, MobError> {
1029 self.attach_existing_session(profile_name, meerkat_id, session_id, None, None)
1030 .await
1031 }
1032
1033 pub async fn attach_existing_session_as_member(
1035 &self,
1036 profile_name: ProfileName,
1037 meerkat_id: MeerkatId,
1038 session_id: meerkat_core::types::SessionId,
1039 ) -> Result<MemberRef, MobError> {
1040 self.attach_existing_session(profile_name, meerkat_id, session_id, None, None)
1041 .await
1042 }
1043
1044 pub async fn spawn_spec(&self, spec: SpawnMemberSpec) -> Result<MemberRef, MobError> {
1046 let (reply_tx, reply_rx) = oneshot::channel();
1047 self.command_tx
1048 .send(MobCommand::Spawn {
1049 spec: Box::new(spec),
1050 owner_session_id: None,
1051 ops_registry: None,
1052 reply_tx,
1053 })
1054 .await
1055 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1056 reply_rx
1057 .await
1058 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1059 .map(|receipt| receipt.member_ref)
1060 }
1061
1062 pub(super) async fn spawn_spec_receipt_with_owner_context(
1063 &self,
1064 spec: SpawnMemberSpec,
1065 owner_context: CanonicalOpsOwnerContext,
1066 ) -> Result<MemberSpawnReceipt, MobError> {
1067 let (reply_tx, reply_rx) = oneshot::channel();
1068 self.command_tx
1069 .send(MobCommand::Spawn {
1070 spec: Box::new(spec),
1071 owner_session_id: Some(owner_context.owner_session_id),
1072 ops_registry: Some(owner_context.ops_registry),
1073 reply_tx,
1074 })
1075 .await
1076 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1077 reply_rx
1078 .await
1079 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1080 }
1081
1082 pub async fn spawn_many(
1086 &self,
1087 specs: Vec<SpawnMemberSpec>,
1088 ) -> Vec<Result<MemberRef, MobError>> {
1089 futures::future::join_all(specs.into_iter().map(|spec| self.spawn_spec(spec))).await
1090 }
1091
1092 pub(super) async fn spawn_many_receipts_with_owner_context(
1093 &self,
1094 specs: Vec<SpawnMemberSpec>,
1095 owner_context: CanonicalOpsOwnerContext,
1096 ) -> Vec<Result<MemberSpawnReceipt, MobError>> {
1097 futures::future::join_all(
1098 specs.into_iter().map(|spec| {
1099 self.spawn_spec_receipt_with_owner_context(spec, owner_context.clone())
1100 }),
1101 )
1102 .await
1103 }
1104
1105 pub async fn retire(&self, meerkat_id: MeerkatId) -> Result<(), MobError> {
1107 let (reply_tx, reply_rx) = oneshot::channel();
1108 self.command_tx
1109 .send(MobCommand::Retire {
1110 meerkat_id,
1111 reply_tx,
1112 })
1113 .await
1114 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1115 reply_rx
1116 .await
1117 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1118 }
1119
1120 pub async fn respawn(
1126 &self,
1127 meerkat_id: MeerkatId,
1128 initial_message: Option<ContentInput>,
1129 ) -> Result<MemberRespawnReceipt, MobRespawnError> {
1130 let old_session_id_before = self
1131 .canonical_member_list_material(&meerkat_id)
1132 .await
1133 .current_session_id;
1134 let (reply_tx, reply_rx) = oneshot::channel();
1135 self.command_tx
1136 .send(MobCommand::Respawn {
1137 meerkat_id,
1138 initial_message,
1139 reply_tx,
1140 })
1141 .await
1142 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1143 let reply = reply_rx
1144 .await
1145 .map_err(|_| MobError::Internal("actor reply dropped".into()))?;
1146 let mut receipt = match reply {
1147 Ok(receipt) => receipt,
1148 Err(MobRespawnError::TopologyRestoreFailed {
1149 mut receipt,
1150 failed_peer_ids,
1151 }) => {
1152 if receipt.old_session_id.is_none() {
1153 receipt.old_session_id = old_session_id_before;
1154 }
1155 let post_material = self
1156 .canonical_member_list_material(&receipt.member_id)
1157 .await;
1158 if MobMemberTerminalClassifier::has_canonical_member(&post_material) {
1159 receipt.new_session_id = post_material.current_session_id;
1160 }
1161 return Err(MobRespawnError::TopologyRestoreFailed {
1162 receipt,
1163 failed_peer_ids,
1164 });
1165 }
1166 Err(err) => return Err(err),
1167 };
1168 if receipt.old_session_id.is_none() {
1169 receipt.old_session_id = old_session_id_before;
1170 }
1171 let post_material = self
1172 .canonical_member_list_material(&receipt.member_id)
1173 .await;
1174 if MobMemberTerminalClassifier::has_canonical_member(&post_material) {
1175 receipt.new_session_id = post_material.current_session_id;
1176 }
1177 Ok(receipt)
1178 }
1179
1180 pub async fn retire_all(&self) -> Result<(), MobError> {
1182 let (reply_tx, reply_rx) = oneshot::channel();
1183 self.command_tx
1184 .send(MobCommand::RetireAll { reply_tx })
1185 .await
1186 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1187 reply_rx
1188 .await
1189 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1190 }
1191
1192 pub async fn wire<T>(&self, local: MeerkatId, target: T) -> Result<(), MobError>
1194 where
1195 T: Into<PeerTarget>,
1196 {
1197 let (reply_tx, reply_rx) = oneshot::channel();
1198 self.command_tx
1199 .send(MobCommand::Wire {
1200 local,
1201 target: target.into(),
1202 reply_tx,
1203 })
1204 .await
1205 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1206 reply_rx
1207 .await
1208 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1209 }
1210
1211 pub async fn unwire<T>(&self, local: MeerkatId, target: T) -> Result<(), MobError>
1213 where
1214 T: Into<PeerTarget>,
1215 {
1216 let (reply_tx, reply_rx) = oneshot::channel();
1217 self.command_tx
1218 .send(MobCommand::Unwire {
1219 local,
1220 target: target.into(),
1221 reply_tx,
1222 })
1223 .await
1224 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1225 reply_rx
1226 .await
1227 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1228 }
1229
1230 pub async fn internal_turn(
1235 &self,
1236 meerkat_id: MeerkatId,
1237 message: impl Into<meerkat_core::types::ContentInput>,
1238 ) -> Result<MemberDeliveryReceipt, MobError> {
1239 let session_id = self
1240 .internal_turn_for_member(meerkat_id.clone(), message.into())
1241 .await?;
1242 Ok(MemberDeliveryReceipt {
1243 member_id: meerkat_id,
1244 session_id,
1245 handling_mode: HandlingMode::Queue,
1246 })
1247 }
1248
1249 pub(super) async fn external_turn_for_member(
1250 &self,
1251 meerkat_id: MeerkatId,
1252 message: meerkat_core::types::ContentInput,
1253 handling_mode: HandlingMode,
1254 render_metadata: Option<RenderMetadata>,
1255 ) -> Result<meerkat_core::types::SessionId, MobError> {
1256 let (reply_tx, reply_rx) = oneshot::channel();
1257 self.command_tx
1258 .send(MobCommand::ExternalTurn {
1259 meerkat_id,
1260 content: message,
1261 handling_mode,
1262 render_metadata,
1263 reply_tx,
1264 })
1265 .await
1266 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1267 reply_rx
1268 .await
1269 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1270 }
1271
1272 pub(super) async fn internal_turn_for_member(
1273 &self,
1274 meerkat_id: MeerkatId,
1275 message: meerkat_core::types::ContentInput,
1276 ) -> Result<SessionId, MobError> {
1277 let (reply_tx, reply_rx) = oneshot::channel();
1278 self.command_tx
1279 .send(MobCommand::InternalTurn {
1280 meerkat_id,
1281 content: message,
1282 reply_tx,
1283 })
1284 .await
1285 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1286 reply_rx
1287 .await
1288 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1289 }
1290
1291 pub async fn stop(&self) -> Result<(), MobError> {
1293 let (reply_tx, reply_rx) = oneshot::channel();
1294 self.command_tx
1295 .send(MobCommand::Stop { reply_tx })
1296 .await
1297 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1298 reply_rx
1299 .await
1300 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1301 }
1302
1303 pub async fn resume(&self) -> Result<(), MobError> {
1305 let (reply_tx, reply_rx) = oneshot::channel();
1306 self.command_tx
1307 .send(MobCommand::ResumeLifecycle { reply_tx })
1308 .await
1309 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1310 reply_rx
1311 .await
1312 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1313 }
1314
1315 pub async fn complete(&self) -> Result<(), MobError> {
1317 let (reply_tx, reply_rx) = oneshot::channel();
1318 self.command_tx
1319 .send(MobCommand::Complete { reply_tx })
1320 .await
1321 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1322 reply_rx
1323 .await
1324 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1325 }
1326
1327 pub async fn reset(&self) -> Result<(), MobError> {
1332 let (reply_tx, reply_rx) = oneshot::channel();
1333 self.command_tx
1334 .send(MobCommand::Reset { reply_tx })
1335 .await
1336 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1337 reply_rx
1338 .await
1339 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1340 }
1341
1342 pub async fn destroy(&self) -> Result<(), MobError> {
1344 let (reply_tx, reply_rx) = oneshot::channel();
1345 self.command_tx
1346 .send(MobCommand::Destroy { reply_tx })
1347 .await
1348 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1349 reply_rx
1350 .await
1351 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1352 }
1353
1354 pub async fn task_create(
1356 &self,
1357 subject: String,
1358 description: String,
1359 blocked_by: Vec<TaskId>,
1360 ) -> Result<TaskId, MobError> {
1361 let (reply_tx, reply_rx) = oneshot::channel();
1362 self.command_tx
1363 .send(MobCommand::TaskCreate {
1364 subject,
1365 description,
1366 blocked_by,
1367 reply_tx,
1368 })
1369 .await
1370 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1371 reply_rx
1372 .await
1373 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1374 }
1375
1376 pub async fn task_update(
1378 &self,
1379 task_id: TaskId,
1380 status: TaskStatus,
1381 owner: Option<MeerkatId>,
1382 ) -> Result<(), MobError> {
1383 let (reply_tx, reply_rx) = oneshot::channel();
1384 self.command_tx
1385 .send(MobCommand::TaskUpdate {
1386 task_id,
1387 status,
1388 owner,
1389 reply_tx,
1390 })
1391 .await
1392 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1393 reply_rx
1394 .await
1395 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1396 }
1397
1398 pub async fn task_list(&self) -> Result<Vec<MobTask>, MobError> {
1400 Ok(self.task_board.read().await.list().cloned().collect())
1401 }
1402
1403 pub async fn task_get(&self, task_id: &TaskId) -> Result<Option<MobTask>, MobError> {
1405 Ok(self.task_board.read().await.get(task_id).cloned())
1406 }
1407
1408 #[cfg(test)]
1409 pub async fn debug_flow_tracker_counts(&self) -> Result<(usize, usize), MobError> {
1410 let (reply_tx, reply_rx) = oneshot::channel();
1411 self.command_tx
1412 .send(MobCommand::FlowTrackerCounts { reply_tx })
1413 .await
1414 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1415 reply_rx
1416 .await
1417 .map_err(|_| MobError::Internal("actor reply dropped".into()))
1418 }
1419
1420 #[cfg(test)]
1421 pub async fn debug_orchestrator_snapshot(
1422 &self,
1423 ) -> Result<super::MobOrchestratorSnapshot, MobError> {
1424 let (reply_tx, reply_rx) = oneshot::channel();
1425 self.command_tx
1426 .send(MobCommand::OrchestratorSnapshot { reply_tx })
1427 .await
1428 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1429 reply_rx
1430 .await
1431 .map_err(|_| MobError::Internal("actor reply dropped".into()))
1432 }
1433
1434 pub async fn set_spawn_policy(
1439 &self,
1440 policy: Option<Arc<dyn super::spawn_policy::SpawnPolicy>>,
1441 ) -> Result<(), MobError> {
1442 let (reply_tx, reply_rx) = oneshot::channel();
1443 self.command_tx
1444 .send(MobCommand::SetSpawnPolicy { policy, reply_tx })
1445 .await
1446 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1447 reply_rx
1448 .await
1449 .map_err(|_| MobError::Internal("actor reply dropped".into()))?;
1450 Ok(())
1451 }
1452
1453 pub async fn shutdown(&self) -> Result<(), MobError> {
1455 let (reply_tx, reply_rx) = oneshot::channel();
1456 self.command_tx
1457 .send(MobCommand::Shutdown { reply_tx })
1458 .await
1459 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1460 reply_rx
1461 .await
1462 .map_err(|_| MobError::Internal("actor reply dropped".into()))??;
1463 Ok(())
1464 }
1465
1466 pub async fn force_cancel_member(&self, meerkat_id: MeerkatId) -> Result<(), MobError> {
1471 let (reply_tx, reply_rx) = oneshot::channel();
1472 self.command_tx
1473 .send(MobCommand::ForceCancel {
1474 meerkat_id,
1475 reply_tx,
1476 })
1477 .await
1478 .map_err(|_| MobError::Internal("actor task dropped".into()))?;
1479 reply_rx
1480 .await
1481 .map_err(|_| MobError::Internal("actor reply dropped".into()))?
1482 }
1483
1484 async fn canonical_member_list_material(
1485 &self,
1486 meerkat_id: &MeerkatId,
1487 ) -> CanonicalMemberSnapshotMaterial {
1488 self.canonical_member_material(
1489 meerkat_id,
1490 PeerConnectivityProjection::Omit,
1491 SessionObservationProjection::LiveOnly,
1492 )
1493 .await
1494 }
1495
1496 async fn canonical_member_snapshot_material(
1497 &self,
1498 meerkat_id: &MeerkatId,
1499 ) -> CanonicalMemberSnapshotMaterial {
1500 self.canonical_member_material(
1501 meerkat_id,
1502 PeerConnectivityProjection::Include,
1503 SessionObservationProjection::Full,
1504 )
1505 .await
1506 }
1507
1508 async fn canonical_member_material(
1509 &self,
1510 meerkat_id: &MeerkatId,
1511 connectivity: PeerConnectivityProjection,
1512 observation: SessionObservationProjection,
1513 ) -> CanonicalMemberSnapshotMaterial {
1514 let (roster_snapshot, roster_entry, roster_state, current_session_id) = {
1517 let roster = self.roster.read().await;
1518 match roster.get(meerkat_id) {
1519 Some(entry) => (
1520 roster.snapshot(),
1521 Some(entry.clone()),
1522 Some(entry.state),
1523 entry.member_ref.session_id().cloned(),
1524 ),
1525 None => (roster.snapshot(), None, None, None),
1526 }
1527 };
1528
1529 let restore_failure = {
1530 self.restore_diagnostics
1531 .read()
1532 .await
1533 .get(meerkat_id)
1534 .cloned()
1535 };
1536 if let Some(diag) = restore_failure {
1537 return MobMemberLifecycleAuthority::materialize(MobMemberLifecycleInput {
1538 member_present: roster_state.is_some(),
1539 roster_state,
1540 session_observation: CanonicalSessionObservation::Missing,
1541 restore_failure: Some(diag.reason),
1542 output_preview: None,
1543 tokens_used: 0,
1544 current_session_id: Some(diag.session_id),
1545 peer_connectivity: None,
1546 kickoff: roster_entry
1547 .as_ref()
1548 .and_then(|entry| entry.kickoff.clone()),
1549 });
1550 }
1551
1552 match (roster_state, current_session_id) {
1553 (None, _) => MobMemberLifecycleAuthority::materialize(MobMemberLifecycleInput {
1554 member_present: false,
1555 roster_state: None,
1556 session_observation: CanonicalSessionObservation::Missing,
1557 restore_failure: None,
1558 output_preview: None,
1559 tokens_used: 0,
1560 current_session_id: None,
1561 peer_connectivity: None,
1562 kickoff: None,
1563 }),
1564 (Some(roster_state), None) => {
1565 MobMemberLifecycleAuthority::materialize(MobMemberLifecycleInput {
1566 member_present: true,
1567 roster_state: Some(roster_state),
1568 session_observation: CanonicalSessionObservation::Missing,
1569 restore_failure: None,
1570 output_preview: None,
1571 tokens_used: 0,
1572 current_session_id: None,
1573 peer_connectivity: None,
1574 kickoff: roster_entry
1575 .as_ref()
1576 .and_then(|entry| entry.kickoff.clone()),
1577 })
1578 }
1579 (Some(roster_state), Some(session_id)) => {
1580 let (output_preview, tokens_used, observation) = match observation {
1581 SessionObservationProjection::LiveOnly => {
1582 match self.session_service.has_live_session(&session_id).await {
1583 Ok(false) => (None, 0, CanonicalSessionObservation::Missing),
1584 Ok(true) => match self.session_service.read(&session_id).await {
1585 Ok(view) => (
1586 view.state.last_assistant_text.clone(),
1587 view.billing.total_tokens,
1588 if view.state.is_active {
1589 CanonicalSessionObservation::Active
1590 } else {
1591 CanonicalSessionObservation::Inactive
1592 },
1593 ),
1594 Err(SessionError::NotFound { .. }) => {
1595 (None, 0, CanonicalSessionObservation::Unknown)
1596 }
1597 Err(_) => (None, 0, CanonicalSessionObservation::Unknown),
1598 },
1599 Err(_) => (None, 0, CanonicalSessionObservation::Unknown),
1600 }
1601 }
1602 SessionObservationProjection::Full => {
1603 match self.session_service.read(&session_id).await {
1604 Ok(view) => (
1605 view.state.last_assistant_text.clone(),
1606 view.billing.total_tokens,
1607 if view.state.is_active {
1608 CanonicalSessionObservation::Active
1609 } else {
1610 CanonicalSessionObservation::Inactive
1611 },
1612 ),
1613 Err(SessionError::NotFound { .. }) => {
1614 (None, 0, CanonicalSessionObservation::Missing)
1615 }
1616 Err(_) => (None, 0, CanonicalSessionObservation::Unknown),
1617 }
1618 }
1619 };
1620 let peer_connectivity = if connectivity == PeerConnectivityProjection::Include {
1621 match roster_entry.as_ref() {
1622 Some(entry) => {
1623 self.resolve_peer_connectivity(entry, &session_id, &roster_snapshot)
1624 .await
1625 }
1626 None => None,
1627 }
1628 } else {
1629 None
1630 };
1631 MobMemberLifecycleAuthority::materialize(MobMemberLifecycleInput {
1632 member_present: true,
1633 roster_state: Some(roster_state),
1634 session_observation: observation,
1635 restore_failure: None,
1636 output_preview,
1637 tokens_used,
1638 current_session_id: Some(session_id),
1639 peer_connectivity,
1640 kickoff: roster_entry.and_then(|entry| entry.kickoff),
1641 })
1642 }
1643 }
1644 }
1645
1646 async fn snapshot_kickoff_waiters(
1647 &self,
1648 meerkat_ids: Vec<MeerkatId>,
1649 ) -> Result<Vec<(MeerkatId, tokio::sync::watch::Receiver<bool>)>, MobError> {
1650 let (reply_tx, reply_rx) = oneshot::channel();
1651 self.command_tx
1652 .send(MobCommand::KickoffBarrierSnapshot {
1653 meerkat_ids,
1654 reply_tx,
1655 })
1656 .await
1657 .map_err(|_| MobError::Internal("mob actor dropped".into()))?;
1658 reply_rx
1659 .await
1660 .map_err(|_| MobError::Internal("actor reply dropped".into()))
1661 }
1662
1663 async fn wait_for_kickoff_receivers(
1664 &self,
1665 target_ids: &[MeerkatId],
1666 waiters: Vec<(MeerkatId, tokio::sync::watch::Receiver<bool>)>,
1667 timeout: Option<Duration>,
1668 ) -> Result<(), MobError> {
1669 if waiters.is_empty() {
1670 return Ok(());
1671 }
1672
1673 let deadline = Instant::now() + timeout.unwrap_or(DEFAULT_KICKOFF_WAIT_TIMEOUT);
1674 let mut pending = waiters
1675 .iter()
1676 .map(|(id, _)| id.clone())
1677 .collect::<std::collections::HashSet<_>>();
1678 let mut futures = FuturesUnordered::new();
1679
1680 for (id, mut rx) in waiters {
1681 if *rx.borrow() {
1682 pending.remove(&id);
1683 continue;
1684 }
1685 futures.push(async move {
1686 loop {
1687 if *rx.borrow() {
1688 break;
1689 }
1690 if rx.changed().await.is_err() {
1691 break;
1692 }
1693 }
1694 id
1695 });
1696 }
1697
1698 while !futures.is_empty() {
1699 let remaining = deadline.saturating_duration_since(Instant::now());
1700 if remaining.is_zero() {
1701 let pending_member_ids = target_ids
1702 .iter()
1703 .filter(|id| pending.contains(*id))
1704 .cloned()
1705 .collect();
1706 return Err(MobError::KickoffWaitTimedOut { pending_member_ids });
1707 }
1708
1709 let next_fut = futures.next();
1710 let sleep_fut = tokio::time::sleep(remaining);
1711 futures::pin_mut!(next_fut);
1712 futures::pin_mut!(sleep_fut);
1713
1714 match futures::future::select(next_fut, sleep_fut).await {
1715 futures::future::Either::Left((Some(id), _)) => {
1716 pending.remove(&id);
1717 }
1718 futures::future::Either::Left((None, _)) => break,
1719 futures::future::Either::Right(((), _)) => {
1720 let pending_member_ids = target_ids
1721 .iter()
1722 .filter(|id| pending.contains(*id))
1723 .cloned()
1724 .collect();
1725 return Err(MobError::KickoffWaitTimedOut { pending_member_ids });
1726 }
1727 }
1728 }
1729
1730 Ok(())
1731 }
1732
1733 async fn wait_one_material(
1734 &self,
1735 meerkat_id: &MeerkatId,
1736 ) -> Result<CanonicalMemberSnapshotMaterial, MobError> {
1737 loop {
1738 let material = self.canonical_member_list_material(meerkat_id).await;
1739 if MobMemberTerminalClassifier::is_terminal(&material) {
1740 return Ok(material);
1741 }
1742 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1743 }
1744 }
1745
1746 pub async fn member_status(
1751 &self,
1752 meerkat_id: &MeerkatId,
1753 ) -> Result<MobMemberSnapshot, MobError> {
1754 let material = self.canonical_member_snapshot_material(meerkat_id).await;
1755 Ok(material.to_snapshot())
1756 }
1757
1758 pub async fn wait_for_kickoff_complete(
1764 &self,
1765 timeout: Option<Duration>,
1766 ) -> Result<Vec<(MeerkatId, MobMemberSnapshot)>, MobError> {
1767 let target_ids = self
1768 .list_all_members()
1769 .await
1770 .into_iter()
1771 .map(|entry| entry.meerkat_id)
1772 .collect::<Vec<_>>();
1773 self.wait_for_members_kickoff_complete(&target_ids, timeout)
1774 .await
1775 }
1776
1777 pub async fn wait_for_members_kickoff_complete(
1781 &self,
1782 ids: &[MeerkatId],
1783 timeout: Option<Duration>,
1784 ) -> Result<Vec<(MeerkatId, MobMemberSnapshot)>, MobError> {
1785 let target_ids = ids.to_vec();
1786 let waiters = self.snapshot_kickoff_waiters(target_ids.clone()).await?;
1787 self.wait_for_kickoff_receivers(&target_ids, waiters, timeout)
1788 .await?;
1789
1790 let mut snapshots = Vec::with_capacity(target_ids.len());
1791 for id in target_ids {
1792 snapshots.push((id.clone(), self.member_status(&id).await?));
1793 }
1794 Ok(snapshots)
1795 }
1796
1797 pub async fn wait_one(&self, meerkat_id: &MeerkatId) -> Result<MobMemberSnapshot, MobError> {
1801 let material = self.wait_one_material(meerkat_id).await?;
1802 Ok(material.to_snapshot())
1803 }
1804
1805 pub async fn wait_all(
1807 &self,
1808 meerkat_ids: &[MeerkatId],
1809 ) -> Result<Vec<MobMemberSnapshot>, MobError> {
1810 let futs = meerkat_ids
1811 .iter()
1812 .map(|id| self.wait_one_material(id))
1813 .collect::<Vec<_>>();
1814 let results = futures::future::join_all(futs).await;
1815 results
1816 .into_iter()
1817 .map(|result| result.map(|material| material.to_snapshot()))
1818 .collect()
1819 }
1820
1821 pub async fn collect_completed(&self) -> Vec<(MeerkatId, MobMemberSnapshot)> {
1823 let entries = self.list_all_members().await;
1824 let mut completed = Vec::new();
1825 for entry in entries {
1826 if let Ok(snapshot) = self.member_status(&entry.meerkat_id).await
1827 && snapshot.is_final
1828 {
1829 completed.push((entry.meerkat_id, snapshot));
1830 }
1831 }
1832 completed
1833 }
1834
1835 pub async fn spawn_helper(
1841 &self,
1842 meerkat_id: MeerkatId,
1843 task: impl Into<String>,
1844 options: HelperOptions,
1845 ) -> Result<HelperResult, MobError> {
1846 let profile_name = options
1847 .role_name
1848 .or_else(|| self.definition.profiles.keys().next().cloned())
1849 .ok_or_else(|| {
1850 MobError::Internal("no profile specified and definition has no profiles".into())
1851 })?;
1852 let task_text = task.into();
1853 let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id.clone());
1854 spec.initial_message = Some(task_text.into());
1855 spec.runtime_mode = Some(
1856 options
1857 .runtime_mode
1858 .unwrap_or(crate::MobRuntimeMode::TurnDriven),
1859 );
1860 spec.backend = options.backend;
1861 spec.tool_access_policy = options.tool_access_policy;
1862 spec.auto_wire_parent = true;
1863
1864 self.spawn_spec(spec).await?;
1865 let helper_material = self.canonical_member_list_material(&meerkat_id).await;
1866 let _ = self.retire(meerkat_id.clone()).await;
1867
1868 Ok(helper_material.to_helper_result())
1869 }
1870
1871 pub async fn fork_helper(
1876 &self,
1877 source_member_id: &MeerkatId,
1878 meerkat_id: MeerkatId,
1879 task: impl Into<String>,
1880 fork_context: crate::launch::ForkContext,
1881 options: HelperOptions,
1882 ) -> Result<HelperResult, MobError> {
1883 let profile_name = options
1884 .role_name
1885 .or_else(|| self.definition.profiles.keys().next().cloned())
1886 .ok_or_else(|| {
1887 MobError::Internal("no profile specified and definition has no profiles".into())
1888 })?;
1889 let task_text = task.into();
1890 let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id.clone());
1891 spec.initial_message = Some(task_text.into());
1892 spec.runtime_mode = Some(
1893 options
1894 .runtime_mode
1895 .unwrap_or(crate::MobRuntimeMode::TurnDriven),
1896 );
1897 spec.backend = options.backend;
1898 spec.tool_access_policy = options.tool_access_policy;
1899 spec.auto_wire_parent = true;
1900 spec.launch_mode = crate::launch::MemberLaunchMode::Fork {
1901 source_member_id: source_member_id.clone(),
1902 fork_context,
1903 };
1904
1905 self.spawn_spec(spec).await?;
1906 let helper_material = self.canonical_member_list_material(&meerkat_id).await;
1907 let _ = self.retire(meerkat_id.clone()).await;
1908
1909 Ok(helper_material.to_helper_result())
1910 }
1911}
1912
1913impl MemberHandle {
1914 pub fn meerkat_id(&self) -> &MeerkatId {
1916 &self.meerkat_id
1917 }
1918
1919 pub async fn send(
1921 &self,
1922 content: impl Into<meerkat_core::types::ContentInput>,
1923 handling_mode: HandlingMode,
1924 ) -> Result<MemberDeliveryReceipt, MobError> {
1925 self.send_with_render_metadata(content, handling_mode, None)
1926 .await
1927 }
1928
1929 pub async fn send_with_render_metadata(
1931 &self,
1932 content: impl Into<meerkat_core::types::ContentInput>,
1933 handling_mode: HandlingMode,
1934 render_metadata: Option<RenderMetadata>,
1935 ) -> Result<MemberDeliveryReceipt, MobError> {
1936 let session_id = self
1937 .mob
1938 .external_turn_for_member(
1939 self.meerkat_id.clone(),
1940 content.into(),
1941 handling_mode,
1942 render_metadata,
1943 )
1944 .await?;
1945 Ok(MemberDeliveryReceipt {
1946 member_id: self.meerkat_id.clone(),
1947 session_id,
1948 handling_mode,
1949 })
1950 }
1951
1952 pub async fn internal_turn(
1954 &self,
1955 content: impl Into<meerkat_core::types::ContentInput>,
1956 ) -> Result<MemberDeliveryReceipt, MobError> {
1957 let session_id = self
1958 .mob
1959 .internal_turn_for_member(self.meerkat_id.clone(), content.into())
1960 .await?;
1961 Ok(MemberDeliveryReceipt {
1962 member_id: self.meerkat_id.clone(),
1963 session_id,
1964 handling_mode: HandlingMode::Queue,
1965 })
1966 }
1967
1968 pub async fn current_session_id(&self) -> Result<Option<SessionId>, MobError> {
1970 let roster = self.mob.roster.read().await;
1971 Ok(roster
1972 .get(&self.meerkat_id)
1973 .and_then(|e| e.member_ref.session_id().cloned()))
1974 }
1975
1976 pub async fn session_ref(&self) -> Result<Option<MemberSessionRef>, MobError> {
1978 let roster = self.mob.roster.read().await;
1979 Ok(roster
1980 .get(&self.meerkat_id)
1981 .and_then(|e| e.member_ref.session_id().cloned())
1982 .map(|session_id| MemberSessionRef {
1983 member_id: self.meerkat_id.clone(),
1984 session_id,
1985 }))
1986 }
1987
1988 pub async fn status(&self) -> Result<MobMemberSnapshot, MobError> {
1990 self.mob.member_status(&self.meerkat_id).await
1991 }
1992
1993 pub async fn events(&self) -> Result<EventStream, MobError> {
1995 self.mob.subscribe_agent_events(&self.meerkat_id).await
1996 }
1997}