use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use futures::stream::{SelectAll, StreamExt};
use meerkat_core::comms::EventStream;
use meerkat_core::event::{AgentEvent, agent_event_type};
use meerkat_mob::ids::MeerkatId;
use meerkat_mob::{
AgentIdentity, AgentRuntimeId, AttributedEvent, FenceToken, MobError, MobHandle, ProfileName,
SpawnMemberSpec,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinHandle;
pub(crate) use self::console_events::ConsoleEventStore;
use self::mob_events::MobEventsStore;
use crate::console_aggregator::{ConsoleLogStore, InMemoryConsoleLogStore};
use crate::mob_handle_runtime::{MobBootstrapSpec, MobRuntime, MobRuntimeError};
use crate::runtime::{
InMemoryMetadataStore, MetadataScope, MobkitRuntimeHandle, PersistentMetadataStore,
RuntimeMetadataTable, RuntimeOptions, start_mobkit_runtime_with_options,
};
use crate::types::{
AgentDiscoverySpec, EventEnvelope, MobKitConfig, MobStructuralEventEnvelope, UnifiedEvent,
};
pub mod builder;
pub(crate) mod console_events;
pub mod cross_mob;
pub mod edge_reconcile;
pub mod edge_types;
pub mod event_log;
pub mod http;
pub(crate) mod implicit_delegate_retirement;
pub mod lifecycle;
pub mod mob_events;
pub mod mob_ops;
pub mod module_ops;
pub mod types;
pub use builder::{IdentityBootstrapMode, UnifiedRuntimeBuilder};
pub use edge_types::{
DesiredPeerEdge, DesiredPeerEdgeError, Discovery, EdgeDiscovery, EdgeReconcileFailure,
PreSpawnContext, PreSpawnHook,
};
pub use event_log::{EventLogConfig, EventLogError, EventLogStore, EventQuery, PersistedEvent};
pub use http::DEFAULT_REFERENCE_APP_MAX_CONCURRENT_REQUESTS;
pub use types::{
ErrorEvent, RediscoverReport, ShutdownDrainReport, UnifiedRuntimeBootstrapError,
UnifiedRuntimeBuilderError, UnifiedRuntimeBuilderField, UnifiedRuntimeError,
UnifiedRuntimeReconcileEdgesReport, UnifiedRuntimeReconcileError,
UnifiedRuntimeReconcileReport, UnifiedRuntimeReconcileRoutingReport, UnifiedRuntimeRunReport,
UnifiedRuntimeShutdownReport,
};
pub type PostSpawnHook =
Arc<dyn Fn(Vec<String>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub type PostReconcileHook = Arc<
dyn Fn(UnifiedRuntimeReconcileReport) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync,
>;
pub type ErrorHook =
Arc<dyn Fn(ErrorEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
const ROSTER_ROUTE_PREFIX: &str = "mob.member.";
const ROSTER_ROUTE_CHANNEL: &str = "notification";
const ROSTER_ROUTE_SINK: &str = "mob_member";
const ROSTER_ROUTE_TARGET_MODULE: &str = "delivery";
const DEFAULT_DRAIN_TIMEOUT: Duration = Duration::from_secs(30);
pub fn discovery_spec_to_spawn_spec(spec: &AgentDiscoverySpec) -> SpawnMemberSpec {
let resume_session_id = spec
.resume_session_id
.as_deref()
.and_then(|s| meerkat_core::types::SessionId::parse(s).ok());
let additional_instructions = if spec.additional_instructions.is_empty() {
None
} else {
Some(spec.additional_instructions.clone())
};
let mut spawn = SpawnMemberSpec::new(
meerkat_mob::ProfileName::from(spec.profile.as_str()),
MeerkatId::from(spec.meerkat_id.as_str()),
);
if let Some(context) = spec.context.clone() {
spawn = spawn.with_context(context);
}
if let Some(labels) = spec.labels.clone() {
spawn = spawn.with_labels(labels);
}
if let Some(sid) = resume_session_id {
spawn = spawn.with_resume_bridge_session_id(sid);
}
if let Some(instructions) = additional_instructions {
spawn = spawn.with_additional_instructions(instructions);
}
spawn
}
pub struct UnifiedRuntime {
mob_runtime: MobRuntime,
post_spawn_hook: Option<PostSpawnHook>,
post_reconcile_hook: Option<PostReconcileHook>,
error_hook: Option<ErrorHook>,
drain_timeout: Duration,
discovery: Option<Box<dyn Discovery>>,
edge_discovery: Option<Box<dyn EdgeDiscovery>>,
module_runtime: Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>,
managed_dynamic_edges: tokio::sync::RwLock<BTreeSet<(String, String)>>,
shutting_down: AtomicBool,
mob_event_ingress: tokio::sync::Mutex<Option<MobEventIngress>>,
bootstrap_edges_report: tokio::sync::RwLock<Option<UnifiedRuntimeReconcileEdgesReport>>,
event_log: Option<event_log::EventLogHandle>,
console_log_store: Arc<dyn ConsoleLogStore>,
console_events: ConsoleEventStore,
mob_events: MobEventsStore,
mob_events_subscriber_task: tokio::sync::Mutex<Option<JoinHandle<()>>>,
implicit_delegate_retirement_task: tokio::sync::Mutex<Option<JoinHandle<()>>>,
contact_directory: Option<crate::contact_directory::ContactDirectory>,
peer_mob_handles: tokio::sync::RwLock<BTreeMap<String, MobHandle>>,
gateway_peer_keys: Option<crate::auth::peer_keys::GatewayPeerKeys>,
session_bridge: Option<Arc<dyn crate::identity_first::bridge::SessionBridge>>,
identity_first_context: Option<Arc<crate::identity_first::IdentityFirstRuntimeContext>>,
metadata_table: Arc<RuntimeMetadataTable>,
persistent_metadata: Arc<dyn PersistentMetadataStore>,
}
enum MobEventIngress {
Forwarder(MobEventForwarder),
}
struct MobEventForwarder {
event_rx: Receiver<EventEnvelope<UnifiedEvent>>,
task: JoinHandle<()>,
}
impl UnifiedRuntime {
pub fn builder() -> UnifiedRuntimeBuilder {
UnifiedRuntimeBuilder::default()
}
pub(crate) async fn from_parts(
mob_runtime: MobRuntime,
module_runtime: MobkitRuntimeHandle,
persistent_metadata: Arc<dyn PersistentMetadataStore>,
) -> Self {
let metadata_table = Arc::new(RuntimeMetadataTable::new());
let mob_events_store = MobEventsStore::new().with_metadata_table(metadata_table.clone());
let mob_event_ingress = Some(Self::create_event_ingress(
mob_runtime.handle(),
mob_runtime.agent_mob_mcp_state(),
mob_events_store.clone(),
));
let mob_events_task = Self::spawn_mob_events_subscriber(
mob_runtime.handle(),
mob_events_store.clone(),
persistent_metadata.clone(),
);
Self {
mob_runtime,
post_spawn_hook: None,
post_reconcile_hook: None,
error_hook: None,
drain_timeout: DEFAULT_DRAIN_TIMEOUT,
discovery: None,
edge_discovery: None,
module_runtime: Arc::new(tokio::sync::Mutex::new(module_runtime)),
managed_dynamic_edges: tokio::sync::RwLock::new(BTreeSet::new()),
shutting_down: AtomicBool::new(false),
mob_event_ingress: tokio::sync::Mutex::new(mob_event_ingress),
bootstrap_edges_report: tokio::sync::RwLock::new(None),
event_log: None,
console_log_store: Arc::new(InMemoryConsoleLogStore::new()),
console_events: ConsoleEventStore::new(),
mob_events: mob_events_store,
mob_events_subscriber_task: tokio::sync::Mutex::new(mob_events_task),
implicit_delegate_retirement_task: tokio::sync::Mutex::new(None),
contact_directory: None,
peer_mob_handles: tokio::sync::RwLock::new(BTreeMap::new()),
gateway_peer_keys: None,
session_bridge: None,
identity_first_context: None,
metadata_table,
persistent_metadata,
}
}
fn spawn_mob_events_subscriber(
handle: MobHandle,
store: MobEventsStore,
persistent_metadata: Arc<dyn PersistentMetadataStore>,
) -> Option<JoinHandle<()>> {
let runtime_handle = tokio::runtime::Handle::try_current().ok()?;
Some(runtime_handle.spawn(run_mob_events_subscription(
handle,
store,
persistent_metadata,
)))
}
pub async fn bootstrap(
mob_spec: MobBootstrapSpec,
module_config: MobKitConfig,
timeout: Duration,
) -> Result<Self, UnifiedRuntimeBootstrapError> {
Self::bootstrap_with_options(
mob_spec,
module_config,
Vec::new(),
timeout,
RuntimeOptions::default(),
Arc::new(InMemoryMetadataStore::new()),
)
.await
}
pub async fn bootstrap_with_options(
mob_spec: MobBootstrapSpec,
module_config: MobKitConfig,
module_agent_events: Vec<EventEnvelope<UnifiedEvent>>,
timeout: Duration,
options: RuntimeOptions,
persistent_metadata: Arc<dyn PersistentMetadataStore>,
) -> Result<Self, UnifiedRuntimeBootstrapError> {
let mob_runtime = MobRuntime::bootstrap(mob_spec)
.await
.map_err(UnifiedRuntimeBootstrapError::Mob)?;
let runtime_options = options.clone();
let module_start_result = std::thread::spawn(move || {
start_mobkit_runtime_with_options(module_config, module_agent_events, timeout, options)
})
.join();
match module_start_result {
Ok(Ok(module_runtime)) => {
let runtime =
Self::from_parts(mob_runtime, module_runtime, persistent_metadata).await;
runtime
.configure_implicit_delegate_retirement(&runtime_options)
.await;
Ok(runtime)
}
Ok(Err(error)) => {
let startup_error = UnifiedRuntimeBootstrapError::Module(error);
Self::rollback_mob_runtime(mob_runtime, startup_error).await
}
Err(_) => {
let startup_error = UnifiedRuntimeBootstrapError::ModuleStartupThreadPanicked;
Self::rollback_mob_runtime(mob_runtime, startup_error).await
}
}
}
pub async fn bootstrap_edges_report(&self) -> Option<UnifiedRuntimeReconcileEdgesReport> {
self.bootstrap_edges_report.read().await.clone()
}
pub fn set_error_hook(&mut self, hook: ErrorHook) {
self.error_hook = Some(hook);
}
pub fn start_event_log(&mut self, config: EventLogConfig) {
let handle = event_log::start_event_log(config, self.error_hook.clone());
self.event_log = Some(handle);
}
pub(crate) fn console_events(&self) -> ConsoleEventStore {
self.console_events.clone()
}
pub(crate) fn mob_events_store(&self) -> MobEventsStore {
self.mob_events.clone()
}
pub fn binary_blob_store(&self) -> Option<Arc<dyn crate::blob_store::BinaryBlobStore>> {
self.mob_runtime.binary_blob_store()
}
pub(crate) fn module_runtime_handle(&self) -> Arc<tokio::sync::Mutex<MobkitRuntimeHandle>> {
Arc::clone(&self.module_runtime)
}
pub fn session_bridge(&self) -> Option<&Arc<dyn crate::identity_first::bridge::SessionBridge>> {
self.session_bridge.as_ref()
}
pub fn identity_first_context(
&self,
) -> Option<&Arc<crate::identity_first::IdentityFirstRuntimeContext>> {
self.identity_first_context.as_ref()
}
pub fn identity_runtime(&self) -> Option<&Arc<crate::identity_first::IdentityRuntime>> {
self.identity_first_context.as_ref().map(|ctx| &ctx.runtime)
}
pub fn attach_identity_first_context(
&mut self,
context: Arc<crate::identity_first::IdentityFirstRuntimeContext>,
) {
self.identity_first_context = Some(context);
}
pub async fn refresh_desired_topology(
&self,
) -> Result<
Option<crate::identity_first::RestoreFlowResult>,
crate::identity_first::IdentityRuntimeError,
> {
match self.identity_first_context.as_ref() {
Some(ctx) => ctx.refresh_desired_topology().await.map(Some),
None => Ok(None),
}
}
pub async fn materialize_identity_first_for_flow(
&self,
) -> Result<
Vec<crate::identity_first::ContinuityRecord>,
crate::identity_first::IdentityRuntimeError,
> {
match self.identity_runtime() {
Some(runtime) => runtime.materialize_all().await,
None => Ok(Vec::new()),
}
}
pub fn metadata_table(&self) -> &Arc<RuntimeMetadataTable> {
&self.metadata_table
}
pub fn persistent_metadata(&self) -> &Arc<dyn PersistentMetadataStore> {
&self.persistent_metadata
}
pub async fn set_mob_labels(&self, labels: BTreeMap<String, String>) {
self.metadata_table
.set_labels(MetadataScope::Mob(self.mob_id()), labels)
.await;
}
pub async fn get_mob_labels(&self) -> BTreeMap<String, String> {
self.metadata_table
.get_labels(&MetadataScope::Mob(self.mob_id()))
.await
}
pub async fn delete_mob_labels(&self) {
let _ = self
.metadata_table
.delete_labels(&MetadataScope::Mob(self.mob_id()))
.await;
}
pub async fn set_run_labels(&self, run_id: &str, labels: BTreeMap<String, String>) {
self.metadata_table
.set_labels(
MetadataScope::Run(self.mob_id(), run_id.to_string()),
labels,
)
.await;
}
pub async fn get_run_labels(&self, run_id: &str) -> BTreeMap<String, String> {
self.metadata_table
.get_labels(&MetadataScope::Run(self.mob_id(), run_id.to_string()))
.await
}
pub async fn delete_run_labels(&self, run_id: &str) {
let _ = self
.metadata_table
.delete_labels(&MetadataScope::Run(self.mob_id(), run_id.to_string()))
.await;
}
pub fn event_log_store(&self) -> Option<std::sync::Arc<dyn event_log::EventLogStore>> {
self.event_log
.as_ref()
.map(event_log::EventLogHandle::store)
}
pub fn console_log_store(&self) -> Arc<dyn ConsoleLogStore> {
self.console_log_store.clone()
}
pub fn set_console_log_store(&mut self, store: Arc<dyn ConsoleLogStore>) {
self.console_log_store = store;
}
pub async fn query_mob_events(
&self,
query: &EventQuery,
) -> Result<Vec<MobStructuralEventEnvelope>, mob_events::MobEventsQueryError> {
let events = self.mob_runtime.handle().events();
mob_events::query_ledger_with_filter(&events, &self.mob_events, query).await
}
pub fn subscribe_mob_events(
&self,
) -> tokio::sync::broadcast::Receiver<MobStructuralEventEnvelope> {
self.mob_events.subscribe()
}
pub(crate) fn ingest_event(&self, event: &EventEnvelope<UnifiedEvent>) {
if let Some(ref log) = self.event_log {
log.ingest(event.clone());
}
}
pub(crate) async fn record_console_lifecycle(
&self,
identity: &str,
event_type: &str,
data: serde_json::Value,
) {
self.console_events
.record_lifecycle(identity, event_type, data)
.await;
}
pub async fn reserve_identity_interaction(
&self,
identity: &str,
runtime_member_id: Option<&str>,
interaction_id: &str,
origin: &str,
content: serde_json::Value,
) -> Result<(), &'static str> {
self.console_events
.reserve_interaction_value(identity, runtime_member_id, interaction_id, origin, content)
.await
}
pub(crate) async fn project_console_event_from_unified(
&self,
event: &EventEnvelope<UnifiedEvent>,
) {
self.console_events.project_unified_event(event).await;
}
pub(crate) fn fire_error(&self, event: ErrorEvent) {
if let Some(ref hook) = self.error_hook {
let hook = hook.clone();
tokio::spawn(async move {
let () = hook(event).await;
});
}
}
fn create_event_ingress(
mob_handle: MobHandle,
agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
mob_events: MobEventsStore,
) -> MobEventIngress {
let (event_tx, event_rx) = tokio::sync::mpsc::channel(256);
let task = tokio::spawn(run_resilient_mob_agent_event_forwarder(
mob_handle,
agent_mob_mcp_state,
event_tx,
mob_events,
));
MobEventIngress::Forwarder(MobEventForwarder { event_rx, task })
}
async fn rollback_mob_runtime(
mob_runtime: MobRuntime,
startup_error: UnifiedRuntimeBootstrapError,
) -> Result<Self, UnifiedRuntimeBootstrapError> {
match mob_runtime.handle().stop().await {
Ok(()) => Err(startup_error),
Err(err) => Err(UnifiedRuntimeBootstrapError::ModuleStartupRollbackFailed {
startup_error: Box::new(startup_error),
rollback_error: MobRuntimeError::from(err),
}),
}
}
}
type TaggedAgentEvent = (
AgentRuntimeId,
FenceToken,
ProfileName,
meerkat_core::event::EventEnvelope<AgentEvent>,
);
type TaggedAgentEventStream = futures::stream::Map<
EventStream,
Box<dyn FnMut(meerkat_core::event::EventEnvelope<AgentEvent>) -> TaggedAgentEvent + Send>,
>;
type TrackedAgentEventStream = (String, AgentIdentity);
async fn run_resilient_mob_agent_event_forwarder(
handle: MobHandle,
agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
event_tx: Sender<EventEnvelope<UnifiedEvent>>,
mob_events: MobEventsStore,
) {
let mut streams: SelectAll<TaggedAgentEventStream> = SelectAll::new();
let mut tracked = HashSet::new();
let mut reconcile_interval = tokio::time::interval(Duration::from_millis(250));
#[cfg(not(target_arch = "wasm32"))]
reconcile_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
reconcile_agent_event_streams(&handle, &agent_mob_mcp_state, &mut tracked, &mut streams).await;
loop {
tokio::select! {
Some((source, source_fence_token, role, envelope)) = streams.next() => {
let attributed_event = AttributedEvent {
source,
source_fence_token,
role,
envelope,
};
let _ = mob_events.project_attributed_event(&attributed_event).await;
if event_tx
.send(attributed_event_to_unified(attributed_event))
.await
.is_err()
{
break;
}
}
_ = reconcile_interval.tick() => {
reconcile_agent_event_streams(&handle, &agent_mob_mcp_state, &mut tracked, &mut streams).await;
}
}
}
}
async fn reconcile_agent_event_streams(
handle: &MobHandle,
agent_mob_mcp_state: &Option<Arc<meerkat_mob_mcp::MobMcpState>>,
tracked: &mut HashSet<TrackedAgentEventStream>,
streams: &mut SelectAll<TaggedAgentEventStream>,
) {
let mut handles = vec![handle.clone()];
if let Some(state) = agent_mob_mcp_state {
let primary_mob_id = handle.mob_id().to_string();
handles.extend(state.mob_handles_snapshot().await.into_iter().filter_map(
|(mob_id, child_handle)| {
if mob_id.as_str() == primary_mob_id {
None
} else {
Some(child_handle)
}
},
));
}
let mut current: HashSet<TrackedAgentEventStream> = HashSet::new();
for handle in &handles {
let mob_id = handle.mob_id().to_string();
for entry in handle.list_members_observation_snapshot().await {
current.insert((mob_id.clone(), entry.agent_identity.clone()));
}
}
tracked.retain(|identity| current.contains(identity));
for handle in handles {
let mob_id = handle.mob_id().to_string();
for entry in handle.list_members_observation_snapshot().await {
let tracked_key = (mob_id.clone(), entry.agent_identity.clone());
if tracked.contains(&tracked_key) {
continue;
}
let identity = entry.agent_identity.clone();
let (runtime_id, fence_token) = entry.binding_atoms();
let role = entry.role.clone();
match handle.subscribe_agent_events_observation(&identity).await {
Ok(stream) => {
tracked.insert(tracked_key);
let mapped = stream.map(Box::new(move |envelope| {
(runtime_id.clone(), fence_token, role.clone(), envelope)
})
as Box<
dyn FnMut(
meerkat_core::event::EventEnvelope<AgentEvent>,
) -> TaggedAgentEvent
+ Send,
>);
streams.push(mapped);
}
Err(error) => {
tracing::warn!(
mob_id = %mob_id,
identity = %identity,
error = %error,
"mobkit agent event forwarder: failed to subscribe; will retry"
);
}
}
}
}
}
async fn run_mob_events_subscription(
handle: MobHandle,
store: MobEventsStore,
persistent_metadata: Arc<dyn PersistentMetadataStore>,
) {
let mob_id = handle.mob_id().as_str().to_string();
let resume_cursor = match persistent_metadata.get_subscription_cursor(&mob_id).await {
Ok(value) => value,
Err(err) => {
tracing::warn!(
mob_id = %mob_id,
error = %err,
"mob_events subscription: failed to read persisted cursor; resuming from latest"
);
None
}
};
let events = handle.events();
let mut subscription = match resume_cursor {
Some(cursor) => match events.subscribe_after(cursor).await {
Ok(sub) => sub,
Err(MobError::StaleEventCursor {
after_cursor,
latest_cursor,
}) => {
tracing::warn!(
mob_id = %mob_id,
after_cursor,
latest_cursor,
"mob_events subscription: persisted cursor is past ledger frontier; resuming at latest"
);
match events.subscribe().await {
Ok(sub) => sub,
Err(err) => {
tracing::warn!(
mob_id = %mob_id,
error = %err,
"mob_events subscription: failed to subscribe at latest after stale-cursor recovery"
);
return;
}
}
}
Err(err) => {
tracing::warn!(
mob_id = %mob_id,
error = %err,
"mob_events subscription: failed to resume from persisted cursor"
);
return;
}
},
None => match events.subscribe().await {
Ok(sub) => sub,
Err(err) => {
tracing::warn!(
mob_id = %mob_id,
error = %err,
"mob_events subscription: initial subscribe failed"
);
return;
}
},
};
while let Some(event) = subscription.event_rx.recv().await {
let envelope = store.project_mob_event(&event).await;
if let Err(err) = persistent_metadata
.set_subscription_cursor(&mob_id, envelope.cursor)
.await
{
tracing::warn!(
mob_id = %mob_id,
cursor = envelope.cursor,
error = %err,
"mob_events subscription: failed to persist cursor; continuing"
);
}
}
}
fn attributed_event_to_unified(attributed: AttributedEvent) -> EventEnvelope<UnifiedEvent> {
EventEnvelope {
event_id: format!("evt-agent-{}", attributed.envelope.event_id),
source: "agent".to_string(),
timestamp_ms: attributed.envelope.timestamp_ms,
event: UnifiedEvent::Agent {
agent_id: attributed.source.to_string(),
event_type: agent_event_type(&attributed.envelope.payload).to_string(),
payload: serde_json::to_value(&attributed.envelope.payload).ok(),
},
}
}