use super::*;
#[derive(Clone)]
pub struct MobHandle {
pub(super) command_tx: mpsc::Sender<MobCommand>,
pub(super) roster: Arc<RwLock<Roster>>,
pub(super) task_board: Arc<RwLock<TaskBoard>>,
pub(super) definition: Arc<MobDefinition>,
pub(super) state: Arc<AtomicU8>,
pub(super) events: Arc<dyn MobEventStore>,
pub(super) mcp_servers: Arc<tokio::sync::Mutex<BTreeMap<String, actor::McpServerEntry>>>,
pub(super) flow_streams:
Arc<tokio::sync::Mutex<BTreeMap<RunId, mpsc::Sender<meerkat_core::ScopedAgentEvent>>>>,
}
#[derive(Clone)]
pub struct MobEventsView {
inner: Arc<dyn MobEventStore>,
}
#[derive(Clone, Debug)]
pub struct SpawnMemberSpec {
pub profile_name: ProfileName,
pub meerkat_id: MeerkatId,
pub initial_message: Option<String>,
pub runtime_mode: Option<crate::MobRuntimeMode>,
pub backend: Option<MobBackendKind>,
}
impl SpawnMemberSpec {
pub fn from_wire(
profile: String,
meerkat_id: String,
initial_message: Option<String>,
runtime_mode: Option<crate::MobRuntimeMode>,
backend: Option<MobBackendKind>,
) -> Self {
Self {
profile_name: ProfileName::from(profile),
meerkat_id: MeerkatId::from(meerkat_id),
initial_message,
runtime_mode,
backend,
}
}
}
impl MobEventsView {
pub async fn poll(
&self,
after_cursor: u64,
limit: usize,
) -> Result<Vec<crate::event::MobEvent>, MobError> {
self.inner.poll(after_cursor, limit).await
}
pub async fn replay_all(&self) -> Result<Vec<crate::event::MobEvent>, MobError> {
self.inner.replay_all().await
}
}
impl MobHandle {
pub async fn poll_events(
&self,
after_cursor: u64,
limit: usize,
) -> Result<Vec<crate::event::MobEvent>, MobError> {
self.events.poll(after_cursor, limit).await
}
pub fn status(&self) -> MobState {
MobState::from_u8(self.state.load(Ordering::Acquire))
}
pub fn definition(&self) -> &MobDefinition {
&self.definition
}
pub fn mob_id(&self) -> &MobId {
&self.definition.id
}
pub async fn roster(&self) -> Roster {
self.roster.read().await.clone()
}
pub async fn list_members(&self) -> Vec<RosterEntry> {
self.roster.read().await.list().cloned().collect()
}
pub async fn list_all_members(&self) -> Vec<RosterEntry> {
self.roster.read().await.list_all().cloned().collect()
}
pub async fn get_member(&self, meerkat_id: &MeerkatId) -> Option<RosterEntry> {
self.roster.read().await.get(meerkat_id).cloned()
}
pub fn events(&self) -> MobEventsView {
MobEventsView {
inner: self.events.clone(),
}
}
pub async fn mcp_server_states(&self) -> BTreeMap<String, bool> {
self.mcp_servers
.lock()
.await
.iter()
.map(|(name, entry)| (name.clone(), entry.running))
.collect()
}
pub async fn run_flow(
&self,
flow_id: FlowId,
params: serde_json::Value,
) -> Result<RunId, MobError> {
self.run_flow_with_stream(flow_id, params, None).await
}
pub async fn run_flow_with_stream(
&self,
flow_id: FlowId,
params: serde_json::Value,
scoped_event_tx: Option<mpsc::Sender<meerkat_core::ScopedAgentEvent>>,
) -> Result<RunId, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::RunFlow {
flow_id,
activation_params: params,
scoped_event_tx,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn cancel_flow(&self, run_id: RunId) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::CancelFlow { run_id, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn flow_status(&self, run_id: RunId) -> Result<Option<MobRun>, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::FlowStatus { run_id, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub fn list_flows(&self) -> Vec<FlowId> {
self.definition.flows.keys().cloned().collect()
}
pub async fn spawn(
&self,
profile_name: ProfileName,
meerkat_id: MeerkatId,
initial_message: Option<String>,
) -> Result<MemberRef, MobError> {
self.spawn_with_options(profile_name, meerkat_id, initial_message, None, None)
.await
}
pub async fn spawn_with_backend(
&self,
profile_name: ProfileName,
meerkat_id: MeerkatId,
initial_message: Option<String>,
backend: Option<MobBackendKind>,
) -> Result<MemberRef, MobError> {
self.spawn_with_options(profile_name, meerkat_id, initial_message, None, backend)
.await
}
pub async fn spawn_with_options(
&self,
profile_name: ProfileName,
meerkat_id: MeerkatId,
initial_message: Option<String>,
runtime_mode: Option<crate::MobRuntimeMode>,
backend: Option<MobBackendKind>,
) -> Result<MemberRef, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Spawn {
profile_name,
meerkat_id,
initial_message,
runtime_mode,
backend,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn spawn_many(
&self,
specs: Vec<SpawnMemberSpec>,
) -> Vec<Result<MemberRef, MobError>> {
futures::future::join_all(specs.into_iter().map(|spec| {
self.spawn_with_options(
spec.profile_name,
spec.meerkat_id,
spec.initial_message,
spec.runtime_mode,
spec.backend,
)
}))
.await
}
pub async fn retire(&self, meerkat_id: MeerkatId) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Retire {
meerkat_id,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn retire_all(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::RetireAll { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn wire(&self, a: MeerkatId, b: MeerkatId) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Wire { a, b, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn unwire(&self, a: MeerkatId, b: MeerkatId) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Unwire { a, b, reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn send_message(
&self,
meerkat_id: MeerkatId,
message: String,
) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::ExternalTurn {
meerkat_id,
message,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn internal_turn(
&self,
meerkat_id: MeerkatId,
message: String,
) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::InternalTurn {
meerkat_id,
message,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn stop(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Stop { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn resume(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::ResumeLifecycle { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn complete(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Complete { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn reset(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Reset { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn destroy(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Destroy { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn task_create(
&self,
subject: String,
description: String,
blocked_by: Vec<TaskId>,
) -> Result<TaskId, MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::TaskCreate {
subject,
description,
blocked_by,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn task_update(
&self,
task_id: TaskId,
status: TaskStatus,
owner: Option<MeerkatId>,
) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::TaskUpdate {
task_id,
status,
owner,
reply_tx,
})
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))?
}
pub async fn task_list(&self) -> Result<Vec<MobTask>, MobError> {
Ok(self.task_board.read().await.list().cloned().collect())
}
pub async fn task_get(&self, task_id: &TaskId) -> Result<Option<MobTask>, MobError> {
Ok(self.task_board.read().await.get(task_id).cloned())
}
#[cfg(test)]
pub async fn debug_flow_tracker_counts(&self) -> Result<(usize, usize), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::FlowTrackerCounts { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))
}
pub async fn shutdown(&self) -> Result<(), MobError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_tx
.send(MobCommand::Shutdown { reply_tx })
.await
.map_err(|_| MobError::Internal("actor task dropped".into()))?;
reply_rx
.await
.map_err(|_| MobError::Internal("actor reply dropped".into()))??;
Ok(())
}
}