Skip to main content

meerkat_mobkit/identity_first/
runtime.rs

1//! Identity-first runtime: delivery, status, lifecycle, and ownership enforcement.
2//!
3//! This module implements the behavioral core of identity-first continuity:
4//! - Delivery: `send()` and `dispatch()` with addressability and lease enforcement
5//! - Status: `status()` returning `IdentityStatus`
6//! - Lifecycle: `retire()`, `respawn()`, `reset()`, `delete_identity()`
7//! - Ownership: lease tracking, fencing, and invariant enforcement
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use futures::stream::{self, StreamExt};
14use tokio::sync::{RwLock, broadcast};
15
16use super::bridge::SessionBridge;
17use super::contracts::{
18    AgentCustomizer, ContinuityStore, LeaseProvider, RosterProvider, TopologyProvider,
19};
20use super::types::{
21    AgentAddressability, AgentIdentity, AgentRuntimeId, AgentRuntimeServices, CheckpointVersion,
22    ContinuityGeneration, ContinuityHealth, ContinuityRecord, ContinuityStoreError, DispatchInput,
23    DurabilityPolicy, DurableAgentSpec, FencingToken, IdentityLifecycleState, IdentityStatus,
24    LeaseGrant, LeaseInfo, ManagedPeerEdge, NotAddressable, RosterContext, SessionSnapshot,
25};
26
27const MANAGED_PEER_RECONCILE_CONCURRENCY: usize = 64;
28
29// ---------------------------------------------------------------------------
30// Error types
31// ---------------------------------------------------------------------------
32
33/// Errors from identity-first runtime operations.
34#[derive(Debug)]
35pub enum IdentityRuntimeError {
36    /// Target identity is not registered/active.
37    UnknownIdentity(AgentIdentity),
38    /// send() rejected: target is InternalOnly.
39    NotAddressable(NotAddressable),
40    /// Operation rejected: no active lease for this identity.
41    NoActiveLease(AgentIdentity),
42    /// Operation rejected: lease was lost.
43    LeaseLost(AgentIdentity),
44    /// Operation rejected: identity is not in a state that permits this operation.
45    InvalidState {
46        identity: AgentIdentity,
47        state: IdentityLifecycleState,
48        operation: &'static str,
49    },
50    /// Continuity store error.
51    Store(ContinuityStoreError),
52    /// Lease provider error.
53    Lease(super::types::LeaseError),
54    /// Duplicate identities in roster.
55    DuplicateIdentity(AgentIdentity),
56    /// Stale fencing token on checkpoint.
57    StaleFencingToken {
58        identity: AgentIdentity,
59        presented: FencingToken,
60        current: FencingToken,
61    },
62    /// Stale checkpoint version.
63    StaleCheckpointVersion {
64        identity: AgentIdentity,
65        presented: CheckpointVersion,
66        current: CheckpointVersion,
67    },
68    /// Generic I/O or internal error.
69    Internal(String),
70}
71
72impl std::fmt::Display for IdentityRuntimeError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        match self {
75            Self::UnknownIdentity(id) => write!(f, "unknown identity: {id}"),
76            Self::NotAddressable(err) => write!(f, "{err}"),
77            Self::NoActiveLease(id) => write!(f, "no active lease for {id}"),
78            Self::LeaseLost(id) => write!(f, "lease lost for {id}"),
79            Self::InvalidState {
80                identity,
81                state,
82                operation,
83            } => write!(
84                f,
85                "cannot {operation} identity {identity} in state {state:?}"
86            ),
87            Self::Store(err) => write!(f, "continuity store: {err}"),
88            Self::Lease(err) => write!(f, "lease provider: {err}"),
89            Self::DuplicateIdentity(id) => write!(f, "duplicate identity in roster: {id}"),
90            Self::StaleFencingToken {
91                identity,
92                presented,
93                current,
94            } => write!(
95                f,
96                "stale fencing token for {identity}: presented {presented}, current {current}"
97            ),
98            Self::StaleCheckpointVersion {
99                identity,
100                presented,
101                current,
102            } => write!(
103                f,
104                "stale checkpoint version for {identity}: presented {presented}, current {current}"
105            ),
106            Self::Internal(msg) => write!(f, "internal: {msg}"),
107        }
108    }
109}
110
111impl std::error::Error for IdentityRuntimeError {}
112
113impl From<ContinuityStoreError> for IdentityRuntimeError {
114    fn from(err: ContinuityStoreError) -> Self {
115        match err {
116            ContinuityStoreError::StaleFencingToken {
117                identity,
118                presented,
119                current,
120            } => Self::StaleFencingToken {
121                identity,
122                presented,
123                current,
124            },
125            ContinuityStoreError::StaleCheckpointVersion {
126                identity,
127                presented,
128                current,
129            } => Self::StaleCheckpointVersion {
130                identity,
131                presented,
132                current,
133            },
134            other => Self::Store(other),
135        }
136    }
137}
138
139// ---------------------------------------------------------------------------
140// Per-identity runtime state
141// ---------------------------------------------------------------------------
142
143/// Tracks the live state for a single identity within the runtime.
144#[derive(Debug, Clone)]
145pub(crate) struct IdentityEntry {
146    pub spec: DurableAgentSpec,
147    pub state: IdentityLifecycleState,
148    pub continuity: Option<ContinuityRecord>,
149    pub lease: Option<LeaseEntry>,
150    pub checkpoint_version: CheckpointVersion,
151    /// Whether a durable runtime_store is available (affects dispatch ack semantics).
152    pub has_runtime_store: bool,
153}
154
155/// Tracks a held lease for an identity.
156#[derive(Debug, Clone)]
157pub(crate) struct LeaseEntry {
158    pub fencing_token: FencingToken,
159    pub ttl: Duration,
160    pub acquired_at: Instant,
161}
162
163impl LeaseEntry {
164    pub fn is_expired(&self) -> bool {
165        self.acquired_at.elapsed() > self.ttl
166    }
167
168    pub fn ttl_remaining(&self) -> Duration {
169        self.ttl.saturating_sub(self.acquired_at.elapsed())
170    }
171
172    pub fn is_healthy(&self) -> bool {
173        // Healthy if more than 20% TTL remains
174        let remaining = self.ttl_remaining();
175        remaining > self.ttl / 5
176    }
177}
178
179// ---------------------------------------------------------------------------
180// Identity-scoped events
181// ---------------------------------------------------------------------------
182
183/// Events emitted for a specific identity, used by `subscribe()`.
184#[derive(Debug, Clone)]
185pub enum IdentityEvent {
186    /// Lifecycle state changed.
187    StateChanged {
188        identity: AgentIdentity,
189        new_state: IdentityLifecycleState,
190    },
191    /// Lease acquired or renewed.
192    LeaseUpdated {
193        identity: AgentIdentity,
194        fencing_token: FencingToken,
195    },
196    /// Lease lost.
197    LeaseLost { identity: AgentIdentity },
198    /// Checkpoint completed.
199    CheckpointCompleted {
200        identity: AgentIdentity,
201        version: CheckpointVersion,
202    },
203}
204
205/// Per-identity event channel capacity.
206const IDENTITY_EVENT_CHANNEL_CAPACITY: usize = 64;
207
208// ---------------------------------------------------------------------------
209// IdentityRuntime
210// ---------------------------------------------------------------------------
211
212/// Configuration for the identity-first runtime.
213pub struct IdentityRuntimeConfig {
214    pub continuity_store: Arc<dyn ContinuityStore>,
215    pub lease_provider: Arc<dyn LeaseProvider>,
216    pub runtime_instance_id: String,
217    pub has_runtime_store: bool,
218    pub durability_policy: DurabilityPolicy,
219    /// Optional session bridge for real session delivery. When `None`,
220    /// delivery operations validate invariants but do not forward to
221    /// the Meerkat session pipeline (useful for tests).
222    pub bridge: Option<Arc<dyn SessionBridge>>,
223    /// Default timeout for wait_for_output / wait_for_output_containing.
224    /// Defaults to 90 seconds if not set.
225    pub default_timeout: Option<Duration>,
226}
227
228#[derive(Clone)]
229pub struct IdentityFirstRuntimeContext {
230    pub runtime: Arc<IdentityRuntime>,
231    pub roster_provider: Arc<dyn RosterProvider>,
232    pub topology_provider: Option<Arc<dyn TopologyProvider>>,
233    pub customizer: Option<Arc<dyn AgentCustomizer>>,
234    mob_definition: Option<meerkat_mob::MobDefinition>,
235}
236
237impl IdentityFirstRuntimeContext {
238    pub fn new(
239        runtime: Arc<IdentityRuntime>,
240        roster_provider: Arc<dyn RosterProvider>,
241        topology_provider: Option<Arc<dyn TopologyProvider>>,
242        customizer: Option<Arc<dyn AgentCustomizer>>,
243        mob_definition: Option<meerkat_mob::MobDefinition>,
244    ) -> Self {
245        Self {
246            runtime,
247            roster_provider,
248            topology_provider,
249            customizer,
250            mob_definition,
251        }
252    }
253
254    pub async fn refresh_desired_topology(
255        &self,
256    ) -> Result<super::orchestrator::RestoreFlowResult, IdentityRuntimeError> {
257        let roster = self
258            .roster_provider
259            .roster(&RosterContext {
260                mob_definition: self.mob_definition.clone(),
261                previous_identities: Vec::new(),
262            })
263            .await
264            .map_err(|err| IdentityRuntimeError::Internal(format!("roster provider: {err}")))?;
265
266        super::orchestrator::restore_flow(
267            &self.runtime,
268            &roster,
269            self.topology_provider.as_deref(),
270            self.customizer.as_deref(),
271        )
272        .await
273    }
274}
275
276/// The identity-first runtime tracks active identities and enforces delivery,
277/// ownership, and lifecycle invariants.
278pub struct IdentityRuntime {
279    entries: RwLock<BTreeMap<AgentIdentity, IdentityEntry>>,
280    event_channels: RwLock<BTreeMap<AgentIdentity, broadcast::Sender<IdentityEvent>>>,
281    continuity_store: Arc<dyn ContinuityStore>,
282    lease_provider: Arc<dyn LeaseProvider>,
283    runtime_instance_id: String,
284    has_runtime_store: bool,
285    durability_policy: DurabilityPolicy,
286    bridge: Option<Arc<dyn SessionBridge>>,
287    runtime_services: AgentRuntimeServices,
288    managed_peer_edges: RwLock<BTreeSet<(AgentIdentity, AgentIdentity)>>,
289    default_timeout: Duration,
290}
291
292impl IdentityRuntime {
293    /// Create a new identity runtime with the given configuration.
294    pub fn new(config: IdentityRuntimeConfig) -> Self {
295        Self {
296            entries: RwLock::new(BTreeMap::new()),
297            event_channels: RwLock::new(BTreeMap::new()),
298            continuity_store: config.continuity_store,
299            lease_provider: config.lease_provider,
300            runtime_instance_id: config.runtime_instance_id,
301            has_runtime_store: config.has_runtime_store,
302            durability_policy: config.durability_policy,
303            bridge: config.bridge,
304            runtime_services: AgentRuntimeServices::empty(),
305            managed_peer_edges: RwLock::new(BTreeSet::new()),
306            default_timeout: config.default_timeout.unwrap_or(Duration::from_secs(90)),
307        }
308    }
309
310    pub fn with_runtime_services(mut self, runtime_services: AgentRuntimeServices) -> Self {
311        self.runtime_services = runtime_services;
312        self
313    }
314
315    pub(crate) fn runtime_services(&self) -> AgentRuntimeServices {
316        self.runtime_services.clone()
317    }
318
319    #[must_use]
320    pub fn has_session_bridge(&self) -> bool {
321        self.bridge.is_some()
322    }
323
324    /// Apply identity-first managed topology to the concrete mob graph.
325    ///
326    /// Topology providers return stable logical identities. The mob comms graph
327    /// is keyed by active runtime member IDs, so this resolves each endpoint
328    /// through continuity records before calling the same-mob bridge wire APIs.
329    pub async fn reconcile_managed_peer_edges(
330        &self,
331        desired_edges: &[ManagedPeerEdge],
332    ) -> Result<(), IdentityRuntimeError> {
333        let Some(bridge) = self.bridge.clone() else {
334            return Ok(());
335        };
336
337        let active_runtimes: BTreeMap<AgentIdentity, AgentRuntimeId> = {
338            let entries = self.entries.read().await;
339            entries
340                .iter()
341                .filter_map(|(identity, entry)| {
342                    if entry.state != IdentityLifecycleState::Active {
343                        return None;
344                    }
345                    entry
346                        .continuity
347                        .as_ref()
348                        .map(|record| (identity.clone(), record.agent_runtime_id.clone()))
349                })
350                .collect()
351        };
352        let runtime_identities: BTreeMap<AgentRuntimeId, AgentIdentity> = active_runtimes
353            .iter()
354            .map(|(identity, runtime_id)| (runtime_id.clone(), identity.clone()))
355            .collect();
356        let current_logical_edges: Option<BTreeSet<(AgentIdentity, AgentIdentity)>> =
357            match bridge.current_member_wires().await {
358                Ok(current_runtime_edges) => Some(
359                    current_runtime_edges
360                        .iter()
361                        .filter_map(|(runtime_a, runtime_b)| {
362                            let a = runtime_identities.get(runtime_a)?;
363                            let b = runtime_identities.get(runtime_b)?;
364                            if a <= b {
365                                Some((a.clone(), b.clone()))
366                            } else {
367                                Some((b.clone(), a.clone()))
368                            }
369                        })
370                        .collect(),
371                ),
372                Err(err) => {
373                    tracing::debug!(
374                        error = %err,
375                        "identity-first topology reconcile could not inspect current member wires"
376                    );
377                    None
378                }
379            };
380
381        let desired: BTreeSet<(AgentIdentity, AgentIdentity)> = desired_edges
382            .iter()
383            .map(|edge| (edge.a().clone(), edge.b().clone()))
384            .collect();
385
386        let managed_snapshot = self.managed_peer_edges.read().await.clone();
387        let retained_logical_edges: Vec<(AgentIdentity, AgentIdentity)> = desired
388            .iter()
389            .filter(|edge| !managed_snapshot.contains(*edge))
390            .filter(|edge| {
391                current_logical_edges
392                    .as_ref()
393                    .is_some_and(|edges| edges.contains(*edge))
394            })
395            .filter(|(a, b)| active_runtimes.contains_key(a) && active_runtimes.contains_key(b))
396            .cloned()
397            .collect();
398        let to_wire: Vec<(AgentIdentity, AgentIdentity, AgentRuntimeId, AgentRuntimeId)> = desired
399            .iter()
400            .filter(|edge| !managed_snapshot.contains(*edge))
401            .filter(|edge| {
402                current_logical_edges
403                    .as_ref()
404                    .is_none_or(|edges| !edges.contains(*edge))
405            })
406            .filter_map(|(a, b)| {
407                let runtime_a = active_runtimes.get(a)?;
408                let runtime_b = active_runtimes.get(b)?;
409                Some((a.clone(), b.clone(), runtime_a.clone(), runtime_b.clone()))
410            })
411            .collect();
412
413        let stale: Vec<(AgentIdentity, AgentIdentity)> = managed_snapshot
414            .iter()
415            .filter(|edge| !desired.contains(*edge))
416            .cloned()
417            .collect();
418        let to_unwire: Vec<(AgentIdentity, AgentIdentity, AgentRuntimeId, AgentRuntimeId)> = stale
419            .iter()
420            .filter_map(|(a, b)| {
421                let runtime_a = active_runtimes.get(a)?;
422                let runtime_b = active_runtimes.get(b)?;
423                if current_logical_edges
424                    .as_ref()
425                    .is_some_and(|edges| !edges.contains(&(a.clone(), b.clone())))
426                {
427                    return None;
428                }
429                Some((a.clone(), b.clone(), runtime_a.clone(), runtime_b.clone()))
430            })
431            .collect();
432
433        let wire_logical_edges = to_wire
434            .iter()
435            .map(|(a, b, _, _)| (a.clone(), b.clone()))
436            .collect::<Vec<_>>();
437        let wire_runtime_edges = to_wire
438            .iter()
439            .map(|(_, _, runtime_a, runtime_b)| (runtime_a.clone(), runtime_b.clone()))
440            .collect::<Vec<_>>();
441        if !wire_runtime_edges.is_empty() {
442            bridge
443                .wire_peers_batch(&wire_runtime_edges)
444                .await
445                .map_err(|e| {
446                    IdentityRuntimeError::Internal(format!("bridge wire_peers_batch: {e}"))
447                })?;
448        }
449
450        let unwire_results =
451            stream::iter(to_unwire.into_iter().map(|(a, b, runtime_a, runtime_b)| {
452                let bridge = bridge.clone();
453                async move {
454                    let result = bridge
455                        .unwire_peer(&runtime_a, &runtime_b)
456                        .await
457                        .map_err(|e| format!("{e}"));
458                    (a, b, result)
459                }
460            }))
461            .buffer_unordered(MANAGED_PEER_RECONCILE_CONCURRENCY)
462            .collect::<Vec<_>>()
463            .await;
464
465        let mut managed = self.managed_peer_edges.write().await;
466        for (a, b) in retained_logical_edges {
467            managed.insert((a, b));
468        }
469        for (a, b) in wire_logical_edges {
470            managed.insert((a, b));
471        }
472
473        for (a, b) in stale {
474            let key = (a.clone(), b.clone());
475            if !active_runtimes.contains_key(&a) || !active_runtimes.contains_key(&b) {
476                managed.remove(&key);
477            }
478        }
479        for (a, b, result) in unwire_results {
480            result
481                .map_err(|e| IdentityRuntimeError::Internal(format!("bridge unwire_peer: {e}")))?;
482            managed.remove(&(a, b));
483        }
484
485        Ok(())
486    }
487
488    /// Emit an event for the given identity. Best-effort — no error if no subscribers.
489    async fn emit_event(&self, identity: &AgentIdentity, event: IdentityEvent) {
490        let channels = self.event_channels.read().await;
491        if let Some(tx) = channels.get(identity) {
492            let _ = tx.send(event);
493        }
494    }
495
496    // -----------------------------------------------------------------------
497    // Registration / activation
498    // -----------------------------------------------------------------------
499
500    /// Register an identity entry in the runtime (called during restore flow).
501    pub async fn register(
502        &self,
503        spec: DurableAgentSpec,
504        state: IdentityLifecycleState,
505        continuity: Option<ContinuityRecord>,
506        lease: Option<LeaseGrant>,
507    ) {
508        let identity = spec.identity.clone();
509        let cpv = continuity
510            .as_ref()
511            .map(|r| r.checkpoint_version)
512            .unwrap_or(CheckpointVersion::new(0));
513        let lease_entry = lease.map(|g| LeaseEntry {
514            fencing_token: g.fencing_token,
515            ttl: g.ttl,
516            acquired_at: Instant::now(),
517        });
518        let entry = IdentityEntry {
519            spec,
520            state,
521            continuity,
522            lease: lease_entry,
523            checkpoint_version: cpv,
524            has_runtime_store: self.has_runtime_store,
525        };
526        self.entries.write().await.insert(identity.clone(), entry);
527
528        // Create event channel for this identity
529        let (tx, _) = broadcast::channel(IDENTITY_EVENT_CHANNEL_CAPACITY);
530        self.event_channels.write().await.insert(identity, tx);
531    }
532
533    // -----------------------------------------------------------------------
534    // Subscribe — REQ-06
535    // -----------------------------------------------------------------------
536
537    /// Subscribe to identity-scoped events.
538    ///
539    /// Returns a broadcast receiver that yields `IdentityEvent` items for
540    /// state changes, lease updates, lease loss, and checkpoint completions.
541    pub async fn subscribe(
542        &self,
543        identity: &AgentIdentity,
544    ) -> Result<broadcast::Receiver<IdentityEvent>, IdentityRuntimeError> {
545        let channels = self.event_channels.read().await;
546        let tx = channels
547            .get(identity)
548            .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
549        Ok(tx.subscribe())
550    }
551
552    /// Update the spec for an existing identity (used during reconciliation).
553    pub async fn update_spec(&self, spec: DurableAgentSpec) -> Result<(), IdentityRuntimeError> {
554        let mut entries = self.entries.write().await;
555        let entry = entries
556            .get_mut(&spec.identity)
557            .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(spec.identity.clone()))?;
558        entry.spec = spec;
559        Ok(())
560    }
561
562    /// Update the lease for an identity.
563    pub async fn update_lease(
564        &self,
565        identity: &AgentIdentity,
566        grant: LeaseGrant,
567    ) -> Result<(), IdentityRuntimeError> {
568        let fencing_token = grant.fencing_token;
569        let mut entries = self.entries.write().await;
570        let entry = entries
571            .get_mut(identity)
572            .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
573        entry.lease = Some(LeaseEntry {
574            fencing_token,
575            ttl: grant.ttl,
576            acquired_at: Instant::now(),
577        });
578        drop(entries);
579        self.emit_event(
580            identity,
581            IdentityEvent::LeaseUpdated {
582                identity: identity.clone(),
583                fencing_token,
584            },
585        )
586        .await;
587        Ok(())
588    }
589
590    /// Mark a lease as lost for an identity (INV-02).
591    pub async fn mark_lease_lost(
592        &self,
593        identity: &AgentIdentity,
594    ) -> Result<(), IdentityRuntimeError> {
595        let mut entries = self.entries.write().await;
596        let entry = entries
597            .get_mut(identity)
598            .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
599        entry.lease = None;
600        drop(entries);
601        self.emit_event(
602            identity,
603            IdentityEvent::LeaseLost {
604                identity: identity.clone(),
605            },
606        )
607        .await;
608        Ok(())
609    }
610
611    /// Remove an identity from the runtime.
612    #[allow(dead_code)]
613    pub(crate) async fn remove(&self, identity: &AgentIdentity) -> Option<IdentityEntry> {
614        self.event_channels.write().await.remove(identity);
615        self.entries.write().await.remove(identity)
616    }
617
618    /// Set the lifecycle state for an identity.
619    pub async fn set_state(
620        &self,
621        identity: &AgentIdentity,
622        state: IdentityLifecycleState,
623    ) -> Result<(), IdentityRuntimeError> {
624        let mut entries = self.entries.write().await;
625        let entry = entries
626            .get_mut(identity)
627            .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
628        entry.state = state;
629        drop(entries);
630        self.emit_event(
631            identity,
632            IdentityEvent::StateChanged {
633                identity: identity.clone(),
634                new_state: state,
635            },
636        )
637        .await;
638        Ok(())
639    }
640
641    // -----------------------------------------------------------------------
642    // Lease checking (INV-01, INV-02)
643    // -----------------------------------------------------------------------
644
645    /// Check that the identity has an active, non-expired lease.
646    /// Returns the fencing token if valid.
647    fn check_lease(entry: &IdentityEntry) -> Result<FencingToken, IdentityRuntimeError> {
648        match &entry.lease {
649            Some(lease) if !lease.is_expired() => Ok(lease.fencing_token),
650            Some(_) => Err(IdentityRuntimeError::LeaseLost(entry.spec.identity.clone())),
651            None => Err(IdentityRuntimeError::NoActiveLease(
652                entry.spec.identity.clone(),
653            )),
654        }
655    }
656
657    // -----------------------------------------------------------------------
658    // Delivery: send() — REQ-01, REQ-03
659    // -----------------------------------------------------------------------
660
661    /// Send conversational content to an addressable identity.
662    ///
663    /// Enforces:
664    /// - Identity must be registered and active
665    /// - Identity must be Addressable (REQ-03)
666    /// - Lease must be held (INV-01)
667    /// - Lease must not be lost (INV-02)
668    ///
669    /// Returns the fencing token for the delivery (caller uses it for checkpoint).
670    pub async fn send(
671        &self,
672        identity: &AgentIdentity,
673        content: &meerkat_core::ContentInput,
674    ) -> Result<FencingToken, IdentityRuntimeError> {
675        let (token, runtime_id) = {
676            let entries = self.entries.read().await;
677            let entry = entries
678                .get(identity)
679                .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
680
681            // REQ-03: reject send to InternalOnly
682            if entry.spec.addressability == AgentAddressability::InternalOnly {
683                return Err(IdentityRuntimeError::NotAddressable(NotAddressable {
684                    identity: identity.clone(),
685                    addressability: entry.spec.addressability,
686                }));
687            }
688
689            // INV-01 / INV-02: require active lease
690            let token = Self::check_lease(entry)?;
691
692            let runtime_id = entry
693                .continuity
694                .as_ref()
695                .map(|c| c.agent_runtime_id.clone());
696
697            (token, runtime_id)
698        };
699
700        // Deliver through the session bridge when available.
701        if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
702            bridge
703                .deliver(rid, content)
704                .await
705                .map_err(|e| IdentityRuntimeError::Internal(format!("bridge deliver: {e}")))?;
706        }
707
708        Ok(token)
709    }
710
711    // -----------------------------------------------------------------------
712    // Delivery: dispatch() — REQ-02
713    // -----------------------------------------------------------------------
714
715    /// Dispatch internal content to any identity (Addressable or InternalOnly).
716    ///
717    /// Enforces:
718    /// - Identity must be registered and active
719    /// - Lease must be held (INV-01)
720    /// - Lease must not be lost (INV-02)
721    ///
722    /// Returns (fencing_token, is_durable) where is_durable indicates whether
723    /// the dispatch is backed by a runtime_store (REQ-04).
724    pub async fn dispatch(
725        &self,
726        identity: &AgentIdentity,
727        input: &DispatchInput,
728    ) -> Result<(FencingToken, bool), IdentityRuntimeError> {
729        let (token, is_durable, runtime_id) = {
730            let entries = self.entries.read().await;
731            let entry = entries
732                .get(identity)
733                .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
734
735            // INV-01 / INV-02: require active lease
736            let token = Self::check_lease(entry)?;
737
738            // REQ-04: durability depends on runtime_store
739            let is_durable = entry.has_runtime_store;
740
741            let runtime_id = entry
742                .continuity
743                .as_ref()
744                .map(|c| c.agent_runtime_id.clone());
745
746            (token, is_durable, runtime_id)
747        };
748
749        // Deliver through the session bridge when available.
750        if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
751            bridge
752                .deliver(rid, &input.content)
753                .await
754                .map_err(|e| IdentityRuntimeError::Internal(format!("bridge dispatch: {e}")))?;
755        }
756
757        Ok((token, is_durable))
758    }
759
760    // -----------------------------------------------------------------------
761    // Status: status() — REQ-07
762    // -----------------------------------------------------------------------
763
764    /// Return the full identity status for the given identity.
765    pub async fn status(
766        &self,
767        identity: &AgentIdentity,
768    ) -> Result<IdentityStatus, IdentityRuntimeError> {
769        let entries = self.entries.read().await;
770        let entry = entries
771            .get(identity)
772            .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
773
774        let lease_info = entry.lease.as_ref().map(|l| LeaseInfo {
775            fencing_token: l.fencing_token,
776            ttl_remaining: l.ttl_remaining(),
777            healthy: l.is_healthy(),
778        });
779
780        let continuity_health = Some(ContinuityHealth {
781            store_reachable: true, // tracked per-store in production
782            durability_policy: self.durability_policy.clone(),
783            last_checkpoint_version: if entry.checkpoint_version.get() > 0 {
784                Some(entry.checkpoint_version)
785            } else {
786                None
787            },
788        });
789
790        Ok(IdentityStatus {
791            identity: identity.clone(),
792            state: entry.state,
793            agent_runtime_id: entry
794                .continuity
795                .as_ref()
796                .map(|c| c.agent_runtime_id.clone()),
797            session_id: entry.continuity.as_ref().map(|c| c.session_id.clone()),
798            profile: Some(entry.spec.profile.clone()),
799            runtime_mode: entry.spec.runtime_mode_override,
800            addressability: entry.spec.addressability,
801            display_name: entry.spec.display_name.clone(),
802            labels: entry.spec.labels.clone(),
803            generation: entry.continuity.as_ref().map(|c| c.generation),
804            checkpoint_version: if entry.checkpoint_version.get() > 0 {
805                Some(entry.checkpoint_version)
806            } else {
807                None
808            },
809            lease: lease_info,
810            continuity_health,
811        })
812    }
813
814    // -----------------------------------------------------------------------
815    // Lifecycle: retire() — REQ-08
816    // -----------------------------------------------------------------------
817
818    /// Retire an identity. Validates lease ownership and retires the mob member.
819    pub async fn retire(
820        &self,
821        identity: &AgentIdentity,
822    ) -> Result<FencingToken, IdentityRuntimeError> {
823        let (token, runtime_id) = {
824            let mut entries = self.entries.write().await;
825            let entry = entries
826                .get_mut(identity)
827                .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
828
829            let token = Self::check_lease(entry)?;
830            entry.state = IdentityLifecycleState::Retiring;
831
832            let runtime_id = entry
833                .continuity
834                .as_ref()
835                .map(|c| c.agent_runtime_id.clone());
836
837            (token, runtime_id)
838        };
839
840        // Retire the mob member through the session bridge when available.
841        if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
842            bridge
843                .retire_member(rid)
844                .await
845                .map_err(|e| IdentityRuntimeError::Internal(format!("bridge retire: {e}")))?;
846        }
847
848        Ok(token)
849    }
850
851    // -----------------------------------------------------------------------
852    // Lifecycle: respawn() — REQ-09
853    // -----------------------------------------------------------------------
854
855    /// Respawn: non-destructive recovery.
856    ///
857    /// 1. Fence the current owner
858    /// 2. Attempt final checkpoint
859    /// 3. Reactivate from authoritative continuity with same record + runtime ID
860    /// 4. ContinuityGeneration does NOT advance
861    pub async fn respawn(
862        &self,
863        identity: &AgentIdentity,
864    ) -> Result<ContinuityRecord, IdentityRuntimeError> {
865        // Fence the old owner by re-acquiring the lease
866        let acquire_result = self
867            .lease_provider
868            .acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
869            .await
870            .map_err(IdentityRuntimeError::Lease)?;
871
872        let grant = match acquire_result.get(identity) {
873            Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
874            _ => {
875                return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
876            }
877        };
878
879        // Resolve current continuity state
880        let resolved = self
881            .continuity_store
882            .resolve_many(std::slice::from_ref(identity))
883            .await
884            .map_err(IdentityRuntimeError::Store)?;
885
886        let record = match resolved.get(identity) {
887            Some(super::types::ContinuityResolveState::Ready { record }) => record.clone(),
888            Some(super::types::ContinuityResolveState::Broken { failure }) => {
889                return Err(IdentityRuntimeError::Internal(format!(
890                    "broken continuity for {identity}: {}",
891                    failure.detail
892                )));
893            }
894            Some(super::types::ContinuityResolveState::Uninitialized) => {
895                return Err(IdentityRuntimeError::Internal(format!(
896                    "cannot respawn uninitialized identity {identity}"
897                )));
898            }
899            None => {
900                return Err(IdentityRuntimeError::Store(
901                    ContinuityStoreError::NotFound {
902                        identity: identity.clone(),
903                    },
904                ));
905            }
906        };
907
908        // Update runtime state: same record, new lease, back to Active
909        let mut entries = self.entries.write().await;
910        if let Some(entry) = entries.get_mut(identity) {
911            entry.continuity = Some(record.clone());
912            entry.lease = Some(LeaseEntry {
913                fencing_token: grant.fencing_token,
914                ttl: grant.ttl,
915                acquired_at: Instant::now(),
916            });
917            entry.state = IdentityLifecycleState::Active;
918            entry.checkpoint_version = record.checkpoint_version;
919        }
920
921        Ok(record)
922    }
923
924    // -----------------------------------------------------------------------
925    // Lifecycle: reset() — REQ-10
926    // -----------------------------------------------------------------------
927
928    /// Reset: destructive continuity reset.
929    ///
930    /// 1. Fence old owner
931    /// 2. Advance ContinuityGeneration
932    /// 3. Create fresh continuity under the same AgentIdentity
933    /// 4. Old-owner late writes rejected by stale fencing token
934    pub async fn reset(
935        &self,
936        identity: &AgentIdentity,
937    ) -> Result<ContinuityRecord, IdentityRuntimeError> {
938        // INV-05: fence the old owner first
939        let acquire_result = self
940            .lease_provider
941            .acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
942            .await
943            .map_err(IdentityRuntimeError::Lease)?;
944
945        let grant = match acquire_result.get(identity) {
946            Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
947            _ => {
948                return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
949            }
950        };
951
952        // Resolve to get current generation
953        let resolved = self
954            .continuity_store
955            .resolve_many(std::slice::from_ref(identity))
956            .await
957            .map_err(IdentityRuntimeError::Store)?;
958
959        let current_gen = match resolved.get(identity) {
960            Some(super::types::ContinuityResolveState::Ready { record }) => record.generation,
961            Some(super::types::ContinuityResolveState::Uninitialized) => {
962                ContinuityGeneration::new(0)
963            }
964            _ => ContinuityGeneration::new(0),
965        };
966
967        // Advance generation
968        let new_gen = ContinuityGeneration::new(current_gen.get() + 1);
969        let new_session_id = meerkat_core::types::SessionId::new();
970        let new_runtime_id = AgentRuntimeId::parse(&format!("{}:gen{}", identity, new_gen.get()))
971            .map_err(|e| {
972            IdentityRuntimeError::Internal(format!("failed to mint runtime id: {e}"))
973        })?;
974
975        let new_record = ContinuityRecord {
976            identity: identity.clone(),
977            agent_runtime_id: new_runtime_id,
978            session_id: new_session_id,
979            generation: new_gen,
980            checkpoint_version: CheckpointVersion::new(0),
981        };
982
983        // Persist the new record (fencing token from new lease protects against old writes)
984        self.continuity_store
985            .upsert_continuity_record(&new_record, grant.fencing_token)
986            .await?;
987
988        // Bridge: retire old mob member and create fresh session for the new identity.
989        if let Some(bridge) = &self.bridge {
990            // Retire old member (best-effort — may already be gone)
991            let old_runtime_id = {
992                let entries = self.entries.read().await;
993                entries
994                    .get(identity)
995                    .and_then(|e| e.continuity.as_ref().map(|c| c.agent_runtime_id.clone()))
996            };
997            if let Some(old_id) = old_runtime_id {
998                let _ = bridge.retire_member(&old_id).await;
999            }
1000
1001            // Get the spec from the runtime entry
1002            let spec = {
1003                let entries = self.entries.read().await;
1004                entries.get(identity).map(|e| e.spec.clone())
1005            };
1006            if let Some(spec) = spec {
1007                let draft = super::types::AgentBuildDraft {
1008                    model: None,
1009                    system_prompt: None,
1010                    additional_instructions: spec.additional_instructions.clone(),
1011                    labels: spec.labels.clone(),
1012                    app_context: spec.context.clone(),
1013                    external_tools: Vec::new(),
1014                    local_external_tools: Default::default(),
1015                };
1016                bridge
1017                    .register_session_runtime_state(
1018                        &new_record.session_id,
1019                        identity,
1020                        new_record.generation,
1021                        new_record.checkpoint_version,
1022                        grant.fencing_token,
1023                    )
1024                    .await
1025                    .map_err(|e| {
1026                        IdentityRuntimeError::Internal(format!(
1027                            "bridge register_session_runtime_state after reset: {e}"
1028                        ))
1029                    })?;
1030                let session_id = bridge
1031                    .create_session(
1032                        identity,
1033                        &new_record.agent_runtime_id,
1034                        &spec,
1035                        &draft,
1036                        &new_record.session_id,
1037                    )
1038                    .await
1039                    .map_err(|e| {
1040                        IdentityRuntimeError::Internal(format!(
1041                            "bridge create_session after reset: {e}"
1042                        ))
1043                    })?;
1044                // Update the record with the actual session ID
1045                let initial_session_id = new_record.session_id.clone();
1046                let mut new_record = new_record;
1047                new_record.session_id = session_id;
1048
1049                // Persist the updated record
1050                if new_record.session_id != initial_session_id {
1051                    self.continuity_store
1052                        .upsert_continuity_record(&new_record, grant.fencing_token)
1053                        .await?;
1054                }
1055                let effective_checkpoint_version = bridge
1056                    .register_session_runtime_state(
1057                        &new_record.session_id,
1058                        identity,
1059                        new_record.generation,
1060                        new_record.checkpoint_version,
1061                        grant.fencing_token,
1062                    )
1063                    .await
1064                    .map_err(|e| {
1065                        IdentityRuntimeError::Internal(format!(
1066                            "bridge register actual session runtime state after reset: {e}"
1067                        ))
1068                    })?;
1069                new_record.checkpoint_version = effective_checkpoint_version;
1070
1071                // Update runtime state
1072                let mut entries = self.entries.write().await;
1073                if let Some(entry) = entries.get_mut(identity) {
1074                    entry.continuity = Some(new_record.clone());
1075                    entry.lease = Some(LeaseEntry {
1076                        fencing_token: grant.fencing_token,
1077                        ttl: grant.ttl,
1078                        acquired_at: Instant::now(),
1079                    });
1080                    entry.state = IdentityLifecycleState::Active;
1081                    entry.checkpoint_version = new_record.checkpoint_version;
1082                }
1083                return Ok(new_record);
1084            }
1085        }
1086
1087        // No bridge — update runtime state only (validation mode)
1088        let mut entries = self.entries.write().await;
1089        if let Some(entry) = entries.get_mut(identity) {
1090            entry.continuity = Some(new_record.clone());
1091            entry.lease = Some(LeaseEntry {
1092                fencing_token: grant.fencing_token,
1093                ttl: grant.ttl,
1094                acquired_at: Instant::now(),
1095            });
1096            entry.state = IdentityLifecycleState::Active;
1097            entry.checkpoint_version = CheckpointVersion::new(0);
1098        }
1099
1100        Ok(new_record)
1101    }
1102
1103    // -----------------------------------------------------------------------
1104    // Lifecycle: delete_identity() — REQ-11
1105    // -----------------------------------------------------------------------
1106
1107    /// Delete an identity: removes continuity record.
1108    ///
1109    /// 1. Fence old owner
1110    /// 2. Remove ContinuityRecord
1111    /// 3. Future bootstrap treats identity as Uninitialized
1112    pub async fn delete_identity(
1113        &self,
1114        identity: &AgentIdentity,
1115    ) -> Result<(), IdentityRuntimeError> {
1116        // INV-05: fence the old owner first
1117        let acquire_result = self
1118            .lease_provider
1119            .acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
1120            .await
1121            .map_err(IdentityRuntimeError::Lease)?;
1122
1123        let grant = match acquire_result.get(identity) {
1124            Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
1125            _ => {
1126                return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
1127            }
1128        };
1129
1130        // Retire the mob member through the session bridge before removing
1131        // the continuity record. This ensures the mob actor is cleaned up.
1132        let runtime_id = {
1133            let entries = self.entries.read().await;
1134            entries
1135                .get(identity)
1136                .and_then(|e| e.continuity.as_ref().map(|c| c.agent_runtime_id.clone()))
1137        };
1138        if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
1139            // Best-effort: if the member is already gone, ignore the error.
1140            let _ = bridge.retire_member(rid).await;
1141        }
1142
1143        // Remove authoritative continuity record from the store
1144        self.continuity_store
1145            .delete_continuity_record(identity, grant.fencing_token)
1146            .await?;
1147
1148        // Remove from runtime tracking
1149        self.event_channels.write().await.remove(identity);
1150        self.entries.write().await.remove(identity);
1151
1152        Ok(())
1153    }
1154
1155    // -----------------------------------------------------------------------
1156    // Checkpoint — REQ-14, REQ-15, REQ-16, REQ-17
1157    // -----------------------------------------------------------------------
1158
1159    /// Save a checkpoint snapshot. Enforces version ordering and fencing.
1160    pub async fn checkpoint(
1161        &self,
1162        identity: &AgentIdentity,
1163        snapshot: &SessionSnapshot,
1164    ) -> Result<CheckpointVersion, IdentityRuntimeError> {
1165        let (record, token, new_version) = {
1166            let entries = self.entries.read().await;
1167            let entry = entries
1168                .get(identity)
1169                .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
1170
1171            // INV-01: require active lease
1172            let token = Self::check_lease(entry)?;
1173
1174            let record = entry
1175                .continuity
1176                .as_ref()
1177                .ok_or_else(|| {
1178                    IdentityRuntimeError::Internal(format!("no continuity record for {identity}"))
1179                })?
1180                .clone();
1181
1182            let new_version = CheckpointVersion::new(entry.checkpoint_version.get() + 1);
1183            (record, token, new_version)
1184        };
1185
1186        // REQ-15 + REQ-16: store enforces version ordering and fencing
1187        self.continuity_store
1188            .save_session_snapshot(
1189                identity,
1190                &record.session_id,
1191                record.generation,
1192                new_version,
1193                token,
1194                snapshot,
1195            )
1196            .await?;
1197
1198        // Update local checkpoint version
1199        {
1200            let mut entries = self.entries.write().await;
1201            if let Some(entry) = entries.get_mut(identity) {
1202                entry.checkpoint_version = new_version;
1203            }
1204        }
1205
1206        self.emit_event(
1207            identity,
1208            IdentityEvent::CheckpointCompleted {
1209                identity: identity.clone(),
1210                version: new_version,
1211            },
1212        )
1213        .await;
1214
1215        Ok(new_version)
1216    }
1217
1218    // -----------------------------------------------------------------------
1219    // Roster inspection — REQ-32
1220    // -----------------------------------------------------------------------
1221
1222    /// Return all active identities with their specs and status.
1223    pub async fn roster_inspect(
1224        &self,
1225    ) -> BTreeMap<AgentIdentity, (DurableAgentSpec, IdentityStatus)> {
1226        let entries = self.entries.read().await;
1227        let mut result = BTreeMap::new();
1228        for (identity, entry) in entries.iter() {
1229            let lease_info = entry.lease.as_ref().map(|l| LeaseInfo {
1230                fencing_token: l.fencing_token,
1231                ttl_remaining: l.ttl_remaining(),
1232                healthy: l.is_healthy(),
1233            });
1234            let continuity_health = Some(ContinuityHealth {
1235                store_reachable: true,
1236                durability_policy: self.durability_policy.clone(),
1237                last_checkpoint_version: if entry.checkpoint_version.get() > 0 {
1238                    Some(entry.checkpoint_version)
1239                } else {
1240                    None
1241                },
1242            });
1243            let status = IdentityStatus {
1244                identity: identity.clone(),
1245                state: entry.state,
1246                agent_runtime_id: entry
1247                    .continuity
1248                    .as_ref()
1249                    .map(|c| c.agent_runtime_id.clone()),
1250                session_id: entry.continuity.as_ref().map(|c| c.session_id.clone()),
1251                profile: Some(entry.spec.profile.clone()),
1252                runtime_mode: entry.spec.runtime_mode_override,
1253                addressability: entry.spec.addressability,
1254                display_name: entry.spec.display_name.clone(),
1255                labels: entry.spec.labels.clone(),
1256                generation: entry.continuity.as_ref().map(|c| c.generation),
1257                checkpoint_version: if entry.checkpoint_version.get() > 0 {
1258                    Some(entry.checkpoint_version)
1259                } else {
1260                    None
1261                },
1262                lease: lease_info,
1263                continuity_health,
1264            };
1265            result.insert(identity.clone(), (entry.spec.clone(), status));
1266        }
1267        result
1268    }
1269
1270    // -----------------------------------------------------------------------
1271    // Roster uniqueness validation — INV-06
1272    // -----------------------------------------------------------------------
1273
1274    /// Validate that a roster contains no duplicate identities.
1275    pub fn validate_roster_uniqueness(
1276        specs: &[DurableAgentSpec],
1277    ) -> Result<(), IdentityRuntimeError> {
1278        let mut seen = std::collections::BTreeSet::new();
1279        for spec in specs {
1280            if !seen.insert(&spec.identity) {
1281                return Err(IdentityRuntimeError::DuplicateIdentity(
1282                    spec.identity.clone(),
1283                ));
1284            }
1285        }
1286        Ok(())
1287    }
1288
1289    // -----------------------------------------------------------------------
1290    // Accessors for internal state
1291    // -----------------------------------------------------------------------
1292
1293    /// Get the current entries (read-only snapshot).
1294    #[allow(dead_code)]
1295    pub(crate) async fn entries(&self) -> BTreeMap<AgentIdentity, IdentityEntry> {
1296        self.entries.read().await.clone()
1297    }
1298
1299    /// Check if an identity is registered.
1300    pub async fn contains(&self, identity: &AgentIdentity) -> bool {
1301        self.entries.read().await.contains_key(identity)
1302    }
1303
1304    /// Check if an identity is registered AND in Active state.
1305    pub async fn is_active(&self, identity: &AgentIdentity) -> bool {
1306        self.entries
1307            .read()
1308            .await
1309            .get(identity)
1310            .is_some_and(|e| e.state == IdentityLifecycleState::Active)
1311    }
1312
1313    /// Get the continuity store reference.
1314    pub fn continuity_store(&self) -> &Arc<dyn ContinuityStore> {
1315        &self.continuity_store
1316    }
1317
1318    /// Get the lease provider reference.
1319    pub fn lease_provider(&self) -> &Arc<dyn LeaseProvider> {
1320        &self.lease_provider
1321    }
1322
1323    /// Get the runtime instance ID.
1324    pub fn runtime_instance_id(&self) -> &str {
1325        &self.runtime_instance_id
1326    }
1327
1328    /// Get the durability policy.
1329    pub fn durability_policy(&self) -> &DurabilityPolicy {
1330        &self.durability_policy
1331    }
1332
1333    /// Get whether a runtime store is configured.
1334    pub fn has_runtime_store(&self) -> bool {
1335        self.has_runtime_store
1336    }
1337
1338    /// Get the session bridge reference, if configured.
1339    pub fn bridge(&self) -> Option<&Arc<dyn SessionBridge>> {
1340        self.bridge.as_ref()
1341    }
1342
1343    // -----------------------------------------------------------------------
1344    // Convenience methods
1345    // -----------------------------------------------------------------------
1346
1347    /// Send plain text to an addressable identity.
1348    pub async fn send_text(
1349        &self,
1350        identity: &AgentIdentity,
1351        text: impl Into<String>,
1352    ) -> Result<FencingToken, IdentityRuntimeError> {
1353        self.send(identity, &meerkat_core::ContentInput::Text(text.into()))
1354            .await
1355    }
1356
1357    /// Dispatch plain text with system origin.
1358    pub async fn dispatch_text(
1359        &self,
1360        identity: &AgentIdentity,
1361        text: impl Into<String>,
1362    ) -> Result<(FencingToken, bool), IdentityRuntimeError> {
1363        self.dispatch(identity, &DispatchInput::system(text)).await
1364    }
1365
1366    /// Execute the restore flow for the given roster.
1367    pub async fn restore_flow(
1368        &self,
1369        roster: &[DurableAgentSpec],
1370        topology_provider: Option<&dyn super::contracts::TopologyProvider>,
1371        customizer: Option<&dyn super::contracts::AgentCustomizer>,
1372    ) -> Result<super::orchestrator::RestoreFlowResult, IdentityRuntimeError> {
1373        super::orchestrator::restore_flow(self, roster, topology_provider, customizer).await
1374    }
1375
1376    /// Resolve the AgentRuntimeId for a registered identity.
1377    pub async fn runtime_id_for(
1378        &self,
1379        identity: &AgentIdentity,
1380    ) -> Result<AgentRuntimeId, IdentityRuntimeError> {
1381        let entries = self.entries.read().await;
1382        let entry = entries
1383            .get(identity)
1384            .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
1385        entry
1386            .continuity
1387            .as_ref()
1388            .map(|c| c.agent_runtime_id.clone())
1389            .ok_or_else(|| {
1390                IdentityRuntimeError::Internal(format!("no continuity record for {identity}"))
1391            })
1392    }
1393
1394    /// Inspect the current execution state of an identity via the bridge.
1395    pub async fn inspect(
1396        &self,
1397        identity: &AgentIdentity,
1398    ) -> Result<super::bridge::MemberInspection, IdentityRuntimeError> {
1399        let runtime_id = self.runtime_id_for(identity).await?;
1400        let bridge = self
1401            .bridge
1402            .as_ref()
1403            .ok_or_else(|| IdentityRuntimeError::Internal("no bridge configured".to_string()))?;
1404        bridge
1405            .inspect_member(&runtime_id)
1406            .await
1407            .map_err(|e| IdentityRuntimeError::Internal(format!("inspect: {e}")))
1408    }
1409
1410    /// The configured default timeout for wait operations.
1411    pub fn default_timeout(&self) -> Duration {
1412        self.default_timeout
1413    }
1414
1415    /// Poll until the identity produces an output_preview, or timeout.
1416    pub async fn wait_for_output(
1417        &self,
1418        identity: &AgentIdentity,
1419        timeout: Duration,
1420    ) -> Result<String, IdentityRuntimeError> {
1421        let deadline = Instant::now() + timeout;
1422        loop {
1423            if let Ok(inspection) = self.inspect(identity).await
1424                && let Some(preview) = inspection.output_preview
1425            {
1426                return Ok(preview);
1427            }
1428            if Instant::now() >= deadline {
1429                return Err(IdentityRuntimeError::Internal(format!(
1430                    "timed out waiting for output from {identity}"
1431                )));
1432            }
1433            tokio::time::sleep(Duration::from_millis(500)).await;
1434        }
1435    }
1436
1437    /// Poll until output_preview contains the given substring, or timeout.
1438    pub async fn wait_for_output_containing(
1439        &self,
1440        identity: &AgentIdentity,
1441        needle: &str,
1442        timeout: Duration,
1443    ) -> Result<String, IdentityRuntimeError> {
1444        let deadline = Instant::now() + timeout;
1445        loop {
1446            if let Ok(inspection) = self.inspect(identity).await
1447                && let Some(ref preview) = inspection.output_preview
1448                && preview.contains(needle)
1449            {
1450                return Ok(preview.clone());
1451            }
1452            if Instant::now() >= deadline {
1453                return Err(IdentityRuntimeError::Internal(format!(
1454                    "timed out waiting for output containing '{needle}' from {identity}"
1455                )));
1456            }
1457            tokio::time::sleep(Duration::from_millis(500)).await;
1458        }
1459    }
1460}
1461
1462/// Wire two identities across mobs, resolving runtime IDs from both IdentityRuntimes.
1463///
1464/// This is a convenience function for same-process cross-mob scenarios where both
1465/// IdentityRuntimes are available. It resolves the AgentRuntimeId for each identity
1466/// and delegates to `UnifiedRuntime::wire_cross_mob()`.
1467pub async fn wire_cross_mob_by_identity(
1468    local_irt: &IdentityRuntime,
1469    local_identity: &AgentIdentity,
1470    remote_irt: &IdentityRuntime,
1471    remote_identity: &AgentIdentity,
1472    local_unified: &crate::UnifiedRuntime,
1473    remote_mob_id: &str,
1474) -> Result<(), IdentityRuntimeError> {
1475    let local_rt = local_irt.runtime_id_for(local_identity).await?;
1476    let remote_rt = remote_irt.runtime_id_for(remote_identity).await?;
1477    local_unified
1478        .wire_cross_mob(local_rt.as_str(), remote_rt.as_str(), remote_mob_id)
1479        .await
1480        .map_err(|e| IdentityRuntimeError::Internal(format!("wire_cross_mob: {e}")))
1481}