use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::stream::{self, StreamExt};
use meerkat_core::types::{HandlingMode, SessionId};
use tokio::sync::{Mutex, RwLock, broadcast};
use super::bridge::SessionBridge;
use super::contracts::{
AgentCustomizer, ContinuityStore, LeaseProvider, RosterProvider, TopologyProvider,
};
use super::types::{
AgentAddressability, AgentBuildContext, AgentIdentity, AgentRuntimeId, AgentRuntimeServices,
CheckpointVersion, ContinuityGeneration, ContinuityHealth, ContinuityRecord,
ContinuityStoreError, DispatchInput, DurabilityPolicy, DurableAgentSpec, FencingToken,
IdentityLifecycleState, IdentityStatus, LeaseGrant, LeaseInfo, ManagedPeerEdge, NotAddressable,
RosterContext, SessionSnapshot,
};
const MANAGED_PEER_RECONCILE_CONCURRENCY: usize = 64;
#[derive(Debug)]
pub enum IdentityRuntimeError {
UnknownIdentity(AgentIdentity),
NotAddressable(NotAddressable),
NoActiveLease(AgentIdentity),
LeaseLost(AgentIdentity),
InvalidState {
identity: AgentIdentity,
state: IdentityLifecycleState,
operation: &'static str,
},
Store(ContinuityStoreError),
Lease(super::types::LeaseError),
DuplicateIdentity(AgentIdentity),
StaleFencingToken {
identity: AgentIdentity,
presented: FencingToken,
current: FencingToken,
},
StaleCheckpointVersion {
identity: AgentIdentity,
presented: CheckpointVersion,
current: CheckpointVersion,
},
Internal(String),
}
impl std::fmt::Display for IdentityRuntimeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownIdentity(id) => write!(f, "unknown identity: {id}"),
Self::NotAddressable(err) => write!(f, "{err}"),
Self::NoActiveLease(id) => write!(f, "no active lease for {id}"),
Self::LeaseLost(id) => write!(f, "lease lost for {id}"),
Self::InvalidState {
identity,
state,
operation,
} => write!(
f,
"cannot {operation} identity {identity} in state {state:?}"
),
Self::Store(err) => write!(f, "continuity store: {err}"),
Self::Lease(err) => write!(f, "lease provider: {err}"),
Self::DuplicateIdentity(id) => write!(f, "duplicate identity in roster: {id}"),
Self::StaleFencingToken {
identity,
presented,
current,
} => write!(
f,
"stale fencing token for {identity}: presented {presented}, current {current}"
),
Self::StaleCheckpointVersion {
identity,
presented,
current,
} => write!(
f,
"stale checkpoint version for {identity}: presented {presented}, current {current}"
),
Self::Internal(msg) => write!(f, "internal: {msg}"),
}
}
}
impl std::error::Error for IdentityRuntimeError {}
impl From<ContinuityStoreError> for IdentityRuntimeError {
fn from(err: ContinuityStoreError) -> Self {
match err {
ContinuityStoreError::StaleFencingToken {
identity,
presented,
current,
} => Self::StaleFencingToken {
identity,
presented,
current,
},
ContinuityStoreError::StaleCheckpointVersion {
identity,
presented,
current,
} => Self::StaleCheckpointVersion {
identity,
presented,
current,
},
other => Self::Store(other),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct IdentityEntry {
pub spec: DurableAgentSpec,
pub state: IdentityLifecycleState,
pub continuity: Option<ContinuityRecord>,
pub lease: Option<LeaseEntry>,
pub checkpoint_version: CheckpointVersion,
pub has_runtime_store: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct LeaseEntry {
pub fencing_token: FencingToken,
pub ttl: Duration,
pub acquired_at: Instant,
}
impl LeaseEntry {
pub fn is_expired(&self) -> bool {
self.acquired_at.elapsed() > self.ttl
}
pub fn ttl_remaining(&self) -> Duration {
self.ttl.saturating_sub(self.acquired_at.elapsed())
}
pub fn is_healthy(&self) -> bool {
let remaining = self.ttl_remaining();
remaining > self.ttl / 5
}
}
#[derive(Debug, Clone)]
pub enum IdentityEvent {
StateChanged {
identity: AgentIdentity,
new_state: IdentityLifecycleState,
},
LeaseUpdated {
identity: AgentIdentity,
fencing_token: FencingToken,
},
LeaseLost { identity: AgentIdentity },
CheckpointCompleted {
identity: AgentIdentity,
version: CheckpointVersion,
},
ResumeFallback {
identity: AgentIdentity,
reason: super::bridge::ResumeFallbackReason,
},
}
const IDENTITY_EVENT_CHANNEL_CAPACITY: usize = 64;
pub struct IdentityRuntimeConfig {
pub continuity_store: Arc<dyn ContinuityStore>,
pub lease_provider: Arc<dyn LeaseProvider>,
pub runtime_instance_id: String,
pub has_runtime_store: bool,
pub durability_policy: DurabilityPolicy,
pub bridge: Option<Arc<dyn SessionBridge>>,
pub default_timeout: Option<Duration>,
}
#[derive(Clone)]
pub struct IdentityFirstRuntimeContext {
pub runtime: Arc<IdentityRuntime>,
pub roster_provider: Arc<dyn RosterProvider>,
pub topology_provider: Option<Arc<dyn TopologyProvider>>,
pub customizer: Option<Arc<dyn AgentCustomizer>>,
mob_definition: Option<meerkat_mob::MobDefinition>,
lazy_materialization: bool,
}
impl IdentityFirstRuntimeContext {
pub fn new(
runtime: Arc<IdentityRuntime>,
roster_provider: Arc<dyn RosterProvider>,
topology_provider: Option<Arc<dyn TopologyProvider>>,
customizer: Option<Arc<dyn AgentCustomizer>>,
mob_definition: Option<meerkat_mob::MobDefinition>,
) -> Self {
Self::new_with_lazy_materialization(
runtime,
roster_provider,
topology_provider,
customizer,
mob_definition,
false,
)
}
pub fn new_with_lazy_materialization(
runtime: Arc<IdentityRuntime>,
roster_provider: Arc<dyn RosterProvider>,
topology_provider: Option<Arc<dyn TopologyProvider>>,
customizer: Option<Arc<dyn AgentCustomizer>>,
mob_definition: Option<meerkat_mob::MobDefinition>,
lazy_materialization: bool,
) -> Self {
Self {
runtime,
roster_provider,
topology_provider,
customizer,
mob_definition,
lazy_materialization,
}
}
pub async fn refresh_desired_topology(
&self,
) -> Result<super::orchestrator::RestoreFlowResult, IdentityRuntimeError> {
let roster = self
.roster_provider
.roster(&RosterContext {
mob_definition: self.mob_definition.clone(),
previous_identities: Vec::new(),
})
.await
.map_err(|err| IdentityRuntimeError::Internal(format!("roster provider: {err}")))?;
if self.lazy_materialization {
super::orchestrator::lazy_register_flow(
&self.runtime,
&roster,
self.topology_provider.as_deref(),
)
.await
} else {
super::orchestrator::restore_flow(
&self.runtime,
&roster,
self.topology_provider.as_deref(),
self.customizer.as_deref(),
)
.await
}
}
}
pub struct IdentityRuntime {
entries: RwLock<BTreeMap<AgentIdentity, IdentityEntry>>,
event_channels: RwLock<BTreeMap<AgentIdentity, broadcast::Sender<IdentityEvent>>>,
continuity_store: Arc<dyn ContinuityStore>,
lease_provider: Arc<dyn LeaseProvider>,
runtime_instance_id: String,
has_runtime_store: bool,
durability_policy: DurabilityPolicy,
bridge: Option<Arc<dyn SessionBridge>>,
runtime_services: AgentRuntimeServices,
managed_peer_edges: RwLock<BTreeSet<(AgentIdentity, AgentIdentity)>>,
managed_peer_reconcile_lock: Mutex<()>,
desired_peer_edges: RwLock<Vec<ManagedPeerEdge>>,
materialization_locks: RwLock<BTreeMap<AgentIdentity, Arc<Mutex<()>>>>,
lifecycle_locks: RwLock<BTreeMap<AgentIdentity, Arc<Mutex<()>>>>,
customizer: RwLock<Option<Arc<dyn AgentCustomizer>>>,
default_timeout: Duration,
}
impl IdentityRuntime {
pub fn new(config: IdentityRuntimeConfig) -> Self {
Self {
entries: RwLock::new(BTreeMap::new()),
event_channels: RwLock::new(BTreeMap::new()),
continuity_store: config.continuity_store,
lease_provider: config.lease_provider,
runtime_instance_id: config.runtime_instance_id,
has_runtime_store: config.has_runtime_store,
durability_policy: config.durability_policy,
bridge: config.bridge,
runtime_services: AgentRuntimeServices::empty(),
managed_peer_edges: RwLock::new(BTreeSet::new()),
managed_peer_reconcile_lock: Mutex::new(()),
desired_peer_edges: RwLock::new(Vec::new()),
materialization_locks: RwLock::new(BTreeMap::new()),
lifecycle_locks: RwLock::new(BTreeMap::new()),
customizer: RwLock::new(None),
default_timeout: config.default_timeout.unwrap_or(Duration::from_secs(90)),
}
}
pub fn with_runtime_services(mut self, runtime_services: AgentRuntimeServices) -> Self {
self.runtime_services = runtime_services;
self
}
pub(crate) fn runtime_services(&self) -> AgentRuntimeServices {
self.runtime_services.clone()
}
pub async fn set_agent_customizer(&self, customizer: Option<Arc<dyn AgentCustomizer>>) {
*self.customizer.write().await = customizer;
}
async fn release_uninstalled_materialize_lease(&self, grant: &LeaseGrant) -> Option<String> {
self.lease_provider
.release_leases(std::slice::from_ref(grant))
.await
.err()
.map(|err| err.to_string())
}
pub async fn set_desired_peer_edges(&self, edges: Vec<ManagedPeerEdge>) {
*self.desired_peer_edges.write().await = edges;
}
pub async fn desired_peer_edges(&self) -> Vec<ManagedPeerEdge> {
self.desired_peer_edges.read().await.clone()
}
async fn registered_identities(&self) -> Vec<AgentIdentity> {
self.entries.read().await.keys().cloned().collect()
}
async fn reachable_peer_identities(&self, identity: &AgentIdentity) -> Vec<AgentIdentity> {
self.desired_peer_edges
.read()
.await
.iter()
.filter_map(|edge| {
if edge.a() == identity {
Some(edge.b().clone())
} else if edge.b() == identity {
Some(edge.a().clone())
} else {
None
}
})
.collect::<BTreeSet<_>>()
.into_iter()
.collect()
}
#[must_use]
pub fn has_session_bridge(&self) -> bool {
self.bridge.is_some()
}
pub async fn reconcile_managed_peer_edges(
&self,
desired_edges: &[ManagedPeerEdge],
) -> Result<(), IdentityRuntimeError> {
let _guard = self.managed_peer_reconcile_lock.lock().await;
let Some(bridge) = self.bridge.clone() else {
return Ok(());
};
let active_runtimes: BTreeMap<AgentIdentity, AgentRuntimeId> = {
let entries = self.entries.read().await;
entries
.iter()
.filter_map(|(identity, entry)| {
if entry.state != IdentityLifecycleState::Active {
return None;
}
entry
.continuity
.as_ref()
.map(|record| (identity.clone(), record.agent_runtime_id.clone()))
})
.collect()
};
let runtime_identities: BTreeMap<AgentRuntimeId, AgentIdentity> = active_runtimes
.iter()
.map(|(identity, runtime_id)| (runtime_id.clone(), identity.clone()))
.collect();
let current_logical_edges: Option<BTreeSet<(AgentIdentity, AgentIdentity)>> =
match bridge.current_member_wires().await {
Ok(current_runtime_edges) => Some(
current_runtime_edges
.iter()
.filter_map(|(runtime_a, runtime_b)| {
let a = runtime_identities.get(runtime_a)?;
let b = runtime_identities.get(runtime_b)?;
if a <= b {
Some((a.clone(), b.clone()))
} else {
Some((b.clone(), a.clone()))
}
})
.collect(),
),
Err(err) => {
tracing::debug!(
error = %err,
"identity-first topology reconcile could not inspect current member wires"
);
None
}
};
let desired: BTreeSet<(AgentIdentity, AgentIdentity)> = desired_edges
.iter()
.map(|edge| (edge.a().clone(), edge.b().clone()))
.collect();
let managed_snapshot = self.managed_peer_edges.read().await.clone();
let retained_logical_edges: Vec<(AgentIdentity, AgentIdentity)> = desired
.iter()
.filter(|edge| !managed_snapshot.contains(*edge))
.filter(|edge| {
current_logical_edges
.as_ref()
.is_some_and(|edges| edges.contains(*edge))
})
.filter(|(a, b)| active_runtimes.contains_key(a) && active_runtimes.contains_key(b))
.cloned()
.collect();
let to_wire: Vec<(AgentIdentity, AgentIdentity, AgentRuntimeId, AgentRuntimeId)> = desired
.iter()
.filter(|edge| !managed_snapshot.contains(*edge))
.filter(|edge| {
current_logical_edges
.as_ref()
.is_none_or(|edges| !edges.contains(*edge))
})
.filter_map(|(a, b)| {
let runtime_a = active_runtimes.get(a)?;
let runtime_b = active_runtimes.get(b)?;
Some((a.clone(), b.clone(), runtime_a.clone(), runtime_b.clone()))
})
.collect();
let stale: Vec<(AgentIdentity, AgentIdentity)> = managed_snapshot
.iter()
.filter(|edge| !desired.contains(*edge))
.cloned()
.collect();
let to_unwire: Vec<(AgentIdentity, AgentIdentity, AgentRuntimeId, AgentRuntimeId)> = stale
.iter()
.filter_map(|(a, b)| {
let runtime_a = active_runtimes.get(a)?;
let runtime_b = active_runtimes.get(b)?;
if current_logical_edges
.as_ref()
.is_some_and(|edges| !edges.contains(&(a.clone(), b.clone())))
{
return None;
}
Some((a.clone(), b.clone(), runtime_a.clone(), runtime_b.clone()))
})
.collect();
let wire_logical_edges = to_wire
.iter()
.map(|(a, b, _, _)| (a.clone(), b.clone()))
.collect::<Vec<_>>();
let wire_runtime_edges = to_wire
.iter()
.map(|(_, _, runtime_a, runtime_b)| (runtime_a.clone(), runtime_b.clone()))
.collect::<Vec<_>>();
if !wire_runtime_edges.is_empty() {
bridge
.wire_peers_batch(&wire_runtime_edges)
.await
.map_err(|e| {
IdentityRuntimeError::Internal(format!("bridge wire_peers_batch: {e}"))
})?;
}
let unwire_results =
stream::iter(to_unwire.into_iter().map(|(a, b, runtime_a, runtime_b)| {
let bridge = bridge.clone();
async move {
let result = bridge
.unwire_peer(&runtime_a, &runtime_b)
.await
.map_err(|e| format!("{e}"));
(a, b, result)
}
}))
.buffer_unordered(MANAGED_PEER_RECONCILE_CONCURRENCY)
.collect::<Vec<_>>()
.await;
let mut managed = self.managed_peer_edges.write().await;
for (a, b) in retained_logical_edges {
managed.insert((a, b));
}
for (a, b) in wire_logical_edges {
managed.insert((a, b));
}
for (a, b) in stale {
let key = (a.clone(), b.clone());
if !active_runtimes.contains_key(&a) || !active_runtimes.contains_key(&b) {
managed.remove(&key);
}
}
for (a, b, result) in unwire_results {
result
.map_err(|e| IdentityRuntimeError::Internal(format!("bridge unwire_peer: {e}")))?;
managed.remove(&(a, b));
}
Ok(())
}
async fn emit_event(&self, identity: &AgentIdentity, event: IdentityEvent) {
let channels = self.event_channels.read().await;
if let Some(tx) = channels.get(identity) {
let _ = tx.send(event);
}
}
pub async fn register(
&self,
spec: DurableAgentSpec,
state: IdentityLifecycleState,
continuity: Option<ContinuityRecord>,
lease: Option<LeaseGrant>,
) {
let identity = spec.identity.clone();
let cpv = continuity
.as_ref()
.map(|r| r.checkpoint_version)
.unwrap_or(CheckpointVersion::new(0));
let lease_entry = lease.map(|g| LeaseEntry {
fencing_token: g.fencing_token,
ttl: g.ttl,
acquired_at: Instant::now(),
});
let entry = IdentityEntry {
spec,
state,
continuity,
lease: lease_entry,
checkpoint_version: cpv,
has_runtime_store: self.has_runtime_store,
};
self.entries.write().await.insert(identity.clone(), entry);
let (tx, _) = broadcast::channel(IDENTITY_EVENT_CHANNEL_CAPACITY);
self.event_channels.write().await.insert(identity, tx);
}
async fn materialization_lock_for(&self, identity: &AgentIdentity) -> Arc<Mutex<()>> {
if let Some(lock) = self.materialization_locks.read().await.get(identity) {
return lock.clone();
}
let mut locks = self.materialization_locks.write().await;
locks
.entry(identity.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
async fn lifecycle_lock_for(&self, identity: &AgentIdentity) -> Arc<Mutex<()>> {
if let Some(lock) = self.lifecycle_locks.read().await.get(identity) {
return lock.clone();
}
let mut locks = self.lifecycle_locks.write().await;
locks
.entry(identity.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
pub async fn materialize(
&self,
identity: &AgentIdentity,
) -> Result<ContinuityRecord, IdentityRuntimeError> {
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let lock = self.materialization_lock_for(identity).await;
let _guard = lock.lock().await;
let (spec, continuity, state) = {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
if entry.state == IdentityLifecycleState::Active {
return entry.continuity.clone().ok_or_else(|| {
IdentityRuntimeError::Internal(format!(
"active identity {identity} has no continuity record"
))
});
}
(entry.spec.clone(), entry.continuity.clone(), entry.state)
};
match state {
IdentityLifecycleState::Dormant | IdentityLifecycleState::Uninitialized => {}
IdentityLifecycleState::Broken
| IdentityLifecycleState::Retiring
| IdentityLifecycleState::Suspended => {
return Err(IdentityRuntimeError::InvalidState {
identity: identity.clone(),
state,
operation: "materialize",
});
}
IdentityLifecycleState::Active => unreachable!("active handled above"),
}
let lease_results = self
.lease_provider
.acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
.await
.map_err(IdentityRuntimeError::Lease)?;
let grant = match lease_results.get(identity) {
Some(super::types::LeaseAcquireResult::Acquired(grant)) => grant.clone(),
_ => return Err(IdentityRuntimeError::NoActiveLease(identity.clone())),
};
if let Some(record) = continuity.as_ref()
&& let Err(err) = self
.continuity_store
.upsert_continuity_record(record, grant.fencing_token)
.await
{
let cleanup_error = self.release_uninstalled_materialize_lease(&grant).await;
return Err(IdentityRuntimeError::Internal(format!(
"continuity upsert before materialize: {err}{}",
cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
let active_peers = self.entries.read().await.keys().cloned().collect();
let managed_edges = self.desired_peer_edges.read().await.clone();
let build_context = AgentBuildContext {
identity: identity.clone(),
active_peers,
managed_edges,
runtime_services: self.runtime_services(),
};
let mut draft = super::types::AgentBuildDraft {
model: None,
system_prompt: None,
additional_instructions: spec.additional_instructions.clone(),
labels: spec.labels.clone(),
app_context: spec.context.clone(),
external_tools: Vec::new(),
local_external_tools: Default::default(),
};
if let Some(customizer) = self.customizer.read().await.clone()
&& let Err(err) = customizer
.customize_build(&build_context, &spec, &mut draft)
.await
{
let cleanup_error = self.release_uninstalled_materialize_lease(&grant).await;
return Err(IdentityRuntimeError::Internal(format!(
"customizer: {err}{}",
cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
let original_continuity = continuity.clone();
let mut abandoned_session_registrations: Vec<SessionId> = Vec::new();
let mut record = if let Some(mut record) = continuity {
let snapshot = match self
.continuity_store
.load_session_snapshot(&record.session_id)
.await
{
Ok(snapshot) => snapshot,
Err(err) => {
let cleanup_error = self.release_uninstalled_materialize_lease(&grant).await;
return Err(IdentityRuntimeError::Internal(format!(
"load session snapshot before materialize: {err}{}",
cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
};
if let Some(bridge) = self.bridge.as_ref() {
if let Err(err) = bridge
.register_session_runtime_state(
&record.session_id,
identity,
record.generation,
record.checkpoint_version,
grant.fencing_token,
)
.await
{
let unregister_error = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
std::slice::from_ref(&record.session_id),
)
.await;
let cleanup_error = self.release_uninstalled_materialize_lease(&grant).await;
return Err(IdentityRuntimeError::Internal(format!(
"bridge register_session_runtime_state: {err}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
let registered_session_id = record.session_id.clone();
let snapshot = snapshot.unwrap_or(SessionSnapshot { data: Vec::new() });
let outcome = bridge
.resume_session(
identity,
&record.agent_runtime_id,
&spec,
&draft,
&record.session_id,
&snapshot,
)
.await;
let outcome = match outcome {
Ok(outcome) => outcome,
Err(err) => {
let unregister_error = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
std::slice::from_ref(®istered_session_id),
)
.await;
let cleanup_error =
bridge.retire_member(&record.agent_runtime_id).await.err();
let lease_cleanup_error =
self.release_uninstalled_materialize_lease(&grant).await;
let detail = format!(
"bridge resume_session: {err}{}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
lease_cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
);
return Err(IdentityRuntimeError::Internal(detail));
}
};
if let Some(reason) = outcome.fallback_reason().cloned() {
tracing::warn!(
%identity,
reason = ?reason,
"lazy identity materialization fresh-spawned after typed resume fallback"
);
self.emit_event(
identity,
IdentityEvent::ResumeFallback {
identity: identity.clone(),
reason,
},
)
.await;
}
let effective_session_id = outcome.session_id().clone();
if effective_session_id != registered_session_id {
abandoned_session_registrations.push(registered_session_id);
}
record.session_id = effective_session_id;
}
record
} else {
let new_runtime_id =
AgentRuntimeId::parse(&format!("rt:{identity}:0")).map_err(|err| {
IdentityRuntimeError::Internal(format!("failed to mint runtime id: {err}"))
})?;
let mut record = ContinuityRecord {
identity: identity.clone(),
agent_runtime_id: new_runtime_id,
session_id: meerkat_core::types::SessionId::new(),
generation: ContinuityGeneration::new(0),
checkpoint_version: CheckpointVersion::new(0),
};
if let Err(err) = self
.continuity_store
.upsert_continuity_record(&record, grant.fencing_token)
.await
{
let cleanup_error = self.release_uninstalled_materialize_lease(&grant).await;
return Err(IdentityRuntimeError::Internal(format!(
"continuity upsert before materialize create: {err}{}",
cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
if let Some(bridge) = self.bridge.as_ref() {
let provisional_session_id = record.session_id.clone();
if let Err(err) = bridge
.register_session_runtime_state(
&record.session_id,
identity,
record.generation,
record.checkpoint_version,
grant.fencing_token,
)
.await
.map_err(|err| {
IdentityRuntimeError::Internal(format!(
"bridge register_session_runtime_state: {err}"
))
})
{
let unregister_error = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
std::slice::from_ref(&provisional_session_id),
)
.await;
let delete_error = self
.continuity_store
.delete_continuity_record(identity, grant.fencing_token)
.await
.err();
let cleanup_error = self.release_uninstalled_materialize_lease(&grant).await;
if let Some(delete_error) = delete_error {
return Err(IdentityRuntimeError::Internal(format!(
"{err}{}; tentative continuity cleanup failed: {delete_error}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
if let Some(cleanup_error) = cleanup_error {
return Err(IdentityRuntimeError::Internal(format!(
"{err}{}; lease cleanup failed: {cleanup_error}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
)));
}
if let Some(unregister_error) = unregister_error {
return Err(IdentityRuntimeError::Internal(format!(
"{err}; unregister session failed: {unregister_error}"
)));
}
return Err(err);
}
let created_session_id = bridge
.create_session(
identity,
&record.agent_runtime_id,
&spec,
&draft,
&record.session_id,
)
.await
.map_err(|err| {
IdentityRuntimeError::Internal(format!("bridge create_session: {err}"))
});
match created_session_id {
Ok(session_id) => {
if session_id != provisional_session_id {
abandoned_session_registrations.push(provisional_session_id);
}
record.session_id = session_id;
}
Err(err) => {
let unregister_error = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
std::slice::from_ref(&provisional_session_id),
)
.await;
let cleanup_error =
bridge.retire_member(&record.agent_runtime_id).await.err();
let delete_error = self
.continuity_store
.delete_continuity_record(identity, grant.fencing_token)
.await
.err();
let lease_cleanup_error =
self.release_uninstalled_materialize_lease(&grant).await;
if unregister_error.is_some()
|| cleanup_error.is_some()
|| delete_error.is_some()
|| lease_cleanup_error.is_some()
{
return Err(IdentityRuntimeError::Internal(format!(
"{err}{}{}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
delete_error
.as_ref()
.map(|e| format!("; tentative continuity cleanup failed: {e}"))
.unwrap_or_default(),
lease_cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
return Err(err);
}
}
}
record
};
if let Err(err) = self
.continuity_store
.upsert_continuity_record(&record, grant.fencing_token)
.await
{
let unregister_error = if let Some(bridge) = self.bridge.as_ref() {
let mut sessions_to_unregister = abandoned_session_registrations.clone();
sessions_to_unregister.push(record.session_id.clone());
Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
&sessions_to_unregister,
)
.await
} else {
None
};
let cleanup_error = if let Some(bridge) = self.bridge.as_ref() {
bridge.retire_member(&record.agent_runtime_id).await.err()
} else {
None
};
let restore_error = self
.restore_continuity_after_materialize_failure(
identity,
original_continuity.as_ref(),
&grant,
)
.await;
let lease_cleanup_error = self.release_uninstalled_materialize_lease(&grant).await;
if unregister_error.is_some()
|| cleanup_error.is_some()
|| restore_error.is_some()
|| lease_cleanup_error.is_some()
{
return Err(IdentityRuntimeError::Internal(format!(
"continuity upsert after materialize: {err}{}{}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
restore_error
.as_ref()
.map(|e| format!("; continuity rollback failed: {e}"))
.unwrap_or_default(),
lease_cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
return Err(IdentityRuntimeError::Internal(format!(
"continuity upsert after materialize: {err}"
)));
}
if let Some(bridge) = self.bridge.as_ref() {
let register_result = bridge
.register_session_runtime_state(
&record.session_id,
identity,
record.generation,
record.checkpoint_version,
grant.fencing_token,
)
.await;
let effective_checkpoint_version = match register_result {
Ok(version) => version,
Err(err) => {
let mut sessions_to_unregister = abandoned_session_registrations.clone();
sessions_to_unregister.push(record.session_id.clone());
let unregister_error = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
&sessions_to_unregister,
)
.await;
let cleanup_error = bridge.retire_member(&record.agent_runtime_id).await.err();
let restore_error = self
.restore_continuity_after_materialize_failure(
identity,
original_continuity.as_ref(),
&grant,
)
.await;
let lease_cleanup_error =
self.release_uninstalled_materialize_lease(&grant).await;
return Err(IdentityRuntimeError::Internal(format!(
"bridge register actual session runtime state: {err}{}{}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
restore_error
.as_ref()
.map(|e| format!("; continuity rollback failed: {e}"))
.unwrap_or_default(),
lease_cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
};
record.checkpoint_version = effective_checkpoint_version;
if let Some(err) = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
&abandoned_session_registrations,
)
.await
{
let actual_unregister_error = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
std::slice::from_ref(&record.session_id),
)
.await;
let unregister_error = actual_unregister_error
.map(|actual_err| format!("{err}; actual session: {actual_err}"))
.unwrap_or(err);
let cleanup_error = bridge.retire_member(&record.agent_runtime_id).await.err();
let restore_error = self
.restore_continuity_after_materialize_failure(
identity,
original_continuity.as_ref(),
&grant,
)
.await;
let lease_cleanup_error = self.release_uninstalled_materialize_lease(&grant).await;
return Err(IdentityRuntimeError::Internal(format!(
"bridge unregister abandoned session runtime state: {unregister_error}{}{}{}",
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
restore_error
.as_ref()
.map(|e| format!("; continuity rollback failed: {e}"))
.unwrap_or_default(),
lease_cleanup_error
.as_ref()
.map(|e| format!("; lease cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
}
{
let mut entries = self.entries.write().await;
let entry = entries
.get_mut(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
entry.continuity = Some(record.clone());
entry.lease = Some(Self::lease_entry_from_grant(&grant));
entry.state = IdentityLifecycleState::Active;
entry.checkpoint_version = record.checkpoint_version;
}
self.emit_event(
identity,
IdentityEvent::StateChanged {
identity: identity.clone(),
new_state: IdentityLifecycleState::Active,
},
)
.await;
let desired_edges = self.desired_peer_edges.read().await.clone();
if !desired_edges.is_empty()
&& let Err(err) = self.reconcile_managed_peer_edges(&desired_edges).await
{
tracing::warn!(
identity = %identity,
error = %err,
"identity materialized with topology reconcile warning"
);
}
Ok(record)
}
pub async fn materialize_all(&self) -> Result<Vec<ContinuityRecord>, IdentityRuntimeError> {
let identities = self.registered_identities().await;
let results = stream::iter(
identities
.into_iter()
.map(|identity| async move { self.materialize(&identity).await }),
)
.buffer_unordered(MANAGED_PEER_RECONCILE_CONCURRENCY)
.collect::<Vec<_>>()
.await;
let mut records = Vec::with_capacity(results.len());
for result in results {
records.push(result?);
}
let desired_edges = self.desired_peer_edges.read().await.clone();
if !desired_edges.is_empty()
&& let Err(err) = self.reconcile_managed_peer_edges(&desired_edges).await
{
tracing::warn!(
error = %err,
"identity materialize_all completed with topology reconcile warning"
);
}
Ok(records)
}
pub async fn materialize_reachable_peers(
&self,
identity: &AgentIdentity,
) -> Result<Vec<ContinuityRecord>, IdentityRuntimeError> {
let peers = self.reachable_peer_identities(identity).await;
let results = stream::iter(
peers
.into_iter()
.map(|peer| async move { self.materialize(&peer).await }),
)
.buffer_unordered(MANAGED_PEER_RECONCILE_CONCURRENCY)
.collect::<Vec<_>>()
.await;
let mut records = Vec::with_capacity(results.len());
for result in results {
records.push(result?);
}
let desired_edges = self.desired_peer_edges.read().await.clone();
if !desired_edges.is_empty()
&& let Err(err) = self.reconcile_managed_peer_edges(&desired_edges).await
{
tracing::warn!(
identity = %identity,
error = %err,
"identity peer materialization completed with topology reconcile warning"
);
}
Ok(records)
}
pub async fn subscribe(
&self,
identity: &AgentIdentity,
) -> Result<broadcast::Receiver<IdentityEvent>, IdentityRuntimeError> {
let channels = self.event_channels.read().await;
let tx = channels
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
Ok(tx.subscribe())
}
pub async fn update_spec(&self, spec: DurableAgentSpec) -> Result<(), IdentityRuntimeError> {
let mut entries = self.entries.write().await;
let entry = entries
.get_mut(&spec.identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(spec.identity.clone()))?;
entry.spec = spec;
Ok(())
}
pub async fn update_lease(
&self,
identity: &AgentIdentity,
grant: LeaseGrant,
) -> Result<(), IdentityRuntimeError> {
let fencing_token = grant.fencing_token;
let mut entries = self.entries.write().await;
let entry = entries
.get_mut(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
entry.lease = Some(LeaseEntry {
fencing_token,
ttl: grant.ttl,
acquired_at: Instant::now(),
});
drop(entries);
self.emit_event(
identity,
IdentityEvent::LeaseUpdated {
identity: identity.clone(),
fencing_token,
},
)
.await;
Ok(())
}
pub async fn mark_lease_lost(
&self,
identity: &AgentIdentity,
) -> Result<(), IdentityRuntimeError> {
let mut entries = self.entries.write().await;
let entry = entries
.get_mut(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
entry.lease = None;
drop(entries);
self.emit_event(
identity,
IdentityEvent::LeaseLost {
identity: identity.clone(),
},
)
.await;
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn remove(&self, identity: &AgentIdentity) -> Option<IdentityEntry> {
self.event_channels.write().await.remove(identity);
self.entries.write().await.remove(identity)
}
pub async fn set_state(
&self,
identity: &AgentIdentity,
state: IdentityLifecycleState,
) -> Result<(), IdentityRuntimeError> {
let mut entries = self.entries.write().await;
let entry = entries
.get_mut(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
entry.state = state;
drop(entries);
self.emit_event(
identity,
IdentityEvent::StateChanged {
identity: identity.clone(),
new_state: state,
},
)
.await;
Ok(())
}
fn check_lease(entry: &IdentityEntry) -> Result<FencingToken, IdentityRuntimeError> {
match &entry.lease {
Some(lease) if !lease.is_expired() => Ok(lease.fencing_token),
Some(_) => Err(IdentityRuntimeError::LeaseLost(entry.spec.identity.clone())),
None => Err(IdentityRuntimeError::NoActiveLease(
entry.spec.identity.clone(),
)),
}
}
fn lease_entry_from_grant(grant: &LeaseGrant) -> LeaseEntry {
LeaseEntry {
fencing_token: grant.fencing_token,
ttl: grant.ttl,
acquired_at: Instant::now(),
}
}
async fn mark_lifecycle_in_progress(
&self,
identity: &AgentIdentity,
state: IdentityLifecycleState,
) -> Result<IdentityEntry, IdentityRuntimeError> {
let mut entries = self.entries.write().await;
let entry = entries
.get_mut(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
let snapshot = entry.clone();
entry.state = state;
entry.lease = None;
Ok(snapshot)
}
async fn restore_entry(&self, identity: &AgentIdentity, entry: IdentityEntry) {
self.entries.write().await.insert(identity.clone(), entry);
}
async fn restore_entry_with_grant(
&self,
identity: &AgentIdentity,
mut entry: IdentityEntry,
grant: &LeaseGrant,
) {
let restore_live_lease = entry.state == IdentityLifecycleState::Active;
if let Some(record) = entry.continuity.as_ref() {
if let Err(err) = self
.continuity_store
.upsert_continuity_record(record, grant.fencing_token)
.await
{
tracing::warn!(
%identity,
error = %err,
"failed to advance restored continuity fencing token after lifecycle failure"
);
entry.state = IdentityLifecycleState::Broken;
} else if let Some(bridge) = self.bridge.as_ref()
&& let Err(err) = bridge
.register_session_runtime_state(
&record.session_id,
identity,
record.generation,
record.checkpoint_version,
grant.fencing_token,
)
.await
{
tracing::warn!(
%identity,
error = %err,
"failed to refresh restored session runtime state after lifecycle failure"
);
entry.state = IdentityLifecycleState::Broken;
}
entry.lease = restore_live_lease.then(|| Self::lease_entry_from_grant(grant));
}
self.restore_entry(identity, entry).await;
}
pub(crate) async fn refresh_active_restore_grant(
&self,
identity: &AgentIdentity,
grant: &LeaseGrant,
) -> Result<(), IdentityRuntimeError> {
let record = {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
if entry.state != IdentityLifecycleState::Active {
return Err(IdentityRuntimeError::InvalidState {
identity: identity.clone(),
state: entry.state,
operation: "refresh_active_restore_grant",
});
}
entry.continuity.clone()
};
if let Some(record) = record.as_ref()
&& let Err(err) = self
.continuity_store
.upsert_continuity_record(record, grant.fencing_token)
.await
{
let mut entries = self.entries.write().await;
if let Some(entry) = entries.get_mut(identity) {
entry.state = IdentityLifecycleState::Broken;
entry.lease = None;
}
drop(entries);
self.emit_event(
identity,
IdentityEvent::StateChanged {
identity: identity.clone(),
new_state: IdentityLifecycleState::Broken,
},
)
.await;
return Err(IdentityRuntimeError::Store(err));
}
let mut entries = self.entries.write().await;
let entry = entries
.get_mut(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
if entry.state != IdentityLifecycleState::Active {
return Err(IdentityRuntimeError::InvalidState {
identity: identity.clone(),
state: entry.state,
operation: "refresh_active_restore_grant",
});
}
entry.lease = Some(Self::lease_entry_from_grant(grant));
drop(entries);
self.emit_event(
identity,
IdentityEvent::LeaseUpdated {
identity: identity.clone(),
fencing_token: grant.fencing_token,
},
)
.await;
Ok(())
}
async fn restore_broken_entry_with_fenced_store(
&self,
identity: &AgentIdentity,
mut entry: IdentityEntry,
grant: &LeaseGrant,
) {
entry.state = IdentityLifecycleState::Broken;
entry.lease = None;
if let Some(record) = entry.continuity.as_ref()
&& let Err(err) = self
.continuity_store
.upsert_continuity_record(record, grant.fencing_token)
.await
{
tracing::warn!(
%identity,
error = %err,
"failed to preserve fenced continuity record for broken identity"
);
}
self.restore_entry(identity, entry).await;
}
async fn mark_rebind_failure_broken(
&self,
identity: &AgentIdentity,
mut entry: IdentityEntry,
grant: &LeaseGrant,
rebound_record: &ContinuityRecord,
) {
entry.state = IdentityLifecycleState::Broken;
entry.lease = None;
entry.checkpoint_version = rebound_record.checkpoint_version;
entry.continuity = Some(rebound_record.clone());
if let Err(err) = self
.continuity_store
.upsert_continuity_record(rebound_record, grant.fencing_token)
.await
{
tracing::warn!(
%identity,
session_id = %rebound_record.session_id,
error = %err,
"failed to preserve rebound continuity after live respawn rebind failure"
);
}
self.restore_entry(identity, entry).await;
}
async fn restore_entry_after_reset_bridge_failure(
&self,
identity: &AgentIdentity,
entry: IdentityEntry,
grant: &LeaseGrant,
force_broken: bool,
) -> Option<ContinuityStoreError> {
let delete_error = if entry.continuity.is_none() {
self.continuity_store
.delete_continuity_record(identity, grant.fencing_token)
.await
.err()
} else {
None
};
if force_broken || delete_error.is_some() {
self.restore_broken_entry_with_fenced_store(identity, entry, grant)
.await;
} else {
self.restore_entry_with_grant(identity, entry, grant).await;
}
delete_error
}
async fn restore_continuity_after_materialize_failure(
&self,
identity: &AgentIdentity,
previous: Option<&ContinuityRecord>,
grant: &LeaseGrant,
) -> Option<ContinuityStoreError> {
match previous {
Some(record) => self
.continuity_store
.upsert_continuity_record(record, grant.fencing_token)
.await
.err(),
None => self
.continuity_store
.delete_continuity_record(identity, grant.fencing_token)
.await
.err(),
}
}
async fn unregister_bridge_session_runtime_states(
bridge: &dyn SessionBridge,
session_ids: &[SessionId],
) -> Option<String> {
let mut errors = Vec::new();
let mut seen = BTreeSet::new();
for session_id in session_ids {
if !seen.insert(session_id.to_string()) {
continue;
}
if let Err(err) = bridge.unregister_session_runtime_state(session_id).await {
errors.push(format!("{session_id}: {err}"));
}
}
(!errors.is_empty()).then(|| errors.join("; "))
}
async fn advance_existing_continuity_fence(
&self,
identity: &AgentIdentity,
entry: &IdentityEntry,
grant: &LeaseGrant,
) -> Result<(), IdentityRuntimeError> {
if let Some(record) = entry.continuity.as_ref() {
self.continuity_store
.upsert_continuity_record(record, grant.fencing_token)
.await
.map_err(IdentityRuntimeError::Store)?;
}
let _ = identity;
Ok(())
}
async fn refresh_existing_session_runtime_state(
&self,
identity: &AgentIdentity,
record: &ContinuityRecord,
grant: &LeaseGrant,
) -> Result<CheckpointVersion, IdentityRuntimeError> {
let Some(bridge) = self.bridge.as_ref() else {
return Ok(record.checkpoint_version);
};
bridge
.register_session_runtime_state(
&record.session_id,
identity,
record.generation,
record.checkpoint_version,
grant.fencing_token,
)
.await
.map_err(|err| {
IdentityRuntimeError::Internal(format!(
"bridge refresh session runtime state: {err}"
))
})
}
pub async fn send(
&self,
identity: &AgentIdentity,
content: &meerkat_core::ContentInput,
) -> Result<FencingToken, IdentityRuntimeError> {
self.send_with_mode(identity, content, HandlingMode::Queue)
.await
}
pub async fn send_with_mode(
&self,
identity: &AgentIdentity,
content: &meerkat_core::ContentInput,
handling_mode: HandlingMode,
) -> Result<FencingToken, IdentityRuntimeError> {
let should_materialize = {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
if entry.spec.addressability == AgentAddressability::InternalOnly {
return Err(IdentityRuntimeError::NotAddressable(NotAddressable {
identity: identity.clone(),
addressability: entry.spec.addressability,
}));
}
entry.state == IdentityLifecycleState::Dormant
|| entry.state == IdentityLifecycleState::Uninitialized
};
if should_materialize {
self.materialize(identity).await?;
}
if handling_mode != HandlingMode::Steer {
self.materialize_reachable_peers(identity).await?;
}
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let (token, runtime_id) = {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
if entry.state != IdentityLifecycleState::Active {
return Err(IdentityRuntimeError::InvalidState {
identity: identity.clone(),
state: entry.state,
operation: "send",
});
}
let token = Self::check_lease(entry)?;
let runtime_id = entry
.continuity
.as_ref()
.map(|c| c.agent_runtime_id.clone());
(token, runtime_id)
};
if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
bridge
.deliver_with_mode(rid, content, handling_mode)
.await
.map_err(|e| IdentityRuntimeError::Internal(format!("bridge deliver: {e}")))?;
}
Ok(token)
}
pub async fn dispatch(
&self,
identity: &AgentIdentity,
input: &DispatchInput,
) -> Result<(FencingToken, bool), IdentityRuntimeError> {
let should_materialize = {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
entry.state == IdentityLifecycleState::Dormant
|| entry.state == IdentityLifecycleState::Uninitialized
};
if should_materialize {
self.materialize(identity).await?;
}
self.materialize_reachable_peers(identity).await?;
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let (token, is_durable, runtime_id) = {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
if entry.state != IdentityLifecycleState::Active {
return Err(IdentityRuntimeError::InvalidState {
identity: identity.clone(),
state: entry.state,
operation: "dispatch",
});
}
let token = Self::check_lease(entry)?;
let is_durable = entry.has_runtime_store;
let runtime_id = entry
.continuity
.as_ref()
.map(|c| c.agent_runtime_id.clone());
(token, is_durable, runtime_id)
};
if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
bridge
.deliver(rid, &input.content)
.await
.map_err(|e| IdentityRuntimeError::Internal(format!("bridge dispatch: {e}")))?;
}
Ok((token, is_durable))
}
pub async fn status(
&self,
identity: &AgentIdentity,
) -> Result<IdentityStatus, IdentityRuntimeError> {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
let lease_info = entry.lease.as_ref().map(|l| LeaseInfo {
fencing_token: l.fencing_token,
ttl_remaining: l.ttl_remaining(),
healthy: l.is_healthy(),
});
let continuity_health = Some(ContinuityHealth {
store_reachable: true, durability_policy: self.durability_policy.clone(),
last_checkpoint_version: if entry.checkpoint_version.get() > 0 {
Some(entry.checkpoint_version)
} else {
None
},
});
Ok(IdentityStatus {
identity: identity.clone(),
state: entry.state,
agent_runtime_id: entry
.continuity
.as_ref()
.map(|c| c.agent_runtime_id.clone()),
session_id: entry.continuity.as_ref().map(|c| c.session_id.clone()),
profile: Some(entry.spec.profile.clone()),
runtime_mode: entry.spec.runtime_mode_override,
addressability: entry.spec.addressability,
display_name: entry.spec.display_name.clone(),
labels: entry.spec.labels.clone(),
generation: entry.continuity.as_ref().map(|c| c.generation),
checkpoint_version: if entry.checkpoint_version.get() > 0 {
Some(entry.checkpoint_version)
} else {
None
},
lease: lease_info,
continuity_health,
})
}
pub async fn statuses(&self) -> Vec<IdentityStatus> {
let identities = self
.entries
.read()
.await
.keys()
.cloned()
.collect::<Vec<_>>();
let mut statuses = Vec::with_capacity(identities.len());
for identity in identities {
if let Ok(status) = self.status(&identity).await {
statuses.push(status);
}
}
statuses
}
pub async fn retire(
&self,
identity: &AgentIdentity,
) -> Result<FencingToken, IdentityRuntimeError> {
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let registered_entry = self
.mark_lifecycle_in_progress(identity, IdentityLifecycleState::Retiring)
.await?;
let _previous_token = match Self::check_lease(®istered_entry) {
Ok(token) => token,
Err(err) => {
self.restore_entry(identity, registered_entry).await;
return Err(err);
}
};
let runtime_id = registered_entry
.continuity
.as_ref()
.map(|c| c.agent_runtime_id.clone());
let session_id = registered_entry
.continuity
.as_ref()
.map(|c| c.session_id.clone());
let acquire_result = match self
.lease_provider
.acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
.await
{
Ok(result) => result,
Err(err) => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::Lease(err));
}
};
let grant = match acquire_result.get(identity) {
Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
_ => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
}
};
if let Err(err) = self
.advance_existing_continuity_fence(identity, ®istered_entry, &grant)
.await
{
let mut broken_entry = registered_entry;
broken_entry.state = IdentityLifecycleState::Broken;
broken_entry.lease = None;
self.restore_entry(identity, broken_entry).await;
return Err(err);
}
if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id)
&& let Err(err) = bridge.retire_member(rid).await
{
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Internal(format!(
"bridge retire: {err}"
)));
}
if let (Some(bridge), Some(session_id)) = (&self.bridge, &session_id)
&& let Err(err) = bridge.unregister_session_runtime_state(session_id).await
{
let mut broken_entry = registered_entry;
broken_entry.state = IdentityLifecycleState::Broken;
broken_entry.lease = None;
self.restore_entry(identity, broken_entry).await;
return Err(IdentityRuntimeError::Internal(format!(
"bridge unregister retired session: {err}"
)));
}
Ok(grant.fencing_token)
}
pub async fn respawn(
&self,
identity: &AgentIdentity,
) -> Result<ContinuityRecord, IdentityRuntimeError> {
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let registered_entry = self
.mark_lifecycle_in_progress(identity, IdentityLifecycleState::Suspended)
.await?;
let acquire_result = match self
.lease_provider
.acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
.await
{
Ok(result) => result,
Err(err) => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::Lease(err));
}
};
let grant = match acquire_result.get(identity) {
Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
_ => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
}
};
if let Err(err) = self
.advance_existing_continuity_fence(identity, ®istered_entry, &grant)
.await
{
let mut broken_entry = registered_entry;
broken_entry.state = IdentityLifecycleState::Broken;
broken_entry.lease = None;
self.restore_entry(identity, broken_entry).await;
return Err(err);
}
let resolved = match self
.continuity_store
.resolve_many(std::slice::from_ref(identity))
.await
{
Ok(resolved) => resolved,
Err(err) => {
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Store(err));
}
};
let record = match resolved.get(identity) {
Some(super::types::ContinuityResolveState::Ready { record }) => record.clone(),
Some(super::types::ContinuityResolveState::Broken { failure }) => {
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Internal(format!(
"broken continuity for {identity}: {}",
failure.detail
)));
}
Some(super::types::ContinuityResolveState::Uninitialized) => {
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Internal(format!(
"cannot respawn uninitialized identity {identity}"
)));
}
None => {
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Store(
ContinuityStoreError::NotFound {
identity: identity.clone(),
},
));
}
};
let effective_checkpoint_version = match self
.refresh_existing_session_runtime_state(identity, &record, &grant)
.await
{
Ok(version) => version,
Err(err) => {
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(err);
}
};
let mut record = record;
record.checkpoint_version = effective_checkpoint_version;
let mut entries = self.entries.write().await;
if let Some(entry) = entries.get_mut(identity) {
entry.continuity = Some(record.clone());
entry.lease = Some(LeaseEntry {
fencing_token: grant.fencing_token,
ttl: grant.ttl,
acquired_at: Instant::now(),
});
entry.state = IdentityLifecycleState::Active;
entry.checkpoint_version = record.checkpoint_version;
}
Ok(record)
}
pub async fn rebind_session_after_live_respawn(
&self,
identity: &AgentIdentity,
session_id: SessionId,
) -> Result<ContinuityRecord, IdentityRuntimeError> {
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let registered_entry = self
.mark_lifecycle_in_progress(identity, IdentityLifecycleState::Suspended)
.await?;
let acquire_result = match self
.lease_provider
.acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
.await
{
Ok(result) => result,
Err(err) => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::Lease(err));
}
};
let grant = match acquire_result.get(identity) {
Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
_ => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
}
};
let mut record = match registered_entry.continuity.as_ref() {
Some(record) => record.clone(),
None => {
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::UnknownIdentity(identity.clone()));
}
};
if let Err(err) = self
.advance_existing_continuity_fence(identity, ®istered_entry, &grant)
.await
{
if let Some(bridge) = self.bridge.as_ref()
&& let Err(unregister_err) =
bridge.unregister_session_runtime_state(&session_id).await
{
tracing::warn!(
%identity,
session_id = %session_id,
error = %unregister_err,
"failed to unregister rebound session after continuity fence failure"
);
}
let mut broken_entry = registered_entry;
broken_entry.state = IdentityLifecycleState::Broken;
broken_entry.lease = None;
self.restore_entry(identity, broken_entry).await;
return Err(err);
}
let previous_session_id = record.session_id.clone();
record.session_id = session_id;
record.checkpoint_version = CheckpointVersion::new(0);
if let Err(err) = self
.continuity_store
.upsert_continuity_record(&record, grant.fencing_token)
.await
{
if let Some(bridge) = self.bridge.as_ref()
&& let Err(unregister_err) = bridge
.unregister_session_runtime_state(&record.session_id)
.await
{
tracing::warn!(
%identity,
session_id = %record.session_id,
error = %unregister_err,
"failed to unregister rebound session after continuity upsert failure"
);
}
self.mark_rebind_failure_broken(identity, registered_entry, &grant, &record)
.await;
return Err(IdentityRuntimeError::Store(err));
}
if let Some(bridge) = self.bridge.as_ref() {
match bridge
.register_session_runtime_state(
&record.session_id,
identity,
record.generation,
record.checkpoint_version,
grant.fencing_token,
)
.await
{
Ok(version) => record.checkpoint_version = version,
Err(err) => {
if let Err(unregister_err) = bridge
.unregister_session_runtime_state(&record.session_id)
.await
{
tracing::warn!(
%identity,
session_id = %record.session_id,
error = %unregister_err,
"failed to unregister rebound session after bridge register failure"
);
}
self.mark_rebind_failure_broken(identity, registered_entry, &grant, &record)
.await;
return Err(IdentityRuntimeError::Internal(format!(
"bridge rebind respawned session runtime state: {err}"
)));
}
}
if previous_session_id != record.session_id
&& let Err(err) = bridge
.unregister_session_runtime_state(&previous_session_id)
.await
{
tracing::warn!(
%identity,
session_id = %previous_session_id,
error = %err,
"failed to unregister previous session after live respawn rebind"
);
}
}
if let Err(err) = self
.continuity_store
.upsert_continuity_record(&record, grant.fencing_token)
.await
{
if let Some(bridge) = self.bridge.as_ref()
&& let Err(unregister_err) = bridge
.unregister_session_runtime_state(&record.session_id)
.await
{
tracing::warn!(
%identity,
session_id = %record.session_id,
error = %unregister_err,
"failed to unregister rebound session after final continuity upsert failure"
);
}
self.mark_rebind_failure_broken(identity, registered_entry, &grant, &record)
.await;
return Err(IdentityRuntimeError::Store(err));
}
self.register(
registered_entry.spec,
IdentityLifecycleState::Active,
Some(record.clone()),
Some(grant),
)
.await;
Ok(record)
}
pub async fn reset(
&self,
identity: &AgentIdentity,
) -> Result<ContinuityRecord, IdentityRuntimeError> {
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let registered_entry = self
.mark_lifecycle_in_progress(identity, IdentityLifecycleState::Suspended)
.await?;
let acquire_result = match self
.lease_provider
.acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
.await
{
Ok(result) => result,
Err(err) => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::Lease(err));
}
};
let grant = match acquire_result.get(identity) {
Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
_ => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
}
};
if let Err(err) = self
.advance_existing_continuity_fence(identity, ®istered_entry, &grant)
.await
{
let mut broken_entry = registered_entry;
broken_entry.state = IdentityLifecycleState::Broken;
broken_entry.lease = None;
self.restore_entry(identity, broken_entry).await;
return Err(err);
}
let resolved = match self
.continuity_store
.resolve_many(std::slice::from_ref(identity))
.await
{
Ok(resolved) => resolved,
Err(err) => {
self.restore_entry_with_grant(identity, registered_entry.clone(), &grant)
.await;
return Err(IdentityRuntimeError::Store(err));
}
};
let current_gen = match resolved.get(identity) {
Some(super::types::ContinuityResolveState::Ready { record }) => record.generation,
Some(super::types::ContinuityResolveState::Uninitialized) => {
ContinuityGeneration::new(0)
}
_ => ContinuityGeneration::new(0),
};
let new_gen = ContinuityGeneration::new(current_gen.get() + 1);
let new_session_id = meerkat_core::types::SessionId::new();
let new_runtime_id = AgentRuntimeId::parse(&format!("rt:{identity}:{}", new_gen.get()))
.map_err(|e| {
IdentityRuntimeError::Internal(format!("failed to mint runtime id: {e}"))
})?;
let new_record = ContinuityRecord {
identity: identity.clone(),
agent_runtime_id: new_runtime_id,
session_id: new_session_id,
generation: new_gen,
checkpoint_version: CheckpointVersion::new(0),
};
if let Some(bridge) = &self.bridge {
if let Err(err) = self
.continuity_store
.upsert_continuity_record(&new_record, grant.fencing_token)
.await
{
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Store(err));
}
let old_runtime_id = registered_entry
.continuity
.as_ref()
.map(|c| c.agent_runtime_id.clone());
let old_session_id = registered_entry
.continuity
.as_ref()
.map(|c| c.session_id.clone());
let spec = registered_entry.spec.clone();
let draft = super::types::AgentBuildDraft {
model: None,
system_prompt: None,
additional_instructions: spec.additional_instructions.clone(),
labels: spec.labels.clone(),
app_context: spec.context.clone(),
external_tools: Vec::new(),
local_external_tools: Default::default(),
};
let session_id = bridge
.create_session(
identity,
&new_record.agent_runtime_id,
&spec,
&draft,
&new_record.session_id,
)
.await
.map_err(|e| {
IdentityRuntimeError::Internal(format!(
"bridge create_session after reset: {e}"
))
});
let session_id = match session_id {
Ok(session_id) => session_id,
Err(err) => {
let cleanup_error = bridge
.retire_member(&new_record.agent_runtime_id)
.await
.err();
let delete_error = self
.restore_entry_after_reset_bridge_failure(
identity,
registered_entry.clone(),
&grant,
cleanup_error.is_some(),
)
.await;
if cleanup_error.is_some() || delete_error.is_some() {
return Err(IdentityRuntimeError::Internal(format!(
"{err}{}{}",
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
delete_error
.as_ref()
.map(|e| format!("; tentative continuity cleanup failed: {e}"))
.unwrap_or_default()
)));
}
return Err(err);
}
};
let mut new_record = new_record;
new_record.session_id = session_id;
if let Err(err) = self
.continuity_store
.upsert_continuity_record(&new_record, grant.fencing_token)
.await
{
let unregister_error = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
std::slice::from_ref(&new_record.session_id),
)
.await;
let cleanup_error = bridge
.retire_member(&new_record.agent_runtime_id)
.await
.err();
let delete_error = self
.restore_entry_after_reset_bridge_failure(
identity,
registered_entry.clone(),
&grant,
unregister_error.is_some() || cleanup_error.is_some(),
)
.await;
if unregister_error.is_some() || cleanup_error.is_some() || delete_error.is_some() {
return Err(IdentityRuntimeError::Internal(format!(
"continuity upsert actual session after reset: {err}{}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
delete_error
.as_ref()
.map(|e| format!("; tentative continuity cleanup failed: {e}"))
.unwrap_or_default(),
)));
}
return Err(IdentityRuntimeError::Store(err));
}
let register_result = bridge
.register_session_runtime_state(
&new_record.session_id,
identity,
new_record.generation,
new_record.checkpoint_version,
grant.fencing_token,
)
.await;
let effective_checkpoint_version = match register_result {
Ok(version) => version,
Err(err) => {
let unregister_error = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
std::slice::from_ref(&new_record.session_id),
)
.await;
let cleanup_error = bridge
.retire_member(&new_record.agent_runtime_id)
.await
.err();
let mut detail =
format!("bridge register actual session runtime state after reset: {err}");
if let Some(unregister_error) = unregister_error.as_ref() {
detail
.push_str(&format!("; unregister session failed: {unregister_error}"));
}
if let Some(cleanup_error) = cleanup_error.as_ref() {
detail.push_str(&format!("; cleanup retire failed: {cleanup_error}"));
}
let delete_error = self
.restore_entry_after_reset_bridge_failure(
identity,
registered_entry.clone(),
&grant,
unregister_error.is_some() || cleanup_error.is_some(),
)
.await;
if let Some(delete_error) = delete_error {
return Err(IdentityRuntimeError::Internal(format!(
"{detail}; tentative continuity cleanup failed: {delete_error}"
)));
}
return Err(IdentityRuntimeError::Internal(detail));
}
};
new_record.checkpoint_version = effective_checkpoint_version;
if let Some(old_id) = old_runtime_id.as_ref()
&& old_id != &new_record.agent_runtime_id
{
if let Err(err) = bridge.retire_member(old_id).await {
let unregister_error = bridge
.unregister_session_runtime_state(&new_record.session_id)
.await
.err();
let cleanup_error = bridge
.retire_member(&new_record.agent_runtime_id)
.await
.err();
self.restore_broken_entry_with_fenced_store(
identity,
registered_entry.clone(),
&grant,
)
.await;
let detail = format!(
"bridge retire old member after reset: {err}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
);
return Err(IdentityRuntimeError::Internal(detail));
}
if let Some(old_session_id) = old_session_id.as_ref()
&& old_session_id != &new_record.session_id
&& let Err(err) = bridge
.unregister_session_runtime_state(old_session_id)
.await
{
let unregister_error = bridge
.unregister_session_runtime_state(&new_record.session_id)
.await
.err();
let cleanup_error = bridge
.retire_member(&new_record.agent_runtime_id)
.await
.err();
self.restore_broken_entry_with_fenced_store(
identity,
registered_entry.clone(),
&grant,
)
.await;
let detail = format!(
"bridge unregister old session after reset: {err}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister new session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
);
return Err(IdentityRuntimeError::Internal(detail));
}
}
if let Err(err) = self
.continuity_store
.upsert_continuity_record(&new_record, grant.fencing_token)
.await
{
let unregister_error = bridge
.unregister_session_runtime_state(&new_record.session_id)
.await
.err();
let cleanup_error = bridge
.retire_member(&new_record.agent_runtime_id)
.await
.err();
let rollback_error = self
.restore_continuity_after_materialize_failure(
identity,
registered_entry.continuity.as_ref(),
&grant,
)
.await;
let mut entries = self.entries.write().await;
if let Some(entry) = entries.get_mut(identity) {
entry.state = IdentityLifecycleState::Broken;
}
if unregister_error.is_some() || cleanup_error.is_some() || rollback_error.is_some()
{
return Err(IdentityRuntimeError::Internal(format!(
"continuity upsert after reset: {err}{}{}{}",
unregister_error
.as_ref()
.map(|e| format!("; unregister session failed: {e}"))
.unwrap_or_default(),
cleanup_error
.as_ref()
.map(|e| format!("; cleanup retire failed: {e}"))
.unwrap_or_default(),
rollback_error
.as_ref()
.map(|e| format!("; continuity rollback failed: {e}"))
.unwrap_or_default()
)));
}
return Err(IdentityRuntimeError::Store(err));
}
let mut entries = self.entries.write().await;
let Some(entry) = entries.get_mut(identity) else {
let _ = bridge
.unregister_session_runtime_state(&new_record.session_id)
.await;
let _ = bridge.retire_member(&new_record.agent_runtime_id).await;
if registered_entry.continuity.is_none() {
let _ = self
.continuity_store
.delete_continuity_record(identity, grant.fencing_token)
.await;
}
return Err(IdentityRuntimeError::UnknownIdentity(identity.clone()));
};
entry.continuity = Some(new_record.clone());
entry.lease = Some(Self::lease_entry_from_grant(&grant));
entry.state = IdentityLifecycleState::Active;
entry.checkpoint_version = new_record.checkpoint_version;
return Ok(new_record);
}
if let Err(err) = self
.continuity_store
.upsert_continuity_record(&new_record, grant.fencing_token)
.await
{
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Store(err));
}
let mut entries = self.entries.write().await;
let entry = entries
.get_mut(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
entry.continuity = Some(new_record.clone());
entry.lease = Some(Self::lease_entry_from_grant(&grant));
entry.state = IdentityLifecycleState::Active;
entry.checkpoint_version = CheckpointVersion::new(0);
Ok(new_record)
}
pub async fn delete_identity(
&self,
identity: &AgentIdentity,
) -> Result<(), IdentityRuntimeError> {
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let registered_entry = self
.mark_lifecycle_in_progress(identity, IdentityLifecycleState::Retiring)
.await?;
let runtime_id = registered_entry
.continuity
.as_ref()
.map(|c| c.agent_runtime_id.clone());
let session_id = registered_entry
.continuity
.as_ref()
.map(|c| c.session_id.clone());
let acquire_result = match self
.lease_provider
.acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
.await
{
Ok(result) => result,
Err(err) => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::Lease(err));
}
};
let grant = match acquire_result.get(identity) {
Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
_ => {
self.restore_entry(identity, registered_entry).await;
return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
}
};
if let Err(err) = self
.advance_existing_continuity_fence(identity, ®istered_entry, &grant)
.await
{
let mut broken_entry = registered_entry;
broken_entry.state = IdentityLifecycleState::Broken;
broken_entry.lease = None;
self.restore_entry(identity, broken_entry).await;
return Err(err);
}
if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id)
&& let Err(err) = bridge.retire_member(rid).await
{
self.restore_entry_with_grant(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Internal(format!(
"bridge retire before delete: {err}"
)));
}
if let (Some(bridge), Some(session_id)) = (&self.bridge, &session_id)
&& let Some(err) = Self::unregister_bridge_session_runtime_states(
bridge.as_ref(),
std::slice::from_ref(session_id),
)
.await
{
self.restore_broken_entry_with_fenced_store(identity, registered_entry, &grant)
.await;
return Err(IdentityRuntimeError::Internal(format!(
"bridge unregister session before delete: {err}"
)));
}
if let Err(err) = self
.continuity_store
.delete_continuity_record(identity, grant.fencing_token)
.await
{
let mut entries = self.entries.write().await;
if let Some(entry) = entries.get_mut(identity) {
entry.state = IdentityLifecycleState::Broken;
}
return Err(IdentityRuntimeError::Store(err));
}
self.event_channels.write().await.remove(identity);
self.entries.write().await.remove(identity);
Ok(())
}
pub async fn checkpoint(
&self,
identity: &AgentIdentity,
snapshot: &SessionSnapshot,
) -> Result<CheckpointVersion, IdentityRuntimeError> {
let lifecycle_lock = self.lifecycle_lock_for(identity).await;
let _lifecycle_guard = lifecycle_lock.lock().await;
let (record, token, new_version) = {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
if entry.state != IdentityLifecycleState::Active {
return Err(IdentityRuntimeError::InvalidState {
identity: identity.clone(),
state: entry.state,
operation: "checkpoint",
});
}
let token = Self::check_lease(entry)?;
let record = entry
.continuity
.as_ref()
.ok_or_else(|| {
IdentityRuntimeError::Internal(format!("no continuity record for {identity}"))
})?
.clone();
let new_version = CheckpointVersion::new(entry.checkpoint_version.get() + 1);
(record, token, new_version)
};
self.continuity_store
.save_session_snapshot(
identity,
&record.session_id,
record.generation,
new_version,
token,
snapshot,
)
.await?;
{
let mut entries = self.entries.write().await;
if let Some(entry) = entries.get_mut(identity) {
entry.checkpoint_version = new_version;
}
}
self.emit_event(
identity,
IdentityEvent::CheckpointCompleted {
identity: identity.clone(),
version: new_version,
},
)
.await;
Ok(new_version)
}
pub async fn roster_inspect(
&self,
) -> BTreeMap<AgentIdentity, (DurableAgentSpec, IdentityStatus)> {
let entries = self.entries.read().await;
let mut result = BTreeMap::new();
for (identity, entry) in entries.iter() {
let lease_info = entry.lease.as_ref().map(|l| LeaseInfo {
fencing_token: l.fencing_token,
ttl_remaining: l.ttl_remaining(),
healthy: l.is_healthy(),
});
let continuity_health = Some(ContinuityHealth {
store_reachable: true,
durability_policy: self.durability_policy.clone(),
last_checkpoint_version: if entry.checkpoint_version.get() > 0 {
Some(entry.checkpoint_version)
} else {
None
},
});
let status = IdentityStatus {
identity: identity.clone(),
state: entry.state,
agent_runtime_id: entry
.continuity
.as_ref()
.map(|c| c.agent_runtime_id.clone()),
session_id: entry.continuity.as_ref().map(|c| c.session_id.clone()),
profile: Some(entry.spec.profile.clone()),
runtime_mode: entry.spec.runtime_mode_override,
addressability: entry.spec.addressability,
display_name: entry.spec.display_name.clone(),
labels: entry.spec.labels.clone(),
generation: entry.continuity.as_ref().map(|c| c.generation),
checkpoint_version: if entry.checkpoint_version.get() > 0 {
Some(entry.checkpoint_version)
} else {
None
},
lease: lease_info,
continuity_health,
};
result.insert(identity.clone(), (entry.spec.clone(), status));
}
result
}
pub fn validate_roster_uniqueness(
specs: &[DurableAgentSpec],
) -> Result<(), IdentityRuntimeError> {
let mut seen = std::collections::BTreeSet::new();
for spec in specs {
if !seen.insert(&spec.identity) {
return Err(IdentityRuntimeError::DuplicateIdentity(
spec.identity.clone(),
));
}
}
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn entries(&self) -> BTreeMap<AgentIdentity, IdentityEntry> {
self.entries.read().await.clone()
}
pub async fn contains(&self, identity: &AgentIdentity) -> bool {
self.entries.read().await.contains_key(identity)
}
pub async fn is_active(&self, identity: &AgentIdentity) -> bool {
self.entries
.read()
.await
.get(identity)
.is_some_and(|e| e.state == IdentityLifecycleState::Active)
}
pub fn continuity_store(&self) -> &Arc<dyn ContinuityStore> {
&self.continuity_store
}
pub fn lease_provider(&self) -> &Arc<dyn LeaseProvider> {
&self.lease_provider
}
pub fn runtime_instance_id(&self) -> &str {
&self.runtime_instance_id
}
pub fn durability_policy(&self) -> &DurabilityPolicy {
&self.durability_policy
}
pub fn has_runtime_store(&self) -> bool {
self.has_runtime_store
}
pub fn bridge(&self) -> Option<&Arc<dyn SessionBridge>> {
self.bridge.as_ref()
}
pub async fn send_text(
&self,
identity: &AgentIdentity,
text: impl Into<String>,
) -> Result<FencingToken, IdentityRuntimeError> {
self.send(identity, &meerkat_core::ContentInput::Text(text.into()))
.await
}
pub async fn dispatch_text(
&self,
identity: &AgentIdentity,
text: impl Into<String>,
) -> Result<(FencingToken, bool), IdentityRuntimeError> {
self.dispatch(identity, &DispatchInput::system(text)).await
}
pub async fn restore_flow(
&self,
roster: &[DurableAgentSpec],
topology_provider: Option<&dyn super::contracts::TopologyProvider>,
customizer: Option<&dyn super::contracts::AgentCustomizer>,
) -> Result<super::orchestrator::RestoreFlowResult, IdentityRuntimeError> {
super::orchestrator::restore_flow(self, roster, topology_provider, customizer).await
}
pub async fn runtime_id_for(
&self,
identity: &AgentIdentity,
) -> Result<AgentRuntimeId, IdentityRuntimeError> {
let entries = self.entries.read().await;
let entry = entries
.get(identity)
.ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
entry
.continuity
.as_ref()
.map(|c| c.agent_runtime_id.clone())
.ok_or_else(|| {
IdentityRuntimeError::Internal(format!("no continuity record for {identity}"))
})
}
pub async fn inspect(
&self,
identity: &AgentIdentity,
) -> Result<super::bridge::MemberInspection, IdentityRuntimeError> {
let runtime_id = self.runtime_id_for(identity).await?;
let bridge = self
.bridge
.as_ref()
.ok_or_else(|| IdentityRuntimeError::Internal("no bridge configured".to_string()))?;
bridge
.inspect_member(&runtime_id)
.await
.map_err(|e| IdentityRuntimeError::Internal(format!("inspect: {e}")))
}
pub fn default_timeout(&self) -> Duration {
self.default_timeout
}
pub async fn wait_for_output(
&self,
identity: &AgentIdentity,
timeout: Duration,
) -> Result<String, IdentityRuntimeError> {
let deadline = Instant::now() + timeout;
loop {
if let Ok(inspection) = self.inspect(identity).await
&& let Some(preview) = inspection.output_preview
{
return Ok(preview);
}
if Instant::now() >= deadline {
return Err(IdentityRuntimeError::Internal(format!(
"timed out waiting for output from {identity}"
)));
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
pub async fn wait_for_output_containing(
&self,
identity: &AgentIdentity,
needle: &str,
timeout: Duration,
) -> Result<String, IdentityRuntimeError> {
let deadline = Instant::now() + timeout;
loop {
if let Ok(inspection) = self.inspect(identity).await
&& let Some(ref preview) = inspection.output_preview
&& preview.contains(needle)
{
return Ok(preview.clone());
}
if Instant::now() >= deadline {
return Err(IdentityRuntimeError::Internal(format!(
"timed out waiting for output containing '{needle}' from {identity}"
)));
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
pub async fn wire_cross_mob_by_identity(
local_irt: &IdentityRuntime,
local_identity: &AgentIdentity,
remote_irt: &IdentityRuntime,
remote_identity: &AgentIdentity,
local_unified: &crate::UnifiedRuntime,
remote_mob_id: &str,
) -> Result<(), IdentityRuntimeError> {
let local_rt = local_irt.runtime_id_for(local_identity).await?;
let remote_rt = remote_irt.runtime_id_for(remote_identity).await?;
local_unified
.wire_cross_mob(local_rt.as_str(), remote_rt.as_str(), remote_mob_id)
.await
.map_err(|e| IdentityRuntimeError::Internal(format!("wire_cross_mob: {e}")))
}