use super::*;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
#[derive(Clone)]
pub struct MobHandle {
pub(super) command_tx: mpsc::Sender<MobCommand>,
pub(super) roster: Arc<RwLock<Roster>>,
pub(super) task_board: Arc<RwLock<TaskBoard>>,
pub(super) definition: Arc<MobDefinition>,
pub(super) state: Arc<AtomicU8>,
pub(super) events: Arc<dyn MobEventStore>,
pub(super) mcp_servers: Arc<tokio::sync::Mutex<BTreeMap<String, actor::McpServerEntry>>>,
pub(super) flow_streams:
Arc<tokio::sync::Mutex<BTreeMap<RunId, mpsc::Sender<meerkat_core::ScopedAgentEvent>>>>,
pub(super) session_service: Arc<dyn MobSessionService>,
}
#[derive(Clone)]
pub struct MobEventsView {
inner: Arc<dyn MobEventStore>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct SpawnMemberSpec {
pub profile_name: ProfileName,
pub meerkat_id: MeerkatId,
pub initial_message: Option<String>,
pub runtime_mode: Option<crate::MobRuntimeMode>,
pub backend: Option<MobBackendKind>,
pub context: Option<serde_json::Value>,
pub labels: Option<std::collections::BTreeMap<String, String>>,
pub resume_session_id: Option<meerkat_core::types::SessionId>,
pub additional_instructions: Option<Vec<String>>,
pub shell_env: Option<std::collections::HashMap<String, String>>,
}
impl SpawnMemberSpec {
pub fn new(profile: impl Into<ProfileName>, meerkat_id: impl Into<MeerkatId>) -> Self {
Self {
profile_name: profile.into(),
meerkat_id: meerkat_id.into(),
initial_message: None,
runtime_mode: None,
backend: None,
context: None,
labels: None,
resume_session_id: None,
additional_instructions: None,
shell_env: None,
}
}
pub fn with_shell_env(mut self, env: std::collections::HashMap<String, String>) -> Self {
self.shell_env = Some(env);
self
}
pub fn with_initial_message(mut self, message: String) -> Self {
self.initial_message = Some(message);
self
}
pub fn with_runtime_mode(mut self, mode: crate::MobRuntimeMode) -> Self {
self.runtime_mode = Some(mode);
self
}
pub fn with_backend(mut self, backend: MobBackendKind) -> Self {
self.backend = Some(backend);
self
}
pub fn with_context(mut self, context: serde_json::Value) -> Self {
self.context = Some(context);
self
}
pub fn with_labels(mut self, labels: std::collections::BTreeMap<String, String>) -> Self {
self.labels = Some(labels);
self
}
pub fn with_resume_session_id(mut self, id: meerkat_core::types::SessionId) -> Self {
self.resume_session_id = Some(id);
self
}
pub fn with_additional_instructions(mut self, instructions: Vec<String>) -> Self {
self.additional_instructions = Some(instructions);
self
}
pub fn from_wire(
profile: String,
meerkat_id: String,
initial_message: Option<String>,
runtime_mode: Option<crate::MobRuntimeMode>,
backend: Option<MobBackendKind>,
) -> Self {
let mut spec = Self::new(profile, meerkat_id);
spec.initial_message = initial_message;
spec.runtime_mode = runtime_mode;
spec.backend = backend;
spec
}
}
impl MobEventsView {
pub async fn poll(
&self,
after_cursor: u64,
limit: usize,
) -> Result<Vec<crate::event::MobEvent>, MobError> {
self.inner.poll(after_cursor, limit).await
}
pub async fn replay_all(&self) -> Result<Vec<crate::event::MobEvent>, MobError> {
self.inner.replay_all().await
}
}
impl MobHandle {
pub async fn poll_events(
&self,
after_cursor: u64,
limit: usize,
) -> Result<Vec<crate::event::MobEvent>, MobError> {
self.events.poll(after_cursor, limit).await
}
pub fn status(&self) -> MobState {
MobState::from_u8(self.state.load(Ordering::Acquire))
}
pub fn definition(&self) -> &MobDefinition {
&self.definition
}
pub fn mob_id(&self) -> &MobId {
&self.definition.id
}
pub async fn roster(&self) -> Roster {
self.roster.read().await.clone()
}
pub async fn list_members(&self) -> Vec<RosterEntry> {
self.roster.read().await.list().cloned().collect()
}
pub async fn list_all_members(&self) -> Vec<RosterEntry> {
self.roster.read().await.list_all().cloned().collect()
}
pub async fn get_member(&self, meerkat_id: &MeerkatId) -> Option<RosterEntry> {
self.roster.read().await.get(meerkat_id).cloned()
}
pub fn events(&self) -> MobEventsView {
MobEventsView {
inner: self.events.clone(),
}
}
pub async fn subscribe_agent_events(
&self,
meerkat_id: &MeerkatId,
) -> Result<EventStream, MobError> {
let session_id = {
let roster = self.roster.read().await;
roster
.session_id(meerkat_id)
.cloned()
.ok_or_else(|| MobError::MeerkatNotFound(meerkat_id.clone()))?
};
SessionService::subscribe_session_events(self.session_service.as_ref(), &session_id)
.await
.map_err(|e| {
MobError::Internal(format!(
"failed to subscribe to agent events for '{meerkat_id}': {e}"
))
})
}
pub async fn subscribe_all_agent_events(&self) -> Vec<(MeerkatId, EventStream)> {
let entries: Vec<_> = {
let roster = self.roster.read().await;
roster
.list()
.filter_map(|e| {
e.member_ref
.session_id()
.map(|sid| (e.meerkat_id.clone(), sid.clone()))
})
.collect()
};
let mut streams = Vec::with_capacity(entries.len());
for (meerkat_id, session_id) in entries {
if let Ok(stream) =
SessionService::subscribe_session_events(self.session_service.as_ref(), &session_id)
.await
{
streams.push((meerkat_id, stream));
}
}
streams
}
pub fn subscribe_mob_events(&self) -> super::event_router::MobEventRouterHandle {
self.subscribe_mob_events_with_config(super::event_router::MobEventRouterConfig::default())
}
pub fn subscribe_mob_events_with_config(
&self,
config: super::event_router::MobEventRouterConfig,
) -> super::event_router::MobEventRouterHandle {
super::event_router::spawn_event_router(
self.session_service.clone(),
self.events.clone(),
self.roster.clone(),
config,
)
}
pub async fn mcp_server_states(&self) -> BTreeMap<String, bool> {
self.mcp_servers
.lock()
.await
.iter()
.map(|(name, entry)| (name.clone(), entry.running))
.collect()
}
pub async fn run_flow(
&self,
flow_id: FlowId,
params: serde_json::Value,
) -> Result<RunId, MobError> {
self.run_flow_with_stream(flow_id, params, None).await
}
pub async fn run_flow_with_stream(
&self,
flow_id: FlowId,
params: serde_json::Value,
scoped_event_tx: Option<mpsc::Sender<meerkat_core::ScopedAgentEvent>>,
) -> Result<RunId, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::RunFlow {
flow_id,
activation_params: params,
scoped_event_tx,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn cancel_flow(&self, run_id: RunId) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::CancelFlow { run_id, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn flow_status(&self, run_id: RunId) -> Result<Option<MobRun>, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::FlowStatus { run_id, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub fn list_flows(&self) -> Vec<FlowId> {
self.definition.flows.keys().cloned().collect()
}
pub async fn spawn(
&self,
profile_name: ProfileName,
meerkat_id: MeerkatId,
initial_message: Option<String>,
) -> Result<MemberRef, MobError> {
self.spawn_with_options(profile_name, meerkat_id, initial_message, None, None)
.await
}
pub async fn spawn_with_backend(
&self,
profile_name: ProfileName,
meerkat_id: MeerkatId,
initial_message: Option<String>,
backend: Option<MobBackendKind>,
) -> Result<MemberRef, MobError> {
self.spawn_with_options(profile_name, meerkat_id, initial_message, None, backend)
.await
}
pub async fn spawn_with_options(
&self,
profile_name: ProfileName,
meerkat_id: MeerkatId,
initial_message: Option<String>,
runtime_mode: Option<crate::MobRuntimeMode>,
backend: Option<MobBackendKind>,
) -> Result<MemberRef, MobError> {
let mut spec = SpawnMemberSpec::new(profile_name, meerkat_id);
spec.initial_message = initial_message;
spec.runtime_mode = runtime_mode;
spec.backend = backend;
self.spawn_spec(spec).await
}
pub async fn spawn_spec(&self, spec: SpawnMemberSpec) -> Result<MemberRef, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Spawn { spec, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn spawn_many(
&self,
specs: Vec<SpawnMemberSpec>,
) -> Vec<Result<MemberRef, MobError>> {
futures::future::join_all(specs.into_iter().map(|spec| self.spawn_spec(spec))).await
}
pub async fn retire(&self, meerkat_id: MeerkatId) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Retire {
meerkat_id,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn respawn(
&self,
meerkat_id: MeerkatId,
initial_message: Option<String>,
) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Respawn {
meerkat_id,
initial_message,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn retire_all(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::RetireAll { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn wire(&self, a: MeerkatId, b: MeerkatId) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Wire { a, b, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn unwire(&self, a: MeerkatId, b: MeerkatId) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Unwire { a, b, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn send_message(
&self,
meerkat_id: MeerkatId,
message: String,
) -> Result<meerkat_core::types::SessionId, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::ExternalTurn {
meerkat_id,
message,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn internal_turn(
&self,
meerkat_id: MeerkatId,
message: String,
) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::InternalTurn {
meerkat_id,
message,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn stop(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Stop { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn resume(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::ResumeLifecycle { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn complete(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Complete { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn reset(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Reset { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn destroy(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Destroy { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn task_create(
&self,
subject: String,
description: String,
blocked_by: Vec<TaskId>,
) -> Result<TaskId, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::TaskCreate {
subject,
description,
blocked_by,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn task_update(
&self,
task_id: TaskId,
status: TaskStatus,
owner: Option<MeerkatId>,
) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::TaskUpdate {
task_id,
status,
owner,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn task_list(&self) -> Result<Vec<MobTask>, MobError> {
Ok(self.task_board.read().await.list().cloned().collect())
}
pub async fn task_get(&self, task_id: &TaskId) -> Result<Option<MobTask>, MobError> {
Ok(self.task_board.read().await.get(task_id).cloned())
}
#[cfg(test)]
pub async fn debug_flow_tracker_counts(&self) -> Result<(usize, usize), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::FlowTrackerCounts { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))
}
pub async fn set_spawn_policy(
&self,
policy: Option<Arc<dyn super::spawn_policy::SpawnPolicy>>,
) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::SetSpawnPolicy { policy, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Shutdown { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))??;
Ok(())
}
}