#![allow(
clippy::all,
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
unused_imports,
redundant_semicolons,
clippy::redundant_clone
)]
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use meerkat_core::types::HandlingMode;
use meerkat_mobkit::identity_first::contracts::{
AgentCustomizer, ContinuityStore, LeaseProvider, TopologyProvider,
};
use meerkat_mobkit::identity_first::orchestrator::{
ReconcileAction, RestoreOutcome, compute_reconcile_actions, lazy_register_flow, restore_flow,
};
use meerkat_mobkit::identity_first::runtime::IdentityEvent;
use meerkat_mobkit::identity_first::{
AgentAddressability, AgentBuildContext, AgentBuildDraft, AgentIdentity, AgentRuntimeId,
BridgeError, CheckpointVersion, ContinuityFailure, ContinuityFailureKind, ContinuityGeneration,
ContinuityRecord, ContinuityResolveState, ContinuityStoreError, CustomizerError,
DispatchIdempotencyKey, DispatchInput, DispatchOrigin, DurabilityPolicy, DurableAgentSpec,
FencingToken, IdentityLifecycleState, IdentityRuntime, IdentityRuntimeConfig,
IdentityRuntimeError, LeaseAcquireResult, LeaseError, LeaseGrant, LeaseRenewResult,
ManagedPeerEdge, SessionBridge, SessionSnapshot, TopologyContext, TopologyError,
};
use meerkat_mobkit::identity_first::{LocalContinuityStore, LocalLeaseProvider};
fn make_identity(name: &str) -> AgentIdentity {
AgentIdentity::parse(name).unwrap()
}
fn make_spec(name: &str) -> DurableAgentSpec {
DurableAgentSpec {
identity: make_identity(name),
profile: meerkat_mob::ProfileName::from("default"),
addressability: AgentAddressability::Addressable,
display_name: None,
labels: BTreeMap::new(),
context: None,
additional_instructions: Vec::new(),
initial_message: None,
runtime_mode_override: None,
}
}
fn make_internal_spec(name: &str) -> DurableAgentSpec {
DurableAgentSpec {
addressability: AgentAddressability::InternalOnly,
..make_spec(name)
}
}
fn make_record(name: &str, generation: u64, cpv: u64) -> ContinuityRecord {
ContinuityRecord {
identity: make_identity(name),
agent_runtime_id: AgentRuntimeId::parse(&format!("rt:{name}:{generation}")).unwrap(),
session_id: meerkat_core::types::SessionId::new(),
generation: ContinuityGeneration::new(generation),
checkpoint_version: CheckpointVersion::new(cpv),
}
}
fn make_grant(name: &str, token: u64) -> LeaseGrant {
LeaseGrant {
identity: make_identity(name),
fencing_token: FencingToken::new(token),
ttl: Duration::from_mins(5),
}
}
fn make_runtime(store: Arc<dyn ContinuityStore>, lease: Arc<dyn LeaseProvider>) -> IdentityRuntime {
IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store,
lease_provider: lease,
runtime_instance_id: "test-runtime".to_string(),
has_runtime_store: false,
durability_policy: DurabilityPolicy::SyncWriteThrough,
bridge: None,
default_timeout: None,
})
}
fn make_runtime_with_store(
store: Arc<dyn ContinuityStore>,
lease: Arc<dyn LeaseProvider>,
) -> IdentityRuntime {
IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store,
lease_provider: lease,
runtime_instance_id: "test-runtime".to_string(),
has_runtime_store: true,
durability_policy: DurabilityPolicy::SyncWriteThrough,
bridge: None,
default_timeout: None,
})
}
fn make_runtime_with_bridge(
store: Arc<dyn ContinuityStore>,
lease: Arc<dyn LeaseProvider>,
bridge: Arc<dyn SessionBridge>,
) -> Arc<IdentityRuntime> {
Arc::new(IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store,
lease_provider: lease,
runtime_instance_id: "test-runtime".to_string(),
has_runtime_store: true,
durability_policy: DurabilityPolicy::SyncWriteThrough,
bridge: Some(bridge),
default_timeout: None,
}))
}
struct CountingContinuityStore {
inner: Arc<LocalContinuityStore>,
resolve_many_calls: AtomicUsize,
load_snapshot_calls: AtomicUsize,
}
#[derive(Default)]
struct MissingLeaseProvider;
#[async_trait]
impl LeaseProvider for MissingLeaseProvider {
async fn acquire_leases(
&self,
_identities: &[AgentIdentity],
_runtime_instance: &str,
) -> Result<BTreeMap<AgentIdentity, LeaseAcquireResult>, LeaseError> {
Ok(BTreeMap::new())
}
async fn renew_leases(
&self,
_grants: &[LeaseGrant],
) -> Result<BTreeMap<AgentIdentity, LeaseRenewResult>, LeaseError> {
Ok(BTreeMap::new())
}
async fn release_leases(&self, _grants: &[LeaseGrant]) -> Result<(), LeaseError> {
Ok(())
}
}
impl CountingContinuityStore {
fn new() -> Self {
Self {
inner: Arc::new(LocalContinuityStore::in_memory().unwrap()),
resolve_many_calls: AtomicUsize::new(0),
load_snapshot_calls: AtomicUsize::new(0),
}
}
fn reset_counts(&self) {
self.resolve_many_calls.store(0, Ordering::SeqCst);
self.load_snapshot_calls.store(0, Ordering::SeqCst);
}
fn resolve_many_calls(&self) -> usize {
self.resolve_many_calls.load(Ordering::SeqCst)
}
fn load_snapshot_calls(&self) -> usize {
self.load_snapshot_calls.load(Ordering::SeqCst)
}
}
#[async_trait]
impl ContinuityStore for CountingContinuityStore {
async fn resolve_many(
&self,
identities: &[AgentIdentity],
) -> Result<BTreeMap<AgentIdentity, ContinuityResolveState>, ContinuityStoreError> {
self.resolve_many_calls.fetch_add(1, Ordering::SeqCst);
self.inner.resolve_many(identities).await
}
async fn load_session_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<SessionSnapshot>, ContinuityStoreError> {
self.load_snapshot_calls.fetch_add(1, Ordering::SeqCst);
self.inner.load_session_snapshot(session_id).await
}
async fn save_session_snapshot(
&self,
identity: &AgentIdentity,
session_id: &meerkat_core::types::SessionId,
generation: ContinuityGeneration,
version: CheckpointVersion,
fencing_token: FencingToken,
snapshot: &SessionSnapshot,
) -> Result<(), ContinuityStoreError> {
self.inner
.save_session_snapshot(
identity,
session_id,
generation,
version,
fencing_token,
snapshot,
)
.await
}
async fn upsert_continuity_record(
&self,
record: &ContinuityRecord,
fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
self.inner
.upsert_continuity_record(record, fencing_token)
.await
}
async fn delete_continuity_record(
&self,
identity: &AgentIdentity,
fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
self.inner
.delete_continuity_record(identity, fencing_token)
.await
}
}
struct FaultyContinuityStore {
inner: Arc<LocalContinuityStore>,
fail_upsert: AtomicBool,
fail_upsert_once: AtomicBool,
fail_delete: AtomicBool,
allow_upserts: AtomicUsize,
}
impl FaultyContinuityStore {
fn new() -> Self {
Self {
inner: Arc::new(LocalContinuityStore::in_memory().unwrap()),
fail_upsert: AtomicBool::new(false),
fail_upsert_once: AtomicBool::new(false),
fail_delete: AtomicBool::new(false),
allow_upserts: AtomicUsize::new(0),
}
}
fn fail_upsert(&self) {
self.fail_upsert.store(true, Ordering::SeqCst);
}
fn fail_delete(&self) {
self.fail_delete.store(true, Ordering::SeqCst);
}
fn fail_next_upsert_after_successes(&self, count: usize) {
self.allow_upserts.store(count, Ordering::SeqCst);
self.fail_upsert_once.store(true, Ordering::SeqCst);
self.fail_upsert();
}
fn fail_upserts_persistently_after_successes(&self, count: usize) {
self.allow_upserts.store(count, Ordering::SeqCst);
self.fail_upsert_once.store(false, Ordering::SeqCst);
self.fail_upsert();
}
}
#[async_trait]
impl ContinuityStore for FaultyContinuityStore {
async fn resolve_many(
&self,
identities: &[AgentIdentity],
) -> Result<BTreeMap<AgentIdentity, ContinuityResolveState>, ContinuityStoreError> {
self.inner.resolve_many(identities).await
}
async fn load_session_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<SessionSnapshot>, ContinuityStoreError> {
self.inner.load_session_snapshot(session_id).await
}
async fn save_session_snapshot(
&self,
identity: &AgentIdentity,
session_id: &meerkat_core::types::SessionId,
generation: ContinuityGeneration,
version: CheckpointVersion,
fencing_token: FencingToken,
snapshot: &SessionSnapshot,
) -> Result<(), ContinuityStoreError> {
self.inner
.save_session_snapshot(
identity,
session_id,
generation,
version,
fencing_token,
snapshot,
)
.await
}
async fn upsert_continuity_record(
&self,
record: &ContinuityRecord,
fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
if self.fail_upsert.load(Ordering::SeqCst) {
let allowed = self.allow_upserts.load(Ordering::SeqCst);
if allowed > 0 {
self.allow_upserts.fetch_sub(1, Ordering::SeqCst);
} else {
if self.fail_upsert_once.swap(false, Ordering::SeqCst) {
self.fail_upsert.store(false, Ordering::SeqCst);
}
return Err(ContinuityStoreError::Io("upsert failed".to_string()));
}
}
self.inner
.upsert_continuity_record(record, fencing_token)
.await
}
async fn delete_continuity_record(
&self,
identity: &AgentIdentity,
fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
if self.fail_delete.load(Ordering::SeqCst) {
return Err(ContinuityStoreError::Io("delete failed".to_string()));
}
self.inner
.delete_continuity_record(identity, fencing_token)
.await
}
}
struct ResolveProbeStore {
inner: Arc<LocalContinuityStore>,
identity: AgentIdentity,
record: ContinuityRecord,
stale_token: FencingToken,
probed: AtomicBool,
stale_write_rejected: AtomicBool,
}
impl ResolveProbeStore {
fn new(identity: AgentIdentity, record: ContinuityRecord, stale_token: FencingToken) -> Self {
Self {
inner: Arc::new(LocalContinuityStore::in_memory().unwrap()),
identity,
record,
stale_token,
probed: AtomicBool::new(false),
stale_write_rejected: AtomicBool::new(false),
}
}
}
#[async_trait]
impl ContinuityStore for ResolveProbeStore {
async fn resolve_many(
&self,
identities: &[AgentIdentity],
) -> Result<BTreeMap<AgentIdentity, ContinuityResolveState>, ContinuityStoreError> {
if identities.contains(&self.identity) && !self.probed.swap(true, Ordering::SeqCst) {
let result = self
.inner
.save_session_snapshot(
&self.identity,
&self.record.session_id,
self.record.generation,
CheckpointVersion::new(self.record.checkpoint_version.get() + 1),
self.stale_token,
&SessionSnapshot {
data: b"stale during resolve".to_vec(),
},
)
.await;
self.stale_write_rejected.store(
matches!(result, Err(ContinuityStoreError::StaleFencingToken { .. })),
Ordering::SeqCst,
);
}
self.inner.resolve_many(identities).await
}
async fn load_session_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<SessionSnapshot>, ContinuityStoreError> {
self.inner.load_session_snapshot(session_id).await
}
async fn save_session_snapshot(
&self,
identity: &AgentIdentity,
session_id: &meerkat_core::types::SessionId,
generation: ContinuityGeneration,
version: CheckpointVersion,
fencing_token: FencingToken,
snapshot: &SessionSnapshot,
) -> Result<(), ContinuityStoreError> {
self.inner
.save_session_snapshot(
identity,
session_id,
generation,
version,
fencing_token,
snapshot,
)
.await
}
async fn upsert_continuity_record(
&self,
record: &ContinuityRecord,
fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
self.inner
.upsert_continuity_record(record, fencing_token)
.await
}
async fn delete_continuity_record(
&self,
identity: &AgentIdentity,
fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
self.inner
.delete_continuity_record(identity, fencing_token)
.await
}
}
#[derive(Default)]
struct CountingBridge {
create_calls: AtomicUsize,
resume_calls: AtomicUsize,
deliver_calls: AtomicUsize,
retire_calls: AtomicUsize,
wire_calls: AtomicUsize,
register_calls: AtomicUsize,
unregister_calls: AtomicUsize,
fail_create: AtomicBool,
fail_register: AtomicBool,
fail_register_after_calls: AtomicUsize,
fail_unregister: AtomicBool,
fail_current_wires: AtomicBool,
fail_retire: AtomicBool,
force_resume_fallback: AtomicBool,
resume_delay: tokio::sync::Mutex<Option<Duration>>,
resume_barrier: tokio::sync::Mutex<Option<Arc<tokio::sync::Barrier>>>,
create_session_id: tokio::sync::Mutex<Option<meerkat_core::types::SessionId>>,
fallback_session_id: tokio::sync::Mutex<Option<meerkat_core::types::SessionId>>,
unregistered_session_ids: tokio::sync::Mutex<Vec<String>>,
wires: tokio::sync::Mutex<Vec<(String, String)>>,
current_wires: tokio::sync::Mutex<Vec<(String, String)>>,
}
impl CountingBridge {
async fn set_resume_delay(&self, delay: Duration) {
*self.resume_delay.lock().await = Some(delay);
}
async fn set_resume_barrier(&self, barrier: Arc<tokio::sync::Barrier>) {
*self.resume_barrier.lock().await = Some(barrier);
}
async fn set_force_resume_fallback(&self, session_id: meerkat_core::types::SessionId) {
self.force_resume_fallback.store(true, Ordering::SeqCst);
*self.fallback_session_id.lock().await = Some(session_id);
}
async fn set_create_session_id(&self, session_id: meerkat_core::types::SessionId) {
*self.create_session_id.lock().await = Some(session_id);
}
fn fail_create(&self) {
self.fail_create.store(true, Ordering::SeqCst);
}
fn fail_register(&self) {
self.fail_register.store(true, Ordering::SeqCst);
}
fn fail_register_after_calls(&self, calls: usize) {
self.fail_register_after_calls
.store(calls, Ordering::SeqCst);
}
fn fail_unregister(&self) {
self.fail_unregister.store(true, Ordering::SeqCst);
}
fn fail_current_wires(&self) {
self.fail_current_wires.store(true, Ordering::SeqCst);
}
fn fail_retire(&self) {
self.fail_retire.store(true, Ordering::SeqCst);
}
}
#[async_trait]
impl SessionBridge for CountingBridge {
async fn create_session(
&self,
_identity: &AgentIdentity,
_runtime_id: &AgentRuntimeId,
_spec: &DurableAgentSpec,
_draft: &AgentBuildDraft,
session_id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::types::SessionId, BridgeError> {
self.create_calls.fetch_add(1, Ordering::SeqCst);
if self.fail_create.load(Ordering::SeqCst) {
return Err(BridgeError::Mob("create failed".to_string()));
}
Ok(self
.create_session_id
.lock()
.await
.clone()
.unwrap_or_else(|| session_id.clone()))
}
async fn resume_session(
&self,
_identity: &AgentIdentity,
_runtime_id: &AgentRuntimeId,
_spec: &DurableAgentSpec,
_draft: &AgentBuildDraft,
session_id: &meerkat_core::types::SessionId,
_snapshot: &SessionSnapshot,
) -> Result<meerkat_mobkit::identity_first::ResumeSessionOutcome, BridgeError> {
self.resume_calls.fetch_add(1, Ordering::SeqCst);
if let Some(delay) = *self.resume_delay.lock().await {
tokio::time::sleep(delay).await;
}
let barrier = self.resume_barrier.lock().await.clone();
if let Some(barrier) = barrier {
barrier.wait().await;
}
if self.force_resume_fallback.load(Ordering::SeqCst) {
let fallback_session_id = self
.fallback_session_id
.lock()
.await
.clone()
.unwrap_or_else(meerkat_core::types::SessionId::new);
return Ok(
meerkat_mobkit::identity_first::ResumeSessionOutcome::FreshSpawned {
session_id: fallback_session_id,
reason:
meerkat_mobkit::identity_first::ResumeFallbackReason::RuntimeIdentityIncompatible {
detail: "test mismatch".to_string(),
},
},
);
}
Ok(
meerkat_mobkit::identity_first::ResumeSessionOutcome::Resumed {
session_id: session_id.clone(),
},
)
}
async fn deliver(
&self,
_runtime_id: &AgentRuntimeId,
_content: &meerkat_core::ContentInput,
) -> Result<meerkat_core::types::SessionId, BridgeError> {
self.deliver_calls.fetch_add(1, Ordering::SeqCst);
Ok(meerkat_core::types::SessionId::new())
}
async fn checkpoint_session(
&self,
_runtime_id: &AgentRuntimeId,
_session_id: &meerkat_core::types::SessionId,
) -> Result<SessionSnapshot, BridgeError> {
Ok(SessionSnapshot { data: Vec::new() })
}
async fn retire_member(&self, _runtime_id: &AgentRuntimeId) -> Result<(), BridgeError> {
self.retire_calls.fetch_add(1, Ordering::SeqCst);
if self.fail_retire.load(Ordering::SeqCst) {
return Err(BridgeError::Mob("retire failed".to_string()));
}
Ok(())
}
async fn wire_peer(&self, a: &AgentRuntimeId, b: &AgentRuntimeId) -> Result<(), BridgeError> {
self.wire_calls.fetch_add(1, Ordering::SeqCst);
self.wires
.lock()
.await
.push((a.as_str().to_string(), b.as_str().to_string()));
Ok(())
}
async fn wire_peers_batch(
&self,
edges: &[(AgentRuntimeId, AgentRuntimeId)],
) -> Result<(), BridgeError> {
for (a, b) in edges {
self.wire_peer(a, b).await?;
self.current_wires
.lock()
.await
.push((a.as_str().to_string(), b.as_str().to_string()));
}
Ok(())
}
async fn current_member_wires(
&self,
) -> Result<Vec<(AgentRuntimeId, AgentRuntimeId)>, BridgeError> {
if self.fail_current_wires.load(Ordering::SeqCst) {
return Err(BridgeError::Mob("current wires failed".to_string()));
}
Ok(self
.current_wires
.lock()
.await
.iter()
.filter_map(|(a, b)| {
Some((
AgentRuntimeId::parse(a).ok()?,
AgentRuntimeId::parse(b).ok()?,
))
})
.collect())
}
async fn register_session_runtime_state(
&self,
_session_id: &meerkat_core::types::SessionId,
_identity: &AgentIdentity,
_generation: ContinuityGeneration,
checkpoint_version: CheckpointVersion,
_fencing_token: FencingToken,
) -> Result<CheckpointVersion, BridgeError> {
let call = self.register_calls.fetch_add(1, Ordering::SeqCst) + 1;
let fail_after = self.fail_register_after_calls.load(Ordering::SeqCst);
if self.fail_register.load(Ordering::SeqCst) || (fail_after != 0 && call > fail_after) {
return Err(BridgeError::Mob("register failed".to_string()));
}
Ok(checkpoint_version)
}
async fn unregister_session_runtime_state(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<(), BridgeError> {
self.unregister_calls.fetch_add(1, Ordering::SeqCst);
self.unregistered_session_ids
.lock()
.await
.push(session_id.to_string());
if self.fail_unregister.load(Ordering::SeqCst) {
return Err(BridgeError::Mob("unregister failed".to_string()));
}
Ok(())
}
}
fn make_content() -> meerkat_core::ContentInput {
meerkat_core::ContentInput::Text("hello".to_string())
}
fn make_dispatch_input() -> DispatchInput {
DispatchInput {
content: make_content(),
origin: DispatchOrigin::Connector,
correlation_id: None,
idempotency_key: None,
}
}
async fn assert_status_lease_is_renewable(
lease_provider: &LocalLeaseProvider,
runtime: &IdentityRuntime,
identity: &AgentIdentity,
old_token: FencingToken,
) {
let status = runtime.status(identity).await.unwrap();
let lease = status.lease.expect("status should expose restored lease");
assert!(
lease.fencing_token > old_token,
"restored lease token must advance past old token"
);
let grant = LeaseGrant {
identity: identity.clone(),
fencing_token: lease.fencing_token,
ttl: lease.ttl_remaining,
};
let renewed = lease_provider
.renew_leases(&[grant])
.await
.expect("renewing restored lease should succeed");
assert!(matches!(
renewed.get(identity),
Some(LeaseRenewResult::Renewed(_))
));
}
async fn assert_old_token_snapshot_write_rejected(
store: &dyn ContinuityStore,
identity: &AgentIdentity,
record: &ContinuityRecord,
old_token: FencingToken,
) {
let err = store
.save_session_snapshot(
identity,
&record.session_id,
record.generation,
CheckpointVersion::new(record.checkpoint_version.get() + 1),
old_token,
&SessionSnapshot {
data: b"stale write".to_vec(),
},
)
.await
.expect_err("old fencing token snapshot write should be rejected");
assert!(
matches!(err, ContinuityStoreError::StaleFencingToken { .. }),
"expected stale fencing token, got {err}"
);
}
#[tokio::test]
async fn identity_first_runtime_send_to_addressable_delivers() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let spec = make_spec("triage:main");
let record = make_record("triage:main", 0, 0);
let grant = make_grant("triage:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let result = runtime
.send(&make_identity("triage:main"), &make_content())
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), FencingToken::new(1));
}
#[tokio::test]
async fn identity_first_runtime_send_to_internal_only_rejected() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let spec = make_internal_spec("gate:main");
let record = make_record("gate:main", 0, 0);
let grant = make_grant("gate:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let result = runtime
.send(&make_identity("gate:main"), &make_content())
.await;
assert!(result.is_err());
match result.unwrap_err() {
IdentityRuntimeError::NotAddressable(err) => {
assert_eq!(err.identity, make_identity("gate:main"));
assert_eq!(err.addressability, AgentAddressability::InternalOnly);
}
other => panic!("expected NotAddressable, got: {other:?}"),
}
}
#[tokio::test]
async fn identity_first_runtime_send_to_unknown_identity_rejected() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let result = runtime
.send(&make_identity("nonexistent:main"), &make_content())
.await;
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::UnknownIdentity(_)
));
}
#[tokio::test]
async fn identity_first_runtime_dispatch_to_addressable_succeeds() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let spec = make_spec("triage:main");
let record = make_record("triage:main", 0, 0);
let grant = make_grant("triage:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let result = runtime
.dispatch(&make_identity("triage:main"), &make_dispatch_input())
.await;
assert!(result.is_ok());
let (token, is_durable) = result.unwrap();
assert_eq!(token, FencingToken::new(1));
assert!(!is_durable); }
#[tokio::test]
async fn identity_first_runtime_dispatch_to_internal_only_succeeds() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let spec = make_internal_spec("gate:main");
let record = make_record("gate:main", 0, 0);
let grant = make_grant("gate:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let result = runtime
.dispatch(&make_identity("gate:main"), &make_dispatch_input())
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn identity_first_runtime_dispatch_with_fields_flows_through() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let spec = make_spec("triage:main");
let record = make_record("triage:main", 0, 0);
let grant = make_grant("triage:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let input = DispatchInput {
content: make_content(),
origin: DispatchOrigin::Policy,
correlation_id: Some(meerkat_mobkit::identity_first::CorrelationId::new("corr-1")),
idempotency_key: Some(DispatchIdempotencyKey::new("idem-1")),
};
let result = runtime
.dispatch(&make_identity("triage:main"), &input)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn identity_first_runtime_dispatch_without_runtime_store_is_in_memory() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let spec = make_spec("triage:main");
let record = make_record("triage:main", 0, 0);
let grant = make_grant("triage:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let (_, is_durable) = runtime
.dispatch(&make_identity("triage:main"), &make_dispatch_input())
.await
.unwrap();
assert!(!is_durable);
}
#[tokio::test]
async fn identity_first_runtime_dispatch_with_runtime_store_is_durable() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime_with_store(store, lease);
let spec = make_spec("triage:main");
let record = make_record("triage:main", 0, 0);
let grant = make_grant("triage:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let (_, is_durable) = runtime
.dispatch(&make_identity("triage:main"), &make_dispatch_input())
.await
.unwrap();
assert!(is_durable);
}
#[tokio::test]
async fn identity_first_runtime_agent_returns_for_active() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let spec = make_spec("triage:main");
let record = make_record("triage:main", 0, 0);
let grant = make_grant("triage:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
assert!(runtime.contains(&make_identity("triage:main")).await);
}
#[tokio::test]
async fn identity_first_runtime_agent_errors_for_unknown() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let result = runtime.status(&make_identity("nonexistent:main")).await;
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::UnknownIdentity(_)
));
}
#[tokio::test]
async fn identity_first_runtime_status_returns_full_identity_status() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let mut spec = make_spec("triage:main");
spec.labels.insert("env".to_string(), "staging".to_string());
let record = make_record("triage:main", 1, 5);
let grant = make_grant("triage:main", 3);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record.clone()),
Some(grant),
)
.await;
let status = runtime.status(&make_identity("triage:main")).await.unwrap();
assert_eq!(status.identity, make_identity("triage:main"));
assert_eq!(status.state, IdentityLifecycleState::Active);
assert_eq!(status.agent_runtime_id, Some(record.agent_runtime_id));
assert_eq!(status.session_id, Some(record.session_id));
assert_eq!(
status.profile,
Some(meerkat_mob::ProfileName::from("default"))
);
assert_eq!(status.addressability, AgentAddressability::Addressable);
assert_eq!(status.labels.get("env"), Some(&"staging".to_string()));
assert_eq!(status.generation, Some(ContinuityGeneration::new(1)));
assert_eq!(status.checkpoint_version, Some(CheckpointVersion::new(5)));
let lease_info = status.lease.unwrap();
assert_eq!(lease_info.fencing_token, FencingToken::new(3));
assert!(lease_info.healthy);
let health = status.continuity_health.unwrap();
assert!(health.store_reachable);
assert_eq!(health.durability_policy, DurabilityPolicy::SyncWriteThrough);
assert_eq!(
health.last_checkpoint_version,
Some(CheckpointVersion::new(5))
);
}
#[tokio::test]
async fn identity_first_runtime_retire_validates_lease_and_sets_retiring() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let spec = make_spec("triage:main");
let record = make_record("triage:main", 0, 0);
let grant = make_grant("triage:main", 1);
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let token = runtime.retire(&make_identity("triage:main")).await.unwrap();
assert_eq!(token, FencingToken::new(1));
let status = runtime.status(&make_identity("triage:main")).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Retiring);
}
#[tokio::test]
async fn identity_first_runtime_retire_bridge_failure_preserves_active_state() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_retire();
let runtime = make_runtime_with_bridge(store, lease, bridge);
let id = make_identity("triage:main");
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(make_record("triage:main", 0, 0)),
Some(make_grant("triage:main", 1)),
)
.await;
let err = runtime.retire(&id).await.unwrap_err();
assert!(err.to_string().contains("bridge retire"));
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Active);
}
#[tokio::test]
async fn identity_first_runtime_retire_unregisters_session_runtime_state() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store, lease, bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
let session_id = record.session_id.clone();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let token = runtime.retire(&id).await.unwrap();
assert_eq!(token, FencingToken::new(1));
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
assert_eq!(bridge.unregister_calls.load(Ordering::SeqCst), 1);
assert_eq!(
bridge.unregistered_session_ids.lock().await.as_slice(),
&[session_id.to_string()]
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Retiring);
}
#[tokio::test]
async fn identity_first_runtime_retire_unregister_failure_fences_stale_session_state() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_unregister();
let runtime = make_runtime_with_bridge(store.clone(), lease.clone(), bridge.clone());
let id = make_identity("triage:main");
let mut acquired = lease
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let old_grant = match acquired.remove(&id).unwrap() {
LeaseAcquireResult::Acquired(grant) => grant,
other => panic!("expected initial lease grant, got {other:?}"),
};
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, old_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(old_grant.clone()),
)
.await;
let err = runtime
.retire(&id)
.await
.expect_err("unregister failure should make retire fail loudly");
assert!(
err.to_string()
.contains("bridge unregister retired session"),
"unexpected error: {err}"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
assert_eq!(bridge.unregister_calls.load(Ordering::SeqCst), 1);
assert_old_token_snapshot_write_rejected(store.as_ref(), &id, &record, old_grant.fencing_token)
.await;
let status = runtime.status(&id).await.unwrap();
assert_eq!(
status.state,
IdentityLifecycleState::Broken,
"the stale bridge registration failure is explicit and non-active after the fence advances"
);
assert!(
status
.lease
.as_ref()
.map_or(true, |lease| lease.fencing_token > old_grant.fencing_token),
"runtime status must not expose the stale pre-retire lease token"
);
}
#[tokio::test]
async fn identity_first_runtime_retire_returns_advanced_fencing_token() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease.clone(), bridge);
let id = make_identity("triage:main");
let mut acquired = lease
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let old_grant = match acquired.remove(&id).unwrap() {
LeaseAcquireResult::Acquired(grant) => grant,
other => panic!("expected initial lease grant, got {other:?}"),
};
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, old_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(old_grant.clone()),
)
.await;
let token = runtime.retire(&id).await.unwrap();
assert!(
token > old_grant.fencing_token,
"retire should return the current advanced fence, not the stale pre-retire token"
);
assert_old_token_snapshot_write_rejected(store.as_ref(), &id, &record, old_grant.fencing_token)
.await;
}
#[tokio::test]
async fn identity_first_runtime_respawn_preserves_record_and_generation() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 2);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let restored = runtime.respawn(&id).await.unwrap();
assert_eq!(restored.identity, id);
assert_eq!(restored.generation, ContinuityGeneration::new(0)); assert_eq!(restored.agent_runtime_id, record.agent_runtime_id);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Active);
}
#[tokio::test]
async fn identity_first_runtime_rebind_after_live_respawn_updates_session() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 2);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let live_session_id = meerkat_core::types::SessionId::new();
let rebound = runtime
.rebind_session_after_live_respawn(&id, live_session_id.clone())
.await
.unwrap();
assert_eq!(rebound.identity, id);
assert_eq!(rebound.agent_runtime_id, record.agent_runtime_id);
assert_eq!(rebound.generation, record.generation);
assert_eq!(rebound.session_id, live_session_id);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.session_id, Some(live_session_id));
}
#[tokio::test]
async fn identity_first_runtime_rebind_final_upsert_failure_unregisters_new_session() {
let store = Arc::new(FaultyContinuityStore::new());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 2);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
store.fail_next_upsert_after_successes(1);
let live_session_id = meerkat_core::types::SessionId::new();
let err = runtime
.rebind_session_after_live_respawn(&id, live_session_id.clone())
.await
.unwrap_err();
assert!(err.to_string().contains("upsert failed"));
let unregistered = bridge.unregistered_session_ids.lock().await.clone();
assert!(
unregistered.contains(&live_session_id.to_string()),
"failed final rebind upsert must unregister new live session; got {unregistered:?}"
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert_eq!(
status.session_id,
Some(live_session_id),
"failed rebind must not resurrect stale pre-respawn session"
);
}
#[tokio::test]
async fn identity_first_runtime_rebind_bridge_register_failure_unregisters_new_session() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 2);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record),
Some(make_grant("triage:main", 1)),
)
.await;
bridge.fail_register();
let live_session_id = meerkat_core::types::SessionId::new();
let err = runtime
.rebind_session_after_live_respawn(&id, live_session_id.clone())
.await
.unwrap_err();
assert!(err.to_string().contains("bridge rebind respawned session"));
let unregistered = bridge.unregistered_session_ids.lock().await.clone();
assert!(
unregistered.contains(&live_session_id.to_string()),
"failed rebind bridge registration must unregister new live session; got {unregistered:?}"
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert_eq!(
status.session_id,
Some(live_session_id),
"failed rebind must not restore stale pre-respawn session as active"
);
}
#[tokio::test]
async fn identity_first_runtime_rebind_initial_upsert_failure_marks_rebound_session_broken() {
let store = Arc::new(FaultyContinuityStore::new());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 2);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
store.fail_next_upsert_after_successes(0);
let live_session_id = meerkat_core::types::SessionId::new();
let err = runtime
.rebind_session_after_live_respawn(&id, live_session_id.clone())
.await
.unwrap_err();
assert!(err.to_string().contains("upsert failed"));
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert_eq!(
status.session_id,
Some(record.session_id),
"failed pre-rebind fence must leave the old session marked Broken instead of claiming the rebound session"
);
}
#[tokio::test]
async fn identity_first_runtime_rebind_persistent_failure_fences_old_session_first() {
let store = Arc::new(FaultyContinuityStore::new());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 2);
let initial_grant = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap()
.remove(&id)
.unwrap();
let LeaseAcquireResult::Acquired(initial_grant) = initial_grant else {
panic!("initial lease should be acquired");
};
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant.clone()),
)
.await;
store.fail_upserts_persistently_after_successes(1);
let live_session_id = meerkat_core::types::SessionId::new();
let err = runtime
.rebind_session_after_live_respawn(&id, live_session_id.clone())
.await
.unwrap_err();
assert!(err.to_string().contains("upsert failed"));
assert_old_token_snapshot_write_rejected(
store.as_ref(),
&id,
&record,
initial_grant.fencing_token,
)
.await;
let unregistered = bridge.unregistered_session_ids.lock().await.clone();
assert!(
unregistered.contains(&live_session_id.to_string()),
"failed rebind must unregister the new live session after the old session is fenced; got {unregistered:?}"
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert_eq!(
status.session_id,
Some(live_session_id),
"persistent rebind failure may remember the rebound session locally, but only as Broken after fencing the old session"
);
}
#[tokio::test]
async fn identity_first_runtime_respawn_fences_old_owner_before_resolve() {
let lease_prov = Arc::new(LocalLeaseProvider::new());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 2);
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let old_token = initial_grant.fencing_token;
let store = Arc::new(ResolveProbeStore::new(
id.clone(),
record.clone(),
old_token,
));
store
.upsert_continuity_record(&record, old_token)
.await
.unwrap();
let runtime = make_runtime(store.clone(), lease_prov);
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record),
Some(initial_grant),
)
.await;
runtime.respawn(&id).await.unwrap();
assert!(
store.probed.load(Ordering::SeqCst),
"respawn should resolve continuity"
);
assert!(
store.stale_write_rejected.load(Ordering::SeqCst),
"old-owner writes must be fenced before respawn awaits resolve_many"
);
}
#[tokio::test]
async fn identity_first_runtime_reset_advances_generation_creates_fresh() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(&[id.clone()], "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant.clone()),
)
.await;
let new_record = runtime.reset(&id).await.unwrap();
assert_eq!(new_record.identity, id);
assert_eq!(new_record.generation, ContinuityGeneration::new(1)); assert_eq!(new_record.checkpoint_version, CheckpointVersion::new(0)); assert_eq!(
new_record.agent_runtime_id,
AgentRuntimeId::parse("rt:triage:main:1").unwrap()
);
assert_ne!(new_record.session_id, record.session_id);
let old_write = store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(6),
initial_grant.fencing_token, &SessionSnapshot {
data: b"stale".to_vec(),
},
)
.await;
assert!(old_write.is_err());
}
#[tokio::test]
async fn identity_first_runtime_reset_unregisters_superseded_bridge_session() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant),
)
.await;
let new_record = runtime.reset(&id).await.unwrap();
assert_ne!(new_record.session_id, record.session_id);
let unregistered = bridge.unregistered_session_ids.lock().await.clone();
assert!(
unregistered.contains(&record.session_id.to_string()),
"successful reset must unregister the superseded bridge session; unregistered={unregistered:?}"
);
}
#[tokio::test]
async fn identity_first_runtime_reset_create_failure_preserves_old_continuity() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_create();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge);
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
let old_token = initial_grant.fencing_token;
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant),
)
.await;
let err = runtime.reset(&id).await.unwrap_err();
assert!(
err.to_string()
.contains("bridge create_session after reset")
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready {
record: record.clone()
})
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(
status.agent_runtime_id.as_ref(),
Some(&record.agent_runtime_id)
);
assert_eq!(status.state, IdentityLifecycleState::Active);
assert_status_lease_is_renewable(&lease_prov, &runtime, &id, old_token).await;
}
#[tokio::test]
async fn identity_first_runtime_reset_failure_preserves_original_dormant_state() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_create();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge);
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Dormant,
Some(record.clone()),
None,
)
.await;
let err = runtime.reset(&id).await.unwrap_err();
assert!(
err.to_string()
.contains("bridge create_session after reset")
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Dormant);
assert_eq!(
status.agent_runtime_id.as_ref(),
Some(&record.agent_runtime_id)
);
assert!(
status.lease.is_none(),
"dormant reset rollback must not expose a newly acquired live lease"
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready {
record: record.clone()
}),
"failed dormant reset must restore the old durable continuity record"
);
assert_old_token_snapshot_write_rejected(
store.as_ref(),
&id,
&record,
initial_grant.fencing_token,
)
.await;
}
#[tokio::test]
async fn identity_first_runtime_reset_create_failure_removes_tentative_uninitialized_record() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_create();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge);
let id = make_identity("triage:uninitialized");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
runtime
.register(
make_spec("triage:uninitialized"),
IdentityLifecycleState::Active,
None,
Some(initial_grant),
)
.await;
let err = runtime.reset(&id).await.unwrap_err();
assert!(
err.to_string()
.contains("bridge create_session after reset")
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized),
"failed reset must not leave a fresh Ready continuity record when no old continuity existed"
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Active);
assert!(
status.agent_runtime_id.is_none(),
"rollback should restore the no-continuity entry"
);
}
#[tokio::test]
async fn identity_first_runtime_reset_register_failure_cleans_new_member_and_preserves_old_continuity()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_register();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
let old_token = initial_grant.fencing_token;
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant),
)
.await;
let err = runtime.reset(&id).await.unwrap_err();
assert!(
err.to_string()
.contains("bridge register actual session runtime state after reset")
);
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 1);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
1,
"failed reset registration must unregister the tentative bridge session"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready {
record: record.clone()
})
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(
status.agent_runtime_id.as_ref(),
Some(&record.agent_runtime_id)
);
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert_status_lease_is_renewable(&lease_prov, &runtime, &id, old_token).await;
}
#[tokio::test]
async fn identity_first_runtime_reset_register_failure_reports_cleanup_failure_and_preserves_old_continuity()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_register();
bridge.fail_retire();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
let old_token = initial_grant.fencing_token;
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant),
)
.await;
let err = runtime.reset(&id).await.unwrap_err();
let message = err.to_string();
assert!(
message.contains("bridge register actual session runtime state after reset"),
"unexpected error: {message}"
);
assert!(
message.contains("cleanup retire failed"),
"cleanup failure must be observable: {message}"
);
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 1);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready {
record: record.clone()
})
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(
status.agent_runtime_id.as_ref(),
Some(&record.agent_runtime_id)
);
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert!(
status.lease.is_none(),
"cleanup ambiguity must not refresh a bridge-visible lease"
);
assert_old_token_snapshot_write_rejected(store.as_ref(), &id, &record, old_token).await;
}
#[tokio::test]
async fn identity_first_runtime_reset_retire_failure_marks_identity_broken() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_retire();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge);
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
let old_token = initial_grant.fencing_token;
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant),
)
.await;
let err = runtime.reset(&id).await.unwrap_err();
assert!(
err.to_string()
.contains("bridge retire old member after reset")
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready {
record: record.clone()
})
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(
status.agent_runtime_id.as_ref(),
Some(&record.agent_runtime_id)
);
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert!(
status.lease.is_none(),
"failed old-member retirement must leave the identity broken without a refreshed lease"
);
assert_old_token_snapshot_write_rejected(store.as_ref(), &id, &record, old_token).await;
}
#[tokio::test]
async fn identity_first_runtime_reset_store_failure_marks_identity_broken() {
let store = Arc::new(FaultyContinuityStore::new());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant),
)
.await;
store.fail_upsert();
let err = runtime.reset(&id).await.unwrap_err();
assert!(err.to_string().contains("upsert failed"));
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 0);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 0);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Broken);
let send = runtime.send(&id, &make_content()).await.unwrap_err();
assert!(
send.to_string().contains("state Broken"),
"broken identity must reject delivery: {send}"
);
}
#[tokio::test]
async fn identity_first_runtime_reset_final_upsert_failure_restores_old_continuity_record() {
let store = Arc::new(FaultyContinuityStore::new());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let initial_grants = lease_prov
.acquire_leases(std::slice::from_ref(&id), "test-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant),
)
.await;
store.fail_next_upsert_after_successes(3);
let err = runtime.reset(&id).await.unwrap_err();
assert!(err.to_string().contains("upsert failed"));
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 1);
assert_eq!(
bridge.retire_calls.load(Ordering::SeqCst),
2,
"failed reset should retire the old member and then clean up the tentative new member"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
2,
"failed final reset upsert must unregister the old and tentative bridge sessions"
);
let unregistered = bridge.unregistered_session_ids.lock().await.clone();
assert!(
unregistered.contains(&record.session_id.to_string()),
"failed final reset upsert must unregister the old bridge session; unregistered={unregistered:?}"
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready {
record: record.clone()
}),
"failed final reset upsert must restore the previous durable record"
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert_eq!(
status.agent_runtime_id.as_ref(),
Some(&record.agent_runtime_id)
);
}
#[tokio::test]
async fn identity_first_runtime_reset_rejects_unregistered_identity_without_store_mutation() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov);
let id = make_identity("triage:main");
let err = runtime.reset(&id).await.unwrap_err();
assert!(matches!(err, IdentityRuntimeError::UnknownIdentity(_)));
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized)
);
}
#[tokio::test]
async fn identity_first_runtime_respawn_rejects_unregistered_identity_without_runtime_update() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov);
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
let err = runtime.respawn(&id).await.unwrap_err();
assert!(matches!(err, IdentityRuntimeError::UnknownIdentity(_)));
assert!(matches!(
runtime.status(&id).await,
Err(IdentityRuntimeError::UnknownIdentity(_))
));
}
#[tokio::test]
async fn identity_first_runtime_delete_identity_removes_from_runtime() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record),
Some(make_grant("triage:main", 1)),
)
.await;
runtime.delete_identity(&id).await.unwrap();
assert!(!runtime.contains(&id).await);
let resolved = store.resolve_many(&[id.clone()]).await.unwrap();
assert_eq!(
resolved.get(&id).unwrap(),
&ContinuityResolveState::Uninitialized,
"deleted identity must resolve as Uninitialized"
);
}
#[tokio::test]
async fn identity_first_runtime_delete_identity_unregisters_bridge_session() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
runtime.delete_identity(&id).await.unwrap();
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
1,
"deleted identity must unregister the session bridge state"
);
assert!(
bridge
.unregistered_session_ids
.lock()
.await
.contains(&record.session_id.to_string())
);
assert!(!runtime.contains(&id).await);
}
#[tokio::test]
async fn identity_first_runtime_delete_unregister_failure_preserves_continuity() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_unregister();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge);
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let err = runtime.delete_identity(&id).await.unwrap_err();
assert!(
err.to_string()
.contains("bridge unregister session before delete"),
"unexpected error: {err}"
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready { record }),
"failed unregister must not delete durable continuity first"
);
}
#[tokio::test]
async fn identity_first_runtime_delete_bridge_failure_preserves_identity() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_retire();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge);
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let err = runtime.delete_identity(&id).await.unwrap_err();
assert!(err.to_string().contains("bridge retire before delete"));
assert!(runtime.contains(&id).await);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready { record })
);
}
#[tokio::test]
async fn identity_first_runtime_delete_store_failure_marks_identity_broken() {
let store = Arc::new(FaultyContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge);
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
store.fail_delete();
let err = runtime.delete_identity(&id).await.unwrap_err();
assert!(err.to_string().contains("delete failed"));
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Broken);
assert_eq!(
status.agent_runtime_id.as_ref(),
Some(&record.agent_runtime_id)
);
}
#[tokio::test]
async fn identity_first_runtime_delete_rejects_unregistered_identity_without_store_delete() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov);
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
let err = runtime.delete_identity(&id).await.unwrap_err();
assert!(matches!(err, IdentityRuntimeError::UnknownIdentity(_)));
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Ready { record })
);
}
#[tokio::test]
async fn identity_first_runtime_lazy_register_does_not_load_snapshots_or_spawn_members() {
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
let roster = (0..1_000)
.map(|index| {
let name = format!("agent:{index}");
let record = make_record(&name, 0, 1);
let store = store.clone();
async move {
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
make_spec(&name)
}
})
.collect::<Vec<_>>();
let roster = futures::future::join_all(roster).await;
store.reset_counts();
let result = lazy_register_flow(&runtime, &roster, None).await.unwrap();
assert_eq!(result.outcomes.len(), 1_000);
assert_eq!(store.resolve_many_calls(), 1);
assert_eq!(store.load_snapshot_calls(), 0);
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 0);
assert_eq!(bridge.resume_calls.load(Ordering::SeqCst), 0);
let status = runtime.status(&make_identity("agent:42")).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Dormant);
assert_eq!(
status.agent_runtime_id,
Some(AgentRuntimeId::parse("rt:agent:42:0").unwrap())
);
}
#[tokio::test]
async fn identity_first_runtime_lazy_snapshot_missing_record_stays_materializable() {
struct SnapshotMissingStore {
record: ContinuityRecord,
load_snapshot_calls: AtomicUsize,
}
#[async_trait]
impl ContinuityStore for SnapshotMissingStore {
async fn resolve_many(
&self,
identities: &[AgentIdentity],
) -> Result<BTreeMap<AgentIdentity, ContinuityResolveState>, ContinuityStoreError> {
Ok(identities
.iter()
.map(|identity| {
(
identity.clone(),
ContinuityResolveState::Broken {
failure: ContinuityFailure {
identity: identity.clone(),
kind: ContinuityFailureKind::SnapshotMissing,
record: Some(self.record.clone()),
detail: "snapshot presence query missed the row".to_string(),
},
},
)
})
.collect())
}
async fn load_session_snapshot(
&self,
_session_id: &meerkat_core::types::SessionId,
) -> Result<Option<SessionSnapshot>, ContinuityStoreError> {
self.load_snapshot_calls.fetch_add(1, Ordering::SeqCst);
Ok(Some(SessionSnapshot {
data: b"recovered snapshot".to_vec(),
}))
}
async fn save_session_snapshot(
&self,
_identity: &AgentIdentity,
_session_id: &meerkat_core::types::SessionId,
_generation: ContinuityGeneration,
_version: CheckpointVersion,
_fencing_token: FencingToken,
_snapshot: &SessionSnapshot,
) -> Result<(), ContinuityStoreError> {
Ok(())
}
async fn upsert_continuity_record(
&self,
_record: &ContinuityRecord,
_fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
Ok(())
}
async fn delete_continuity_record(
&self,
_identity: &AgentIdentity,
_fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
Ok(())
}
}
let record = make_record("agent:recoverable", 0, 7);
let store = Arc::new(SnapshotMissingStore {
record: record.clone(),
load_snapshot_calls: AtomicUsize::new(0),
});
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(
store.clone(),
Arc::new(LocalLeaseProvider::new()),
bridge.clone(),
);
let result = lazy_register_flow(&runtime, &[make_spec("agent:recoverable")], None)
.await
.unwrap();
assert!(matches!(
result.outcomes.get(&make_identity("agent:recoverable")),
Some(RestoreOutcome::Dormant {
record: Some(_),
..
})
));
assert_eq!(
runtime
.status(&make_identity("agent:recoverable"))
.await
.unwrap()
.state,
IdentityLifecycleState::Dormant
);
assert_eq!(store.load_snapshot_calls.load(Ordering::SeqCst), 0);
let materialized = runtime
.materialize(&make_identity("agent:recoverable"))
.await
.unwrap();
assert_eq!(materialized.session_id, record.session_id);
assert_eq!(bridge.resume_calls.load(Ordering::SeqCst), 1);
assert_eq!(store.load_snapshot_calls.load(Ordering::SeqCst), 1);
assert_eq!(
runtime
.status(&make_identity("agent:recoverable"))
.await
.unwrap()
.state,
IdentityLifecycleState::Active
);
}
#[tokio::test]
async fn identity_first_runtime_lazy_first_send_materializes_only_target_once() {
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
for name in ["agent:a", "agent:b"] {
let record = make_record(name, 0, 1);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
store
.save_session_snapshot(
&make_identity(name),
&record.session_id,
record.generation,
CheckpointVersion::new(2),
FencingToken::new(1),
&SessionSnapshot {
data: format!("snapshot-{name}").into_bytes(),
},
)
.await
.unwrap();
}
let roster = vec![make_spec("agent:a"), make_spec("agent:b")];
lazy_register_flow(&runtime, &roster, None).await.unwrap();
store.reset_counts();
runtime
.send(&make_identity("agent:a"), &make_content())
.await
.unwrap();
assert_eq!(store.load_snapshot_calls(), 1);
assert_eq!(bridge.resume_calls.load(Ordering::SeqCst), 1);
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 0);
assert_eq!(bridge.deliver_calls.load(Ordering::SeqCst), 1);
assert_eq!(
runtime
.status(&make_identity("agent:a"))
.await
.unwrap()
.state,
IdentityLifecycleState::Active
);
assert_eq!(
runtime
.status(&make_identity("agent:b"))
.await
.unwrap()
.state,
IdentityLifecycleState::Dormant
);
}
#[tokio::test]
async fn identity_first_runtime_parallel_sends_to_dormant_identity_coalesce_materialization() {
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.set_resume_delay(Duration::from_millis(50)).await;
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
let record = make_record("agent:coalesce", 0, 1);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
lazy_register_flow(&runtime, &[make_spec("agent:coalesce")], None)
.await
.unwrap();
store.reset_counts();
let mut tasks = Vec::new();
for _ in 0..10 {
let runtime = runtime.clone();
tasks.push(tokio::spawn(async move {
runtime
.send(&make_identity("agent:coalesce"), &make_content())
.await
.unwrap();
}));
}
for task in tasks {
task.await.unwrap();
}
assert_eq!(store.load_snapshot_calls(), 1);
assert_eq!(bridge.resume_calls.load(Ordering::SeqCst), 1);
assert_eq!(bridge.deliver_calls.load(Ordering::SeqCst), 10);
assert_eq!(
runtime
.status(&make_identity("agent:coalesce"))
.await
.unwrap()
.state,
IdentityLifecycleState::Active
);
}
#[tokio::test]
async fn identity_first_runtime_lazy_dispatch_materializes_internal_only_identity() {
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
lazy_register_flow(&runtime, &[make_internal_spec("agent:internal")], None)
.await
.unwrap();
runtime
.dispatch(&make_identity("agent:internal"), &make_dispatch_input())
.await
.unwrap();
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 1);
assert_eq!(bridge.deliver_calls.load(Ordering::SeqCst), 1);
assert_eq!(
runtime
.status(&make_identity("agent:internal"))
.await
.unwrap()
.state,
IdentityLifecycleState::Active
);
}
#[tokio::test]
async fn identity_first_runtime_lazy_first_send_materializes_reachable_peers_and_wires_topology() {
struct StaticTopology(Vec<(&'static str, &'static str)>);
#[async_trait]
impl TopologyProvider for StaticTopology {
async fn compute_edges(
&self,
_target_identities: &[AgentIdentity],
_context: &TopologyContext,
) -> Result<Vec<ManagedPeerEdge>, TopologyError> {
self.0
.iter()
.map(|(a, b)| {
ManagedPeerEdge::new(make_identity(a), make_identity(b))
.map_err(|err| TopologyError::InvalidEdge(format!("{err}")))
})
.collect()
}
}
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
let roster = vec![
make_spec("review:singleton"),
make_spec("initiative:alpha"),
make_spec("initiative:beta"),
];
lazy_register_flow(
&runtime,
&roster,
Some(&StaticTopology(vec![
("review:singleton", "initiative:alpha"),
("review:singleton", "initiative:beta"),
])),
)
.await
.unwrap();
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 0);
assert_eq!(bridge.wire_calls.load(Ordering::SeqCst), 0);
runtime
.send(&make_identity("review:singleton"), &make_content())
.await
.unwrap();
assert_eq!(
bridge.create_calls.load(Ordering::SeqCst),
3,
"first review send must hydrate review plus its initiative peers"
);
assert_eq!(bridge.deliver_calls.load(Ordering::SeqCst), 1);
assert_eq!(bridge.wire_calls.load(Ordering::SeqCst), 2);
for identity in ["review:singleton", "initiative:alpha", "initiative:beta"] {
assert_eq!(
runtime
.status(&make_identity(identity))
.await
.unwrap()
.state,
IdentityLifecycleState::Active
);
}
let wires = bridge.wires.lock().await.clone();
assert!(
wires.contains(&(
"rt:initiative:alpha:0".to_string(),
"rt:review:singleton:0".to_string()
)),
"review must be concretely wired to initiative:alpha, got {wires:?}"
);
assert!(
wires.contains(&(
"rt:initiative:beta:0".to_string(),
"rt:review:singleton:0".to_string()
)),
"review must be concretely wired to initiative:beta, got {wires:?}"
);
}
#[tokio::test]
async fn identity_first_runtime_lazy_register_warns_on_topology_reconcile_failure() {
struct StaticTopology(Vec<(&'static str, &'static str)>);
#[async_trait]
impl TopologyProvider for StaticTopology {
async fn compute_edges(
&self,
_target_identities: &[AgentIdentity],
_context: &TopologyContext,
) -> Result<Vec<ManagedPeerEdge>, TopologyError> {
self.0
.iter()
.map(|(a, b)| {
ManagedPeerEdge::new(make_identity(a), make_identity(b))
.map_err(|err| TopologyError::InvalidEdge(format!("{err}")))
})
.collect()
}
}
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_current_wires();
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge);
let result = lazy_register_flow(
&runtime,
&[make_spec("review:singleton"), make_spec("initiative:alpha")],
Some(&StaticTopology(vec![(
"review:singleton",
"initiative:alpha",
)])),
)
.await
.expect("lazy registration should not fail just because topology reconcile failed");
assert_eq!(result.managed_edges.len(), 1);
assert_eq!(
runtime
.status(&make_identity("review:singleton"))
.await
.unwrap()
.state,
IdentityLifecycleState::Dormant
);
}
#[tokio::test]
async fn identity_first_runtime_steer_send_does_not_wait_for_reachable_peer_materialization() {
struct StaticTopology(Vec<(&'static str, &'static str)>);
#[async_trait]
impl TopologyProvider for StaticTopology {
async fn compute_edges(
&self,
_target_identities: &[AgentIdentity],
_context: &TopologyContext,
) -> Result<Vec<ManagedPeerEdge>, TopologyError> {
self.0
.iter()
.map(|(a, b)| {
ManagedPeerEdge::new(make_identity(a), make_identity(b))
.map_err(|err| TopologyError::InvalidEdge(format!("{err}")))
})
.collect()
}
}
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
let peer_record = make_record("initiative:slow-peer", 0, 1);
store
.upsert_continuity_record(&peer_record, FencingToken::new(1))
.await
.unwrap();
store
.save_session_snapshot(
&make_identity("initiative:slow-peer"),
&peer_record.session_id,
peer_record.generation,
CheckpointVersion::new(2),
FencingToken::new(1),
&SessionSnapshot {
data: b"slow peer snapshot".to_vec(),
},
)
.await
.unwrap();
lazy_register_flow(
&runtime,
&[
make_spec("deep-investigator:singleton"),
make_spec("initiative:slow-peer"),
],
Some(&StaticTopology(vec![(
"deep-investigator:singleton",
"initiative:slow-peer",
)])),
)
.await
.unwrap();
runtime
.materialize(&make_identity("deep-investigator:singleton"))
.await
.unwrap();
bridge.set_resume_delay(Duration::from_secs(5)).await;
tokio::time::timeout(
Duration::from_millis(250),
runtime.send_with_mode(
&make_identity("deep-investigator:singleton"),
&make_content(),
HandlingMode::Steer,
),
)
.await
.expect("steer send must not wait for reachable peer materialization")
.unwrap();
assert_eq!(bridge.deliver_calls.load(Ordering::SeqCst), 1);
assert_eq!(
bridge.resume_calls.load(Ordering::SeqCst),
0,
"steer delivery should not synchronously hydrate the slow peer"
);
assert_eq!(
runtime
.status(&make_identity("initiative:slow-peer"))
.await
.unwrap()
.state,
IdentityLifecycleState::Dormant
);
}
#[tokio::test]
async fn identity_first_runtime_materialize_all_hydrates_registered_identities_in_parallel() {
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
let agent_count = 8;
let roster = (0..agent_count)
.map(|index| make_spec(&format!("agent:{index}")))
.collect::<Vec<_>>();
for spec in &roster {
let record = make_record(spec.identity.as_str(), 0, 1);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
store
.save_session_snapshot(
&spec.identity,
&record.session_id,
record.generation,
CheckpointVersion::new(2),
FencingToken::new(1),
&SessionSnapshot {
data: format!("snapshot-{}", spec.identity).into_bytes(),
},
)
.await
.unwrap();
}
lazy_register_flow(&runtime, &roster, None).await.unwrap();
store.reset_counts();
bridge
.set_resume_barrier(Arc::new(tokio::sync::Barrier::new(agent_count)))
.await;
let records = tokio::time::timeout(Duration::from_secs(2), runtime.materialize_all())
.await
.expect("parallel materialize_all should not block behind one pending resume")
.unwrap();
assert_eq!(records.len(), agent_count);
assert_eq!(store.load_snapshot_calls(), agent_count);
assert_eq!(bridge.resume_calls.load(Ordering::SeqCst), agent_count);
for spec in &roster {
assert_eq!(
runtime.status(&spec.identity).await.unwrap().state,
IdentityLifecycleState::Active
);
}
}
#[tokio::test]
async fn identity_first_runtime_materialize_fences_old_owner_before_resume() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease.clone(), bridge.clone());
let id = make_identity("triage:main");
let initial_grants = lease
.acquire_leases(std::slice::from_ref(&id), "old-runtime")
.await
.unwrap();
let initial_grant = match initial_grants.get(&id).unwrap() {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected Acquired"),
};
let record = make_record("triage:main", 0, 5);
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
lease
.release_leases(std::slice::from_ref(&initial_grant))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Dormant,
Some(record.clone()),
None,
)
.await;
let barrier = Arc::new(tokio::sync::Barrier::new(2));
bridge.set_resume_barrier(barrier.clone()).await;
let runtime_for_task = runtime.clone();
let id_for_task = id.clone();
let materialize_task =
tokio::spawn(async move { runtime_for_task.materialize(&id_for_task).await });
while bridge.resume_calls.load(Ordering::SeqCst) == 0 {
tokio::task::yield_now().await;
}
let stale_write = store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(record.checkpoint_version.get() + 1),
initial_grant.fencing_token,
&SessionSnapshot {
data: b"stale write".to_vec(),
},
)
.await;
barrier.wait().await;
materialize_task
.await
.expect("materialize task")
.expect("materialize");
let err = stale_write.expect_err("old fencing token snapshot write should be rejected");
assert!(
matches!(err, ContinuityStoreError::StaleFencingToken { .. }),
"expected stale fencing token, got {err}"
);
}
#[tokio::test]
async fn identity_first_runtime_materialize_create_failure_removes_tentative_record() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_create();
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge);
let id = make_identity("triage:main");
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Uninitialized,
None,
None,
)
.await;
let err = runtime.materialize(&id).await.unwrap_err();
assert!(err.to_string().contains("bridge create_session"));
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized),
"failed first materialize must not leave a phantom continuity record"
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Uninitialized);
assert!(status.agent_runtime_id.is_none());
}
#[tokio::test]
async fn identity_first_runtime_materialize_final_register_failure_removes_tentative_record() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_register_after_calls(1);
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
let id = make_identity("triage:main");
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Uninitialized,
None,
None,
)
.await;
let err = runtime.materialize(&id).await.unwrap_err();
assert!(
err.to_string()
.contains("bridge register actual session runtime state"),
"unexpected error: {err}"
);
assert_eq!(
bridge.retire_calls.load(Ordering::SeqCst),
1,
"failed final registration should retire the tentative member"
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized),
"failed final materialize registration must not leave a phantom continuity record"
);
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Uninitialized);
assert!(status.agent_runtime_id.is_none());
}
#[tokio::test]
async fn identity_first_runtime_materialize_final_upsert_failure_unregisters_actual_session() {
let store = Arc::new(FaultyContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
let id = make_identity("triage:main");
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Uninitialized,
None,
None,
)
.await;
store.fail_next_upsert_after_successes(1);
let err = runtime.materialize(&id).await.unwrap_err();
assert!(err.to_string().contains("upsert failed"));
assert_eq!(bridge.create_calls.load(Ordering::SeqCst), 1);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
1,
"failed final materialize upsert must unregister the created session"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized)
);
}
#[tokio::test]
async fn identity_first_runtime_lazy_resume_incompatible_fallback_is_typed_and_visible() {
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let fallback_session_id = meerkat_core::types::SessionId::new();
bridge
.set_force_resume_fallback(fallback_session_id.clone())
.await;
let runtime = make_runtime_with_bridge(store.clone(), lease, bridge.clone());
let record = make_record("agent:legacy", 0, 1);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
lazy_register_flow(&runtime, &[make_spec("agent:legacy")], None)
.await
.unwrap();
let mut events = runtime
.subscribe(&make_identity("agent:legacy"))
.await
.unwrap();
runtime
.send(&make_identity("agent:legacy"), &make_content())
.await
.unwrap();
let event = events.recv().await.unwrap();
match event {
IdentityEvent::ResumeFallback { identity, reason } => {
assert_eq!(identity, make_identity("agent:legacy"));
assert!(matches!(
reason,
meerkat_mobkit::identity_first::ResumeFallbackReason::RuntimeIdentityIncompatible { .. }
));
}
other => panic!("expected ResumeFallback, got {other:?}"),
}
assert_eq!(
runtime
.status(&make_identity("agent:legacy"))
.await
.unwrap()
.session_id,
Some(fallback_session_id)
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
1,
"resume fallback must unregister the abandoned pre-registered session"
);
}
#[tokio::test]
async fn identity_first_runtime_fresh_materialize_unregisters_provisional_session_id() {
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_create_session_id(actual_session_id.clone())
.await;
let runtime = make_runtime_with_bridge(store, lease, bridge.clone());
lazy_register_flow(&runtime, &[make_spec("agent:fresh")], None)
.await
.unwrap();
let materialized = runtime
.materialize(&make_identity("agent:fresh"))
.await
.unwrap();
assert_eq!(materialized.session_id, actual_session_id);
assert_eq!(
bridge.register_calls.load(Ordering::SeqCst),
2,
"fresh materialize registers the provisional state and then the actual session"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
1,
"fresh materialize must unregister the abandoned provisional session"
);
}
#[tokio::test]
async fn identity_first_runtime_fresh_materialize_unregister_failure_cleans_actual_session_id() {
let store = Arc::new(CountingContinuityStore::new());
let lease = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_create_session_id(actual_session_id.clone())
.await;
bridge.fail_unregister();
let runtime = make_runtime_with_bridge(store, lease, bridge.clone());
lazy_register_flow(&runtime, &[make_spec("agent:fresh")], None)
.await
.unwrap();
let err = runtime
.materialize(&make_identity("agent:fresh"))
.await
.unwrap_err();
assert!(
err.to_string()
.contains("bridge unregister abandoned session runtime state"),
"unexpected error: {err}"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
2,
"failed abandoned-session cleanup must also unregister the actual bridge session before rollback"
);
assert!(
bridge
.unregistered_session_ids
.lock()
.await
.contains(&actual_session_id.to_string()),
"rollback must attempt to unregister the actual session id"
);
assert_eq!(
bridge.retire_calls.load(Ordering::SeqCst),
1,
"failed abandoned-session cleanup should retire the materialized member"
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_fresh_boot() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let roster = vec![make_spec("triage:main"), make_spec("worker:main")];
let result = restore_flow(&runtime, &roster, None, None).await.unwrap();
assert_eq!(result.outcomes.len(), 2);
for (id, outcome) in &result.outcomes {
match outcome {
RestoreOutcome::Created { record, .. } => {
assert_eq!(&record.identity, id);
assert_eq!(record.generation, ContinuityGeneration::new(0));
}
other => panic!("expected Created, got: {other:?}"),
}
}
assert!(runtime.contains(&make_identity("triage:main")).await);
assert!(runtime.contains(&make_identity("worker:main")).await);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_rejects_lease_conflicts_before_bridge_work() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store, lease_prov.clone(), bridge.clone());
let id = make_identity("triage:main");
lease_prov
.acquire_leases(std::slice::from_ref(&id), "other-runtime")
.await
.unwrap();
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("restore flow must reject lease conflict");
assert!(
err.to_string().contains("no active lease"),
"unexpected error: {err}"
);
assert_eq!(
bridge.create_calls.load(Ordering::SeqCst),
0,
"restore flow must not create live members without the durable lease"
);
assert!(!runtime.contains(&id).await);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_releases_partial_leases_on_conflict() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store, lease_prov.clone(), bridge.clone());
let free_id = make_identity("triage:free");
let held_id = make_identity("triage:held");
lease_prov
.acquire_leases(std::slice::from_ref(&held_id), "other-runtime")
.await
.unwrap();
let err = restore_flow(
&runtime,
&[make_spec("triage:free"), make_spec("triage:held")],
None,
None,
)
.await
.expect_err("restore flow must reject mixed lease acquisition");
assert!(
err.to_string().contains("no active lease"),
"unexpected error: {err}"
);
assert_eq!(
bridge.create_calls.load(Ordering::SeqCst),
0,
"restore flow must not create live members after a mixed lease failure"
);
let retry = lease_prov
.acquire_leases(std::slice::from_ref(&free_id), "other-runtime")
.await
.unwrap();
assert!(
matches!(retry.get(&free_id), Some(LeaseAcquireResult::Acquired(_))),
"partial free identity lease must be released after restore-flow failure: {retry:#?}"
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_rejects_missing_lease_results_before_bridge_work() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(MissingLeaseProvider);
let bridge = Arc::new(CountingBridge::default());
let runtime = make_runtime_with_bridge(store, lease_prov, bridge.clone());
let id = make_identity("triage:main");
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("restore flow must reject missing lease results");
assert!(
err.to_string().contains("no active lease"),
"unexpected error: {err}"
);
assert_eq!(
bridge.create_calls.load(Ordering::SeqCst),
0,
"restore flow must not create live members without an explicit durable lease grant"
);
assert!(!runtime.contains(&id).await);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_fresh_boot_unregisters_abandoned_session() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_create_session_id(actual_session_id.clone())
.await;
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let result = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.unwrap();
match result.outcomes.get(&id).unwrap() {
RestoreOutcome::Created { record, .. } => {
assert_eq!(record.session_id, actual_session_id);
}
other => panic!("expected Created, got: {other:?}"),
}
let unregistered = bridge.unregistered_session_ids.lock().await.clone();
assert_eq!(
unregistered.len(),
1,
"fresh restore should unregister the abandoned provisional session"
);
assert_ne!(unregistered[0], actual_session_id.to_string());
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_fresh_boot_register_failure_removes_tentative_record()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_register();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge);
let id = make_identity("triage:main");
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("provisional bridge register should fail");
assert!(
err.to_string()
.contains("bridge register_session_runtime_state"),
"unexpected error: {err}"
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized)
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_fresh_boot_create_failure_removes_tentative_record() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_create();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("bridge create should fail");
assert!(
err.to_string().contains("bridge create_session"),
"unexpected error: {err}"
);
assert_eq!(bridge.unregister_calls.load(Ordering::SeqCst), 1);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized)
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_fresh_boot_actual_register_failure_removes_tentative_record()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_create_session_id(actual_session_id.clone())
.await;
bridge.fail_register_after_calls(1);
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("actual bridge register should fail");
assert!(
err.to_string()
.contains("bridge register actual session runtime state"),
"unexpected error: {err}"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
2,
"failed actual register must unregister both provisional and actual session state"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized)
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_fresh_boot_same_session_register_failure_unregisters_tentative_state()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_register_after_calls(1);
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("same-session actual bridge register should fail");
assert!(
err.to_string()
.contains("bridge register actual session runtime state"),
"unexpected error: {err}"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
1,
"failed same-session restore create must unregister the pre-registered session"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized)
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_fresh_boot_unregister_failure_removes_tentative_record()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_create_session_id(actual_session_id.clone())
.await;
bridge.fail_unregister();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("abandoned provisional unregister should fail");
assert!(
err.to_string()
.contains("bridge unregister abandoned session runtime state"),
"unexpected error: {err}"
);
assert_eq!(bridge.unregister_calls.load(Ordering::SeqCst), 2);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized)
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_fresh_boot_upsert_failure_cleans_bridge_state() {
let store = Arc::new(FaultyContinuityStore::new());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_create_session_id(actual_session_id.clone())
.await;
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
store.fail_next_upsert_after_successes(1);
let id = make_identity("triage:main");
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("final restore create upsert should fail");
assert!(
err.to_string()
.contains("continuity upsert after restore create"),
"unexpected error: {err}"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
2,
"failed restore create upsert must unregister provisional and actual sessions"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
assert_eq!(
resolved.get(&id),
Some(&ContinuityResolveState::Uninitialized)
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_resumes_ready() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(0))
.await
.unwrap();
store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(0),
&SessionSnapshot {
data: b"snapshot data".to_vec(),
},
)
.await
.unwrap();
let roster = vec![make_spec("triage:main")];
let result = restore_flow(&runtime, &roster, None, None).await.unwrap();
match result.outcomes.get(&id).unwrap() {
RestoreOutcome::Resumed {
record: r,
snapshot,
..
} => {
assert_eq!(r.identity, id);
assert_eq!(snapshot.data, b"snapshot data");
}
other => panic!("expected Resumed, got: {other:?}"),
}
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_reconciles_resume_returned_session_id() {
struct ResumeFallbackBridge {
actual_session_id: meerkat_core::types::SessionId,
unregistered_session_ids: Arc<tokio::sync::Mutex<Vec<String>>>,
}
#[async_trait]
impl SessionBridge for ResumeFallbackBridge {
async fn create_session(
&self,
_identity: &AgentIdentity,
_runtime_id: &AgentRuntimeId,
_spec: &DurableAgentSpec,
_draft: &AgentBuildDraft,
session_id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::types::SessionId, BridgeError> {
Ok(session_id.clone())
}
async fn resume_session(
&self,
_identity: &AgentIdentity,
_runtime_id: &AgentRuntimeId,
_spec: &DurableAgentSpec,
_draft: &AgentBuildDraft,
_session_id: &meerkat_core::types::SessionId,
_snapshot: &SessionSnapshot,
) -> Result<meerkat_mobkit::identity_first::ResumeSessionOutcome, BridgeError> {
Ok(
meerkat_mobkit::identity_first::ResumeSessionOutcome::Resumed {
session_id: self.actual_session_id.clone(),
},
)
}
async fn deliver(
&self,
_runtime_id: &AgentRuntimeId,
_content: &meerkat_core::ContentInput,
) -> Result<meerkat_core::types::SessionId, BridgeError> {
Ok(self.actual_session_id.clone())
}
async fn checkpoint_session(
&self,
_runtime_id: &AgentRuntimeId,
_session_id: &meerkat_core::types::SessionId,
) -> Result<SessionSnapshot, BridgeError> {
Ok(SessionSnapshot {
data: b"bridge checkpoint".to_vec(),
})
}
async fn retire_member(&self, _runtime_id: &AgentRuntimeId) -> Result<(), BridgeError> {
Ok(())
}
async fn unregister_session_runtime_state(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<(), BridgeError> {
self.unregistered_session_ids
.lock()
.await
.push(session_id.to_string());
Ok(())
}
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let actual_session_id = meerkat_core::types::SessionId::new();
let unregistered_session_ids = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let runtime = IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store.clone(),
lease_provider: lease_prov,
runtime_instance_id: "test-runtime".to_string(),
has_runtime_store: true,
durability_policy: DurabilityPolicy::SyncWriteThrough,
bridge: Some(Arc::new(ResumeFallbackBridge {
actual_session_id: actual_session_id.clone(),
unregistered_session_ids: unregistered_session_ids.clone(),
})),
default_timeout: None,
});
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
let original_session_id = record.session_id.clone();
assert_ne!(record.session_id, actual_session_id);
store
.upsert_continuity_record(&record, FencingToken::new(0))
.await
.unwrap();
store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(0),
&SessionSnapshot {
data: b"old snapshot".to_vec(),
},
)
.await
.unwrap();
let roster = vec![make_spec("triage:main")];
let result = restore_flow(&runtime, &roster, None, None).await.unwrap();
match result.outcomes.get(&id).unwrap() {
RestoreOutcome::Resumed { record, .. } => {
assert_eq!(record.session_id, actual_session_id);
assert_eq!(record.checkpoint_version, CheckpointVersion::new(1));
}
other => panic!("expected Resumed, got: {other:?}"),
}
let status = runtime.status(&id).await.unwrap();
assert_eq!(status.session_id, Some(actual_session_id.clone()));
assert_eq!(status.checkpoint_version, Some(CheckpointVersion::new(1)));
let next_version = runtime
.checkpoint(
&id,
&SessionSnapshot {
data: b"new snapshot".to_vec(),
},
)
.await
.expect("checkpoint should use returned resume session id and current version");
assert_eq!(next_version, CheckpointVersion::new(2));
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
let ContinuityResolveState::Ready { record } = resolved.get(&id).unwrap() else {
panic!("expected ready record");
};
assert_eq!(record.session_id, actual_session_id);
assert_eq!(record.checkpoint_version, CheckpointVersion::new(2));
assert_eq!(
unregistered_session_ids.lock().await.as_slice(),
&[original_session_id.to_string()]
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_resume_abandoned_unregister_failure_rolls_back_continuity()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_force_resume_fallback(actual_session_id.clone())
.await;
bridge.fail_unregister();
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
let original_session_id = record.session_id.clone();
store
.upsert_continuity_record(&record, FencingToken::new(0))
.await
.unwrap();
store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(0),
&SessionSnapshot {
data: b"old snapshot".to_vec(),
},
)
.await
.unwrap();
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("abandoned resume-session unregister failure must fail");
assert!(
err.to_string()
.contains("bridge unregister abandoned session runtime state"),
"unexpected error: {err}"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
2,
"failed abandoned-session cleanup must unregister both old and actual sessions"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
assert!(!runtime.contains(&id).await);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
let ContinuityResolveState::Ready { record: restored } = resolved.get(&id).unwrap() else {
panic!("expected previous ready record after rollback");
};
assert_eq!(restored.session_id, original_session_id);
assert_ne!(restored.session_id, actual_session_id);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_resume_fence_upsert_failure_avoids_bridge_work() {
let store = Arc::new(FaultyContinuityStore::new());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_force_resume_fallback(actual_session_id.clone())
.await;
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
let original_session_id = record.session_id.clone();
store
.upsert_continuity_record(&record, FencingToken::new(0))
.await
.unwrap();
store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(0),
&SessionSnapshot {
data: b"old snapshot".to_vec(),
},
)
.await
.unwrap();
store.fail_upsert();
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("restore resume fence upsert should fail");
assert!(
err.to_string().contains("continuity store"),
"unexpected error: {err}"
);
assert_eq!(
bridge.register_calls.load(Ordering::SeqCst),
0,
"failed pre-resume fence upsert must not register bridge session state"
);
assert_eq!(bridge.resume_calls.load(Ordering::SeqCst), 0);
assert_eq!(bridge.unregister_calls.load(Ordering::SeqCst), 0);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 0);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
let Some(ContinuityResolveState::Ready { record }) = resolved.get(&id) else {
panic!("expected original ready record after failed resume upsert: {resolved:#?}");
};
assert_eq!(record.session_id, original_session_id);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_resume_register_failure_rolls_back_continuity_record()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
let actual_session_id = meerkat_core::types::SessionId::new();
bridge
.set_force_resume_fallback(actual_session_id.clone())
.await;
bridge.fail_register_after_calls(1);
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
let original_session_id = record.session_id.clone();
store
.upsert_continuity_record(&record, FencingToken::new(0))
.await
.unwrap();
store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(0),
&SessionSnapshot {
data: b"old snapshot".to_vec(),
},
)
.await
.unwrap();
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("restore resume final bridge register should fail");
assert!(
err.to_string()
.contains("bridge register_session_runtime_state"),
"unexpected error: {err}"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
2,
"failed resume final register must unregister abandoned and actual session state"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let unregistered = bridge.unregistered_session_ids.lock().await.clone();
assert_eq!(
unregistered,
vec![
original_session_id.to_string(),
actual_session_id.to_string()
]
);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
let Some(ContinuityResolveState::Ready { record }) = resolved.get(&id) else {
panic!(
"expected rollback to original ready record after failed final register: {resolved:#?}"
);
};
assert_eq!(record.session_id, original_session_id);
assert_ne!(record.session_id, actual_session_id);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_resume_same_session_register_failure_unregisters_actual_session()
{
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(CountingBridge::default());
bridge.fail_register_after_calls(1);
let runtime = make_runtime_with_bridge(store.clone(), lease_prov, bridge.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
let original_session_id = record.session_id.clone();
store
.upsert_continuity_record(&record, FencingToken::new(0))
.await
.unwrap();
store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(0),
&SessionSnapshot {
data: b"old snapshot".to_vec(),
},
)
.await
.unwrap();
let err = restore_flow(&runtime, &[make_spec("triage:main")], None, None)
.await
.expect_err("restore resume final bridge register should fail");
assert!(
err.to_string()
.contains("bridge register_session_runtime_state"),
"unexpected error: {err}"
);
assert_eq!(
bridge.unregister_calls.load(Ordering::SeqCst),
1,
"failed same-session final register must unregister the actual session state"
);
assert_eq!(bridge.retire_calls.load(Ordering::SeqCst), 1);
let unregistered = bridge.unregistered_session_ids.lock().await.clone();
assert_eq!(unregistered, vec![original_session_id.to_string()]);
let resolved = store.resolve_many(std::slice::from_ref(&id)).await.unwrap();
let Some(ContinuityResolveState::Ready { record }) = resolved.get(&id) else {
panic!(
"expected rollback to original ready record after failed final register: {resolved:#?}"
);
};
assert_eq!(record.session_id, original_session_id);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_customizer_receives_peers() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let seen_contexts = Arc::new(tokio::sync::Mutex::new(Vec::new()));
struct TrackingCustomizer {
seen: Arc<tokio::sync::Mutex<Vec<AgentBuildContext>>>,
}
#[async_trait]
impl AgentCustomizer for TrackingCustomizer {
async fn customize_build(
&self,
context: &AgentBuildContext,
_spec: &DurableAgentSpec,
_draft: &mut AgentBuildDraft,
) -> Result<(), CustomizerError> {
self.seen.lock().await.push(context.clone());
Ok(())
}
}
let customizer = TrackingCustomizer {
seen: seen_contexts.clone(),
};
let roster = vec![make_spec("triage:main"), make_spec("worker:main")];
restore_flow(&runtime, &roster, None, Some(&customizer))
.await
.unwrap();
let contexts = seen_contexts.lock().await;
assert_eq!(contexts.len(), 2);
for ctx in contexts.iter() {
assert_eq!(ctx.active_peers.len(), 2);
assert!(ctx.active_peers.contains(&make_identity("triage:main")));
assert!(ctx.active_peers.contains(&make_identity("worker:main")));
}
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_releases_leases_on_customizer_failure() {
struct FailingCustomizer;
#[async_trait]
impl AgentCustomizer for FailingCustomizer {
async fn customize_build(
&self,
_context: &AgentBuildContext,
_spec: &DurableAgentSpec,
_draft: &mut AgentBuildDraft,
) -> Result<(), CustomizerError> {
Err(CustomizerError::BuildFailed("boom".to_string()))
}
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store,
lease_provider: lease.clone(),
runtime_instance_id: "restore-customizer-failure-runtime".to_string(),
has_runtime_store: true,
durability_policy: DurabilityPolicy::SyncWriteThrough,
bridge: None,
default_timeout: None,
});
let identity = make_identity("review:singleton");
let result = restore_flow(
&runtime,
&[make_spec("review:singleton")],
None,
Some(&FailingCustomizer),
)
.await;
assert!(result.is_err());
let acquired = lease
.acquire_leases(
std::slice::from_ref(&identity),
"restore-customizer-failure-other-runtime",
)
.await
.unwrap();
assert!(
matches!(
acquired.get(&identity),
Some(LeaseAcquireResult::Acquired(_))
),
"customizer failure must release the acquired lease: {acquired:?}"
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_releases_active_lease_on_customizer_failure() {
struct FailingCustomizer;
#[async_trait]
impl AgentCustomizer for FailingCustomizer {
async fn customize_build(
&self,
_context: &AgentBuildContext,
_spec: &DurableAgentSpec,
_draft: &mut AgentBuildDraft,
) -> Result<(), CustomizerError> {
Err(CustomizerError::BuildFailed("boom".to_string()))
}
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime_with_store(store.clone(), lease.clone());
let identity = make_identity("review:singleton");
let record = make_record("review:singleton", 0, 0);
let initial_grant = lease
.acquire_leases(std::slice::from_ref(&identity), "test-runtime")
.await
.unwrap()
.remove(&identity)
.unwrap();
let LeaseAcquireResult::Acquired(initial_grant) = initial_grant else {
panic!("initial lease should be acquired");
};
store
.upsert_continuity_record(&record, initial_grant.fencing_token)
.await
.unwrap();
runtime
.register(
make_spec("review:singleton"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(initial_grant.clone()),
)
.await;
let result = restore_flow(
&runtime,
&[make_spec("review:singleton")],
None,
Some(&FailingCustomizer),
)
.await;
assert!(result.is_err());
let status = runtime.status(&identity).await.unwrap();
assert_eq!(status.state, IdentityLifecycleState::Active);
let refreshed_lease = status
.lease
.expect("already-active identity keeps a current runtime lease");
assert!(
refreshed_lease.fencing_token > initial_grant.fencing_token,
"restore_flow must refresh the active runtime lease before customizer work"
);
let send_token = runtime.send(&identity, &make_content()).await.unwrap();
assert_eq!(
send_token, refreshed_lease.fencing_token,
"active runtime must not continue sending with the stale pre-restore token"
);
assert_old_token_snapshot_write_rejected(
store.as_ref(),
&identity,
&record,
initial_grant.fencing_token,
)
.await;
let acquired = lease
.acquire_leases(
std::slice::from_ref(&identity),
"restore-active-customizer-failure-other-runtime",
)
.await
.unwrap();
assert!(
matches!(
acquired.get(&identity),
Some(LeaseAcquireResult::AlreadyHeld { .. })
),
"customizer failure must not release the refreshed lease for an already-active identity: {acquired:?}"
);
}
#[tokio::test]
async fn identity_first_runtime_materialize_releases_lease_on_customizer_failure() {
struct FailingCustomizer;
#[async_trait]
impl AgentCustomizer for FailingCustomizer {
async fn customize_build(
&self,
_context: &AgentBuildContext,
_spec: &DurableAgentSpec,
_draft: &mut AgentBuildDraft,
) -> Result<(), CustomizerError> {
Err(CustomizerError::BuildFailed("boom".to_string()))
}
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store,
lease_provider: lease.clone(),
runtime_instance_id: "materialize-customizer-failure-runtime".to_string(),
has_runtime_store: true,
durability_policy: DurabilityPolicy::SyncWriteThrough,
bridge: None,
default_timeout: None,
});
lazy_register_flow(&runtime, &[make_spec("review:singleton")], None)
.await
.unwrap();
runtime
.set_agent_customizer(Some(Arc::new(FailingCustomizer)))
.await;
let identity = make_identity("review:singleton");
let result = runtime.materialize(&identity).await;
assert!(result.is_err());
let acquired = lease
.acquire_leases(
std::slice::from_ref(&identity),
"materialize-customizer-failure-other-runtime",
)
.await
.unwrap();
assert!(
matches!(
acquired.get(&identity),
Some(LeaseAcquireResult::Acquired(_))
),
"materialize customizer failure must release the acquired lease: {acquired:?}"
);
}
#[tokio::test]
async fn identity_first_runtime_restore_flow_broken_fails_loudly() {
struct BrokenStore;
#[async_trait]
impl ContinuityStore for BrokenStore {
async fn resolve_many(
&self,
identities: &[AgentIdentity],
) -> Result<BTreeMap<AgentIdentity, ContinuityResolveState>, ContinuityStoreError> {
let mut map = BTreeMap::new();
for id in identities {
if id.as_str() == "broken:main" {
map.insert(
id.clone(),
ContinuityResolveState::Broken {
failure: ContinuityFailure {
identity: id.clone(),
kind: ContinuityFailureKind::SnapshotCorrupted,
record: None,
detail: "corrupted data".to_string(),
},
},
);
} else {
map.insert(id.clone(), ContinuityResolveState::Uninitialized);
}
}
Ok(map)
}
async fn load_session_snapshot(
&self,
_session_id: &meerkat_core::types::SessionId,
) -> Result<Option<SessionSnapshot>, ContinuityStoreError> {
Ok(None)
}
async fn save_session_snapshot(
&self,
_identity: &AgentIdentity,
_session_id: &meerkat_core::types::SessionId,
_generation: ContinuityGeneration,
_version: CheckpointVersion,
_fencing_token: FencingToken,
_snapshot: &SessionSnapshot,
) -> Result<(), ContinuityStoreError> {
Ok(())
}
async fn upsert_continuity_record(
&self,
_record: &ContinuityRecord,
_fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
Ok(())
}
async fn delete_continuity_record(
&self,
_identity: &AgentIdentity,
_fencing_token: FencingToken,
) -> Result<(), ContinuityStoreError> {
Ok(())
}
}
let store = Arc::new(BrokenStore);
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease_prov.clone());
let roster = vec![make_spec("broken:main")];
let result = restore_flow(&runtime, &roster, None, None).await.unwrap();
let id = make_identity("broken:main");
match result.outcomes.get(&id).unwrap() {
RestoreOutcome::Broken(failure) => {
assert_eq!(failure.kind, ContinuityFailureKind::SnapshotCorrupted);
assert_eq!(failure.detail, "corrupted data");
}
other => panic!("expected Broken, got: {other:?}"),
}
let mut reacquired = lease_prov
.acquire_leases(std::slice::from_ref(&id), "after-broken-restore")
.await
.unwrap();
assert!(
matches!(
reacquired.remove(&id),
Some(LeaseAcquireResult::Acquired(_))
),
"broken restore outcome must release its unactivated lease"
);
}
#[tokio::test]
async fn identity_first_runtime_checkpoint_saves_snapshot() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let snapshot = SessionSnapshot {
data: b"turn-1-data".to_vec(),
};
let version = runtime.checkpoint(&id, &snapshot).await.unwrap();
assert_eq!(version, CheckpointVersion::new(1));
let loaded = store
.load_session_snapshot(&record.session_id)
.await
.unwrap()
.unwrap();
assert_eq!(loaded.data, b"turn-1-data");
}
#[tokio::test]
async fn identity_first_runtime_checkpoint_version_ordering() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let snap1 = SessionSnapshot {
data: b"v1".to_vec(),
};
let v1 = runtime.checkpoint(&id, &snap1).await.unwrap();
assert_eq!(v1, CheckpointVersion::new(1));
let snap2 = SessionSnapshot {
data: b"v2".to_vec(),
};
let v2 = runtime.checkpoint(&id, &snap2).await.unwrap();
assert_eq!(v2, CheckpointVersion::new(2));
let stale = store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(2), FencingToken::new(1),
&SessionSnapshot {
data: b"stale".to_vec(),
},
)
.await;
assert!(matches!(
stale.unwrap_err(),
ContinuityStoreError::StaleCheckpointVersion { .. }
));
let older = store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(1),
&SessionSnapshot {
data: b"older".to_vec(),
},
)
.await;
assert!(matches!(
older.unwrap_err(),
ContinuityStoreError::StaleCheckpointVersion { .. }
));
}
#[tokio::test]
async fn identity_first_runtime_stale_fencing_token_rejected() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(5))
.await
.unwrap();
let result = store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(3), &SessionSnapshot {
data: b"data".to_vec(),
},
)
.await;
assert!(matches!(
result.unwrap_err(),
ContinuityStoreError::StaleFencingToken { .. }
));
let result = store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(1),
FencingToken::new(5),
&SessionSnapshot {
data: b"data".to_vec(),
},
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn identity_first_runtime_checkpoint_policy_sync_write_through() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let snap = SessionSnapshot {
data: b"sync-data".to_vec(),
};
let v = runtime.checkpoint(&id, &snap).await.unwrap();
assert_eq!(v, CheckpointVersion::new(1));
let loaded = store
.load_session_snapshot(&record.session_id)
.await
.unwrap()
.unwrap();
assert_eq!(loaded.data, b"sync-data");
let status = runtime.status(&id).await.unwrap();
let health = status.continuity_health.unwrap();
assert_eq!(health.durability_policy, DurabilityPolicy::SyncWriteThrough);
}
#[tokio::test]
async fn identity_first_runtime_checkpoint_policy_async_reported_in_status() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store.clone(),
lease_provider: lease_prov,
runtime_instance_id: "test".to_string(),
has_runtime_store: false,
durability_policy: DurabilityPolicy::AsyncReplicated,
bridge: None,
default_timeout: None,
});
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record),
Some(make_grant("triage:main", 1)),
)
.await;
let status = runtime.status(&id).await.unwrap();
let health = status.continuity_health.unwrap();
assert_eq!(health.durability_policy, DurabilityPolicy::AsyncReplicated);
}
#[tokio::test]
async fn identity_first_runtime_local_cache_promotion_stale_fails() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(3),
FencingToken::new(1),
&SessionSnapshot {
data: b"v3".to_vec(),
},
)
.await
.unwrap();
let result = store
.save_session_snapshot(
&id,
&record.session_id,
record.generation,
CheckpointVersion::new(2), FencingToken::new(1),
&SessionSnapshot {
data: b"local-v2".to_vec(),
},
)
.await;
assert!(matches!(
result.unwrap_err(),
ContinuityStoreError::StaleCheckpointVersion { .. }
));
}
#[tokio::test]
async fn identity_first_runtime_inv01_send_without_lease_rejected() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(make_record("triage:main", 0, 0)),
None, )
.await;
let result = runtime
.send(&make_identity("triage:main"), &make_content())
.await;
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::NoActiveLease(_)
));
}
#[tokio::test]
async fn identity_first_runtime_inv01_dispatch_without_lease_rejected() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(make_record("triage:main", 0, 0)),
None,
)
.await;
let result = runtime
.dispatch(&make_identity("triage:main"), &make_dispatch_input())
.await;
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::NoActiveLease(_)
));
}
#[tokio::test]
async fn identity_first_runtime_inv01_checkpoint_without_lease_rejected() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(make_record("triage:main", 0, 0)),
None,
)
.await;
let result = runtime
.checkpoint(
&make_identity("triage:main"),
&SessionSnapshot {
data: b"data".to_vec(),
},
)
.await;
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::NoActiveLease(_)
));
}
#[tokio::test]
async fn identity_first_runtime_inv02_lease_loss_blocks_send() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(make_record("triage:main", 0, 0)),
Some(make_grant("triage:main", 1)),
)
.await;
runtime
.mark_lease_lost(&make_identity("triage:main"))
.await
.unwrap();
let result = runtime
.send(&make_identity("triage:main"), &make_content())
.await;
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::NoActiveLease(_)
));
}
#[tokio::test]
async fn identity_first_runtime_inv02_lease_loss_blocks_dispatch() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(make_record("triage:main", 0, 0)),
Some(make_grant("triage:main", 1)),
)
.await;
runtime
.mark_lease_lost(&make_identity("triage:main"))
.await
.unwrap();
let result = runtime
.dispatch(&make_identity("triage:main"), &make_dispatch_input())
.await;
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::NoActiveLease(_)
));
}
#[tokio::test]
async fn identity_first_runtime_inv05_reset_fences_old_owner() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record.clone()),
Some(make_grant("triage:main", 1)),
)
.await;
let new_record = runtime.reset(&id).await.unwrap();
assert_eq!(new_record.generation, ContinuityGeneration::new(1));
}
#[tokio::test]
async fn identity_first_runtime_inv05_delete_fences_old_owner() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease_prov.clone());
let id = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
store
.upsert_continuity_record(&record, FencingToken::new(1))
.await
.unwrap();
runtime
.register(
make_spec("triage:main"),
IdentityLifecycleState::Active,
Some(record),
Some(make_grant("triage:main", 1)),
)
.await;
runtime.delete_identity(&id).await.unwrap();
assert!(!runtime.contains(&id).await);
}
#[tokio::test]
async fn identity_first_runtime_inv06_duplicate_identities_rejected() {
let specs = vec![make_spec("triage:main"), make_spec("triage:main")];
let result = IdentityRuntime::validate_roster_uniqueness(&specs);
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::DuplicateIdentity(_)
));
}
#[tokio::test]
async fn identity_first_runtime_inv06_unique_identities_accepted() {
let specs = vec![make_spec("triage:main"), make_spec("worker:main")];
assert!(IdentityRuntime::validate_roster_uniqueness(&specs).is_ok());
}
#[tokio::test]
async fn identity_first_runtime_inv06_restore_flow_rejects_duplicates() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease_prov);
let roster = vec![make_spec("triage:main"), make_spec("triage:main")];
let result = restore_flow(&runtime, &roster, None, None).await;
assert!(matches!(
result.unwrap_err(),
IdentityRuntimeError::DuplicateIdentity(_)
));
}
#[tokio::test]
async fn identity_first_runtime_topology_compute_edges() {
struct TestTopology;
#[async_trait]
impl TopologyProvider for TestTopology {
async fn compute_edges(
&self,
target_identities: &[AgentIdentity],
_context: &TopologyContext,
) -> Result<Vec<ManagedPeerEdge>, TopologyError> {
let mut edges = Vec::new();
for i in 0..target_identities.len() {
for j in (i + 1)..target_identities.len() {
if let Ok(edge) = ManagedPeerEdge::new(
target_identities[i].clone(),
target_identities[j].clone(),
) {
edges.push(edge);
}
}
}
Ok(edges)
}
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease_prov);
let roster = vec![make_spec("a:main"), make_spec("b:main")];
let topology = TestTopology;
let result = restore_flow(&runtime, &roster, Some(&topology), None)
.await
.unwrap();
assert_eq!(result.managed_edges.len(), 1);
let edge = &result.managed_edges[0];
assert_eq!(edge.a(), &make_identity("a:main"));
assert_eq!(edge.b(), &make_identity("b:main"));
}
#[tokio::test]
async fn identity_first_runtime_topology_materializes_runtime_peer_wires() {
#[derive(Default)]
struct RecordingBridge {
wires: tokio::sync::Mutex<Vec<(String, String)>>,
unwires: tokio::sync::Mutex<Vec<(String, String)>>,
current_wires: tokio::sync::Mutex<Vec<(String, String)>>,
}
#[async_trait]
impl SessionBridge for RecordingBridge {
async fn create_session(
&self,
_identity: &AgentIdentity,
_runtime_id: &AgentRuntimeId,
_spec: &DurableAgentSpec,
_draft: &AgentBuildDraft,
session_id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::types::SessionId, BridgeError> {
Ok(session_id.clone())
}
async fn resume_session(
&self,
_identity: &AgentIdentity,
_runtime_id: &AgentRuntimeId,
_spec: &DurableAgentSpec,
_draft: &AgentBuildDraft,
session_id: &meerkat_core::types::SessionId,
_snapshot: &SessionSnapshot,
) -> Result<meerkat_mobkit::identity_first::ResumeSessionOutcome, BridgeError> {
Ok(
meerkat_mobkit::identity_first::ResumeSessionOutcome::Resumed {
session_id: session_id.clone(),
},
)
}
async fn deliver(
&self,
_runtime_id: &AgentRuntimeId,
_content: &meerkat_core::ContentInput,
) -> Result<meerkat_core::types::SessionId, BridgeError> {
Ok(meerkat_core::types::SessionId::new())
}
async fn checkpoint_session(
&self,
_runtime_id: &AgentRuntimeId,
_session_id: &meerkat_core::types::SessionId,
) -> Result<SessionSnapshot, BridgeError> {
Ok(SessionSnapshot { data: Vec::new() })
}
async fn retire_member(&self, _runtime_id: &AgentRuntimeId) -> Result<(), BridgeError> {
Ok(())
}
async fn wire_peer(
&self,
a: &AgentRuntimeId,
b: &AgentRuntimeId,
) -> Result<(), BridgeError> {
self.wires
.lock()
.await
.push((a.as_str().to_string(), b.as_str().to_string()));
Ok(())
}
async fn wire_peers_batch(
&self,
edges: &[(AgentRuntimeId, AgentRuntimeId)],
) -> Result<(), BridgeError> {
for (a, b) in edges {
self.wire_peer(a, b).await?;
self.current_wires
.lock()
.await
.push((a.as_str().to_string(), b.as_str().to_string()));
}
Ok(())
}
async fn unwire_peer(
&self,
a: &AgentRuntimeId,
b: &AgentRuntimeId,
) -> Result<(), BridgeError> {
self.unwires
.lock()
.await
.push((a.as_str().to_string(), b.as_str().to_string()));
Ok(())
}
async fn current_member_wires(
&self,
) -> Result<Vec<(AgentRuntimeId, AgentRuntimeId)>, BridgeError> {
Ok(self
.current_wires
.lock()
.await
.iter()
.filter_map(|(a, b)| {
Some((
AgentRuntimeId::parse(a).ok()?,
AgentRuntimeId::parse(b).ok()?,
))
})
.collect())
}
}
struct StaticTopology(Vec<(&'static str, &'static str)>);
#[async_trait]
impl TopologyProvider for StaticTopology {
async fn compute_edges(
&self,
_target_identities: &[AgentIdentity],
_context: &TopologyContext,
) -> Result<Vec<ManagedPeerEdge>, TopologyError> {
self.0
.iter()
.map(|(a, b)| {
ManagedPeerEdge::new(make_identity(a), make_identity(b))
.map_err(|e| TopologyError::InvalidEdge(format!("{e}")))
})
.collect()
}
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_provider = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(RecordingBridge::default());
let runtime = IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store,
lease_provider,
runtime_instance_id: "test-runtime".to_string(),
has_runtime_store: false,
durability_policy: DurabilityPolicy::SyncWriteThrough,
bridge: Some(bridge.clone()),
default_timeout: None,
});
let roster = vec![
make_spec("a:main"),
make_spec("b:main"),
make_spec("c:main"),
];
restore_flow(
&runtime,
&roster,
Some(&StaticTopology(vec![("a:main", "b:main")])),
None,
)
.await
.unwrap();
assert_eq!(
bridge.wires.lock().await.as_slice(),
&[("rt:a:main:0".to_string(), "rt:b:main:0".to_string())]
);
restore_flow(
&runtime,
&roster,
Some(&StaticTopology(vec![("b:main", "c:main")])),
None,
)
.await
.unwrap();
assert_eq!(
bridge.wires.lock().await.as_slice(),
&[
("rt:a:main:0".to_string(), "rt:b:main:0".to_string()),
("rt:b:main:0".to_string(), "rt:c:main:0".to_string()),
]
);
assert_eq!(
bridge.unwires.lock().await.as_slice(),
&[("rt:a:main:0".to_string(), "rt:b:main:0".to_string())]
);
}
#[tokio::test]
async fn identity_first_runtime_topology_claims_persisted_wires_without_rebatching() {
#[derive(Default)]
struct RecordingBridge {
wires: tokio::sync::Mutex<Vec<(String, String)>>,
unwires: tokio::sync::Mutex<Vec<(String, String)>>,
current_wires: tokio::sync::Mutex<Vec<(String, String)>>,
}
#[async_trait]
impl SessionBridge for RecordingBridge {
async fn create_session(
&self,
_identity: &AgentIdentity,
_runtime_id: &AgentRuntimeId,
_spec: &DurableAgentSpec,
_draft: &AgentBuildDraft,
session_id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::types::SessionId, BridgeError> {
Ok(session_id.clone())
}
async fn resume_session(
&self,
_identity: &AgentIdentity,
_runtime_id: &AgentRuntimeId,
_spec: &DurableAgentSpec,
_draft: &AgentBuildDraft,
session_id: &meerkat_core::types::SessionId,
_snapshot: &SessionSnapshot,
) -> Result<meerkat_mobkit::identity_first::ResumeSessionOutcome, BridgeError> {
Ok(
meerkat_mobkit::identity_first::ResumeSessionOutcome::Resumed {
session_id: session_id.clone(),
},
)
}
async fn deliver(
&self,
_runtime_id: &AgentRuntimeId,
_content: &meerkat_core::ContentInput,
) -> Result<meerkat_core::types::SessionId, BridgeError> {
Ok(meerkat_core::types::SessionId::new())
}
async fn checkpoint_session(
&self,
_runtime_id: &AgentRuntimeId,
_session_id: &meerkat_core::types::SessionId,
) -> Result<SessionSnapshot, BridgeError> {
Ok(SessionSnapshot { data: Vec::new() })
}
async fn retire_member(&self, _runtime_id: &AgentRuntimeId) -> Result<(), BridgeError> {
Ok(())
}
async fn wire_peer(
&self,
a: &AgentRuntimeId,
b: &AgentRuntimeId,
) -> Result<(), BridgeError> {
self.wires
.lock()
.await
.push((a.as_str().to_string(), b.as_str().to_string()));
Ok(())
}
async fn unwire_peer(
&self,
a: &AgentRuntimeId,
b: &AgentRuntimeId,
) -> Result<(), BridgeError> {
self.unwires
.lock()
.await
.push((a.as_str().to_string(), b.as_str().to_string()));
Ok(())
}
async fn current_member_wires(
&self,
) -> Result<Vec<(AgentRuntimeId, AgentRuntimeId)>, BridgeError> {
Ok(self
.current_wires
.lock()
.await
.iter()
.filter_map(|(a, b)| {
Some((
AgentRuntimeId::parse(a).ok()?,
AgentRuntimeId::parse(b).ok()?,
))
})
.collect())
}
}
struct StaticTopology(Vec<(&'static str, &'static str)>);
#[async_trait]
impl TopologyProvider for StaticTopology {
async fn compute_edges(
&self,
_target_identities: &[AgentIdentity],
_context: &TopologyContext,
) -> Result<Vec<ManagedPeerEdge>, TopologyError> {
self.0
.iter()
.map(|(a, b)| {
ManagedPeerEdge::new(make_identity(a), make_identity(b))
.map_err(|e| TopologyError::InvalidEdge(format!("{e}")))
})
.collect()
}
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_provider = Arc::new(LocalLeaseProvider::new());
let bridge = Arc::new(RecordingBridge::default());
bridge
.current_wires
.lock()
.await
.push(("rt:a:main:0".to_string(), "rt:b:main:0".to_string()));
let runtime = IdentityRuntime::new(IdentityRuntimeConfig {
continuity_store: store,
lease_provider,
runtime_instance_id: "test-runtime".to_string(),
has_runtime_store: false,
durability_policy: DurabilityPolicy::SyncWriteThrough,
bridge: Some(bridge.clone()),
default_timeout: None,
});
let roster = vec![make_spec("a:main"), make_spec("b:main")];
restore_flow(
&runtime,
&roster,
Some(&StaticTopology(vec![("a:main", "b:main")])),
None,
)
.await
.unwrap();
assert!(
bridge.wires.lock().await.is_empty(),
"persisted wires should be claimed as managed without re-sending a batch"
);
restore_flow(&runtime, &roster, Some(&StaticTopology(Vec::new())), None)
.await
.unwrap();
assert_eq!(
bridge.unwires.lock().await.as_slice(),
&[("rt:a:main:0".to_string(), "rt:b:main:0".to_string())]
);
}
#[tokio::test]
async fn identity_first_runtime_topology_static_wiring_not_modified() {
struct EmptyTopology;
#[async_trait]
impl TopologyProvider for EmptyTopology {
async fn compute_edges(
&self,
_target_identities: &[AgentIdentity],
_context: &TopologyContext,
) -> Result<Vec<ManagedPeerEdge>, TopologyError> {
Ok(Vec::new()) }
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease_prov);
let roster = vec![make_spec("a:main"), make_spec("b:main")];
let result = restore_flow(&runtime, &roster, Some(&EmptyTopology), None)
.await
.unwrap();
assert!(result.managed_edges.is_empty());
}
#[tokio::test]
async fn identity_first_runtime_topology_recomputed_from_current_state() {
use std::sync::atomic::{AtomicU32, Ordering};
struct CountingTopology(AtomicU32);
#[async_trait]
impl TopologyProvider for CountingTopology {
async fn compute_edges(
&self,
_target: &[AgentIdentity],
_ctx: &TopologyContext,
) -> Result<Vec<ManagedPeerEdge>, TopologyError> {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(Vec::new())
}
}
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease_prov = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease_prov);
let topology = CountingTopology(AtomicU32::new(0));
let roster = vec![make_spec("a:main")];
restore_flow(&runtime, &roster, Some(&topology), None)
.await
.unwrap();
restore_flow(&runtime, &roster, Some(&topology), None)
.await
.unwrap();
assert_eq!(topology.0.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn identity_first_runtime_roster_inspect_returns_all_identities() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let mut spec1 = make_spec("triage:main");
spec1
.labels
.insert("role".to_string(), "triage".to_string());
let mut spec2 = make_internal_spec("gate:main");
spec2.labels.insert("role".to_string(), "gate".to_string());
runtime
.register(
spec1.clone(),
IdentityLifecycleState::Active,
Some(make_record("triage:main", 0, 2)),
Some(make_grant("triage:main", 1)),
)
.await;
runtime
.register(
spec2.clone(),
IdentityLifecycleState::Active,
Some(make_record("gate:main", 0, 0)),
Some(make_grant("gate:main", 2)),
)
.await;
let roster = runtime.roster_inspect().await;
assert_eq!(roster.len(), 2);
let (spec, status) = roster.get(&make_identity("triage:main")).unwrap();
assert_eq!(spec.labels.get("role"), Some(&"triage".to_string()));
assert_eq!(status.state, IdentityLifecycleState::Active);
assert_eq!(status.addressability, AgentAddressability::Addressable);
assert!(status.lease.is_some());
let (spec, status) = roster.get(&make_identity("gate:main")).unwrap();
assert_eq!(spec.labels.get("role"), Some(&"gate".to_string()));
assert_eq!(status.addressability, AgentAddressability::InternalOnly);
}
#[tokio::test]
async fn identity_first_runtime_reconcile_new_identity_activates() {
let current = BTreeMap::new();
let desired = vec![make_spec("triage:main")];
let actions = compute_reconcile_actions(&desired, ¤t);
assert_eq!(actions.len(), 1);
assert!(
matches!(&actions[0], ReconcileAction::Activate(s) if s.identity == make_identity("triage:main"))
);
}
#[tokio::test]
async fn identity_first_runtime_reconcile_removed_identity_retires() {
let mut current = BTreeMap::new();
current.insert(make_identity("old:main"), make_spec("old:main"));
let desired = vec![]; let actions = compute_reconcile_actions(&desired, ¤t);
assert_eq!(actions.len(), 1);
assert!(matches!(&actions[0], ReconcileAction::Retire(id) if id == &make_identity("old:main")));
}
#[tokio::test]
async fn identity_first_runtime_reconcile_labels_change_hot_reloads() {
let mut old_spec = make_spec("triage:main");
old_spec
.labels
.insert("env".to_string(), "staging".to_string());
let mut current = BTreeMap::new();
current.insert(make_identity("triage:main"), old_spec);
let mut new_spec = make_spec("triage:main");
new_spec
.labels
.insert("env".to_string(), "production".to_string());
let actions = compute_reconcile_actions(&[new_spec.clone()], ¤t);
assert_eq!(actions.len(), 1);
assert!(
matches!(&actions[0], ReconcileAction::HotReload { identity, .. }
if identity == &make_identity("triage:main"))
);
}
#[tokio::test]
async fn identity_first_runtime_reconcile_addressability_change_hot_reloads() {
let old_spec = make_spec("triage:main");
let mut current = BTreeMap::new();
current.insert(make_identity("triage:main"), old_spec);
let new_spec = make_internal_spec("triage:main");
let actions = compute_reconcile_actions(&[new_spec], ¤t);
assert_eq!(actions.len(), 1);
assert!(matches!(&actions[0], ReconcileAction::HotReload { .. }));
}
#[tokio::test]
async fn identity_first_runtime_reconcile_profile_change_respawns() {
let old_spec = make_spec("triage:main");
let mut current = BTreeMap::new();
current.insert(make_identity("triage:main"), old_spec);
let mut new_spec = make_spec("triage:main");
new_spec.profile = meerkat_mob::ProfileName::from("expert");
let actions = compute_reconcile_actions(&[new_spec], ¤t);
assert_eq!(actions.len(), 1);
assert!(
matches!(&actions[0], ReconcileAction::Respawn { identity, .. }
if identity == &make_identity("triage:main"))
);
}
#[tokio::test]
async fn identity_first_runtime_reconcile_context_change_hot_reloads() {
let old_spec = make_spec("triage:main");
let mut current = BTreeMap::new();
current.insert(make_identity("triage:main"), old_spec);
let mut new_spec = make_spec("triage:main");
new_spec.context = Some(serde_json::json!({"key": "value"}));
let actions = compute_reconcile_actions(&[new_spec], ¤t);
assert_eq!(actions.len(), 1);
assert!(matches!(&actions[0], ReconcileAction::HotReload { .. }));
}
#[tokio::test]
async fn identity_first_runtime_reconcile_unchanged_no_action() {
let spec = make_spec("triage:main");
let mut current = BTreeMap::new();
current.insert(make_identity("triage:main"), spec.clone());
let actions = compute_reconcile_actions(&[spec], ¤t);
assert!(actions.is_empty());
}
#[tokio::test]
async fn identity_first_runtime_subscribe_receives_identity_events() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store.clone(), lease.clone());
let spec = make_spec("triage:main");
let identity = make_identity("triage:main");
let record = make_record("triage:main", 0, 0);
let grants = lease
.acquire_leases(std::slice::from_ref(&identity), "test-runtime")
.await
.unwrap();
let grant = match &grants[&identity] {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected acquired lease"),
};
runtime
.register(
spec,
IdentityLifecycleState::Active,
Some(record),
Some(grant),
)
.await;
let mut rx = runtime.subscribe(&identity).await.unwrap();
runtime
.set_state(&identity, IdentityLifecycleState::Retiring)
.await
.unwrap();
let event = rx.recv().await.unwrap();
assert!(matches!(
event,
IdentityEvent::StateChanged {
new_state: IdentityLifecycleState::Retiring,
..
}
));
runtime.mark_lease_lost(&identity).await.unwrap();
let event = rx.recv().await.unwrap();
assert!(matches!(event, IdentityEvent::LeaseLost { .. }));
let grants = lease
.acquire_leases(std::slice::from_ref(&identity), "test-runtime")
.await
.unwrap();
let grant = match &grants[&identity] {
LeaseAcquireResult::Acquired(g) => g.clone(),
_ => panic!("expected acquired lease"),
};
runtime.update_lease(&identity, grant).await.unwrap();
let event = rx.recv().await.unwrap();
assert!(matches!(event, IdentityEvent::LeaseUpdated { .. }));
runtime
.set_state(&identity, IdentityLifecycleState::Active)
.await
.unwrap();
let event = rx.recv().await.unwrap();
assert!(matches!(
event,
IdentityEvent::StateChanged {
new_state: IdentityLifecycleState::Active,
..
}
));
let snapshot = SessionSnapshot {
data: b"test-data".to_vec(),
};
runtime.checkpoint(&identity, &snapshot).await.unwrap();
let event = rx.recv().await.unwrap();
assert!(matches!(
event,
IdentityEvent::CheckpointCompleted { version, .. }
if version == CheckpointVersion::new(1)
));
}
#[tokio::test]
async fn identity_first_runtime_subscribe_unknown_identity_fails() {
let store = Arc::new(LocalContinuityStore::in_memory().unwrap());
let lease = Arc::new(LocalLeaseProvider::new());
let runtime = make_runtime(store, lease);
let result = runtime.subscribe(&make_identity("nonexistent")).await;
assert!(matches!(
result,
Err(IdentityRuntimeError::UnknownIdentity(_))
));
}