1use std::collections::{BTreeMap, BTreeSet};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use futures::stream::{self, StreamExt};
14use tokio::sync::{RwLock, broadcast};
15
16use super::bridge::SessionBridge;
17use super::contracts::{
18 AgentCustomizer, ContinuityStore, LeaseProvider, RosterProvider, TopologyProvider,
19};
20use super::types::{
21 AgentAddressability, AgentIdentity, AgentRuntimeId, AgentRuntimeServices, CheckpointVersion,
22 ContinuityGeneration, ContinuityHealth, ContinuityRecord, ContinuityStoreError, DispatchInput,
23 DurabilityPolicy, DurableAgentSpec, FencingToken, IdentityLifecycleState, IdentityStatus,
24 LeaseGrant, LeaseInfo, ManagedPeerEdge, NotAddressable, RosterContext, SessionSnapshot,
25};
26
27const MANAGED_PEER_RECONCILE_CONCURRENCY: usize = 64;
28
29#[derive(Debug)]
35pub enum IdentityRuntimeError {
36 UnknownIdentity(AgentIdentity),
38 NotAddressable(NotAddressable),
40 NoActiveLease(AgentIdentity),
42 LeaseLost(AgentIdentity),
44 InvalidState {
46 identity: AgentIdentity,
47 state: IdentityLifecycleState,
48 operation: &'static str,
49 },
50 Store(ContinuityStoreError),
52 Lease(super::types::LeaseError),
54 DuplicateIdentity(AgentIdentity),
56 StaleFencingToken {
58 identity: AgentIdentity,
59 presented: FencingToken,
60 current: FencingToken,
61 },
62 StaleCheckpointVersion {
64 identity: AgentIdentity,
65 presented: CheckpointVersion,
66 current: CheckpointVersion,
67 },
68 Internal(String),
70}
71
72impl std::fmt::Display for IdentityRuntimeError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 Self::UnknownIdentity(id) => write!(f, "unknown identity: {id}"),
76 Self::NotAddressable(err) => write!(f, "{err}"),
77 Self::NoActiveLease(id) => write!(f, "no active lease for {id}"),
78 Self::LeaseLost(id) => write!(f, "lease lost for {id}"),
79 Self::InvalidState {
80 identity,
81 state,
82 operation,
83 } => write!(
84 f,
85 "cannot {operation} identity {identity} in state {state:?}"
86 ),
87 Self::Store(err) => write!(f, "continuity store: {err}"),
88 Self::Lease(err) => write!(f, "lease provider: {err}"),
89 Self::DuplicateIdentity(id) => write!(f, "duplicate identity in roster: {id}"),
90 Self::StaleFencingToken {
91 identity,
92 presented,
93 current,
94 } => write!(
95 f,
96 "stale fencing token for {identity}: presented {presented}, current {current}"
97 ),
98 Self::StaleCheckpointVersion {
99 identity,
100 presented,
101 current,
102 } => write!(
103 f,
104 "stale checkpoint version for {identity}: presented {presented}, current {current}"
105 ),
106 Self::Internal(msg) => write!(f, "internal: {msg}"),
107 }
108 }
109}
110
111impl std::error::Error for IdentityRuntimeError {}
112
113impl From<ContinuityStoreError> for IdentityRuntimeError {
114 fn from(err: ContinuityStoreError) -> Self {
115 match err {
116 ContinuityStoreError::StaleFencingToken {
117 identity,
118 presented,
119 current,
120 } => Self::StaleFencingToken {
121 identity,
122 presented,
123 current,
124 },
125 ContinuityStoreError::StaleCheckpointVersion {
126 identity,
127 presented,
128 current,
129 } => Self::StaleCheckpointVersion {
130 identity,
131 presented,
132 current,
133 },
134 other => Self::Store(other),
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
145pub(crate) struct IdentityEntry {
146 pub spec: DurableAgentSpec,
147 pub state: IdentityLifecycleState,
148 pub continuity: Option<ContinuityRecord>,
149 pub lease: Option<LeaseEntry>,
150 pub checkpoint_version: CheckpointVersion,
151 pub has_runtime_store: bool,
153}
154
155#[derive(Debug, Clone)]
157pub(crate) struct LeaseEntry {
158 pub fencing_token: FencingToken,
159 pub ttl: Duration,
160 pub acquired_at: Instant,
161}
162
163impl LeaseEntry {
164 pub fn is_expired(&self) -> bool {
165 self.acquired_at.elapsed() > self.ttl
166 }
167
168 pub fn ttl_remaining(&self) -> Duration {
169 self.ttl.saturating_sub(self.acquired_at.elapsed())
170 }
171
172 pub fn is_healthy(&self) -> bool {
173 let remaining = self.ttl_remaining();
175 remaining > self.ttl / 5
176 }
177}
178
179#[derive(Debug, Clone)]
185pub enum IdentityEvent {
186 StateChanged {
188 identity: AgentIdentity,
189 new_state: IdentityLifecycleState,
190 },
191 LeaseUpdated {
193 identity: AgentIdentity,
194 fencing_token: FencingToken,
195 },
196 LeaseLost { identity: AgentIdentity },
198 CheckpointCompleted {
200 identity: AgentIdentity,
201 version: CheckpointVersion,
202 },
203}
204
205const IDENTITY_EVENT_CHANNEL_CAPACITY: usize = 64;
207
208pub struct IdentityRuntimeConfig {
214 pub continuity_store: Arc<dyn ContinuityStore>,
215 pub lease_provider: Arc<dyn LeaseProvider>,
216 pub runtime_instance_id: String,
217 pub has_runtime_store: bool,
218 pub durability_policy: DurabilityPolicy,
219 pub bridge: Option<Arc<dyn SessionBridge>>,
223 pub default_timeout: Option<Duration>,
226}
227
228#[derive(Clone)]
229pub struct IdentityFirstRuntimeContext {
230 pub runtime: Arc<IdentityRuntime>,
231 pub roster_provider: Arc<dyn RosterProvider>,
232 pub topology_provider: Option<Arc<dyn TopologyProvider>>,
233 pub customizer: Option<Arc<dyn AgentCustomizer>>,
234 mob_definition: Option<meerkat_mob::MobDefinition>,
235}
236
237impl IdentityFirstRuntimeContext {
238 pub fn new(
239 runtime: Arc<IdentityRuntime>,
240 roster_provider: Arc<dyn RosterProvider>,
241 topology_provider: Option<Arc<dyn TopologyProvider>>,
242 customizer: Option<Arc<dyn AgentCustomizer>>,
243 mob_definition: Option<meerkat_mob::MobDefinition>,
244 ) -> Self {
245 Self {
246 runtime,
247 roster_provider,
248 topology_provider,
249 customizer,
250 mob_definition,
251 }
252 }
253
254 pub async fn refresh_desired_topology(
255 &self,
256 ) -> Result<super::orchestrator::RestoreFlowResult, IdentityRuntimeError> {
257 let roster = self
258 .roster_provider
259 .roster(&RosterContext {
260 mob_definition: self.mob_definition.clone(),
261 previous_identities: Vec::new(),
262 })
263 .await
264 .map_err(|err| IdentityRuntimeError::Internal(format!("roster provider: {err}")))?;
265
266 super::orchestrator::restore_flow(
267 &self.runtime,
268 &roster,
269 self.topology_provider.as_deref(),
270 self.customizer.as_deref(),
271 )
272 .await
273 }
274}
275
276pub struct IdentityRuntime {
279 entries: RwLock<BTreeMap<AgentIdentity, IdentityEntry>>,
280 event_channels: RwLock<BTreeMap<AgentIdentity, broadcast::Sender<IdentityEvent>>>,
281 continuity_store: Arc<dyn ContinuityStore>,
282 lease_provider: Arc<dyn LeaseProvider>,
283 runtime_instance_id: String,
284 has_runtime_store: bool,
285 durability_policy: DurabilityPolicy,
286 bridge: Option<Arc<dyn SessionBridge>>,
287 runtime_services: AgentRuntimeServices,
288 managed_peer_edges: RwLock<BTreeSet<(AgentIdentity, AgentIdentity)>>,
289 default_timeout: Duration,
290}
291
292impl IdentityRuntime {
293 pub fn new(config: IdentityRuntimeConfig) -> Self {
295 Self {
296 entries: RwLock::new(BTreeMap::new()),
297 event_channels: RwLock::new(BTreeMap::new()),
298 continuity_store: config.continuity_store,
299 lease_provider: config.lease_provider,
300 runtime_instance_id: config.runtime_instance_id,
301 has_runtime_store: config.has_runtime_store,
302 durability_policy: config.durability_policy,
303 bridge: config.bridge,
304 runtime_services: AgentRuntimeServices::empty(),
305 managed_peer_edges: RwLock::new(BTreeSet::new()),
306 default_timeout: config.default_timeout.unwrap_or(Duration::from_secs(90)),
307 }
308 }
309
310 pub fn with_runtime_services(mut self, runtime_services: AgentRuntimeServices) -> Self {
311 self.runtime_services = runtime_services;
312 self
313 }
314
315 pub(crate) fn runtime_services(&self) -> AgentRuntimeServices {
316 self.runtime_services.clone()
317 }
318
319 #[must_use]
320 pub fn has_session_bridge(&self) -> bool {
321 self.bridge.is_some()
322 }
323
324 pub async fn reconcile_managed_peer_edges(
330 &self,
331 desired_edges: &[ManagedPeerEdge],
332 ) -> Result<(), IdentityRuntimeError> {
333 let Some(bridge) = self.bridge.clone() else {
334 return Ok(());
335 };
336
337 let active_runtimes: BTreeMap<AgentIdentity, AgentRuntimeId> = {
338 let entries = self.entries.read().await;
339 entries
340 .iter()
341 .filter_map(|(identity, entry)| {
342 if entry.state != IdentityLifecycleState::Active {
343 return None;
344 }
345 entry
346 .continuity
347 .as_ref()
348 .map(|record| (identity.clone(), record.agent_runtime_id.clone()))
349 })
350 .collect()
351 };
352 let runtime_identities: BTreeMap<AgentRuntimeId, AgentIdentity> = active_runtimes
353 .iter()
354 .map(|(identity, runtime_id)| (runtime_id.clone(), identity.clone()))
355 .collect();
356 let current_logical_edges: Option<BTreeSet<(AgentIdentity, AgentIdentity)>> =
357 match bridge.current_member_wires().await {
358 Ok(current_runtime_edges) => Some(
359 current_runtime_edges
360 .iter()
361 .filter_map(|(runtime_a, runtime_b)| {
362 let a = runtime_identities.get(runtime_a)?;
363 let b = runtime_identities.get(runtime_b)?;
364 if a <= b {
365 Some((a.clone(), b.clone()))
366 } else {
367 Some((b.clone(), a.clone()))
368 }
369 })
370 .collect(),
371 ),
372 Err(err) => {
373 tracing::debug!(
374 error = %err,
375 "identity-first topology reconcile could not inspect current member wires"
376 );
377 None
378 }
379 };
380
381 let desired: BTreeSet<(AgentIdentity, AgentIdentity)> = desired_edges
382 .iter()
383 .map(|edge| (edge.a().clone(), edge.b().clone()))
384 .collect();
385
386 let managed_snapshot = self.managed_peer_edges.read().await.clone();
387 let retained_logical_edges: Vec<(AgentIdentity, AgentIdentity)> = desired
388 .iter()
389 .filter(|edge| !managed_snapshot.contains(*edge))
390 .filter(|edge| {
391 current_logical_edges
392 .as_ref()
393 .is_some_and(|edges| edges.contains(*edge))
394 })
395 .filter(|(a, b)| active_runtimes.contains_key(a) && active_runtimes.contains_key(b))
396 .cloned()
397 .collect();
398 let to_wire: Vec<(AgentIdentity, AgentIdentity, AgentRuntimeId, AgentRuntimeId)> = desired
399 .iter()
400 .filter(|edge| !managed_snapshot.contains(*edge))
401 .filter(|edge| {
402 current_logical_edges
403 .as_ref()
404 .is_none_or(|edges| !edges.contains(*edge))
405 })
406 .filter_map(|(a, b)| {
407 let runtime_a = active_runtimes.get(a)?;
408 let runtime_b = active_runtimes.get(b)?;
409 Some((a.clone(), b.clone(), runtime_a.clone(), runtime_b.clone()))
410 })
411 .collect();
412
413 let stale: Vec<(AgentIdentity, AgentIdentity)> = managed_snapshot
414 .iter()
415 .filter(|edge| !desired.contains(*edge))
416 .cloned()
417 .collect();
418 let to_unwire: Vec<(AgentIdentity, AgentIdentity, AgentRuntimeId, AgentRuntimeId)> = stale
419 .iter()
420 .filter_map(|(a, b)| {
421 let runtime_a = active_runtimes.get(a)?;
422 let runtime_b = active_runtimes.get(b)?;
423 if current_logical_edges
424 .as_ref()
425 .is_some_and(|edges| !edges.contains(&(a.clone(), b.clone())))
426 {
427 return None;
428 }
429 Some((a.clone(), b.clone(), runtime_a.clone(), runtime_b.clone()))
430 })
431 .collect();
432
433 let wire_logical_edges = to_wire
434 .iter()
435 .map(|(a, b, _, _)| (a.clone(), b.clone()))
436 .collect::<Vec<_>>();
437 let wire_runtime_edges = to_wire
438 .iter()
439 .map(|(_, _, runtime_a, runtime_b)| (runtime_a.clone(), runtime_b.clone()))
440 .collect::<Vec<_>>();
441 if !wire_runtime_edges.is_empty() {
442 bridge
443 .wire_peers_batch(&wire_runtime_edges)
444 .await
445 .map_err(|e| {
446 IdentityRuntimeError::Internal(format!("bridge wire_peers_batch: {e}"))
447 })?;
448 }
449
450 let unwire_results =
451 stream::iter(to_unwire.into_iter().map(|(a, b, runtime_a, runtime_b)| {
452 let bridge = bridge.clone();
453 async move {
454 let result = bridge
455 .unwire_peer(&runtime_a, &runtime_b)
456 .await
457 .map_err(|e| format!("{e}"));
458 (a, b, result)
459 }
460 }))
461 .buffer_unordered(MANAGED_PEER_RECONCILE_CONCURRENCY)
462 .collect::<Vec<_>>()
463 .await;
464
465 let mut managed = self.managed_peer_edges.write().await;
466 for (a, b) in retained_logical_edges {
467 managed.insert((a, b));
468 }
469 for (a, b) in wire_logical_edges {
470 managed.insert((a, b));
471 }
472
473 for (a, b) in stale {
474 let key = (a.clone(), b.clone());
475 if !active_runtimes.contains_key(&a) || !active_runtimes.contains_key(&b) {
476 managed.remove(&key);
477 }
478 }
479 for (a, b, result) in unwire_results {
480 result
481 .map_err(|e| IdentityRuntimeError::Internal(format!("bridge unwire_peer: {e}")))?;
482 managed.remove(&(a, b));
483 }
484
485 Ok(())
486 }
487
488 async fn emit_event(&self, identity: &AgentIdentity, event: IdentityEvent) {
490 let channels = self.event_channels.read().await;
491 if let Some(tx) = channels.get(identity) {
492 let _ = tx.send(event);
493 }
494 }
495
496 pub async fn register(
502 &self,
503 spec: DurableAgentSpec,
504 state: IdentityLifecycleState,
505 continuity: Option<ContinuityRecord>,
506 lease: Option<LeaseGrant>,
507 ) {
508 let identity = spec.identity.clone();
509 let cpv = continuity
510 .as_ref()
511 .map(|r| r.checkpoint_version)
512 .unwrap_or(CheckpointVersion::new(0));
513 let lease_entry = lease.map(|g| LeaseEntry {
514 fencing_token: g.fencing_token,
515 ttl: g.ttl,
516 acquired_at: Instant::now(),
517 });
518 let entry = IdentityEntry {
519 spec,
520 state,
521 continuity,
522 lease: lease_entry,
523 checkpoint_version: cpv,
524 has_runtime_store: self.has_runtime_store,
525 };
526 self.entries.write().await.insert(identity.clone(), entry);
527
528 let (tx, _) = broadcast::channel(IDENTITY_EVENT_CHANNEL_CAPACITY);
530 self.event_channels.write().await.insert(identity, tx);
531 }
532
533 pub async fn subscribe(
542 &self,
543 identity: &AgentIdentity,
544 ) -> Result<broadcast::Receiver<IdentityEvent>, IdentityRuntimeError> {
545 let channels = self.event_channels.read().await;
546 let tx = channels
547 .get(identity)
548 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
549 Ok(tx.subscribe())
550 }
551
552 pub async fn update_spec(&self, spec: DurableAgentSpec) -> Result<(), IdentityRuntimeError> {
554 let mut entries = self.entries.write().await;
555 let entry = entries
556 .get_mut(&spec.identity)
557 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(spec.identity.clone()))?;
558 entry.spec = spec;
559 Ok(())
560 }
561
562 pub async fn update_lease(
564 &self,
565 identity: &AgentIdentity,
566 grant: LeaseGrant,
567 ) -> Result<(), IdentityRuntimeError> {
568 let fencing_token = grant.fencing_token;
569 let mut entries = self.entries.write().await;
570 let entry = entries
571 .get_mut(identity)
572 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
573 entry.lease = Some(LeaseEntry {
574 fencing_token,
575 ttl: grant.ttl,
576 acquired_at: Instant::now(),
577 });
578 drop(entries);
579 self.emit_event(
580 identity,
581 IdentityEvent::LeaseUpdated {
582 identity: identity.clone(),
583 fencing_token,
584 },
585 )
586 .await;
587 Ok(())
588 }
589
590 pub async fn mark_lease_lost(
592 &self,
593 identity: &AgentIdentity,
594 ) -> Result<(), IdentityRuntimeError> {
595 let mut entries = self.entries.write().await;
596 let entry = entries
597 .get_mut(identity)
598 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
599 entry.lease = None;
600 drop(entries);
601 self.emit_event(
602 identity,
603 IdentityEvent::LeaseLost {
604 identity: identity.clone(),
605 },
606 )
607 .await;
608 Ok(())
609 }
610
611 #[allow(dead_code)]
613 pub(crate) async fn remove(&self, identity: &AgentIdentity) -> Option<IdentityEntry> {
614 self.event_channels.write().await.remove(identity);
615 self.entries.write().await.remove(identity)
616 }
617
618 pub async fn set_state(
620 &self,
621 identity: &AgentIdentity,
622 state: IdentityLifecycleState,
623 ) -> Result<(), IdentityRuntimeError> {
624 let mut entries = self.entries.write().await;
625 let entry = entries
626 .get_mut(identity)
627 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
628 entry.state = state;
629 drop(entries);
630 self.emit_event(
631 identity,
632 IdentityEvent::StateChanged {
633 identity: identity.clone(),
634 new_state: state,
635 },
636 )
637 .await;
638 Ok(())
639 }
640
641 fn check_lease(entry: &IdentityEntry) -> Result<FencingToken, IdentityRuntimeError> {
648 match &entry.lease {
649 Some(lease) if !lease.is_expired() => Ok(lease.fencing_token),
650 Some(_) => Err(IdentityRuntimeError::LeaseLost(entry.spec.identity.clone())),
651 None => Err(IdentityRuntimeError::NoActiveLease(
652 entry.spec.identity.clone(),
653 )),
654 }
655 }
656
657 pub async fn send(
671 &self,
672 identity: &AgentIdentity,
673 content: &meerkat_core::ContentInput,
674 ) -> Result<FencingToken, IdentityRuntimeError> {
675 let (token, runtime_id) = {
676 let entries = self.entries.read().await;
677 let entry = entries
678 .get(identity)
679 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
680
681 if entry.spec.addressability == AgentAddressability::InternalOnly {
683 return Err(IdentityRuntimeError::NotAddressable(NotAddressable {
684 identity: identity.clone(),
685 addressability: entry.spec.addressability,
686 }));
687 }
688
689 let token = Self::check_lease(entry)?;
691
692 let runtime_id = entry
693 .continuity
694 .as_ref()
695 .map(|c| c.agent_runtime_id.clone());
696
697 (token, runtime_id)
698 };
699
700 if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
702 bridge
703 .deliver(rid, content)
704 .await
705 .map_err(|e| IdentityRuntimeError::Internal(format!("bridge deliver: {e}")))?;
706 }
707
708 Ok(token)
709 }
710
711 pub async fn dispatch(
725 &self,
726 identity: &AgentIdentity,
727 input: &DispatchInput,
728 ) -> Result<(FencingToken, bool), IdentityRuntimeError> {
729 let (token, is_durable, runtime_id) = {
730 let entries = self.entries.read().await;
731 let entry = entries
732 .get(identity)
733 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
734
735 let token = Self::check_lease(entry)?;
737
738 let is_durable = entry.has_runtime_store;
740
741 let runtime_id = entry
742 .continuity
743 .as_ref()
744 .map(|c| c.agent_runtime_id.clone());
745
746 (token, is_durable, runtime_id)
747 };
748
749 if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
751 bridge
752 .deliver(rid, &input.content)
753 .await
754 .map_err(|e| IdentityRuntimeError::Internal(format!("bridge dispatch: {e}")))?;
755 }
756
757 Ok((token, is_durable))
758 }
759
760 pub async fn status(
766 &self,
767 identity: &AgentIdentity,
768 ) -> Result<IdentityStatus, IdentityRuntimeError> {
769 let entries = self.entries.read().await;
770 let entry = entries
771 .get(identity)
772 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
773
774 let lease_info = entry.lease.as_ref().map(|l| LeaseInfo {
775 fencing_token: l.fencing_token,
776 ttl_remaining: l.ttl_remaining(),
777 healthy: l.is_healthy(),
778 });
779
780 let continuity_health = Some(ContinuityHealth {
781 store_reachable: true, durability_policy: self.durability_policy.clone(),
783 last_checkpoint_version: if entry.checkpoint_version.get() > 0 {
784 Some(entry.checkpoint_version)
785 } else {
786 None
787 },
788 });
789
790 Ok(IdentityStatus {
791 identity: identity.clone(),
792 state: entry.state,
793 agent_runtime_id: entry
794 .continuity
795 .as_ref()
796 .map(|c| c.agent_runtime_id.clone()),
797 session_id: entry.continuity.as_ref().map(|c| c.session_id.clone()),
798 profile: Some(entry.spec.profile.clone()),
799 runtime_mode: entry.spec.runtime_mode_override,
800 addressability: entry.spec.addressability,
801 display_name: entry.spec.display_name.clone(),
802 labels: entry.spec.labels.clone(),
803 generation: entry.continuity.as_ref().map(|c| c.generation),
804 checkpoint_version: if entry.checkpoint_version.get() > 0 {
805 Some(entry.checkpoint_version)
806 } else {
807 None
808 },
809 lease: lease_info,
810 continuity_health,
811 })
812 }
813
814 pub async fn retire(
820 &self,
821 identity: &AgentIdentity,
822 ) -> Result<FencingToken, IdentityRuntimeError> {
823 let (token, runtime_id) = {
824 let mut entries = self.entries.write().await;
825 let entry = entries
826 .get_mut(identity)
827 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
828
829 let token = Self::check_lease(entry)?;
830 entry.state = IdentityLifecycleState::Retiring;
831
832 let runtime_id = entry
833 .continuity
834 .as_ref()
835 .map(|c| c.agent_runtime_id.clone());
836
837 (token, runtime_id)
838 };
839
840 if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
842 bridge
843 .retire_member(rid)
844 .await
845 .map_err(|e| IdentityRuntimeError::Internal(format!("bridge retire: {e}")))?;
846 }
847
848 Ok(token)
849 }
850
851 pub async fn respawn(
862 &self,
863 identity: &AgentIdentity,
864 ) -> Result<ContinuityRecord, IdentityRuntimeError> {
865 let acquire_result = self
867 .lease_provider
868 .acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
869 .await
870 .map_err(IdentityRuntimeError::Lease)?;
871
872 let grant = match acquire_result.get(identity) {
873 Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
874 _ => {
875 return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
876 }
877 };
878
879 let resolved = self
881 .continuity_store
882 .resolve_many(std::slice::from_ref(identity))
883 .await
884 .map_err(IdentityRuntimeError::Store)?;
885
886 let record = match resolved.get(identity) {
887 Some(super::types::ContinuityResolveState::Ready { record }) => record.clone(),
888 Some(super::types::ContinuityResolveState::Broken { failure }) => {
889 return Err(IdentityRuntimeError::Internal(format!(
890 "broken continuity for {identity}: {}",
891 failure.detail
892 )));
893 }
894 Some(super::types::ContinuityResolveState::Uninitialized) => {
895 return Err(IdentityRuntimeError::Internal(format!(
896 "cannot respawn uninitialized identity {identity}"
897 )));
898 }
899 None => {
900 return Err(IdentityRuntimeError::Store(
901 ContinuityStoreError::NotFound {
902 identity: identity.clone(),
903 },
904 ));
905 }
906 };
907
908 let mut entries = self.entries.write().await;
910 if let Some(entry) = entries.get_mut(identity) {
911 entry.continuity = Some(record.clone());
912 entry.lease = Some(LeaseEntry {
913 fencing_token: grant.fencing_token,
914 ttl: grant.ttl,
915 acquired_at: Instant::now(),
916 });
917 entry.state = IdentityLifecycleState::Active;
918 entry.checkpoint_version = record.checkpoint_version;
919 }
920
921 Ok(record)
922 }
923
924 pub async fn reset(
935 &self,
936 identity: &AgentIdentity,
937 ) -> Result<ContinuityRecord, IdentityRuntimeError> {
938 let acquire_result = self
940 .lease_provider
941 .acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
942 .await
943 .map_err(IdentityRuntimeError::Lease)?;
944
945 let grant = match acquire_result.get(identity) {
946 Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
947 _ => {
948 return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
949 }
950 };
951
952 let resolved = self
954 .continuity_store
955 .resolve_many(std::slice::from_ref(identity))
956 .await
957 .map_err(IdentityRuntimeError::Store)?;
958
959 let current_gen = match resolved.get(identity) {
960 Some(super::types::ContinuityResolveState::Ready { record }) => record.generation,
961 Some(super::types::ContinuityResolveState::Uninitialized) => {
962 ContinuityGeneration::new(0)
963 }
964 _ => ContinuityGeneration::new(0),
965 };
966
967 let new_gen = ContinuityGeneration::new(current_gen.get() + 1);
969 let new_session_id = meerkat_core::types::SessionId::new();
970 let new_runtime_id = AgentRuntimeId::parse(&format!("{}:gen{}", identity, new_gen.get()))
971 .map_err(|e| {
972 IdentityRuntimeError::Internal(format!("failed to mint runtime id: {e}"))
973 })?;
974
975 let new_record = ContinuityRecord {
976 identity: identity.clone(),
977 agent_runtime_id: new_runtime_id,
978 session_id: new_session_id,
979 generation: new_gen,
980 checkpoint_version: CheckpointVersion::new(0),
981 };
982
983 self.continuity_store
985 .upsert_continuity_record(&new_record, grant.fencing_token)
986 .await?;
987
988 if let Some(bridge) = &self.bridge {
990 let old_runtime_id = {
992 let entries = self.entries.read().await;
993 entries
994 .get(identity)
995 .and_then(|e| e.continuity.as_ref().map(|c| c.agent_runtime_id.clone()))
996 };
997 if let Some(old_id) = old_runtime_id {
998 let _ = bridge.retire_member(&old_id).await;
999 }
1000
1001 let spec = {
1003 let entries = self.entries.read().await;
1004 entries.get(identity).map(|e| e.spec.clone())
1005 };
1006 if let Some(spec) = spec {
1007 let draft = super::types::AgentBuildDraft {
1008 model: None,
1009 system_prompt: None,
1010 additional_instructions: spec.additional_instructions.clone(),
1011 labels: spec.labels.clone(),
1012 app_context: spec.context.clone(),
1013 external_tools: Vec::new(),
1014 local_external_tools: Default::default(),
1015 };
1016 bridge
1017 .register_session_runtime_state(
1018 &new_record.session_id,
1019 identity,
1020 new_record.generation,
1021 new_record.checkpoint_version,
1022 grant.fencing_token,
1023 )
1024 .await
1025 .map_err(|e| {
1026 IdentityRuntimeError::Internal(format!(
1027 "bridge register_session_runtime_state after reset: {e}"
1028 ))
1029 })?;
1030 let session_id = bridge
1031 .create_session(
1032 identity,
1033 &new_record.agent_runtime_id,
1034 &spec,
1035 &draft,
1036 &new_record.session_id,
1037 )
1038 .await
1039 .map_err(|e| {
1040 IdentityRuntimeError::Internal(format!(
1041 "bridge create_session after reset: {e}"
1042 ))
1043 })?;
1044 let initial_session_id = new_record.session_id.clone();
1046 let mut new_record = new_record;
1047 new_record.session_id = session_id;
1048
1049 if new_record.session_id != initial_session_id {
1051 self.continuity_store
1052 .upsert_continuity_record(&new_record, grant.fencing_token)
1053 .await?;
1054 }
1055 let effective_checkpoint_version = bridge
1056 .register_session_runtime_state(
1057 &new_record.session_id,
1058 identity,
1059 new_record.generation,
1060 new_record.checkpoint_version,
1061 grant.fencing_token,
1062 )
1063 .await
1064 .map_err(|e| {
1065 IdentityRuntimeError::Internal(format!(
1066 "bridge register actual session runtime state after reset: {e}"
1067 ))
1068 })?;
1069 new_record.checkpoint_version = effective_checkpoint_version;
1070
1071 let mut entries = self.entries.write().await;
1073 if let Some(entry) = entries.get_mut(identity) {
1074 entry.continuity = Some(new_record.clone());
1075 entry.lease = Some(LeaseEntry {
1076 fencing_token: grant.fencing_token,
1077 ttl: grant.ttl,
1078 acquired_at: Instant::now(),
1079 });
1080 entry.state = IdentityLifecycleState::Active;
1081 entry.checkpoint_version = new_record.checkpoint_version;
1082 }
1083 return Ok(new_record);
1084 }
1085 }
1086
1087 let mut entries = self.entries.write().await;
1089 if let Some(entry) = entries.get_mut(identity) {
1090 entry.continuity = Some(new_record.clone());
1091 entry.lease = Some(LeaseEntry {
1092 fencing_token: grant.fencing_token,
1093 ttl: grant.ttl,
1094 acquired_at: Instant::now(),
1095 });
1096 entry.state = IdentityLifecycleState::Active;
1097 entry.checkpoint_version = CheckpointVersion::new(0);
1098 }
1099
1100 Ok(new_record)
1101 }
1102
1103 pub async fn delete_identity(
1113 &self,
1114 identity: &AgentIdentity,
1115 ) -> Result<(), IdentityRuntimeError> {
1116 let acquire_result = self
1118 .lease_provider
1119 .acquire_leases(std::slice::from_ref(identity), &self.runtime_instance_id)
1120 .await
1121 .map_err(IdentityRuntimeError::Lease)?;
1122
1123 let grant = match acquire_result.get(identity) {
1124 Some(super::types::LeaseAcquireResult::Acquired(g)) => g.clone(),
1125 _ => {
1126 return Err(IdentityRuntimeError::NoActiveLease(identity.clone()));
1127 }
1128 };
1129
1130 let runtime_id = {
1133 let entries = self.entries.read().await;
1134 entries
1135 .get(identity)
1136 .and_then(|e| e.continuity.as_ref().map(|c| c.agent_runtime_id.clone()))
1137 };
1138 if let (Some(bridge), Some(rid)) = (&self.bridge, &runtime_id) {
1139 let _ = bridge.retire_member(rid).await;
1141 }
1142
1143 self.continuity_store
1145 .delete_continuity_record(identity, grant.fencing_token)
1146 .await?;
1147
1148 self.event_channels.write().await.remove(identity);
1150 self.entries.write().await.remove(identity);
1151
1152 Ok(())
1153 }
1154
1155 pub async fn checkpoint(
1161 &self,
1162 identity: &AgentIdentity,
1163 snapshot: &SessionSnapshot,
1164 ) -> Result<CheckpointVersion, IdentityRuntimeError> {
1165 let (record, token, new_version) = {
1166 let entries = self.entries.read().await;
1167 let entry = entries
1168 .get(identity)
1169 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
1170
1171 let token = Self::check_lease(entry)?;
1173
1174 let record = entry
1175 .continuity
1176 .as_ref()
1177 .ok_or_else(|| {
1178 IdentityRuntimeError::Internal(format!("no continuity record for {identity}"))
1179 })?
1180 .clone();
1181
1182 let new_version = CheckpointVersion::new(entry.checkpoint_version.get() + 1);
1183 (record, token, new_version)
1184 };
1185
1186 self.continuity_store
1188 .save_session_snapshot(
1189 identity,
1190 &record.session_id,
1191 record.generation,
1192 new_version,
1193 token,
1194 snapshot,
1195 )
1196 .await?;
1197
1198 {
1200 let mut entries = self.entries.write().await;
1201 if let Some(entry) = entries.get_mut(identity) {
1202 entry.checkpoint_version = new_version;
1203 }
1204 }
1205
1206 self.emit_event(
1207 identity,
1208 IdentityEvent::CheckpointCompleted {
1209 identity: identity.clone(),
1210 version: new_version,
1211 },
1212 )
1213 .await;
1214
1215 Ok(new_version)
1216 }
1217
1218 pub async fn roster_inspect(
1224 &self,
1225 ) -> BTreeMap<AgentIdentity, (DurableAgentSpec, IdentityStatus)> {
1226 let entries = self.entries.read().await;
1227 let mut result = BTreeMap::new();
1228 for (identity, entry) in entries.iter() {
1229 let lease_info = entry.lease.as_ref().map(|l| LeaseInfo {
1230 fencing_token: l.fencing_token,
1231 ttl_remaining: l.ttl_remaining(),
1232 healthy: l.is_healthy(),
1233 });
1234 let continuity_health = Some(ContinuityHealth {
1235 store_reachable: true,
1236 durability_policy: self.durability_policy.clone(),
1237 last_checkpoint_version: if entry.checkpoint_version.get() > 0 {
1238 Some(entry.checkpoint_version)
1239 } else {
1240 None
1241 },
1242 });
1243 let status = IdentityStatus {
1244 identity: identity.clone(),
1245 state: entry.state,
1246 agent_runtime_id: entry
1247 .continuity
1248 .as_ref()
1249 .map(|c| c.agent_runtime_id.clone()),
1250 session_id: entry.continuity.as_ref().map(|c| c.session_id.clone()),
1251 profile: Some(entry.spec.profile.clone()),
1252 runtime_mode: entry.spec.runtime_mode_override,
1253 addressability: entry.spec.addressability,
1254 display_name: entry.spec.display_name.clone(),
1255 labels: entry.spec.labels.clone(),
1256 generation: entry.continuity.as_ref().map(|c| c.generation),
1257 checkpoint_version: if entry.checkpoint_version.get() > 0 {
1258 Some(entry.checkpoint_version)
1259 } else {
1260 None
1261 },
1262 lease: lease_info,
1263 continuity_health,
1264 };
1265 result.insert(identity.clone(), (entry.spec.clone(), status));
1266 }
1267 result
1268 }
1269
1270 pub fn validate_roster_uniqueness(
1276 specs: &[DurableAgentSpec],
1277 ) -> Result<(), IdentityRuntimeError> {
1278 let mut seen = std::collections::BTreeSet::new();
1279 for spec in specs {
1280 if !seen.insert(&spec.identity) {
1281 return Err(IdentityRuntimeError::DuplicateIdentity(
1282 spec.identity.clone(),
1283 ));
1284 }
1285 }
1286 Ok(())
1287 }
1288
1289 #[allow(dead_code)]
1295 pub(crate) async fn entries(&self) -> BTreeMap<AgentIdentity, IdentityEntry> {
1296 self.entries.read().await.clone()
1297 }
1298
1299 pub async fn contains(&self, identity: &AgentIdentity) -> bool {
1301 self.entries.read().await.contains_key(identity)
1302 }
1303
1304 pub async fn is_active(&self, identity: &AgentIdentity) -> bool {
1306 self.entries
1307 .read()
1308 .await
1309 .get(identity)
1310 .is_some_and(|e| e.state == IdentityLifecycleState::Active)
1311 }
1312
1313 pub fn continuity_store(&self) -> &Arc<dyn ContinuityStore> {
1315 &self.continuity_store
1316 }
1317
1318 pub fn lease_provider(&self) -> &Arc<dyn LeaseProvider> {
1320 &self.lease_provider
1321 }
1322
1323 pub fn runtime_instance_id(&self) -> &str {
1325 &self.runtime_instance_id
1326 }
1327
1328 pub fn durability_policy(&self) -> &DurabilityPolicy {
1330 &self.durability_policy
1331 }
1332
1333 pub fn has_runtime_store(&self) -> bool {
1335 self.has_runtime_store
1336 }
1337
1338 pub fn bridge(&self) -> Option<&Arc<dyn SessionBridge>> {
1340 self.bridge.as_ref()
1341 }
1342
1343 pub async fn send_text(
1349 &self,
1350 identity: &AgentIdentity,
1351 text: impl Into<String>,
1352 ) -> Result<FencingToken, IdentityRuntimeError> {
1353 self.send(identity, &meerkat_core::ContentInput::Text(text.into()))
1354 .await
1355 }
1356
1357 pub async fn dispatch_text(
1359 &self,
1360 identity: &AgentIdentity,
1361 text: impl Into<String>,
1362 ) -> Result<(FencingToken, bool), IdentityRuntimeError> {
1363 self.dispatch(identity, &DispatchInput::system(text)).await
1364 }
1365
1366 pub async fn restore_flow(
1368 &self,
1369 roster: &[DurableAgentSpec],
1370 topology_provider: Option<&dyn super::contracts::TopologyProvider>,
1371 customizer: Option<&dyn super::contracts::AgentCustomizer>,
1372 ) -> Result<super::orchestrator::RestoreFlowResult, IdentityRuntimeError> {
1373 super::orchestrator::restore_flow(self, roster, topology_provider, customizer).await
1374 }
1375
1376 pub async fn runtime_id_for(
1378 &self,
1379 identity: &AgentIdentity,
1380 ) -> Result<AgentRuntimeId, IdentityRuntimeError> {
1381 let entries = self.entries.read().await;
1382 let entry = entries
1383 .get(identity)
1384 .ok_or_else(|| IdentityRuntimeError::UnknownIdentity(identity.clone()))?;
1385 entry
1386 .continuity
1387 .as_ref()
1388 .map(|c| c.agent_runtime_id.clone())
1389 .ok_or_else(|| {
1390 IdentityRuntimeError::Internal(format!("no continuity record for {identity}"))
1391 })
1392 }
1393
1394 pub async fn inspect(
1396 &self,
1397 identity: &AgentIdentity,
1398 ) -> Result<super::bridge::MemberInspection, IdentityRuntimeError> {
1399 let runtime_id = self.runtime_id_for(identity).await?;
1400 let bridge = self
1401 .bridge
1402 .as_ref()
1403 .ok_or_else(|| IdentityRuntimeError::Internal("no bridge configured".to_string()))?;
1404 bridge
1405 .inspect_member(&runtime_id)
1406 .await
1407 .map_err(|e| IdentityRuntimeError::Internal(format!("inspect: {e}")))
1408 }
1409
1410 pub fn default_timeout(&self) -> Duration {
1412 self.default_timeout
1413 }
1414
1415 pub async fn wait_for_output(
1417 &self,
1418 identity: &AgentIdentity,
1419 timeout: Duration,
1420 ) -> Result<String, IdentityRuntimeError> {
1421 let deadline = Instant::now() + timeout;
1422 loop {
1423 if let Ok(inspection) = self.inspect(identity).await
1424 && let Some(preview) = inspection.output_preview
1425 {
1426 return Ok(preview);
1427 }
1428 if Instant::now() >= deadline {
1429 return Err(IdentityRuntimeError::Internal(format!(
1430 "timed out waiting for output from {identity}"
1431 )));
1432 }
1433 tokio::time::sleep(Duration::from_millis(500)).await;
1434 }
1435 }
1436
1437 pub async fn wait_for_output_containing(
1439 &self,
1440 identity: &AgentIdentity,
1441 needle: &str,
1442 timeout: Duration,
1443 ) -> Result<String, IdentityRuntimeError> {
1444 let deadline = Instant::now() + timeout;
1445 loop {
1446 if let Ok(inspection) = self.inspect(identity).await
1447 && let Some(ref preview) = inspection.output_preview
1448 && preview.contains(needle)
1449 {
1450 return Ok(preview.clone());
1451 }
1452 if Instant::now() >= deadline {
1453 return Err(IdentityRuntimeError::Internal(format!(
1454 "timed out waiting for output containing '{needle}' from {identity}"
1455 )));
1456 }
1457 tokio::time::sleep(Duration::from_millis(500)).await;
1458 }
1459 }
1460}
1461
1462pub async fn wire_cross_mob_by_identity(
1468 local_irt: &IdentityRuntime,
1469 local_identity: &AgentIdentity,
1470 remote_irt: &IdentityRuntime,
1471 remote_identity: &AgentIdentity,
1472 local_unified: &crate::UnifiedRuntime,
1473 remote_mob_id: &str,
1474) -> Result<(), IdentityRuntimeError> {
1475 let local_rt = local_irt.runtime_id_for(local_identity).await?;
1476 let remote_rt = remote_irt.runtime_id_for(remote_identity).await?;
1477 local_unified
1478 .wire_cross_mob(local_rt.as_str(), remote_rt.as_str(), remote_mob_id)
1479 .await
1480 .map_err(|e| IdentityRuntimeError::Internal(format!("wire_cross_mob: {e}")))
1481}