#![allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
use std::sync::Arc;
use meerkat_core::completion_feed::CompletionFeed;
use meerkat_core::lifecycle::core_executor::{CoreApplyOutput, CoreExecutorError};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::{CoreExecutor, RunId};
use meerkat_core::ops_lifecycle::{
OperationKind, OperationResult, OperationSpec, OpsLifecycleRegistry,
};
use meerkat_core::runtime_epoch::{EpochCursorState, RuntimeEpochId};
use meerkat_core::types::SessionId;
use meerkat_runtime::RuntimeStore; use meerkat_runtime::{PersistedOpsSnapshot, RuntimeOpsLifecycleRegistry};
fn bg_spec(name: &str) -> OperationSpec {
OperationSpec {
id: meerkat_core::ops_lifecycle::OperationId::new(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: SessionId::new(),
display_name: name.into(),
source_label: "test-persistence".into(),
child_session_id: None,
expect_peer_channel: false,
}
}
fn op_result(id: &meerkat_core::ops_lifecycle::OperationId) -> OperationResult {
OperationResult {
id: id.clone(),
content: "done".into(),
is_error: false,
duration_ms: 42,
tokens_used: 0,
}
}
#[test]
fn persisted_ops_snapshot_serde_round_trip() {
let registry = RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
let epoch_id = RuntimeEpochId::new();
let spec = bg_spec("serde-roundtrip");
let op_id = spec.id.clone();
registry.register_operation(spec).unwrap();
registry.provisioning_succeeded(&op_id).unwrap();
registry
.complete_operation(&op_id, op_result(&op_id))
.unwrap();
let snapshot = registry.capture_persistence_snapshot(epoch_id.clone(), &cursor_state);
let json = serde_json::to_string(&snapshot).expect("serialize");
let restored: PersistedOpsSnapshot = serde_json::from_str(&json).expect("deserialize");
assert_eq!(restored.epoch_id, epoch_id);
assert_eq!(restored.completion_entries.len(), 1);
assert_eq!(restored.completion_entries[0].operation_id, op_id);
assert_eq!(restored.operation_specs.len(), 1);
assert!(restored.operation_specs.contains_key(&op_id));
}
#[test]
fn persisted_ops_snapshot_preserves_cursor_values() {
let registry = RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::from_recovered(5, 3, 2));
let snapshot = registry.capture_persistence_snapshot(RuntimeEpochId::new(), &cursor_state);
assert_eq!(snapshot.cursors.agent_applied_cursor, 5);
assert_eq!(snapshot.cursors.runtime_observed_seq, 3);
assert_eq!(snapshot.cursors.runtime_last_injected_seq, 2);
}
#[test]
fn recovered_registry_contains_terminal_completion_entries() {
let registry = RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
let specs: Vec<_> = (0..3).map(|i| bg_spec(&format!("op-{i}"))).collect();
for spec in &specs {
registry.register_operation(spec.clone()).unwrap();
registry.provisioning_succeeded(&spec.id).unwrap();
}
registry
.complete_operation(&specs[0].id, op_result(&specs[0].id))
.unwrap();
registry
.complete_operation(&specs[1].id, op_result(&specs[1].id))
.unwrap();
let snapshot = registry.capture_persistence_snapshot(RuntimeEpochId::new(), &cursor_state);
let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
let feed = recovered
.completion_feed()
.expect("recovered registry should have feed");
let batch = feed.list_since(0);
assert_eq!(
batch.entries.len(),
2,
"recovered feed should contain 2 terminal entries, got {}",
batch.entries.len()
);
}
#[test]
fn recovered_registry_strips_non_terminal_ops() {
let registry = RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
let terminal_spec = bg_spec("terminal");
let nonterminal_spec = bg_spec("in-progress");
registry.register_operation(terminal_spec.clone()).unwrap();
registry.provisioning_succeeded(&terminal_spec.id).unwrap();
registry
.complete_operation(&terminal_spec.id, op_result(&terminal_spec.id))
.unwrap();
registry
.register_operation(nonterminal_spec.clone())
.unwrap();
registry
.provisioning_succeeded(&nonterminal_spec.id)
.unwrap();
let snapshot = registry.capture_persistence_snapshot(RuntimeEpochId::new(), &cursor_state);
let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
assert!(
recovered.snapshot(&terminal_spec.id).is_some(),
"terminal op should survive recovery"
);
assert!(
recovered.snapshot(&nonterminal_spec.id).is_none(),
"non-terminal op should NOT survive recovery"
);
}
#[test]
fn recovered_feed_list_since_persisted_cursor_returns_unsurfaced() {
let registry = RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
let spec_a = bg_spec("unsurfaced-a");
let spec_b = bg_spec("unsurfaced-b");
registry.register_operation(spec_a.clone()).unwrap();
registry.provisioning_succeeded(&spec_a.id).unwrap();
registry
.complete_operation(&spec_a.id, op_result(&spec_a.id))
.unwrap();
registry.register_operation(spec_b.clone()).unwrap();
registry.provisioning_succeeded(&spec_b.id).unwrap();
registry
.complete_operation(&spec_b.id, op_result(&spec_b.id))
.unwrap();
let snapshot = registry.capture_persistence_snapshot(RuntimeEpochId::new(), &cursor_state);
let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
let feed = recovered
.completion_feed()
.expect("recovered registry should have feed");
let batch = feed.list_since(0);
assert_eq!(
batch.entries.len(),
2,
"list_since(0) should return 2 unsurfaced entries"
);
let batch_at_watermark = feed.list_since(batch.watermark);
assert!(
batch_at_watermark.entries.is_empty(),
"list_since(watermark) should return nothing"
);
}
#[test]
fn recovered_registry_clears_wait_state() {
let registry = RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
let spec = bg_spec("wait-clear");
registry.register_operation(spec.clone()).unwrap();
registry.provisioning_succeeded(&spec.id).unwrap();
registry
.complete_operation(&spec.id, op_result(&spec.id))
.unwrap();
let snapshot = registry.capture_persistence_snapshot(RuntimeEpochId::new(), &cursor_state);
let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
let new_spec = bg_spec("post-recovery-op");
let result = recovered.register_operation(new_spec);
assert!(
result.is_ok(),
"recovered registry should accept new operations"
);
}
#[test]
fn recovered_epoch_id_matches_persisted() {
let registry = RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
let epoch_id = RuntimeEpochId::new();
let snapshot = registry.capture_persistence_snapshot(epoch_id.clone(), &cursor_state);
assert_eq!(snapshot.epoch_id, epoch_id, "persisted epoch_id must match");
}
#[test]
fn recovered_feed_watermark_matches_persisted_sequence() {
let registry = RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
for i in 0..3 {
let spec = bg_spec(&format!("watermark-{i}"));
let id = spec.id.clone();
registry.register_operation(spec).unwrap();
registry.provisioning_succeeded(&id).unwrap();
registry.complete_operation(&id, op_result(&id)).unwrap();
}
let snapshot = registry.capture_persistence_snapshot(RuntimeEpochId::new(), &cursor_state);
let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
let feed = recovered.completion_feed().unwrap();
assert!(
feed.watermark() >= 3,
"recovered feed watermark ({}) should be >= 3",
feed.watermark()
);
}
#[test]
fn snapshot_captures_entries_beyond_authority_retention() {
let registry = meerkat_runtime::RuntimeOpsLifecycleRegistry::with_config(
meerkat_runtime::OpsLifecycleConfig {
max_completed: 2,
max_concurrent: None,
},
);
let cursor_state = Arc::new(EpochCursorState::new());
for i in 0..4 {
let spec = bg_spec(&format!("evict-{i}"));
let id = spec.id.clone();
registry.register_operation(spec).unwrap();
registry.provisioning_succeeded(&id).unwrap();
registry.complete_operation(&id, op_result(&id)).unwrap();
}
let snapshot = registry.capture_persistence_snapshot(RuntimeEpochId::new(), &cursor_state);
assert_eq!(
snapshot.completion_entries.len(),
4,
"snapshot should capture all feed entries ({} found), not just authority-retained",
snapshot.completion_entries.len()
);
assert_eq!(
snapshot.authority_state.operation_count(),
2,
"authority should retain max_completed=2 ops"
);
}
struct NoopExecutor;
#[async_trait::async_trait]
impl CoreExecutor for NoopExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
Ok(CoreApplyOutput {
receipt: meerkat_core::RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
terminal: None,
run_result: None,
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
#[tokio::test]
async fn cold_ensure_session_with_executor_uses_shared_recovery_path() {
let adapter = Arc::new(meerkat_runtime::RuntimeSessionAdapter::ephemeral());
let session_id = SessionId::new();
adapter
.register_session_with_executor(session_id.clone(), Box::new(NoopExecutor))
.await;
let bindings = adapter
.prepare_bindings(session_id.clone())
.await
.expect("session should be registered after cold attach");
assert_eq!(bindings.session_id, session_id);
let spec = bg_spec("cold-attach-verify");
let op_id = spec.id.clone();
bindings.ops_lifecycle.register_operation(spec).unwrap();
let direct = adapter
.ops_lifecycle_registry(&session_id)
.await
.expect("registry should exist");
assert!(
direct.snapshot(&op_id).is_some(),
"cold attach-first registry must be the same instance as prepare_bindings"
);
}
#[tokio::test]
async fn cold_persistent_adapter_recovers_persisted_epoch() {
let store = Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
let session_id = SessionId::new();
let registry = meerkat_runtime::RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
let epoch_id = RuntimeEpochId::new();
let spec = bg_spec("persistent-recovery");
let op_id = spec.id.clone();
registry.register_operation(spec).unwrap();
registry.provisioning_succeeded(&op_id).unwrap();
registry
.complete_operation(&op_id, op_result(&op_id))
.unwrap();
let snapshot = registry.capture_persistence_snapshot(epoch_id.clone(), &cursor_state);
let runtime_id = meerkat_runtime::identifiers::LogicalRuntimeId::new(session_id.to_string());
store
.persist_ops_lifecycle(&runtime_id, &snapshot)
.await
.unwrap();
let adapter = meerkat_runtime::RuntimeSessionAdapter::persistent_without_blobs(Arc::clone(
&store,
)
as Arc<dyn RuntimeStore>);
adapter.register_session(session_id.clone()).await;
let bindings = adapter.prepare_bindings(session_id.clone()).await.unwrap();
assert_eq!(
bindings.epoch_id, epoch_id,
"register_session must recover the persisted epoch_id"
);
let feed = bindings.ops_lifecycle.completion_feed().unwrap();
let batch = feed.list_since(0);
assert_eq!(
batch.entries.len(),
1,
"register_session must recover persisted completion entries"
);
assert_eq!(batch.entries[0].operation_id, op_id);
}
#[tokio::test]
async fn cold_ensure_session_with_executor_recovers_persisted_epoch() {
let store = Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
let session_id = SessionId::new();
let registry = meerkat_runtime::RuntimeOpsLifecycleRegistry::new();
let cursor_state = Arc::new(EpochCursorState::new());
let epoch_id = RuntimeEpochId::new();
let spec = bg_spec("executor-attach-recovery");
let op_id = spec.id.clone();
registry.register_operation(spec).unwrap();
registry.provisioning_succeeded(&op_id).unwrap();
registry
.complete_operation(&op_id, op_result(&op_id))
.unwrap();
let snapshot = registry.capture_persistence_snapshot(epoch_id.clone(), &cursor_state);
let runtime_id = meerkat_runtime::identifiers::LogicalRuntimeId::new(session_id.to_string());
store
.persist_ops_lifecycle(&runtime_id, &snapshot)
.await
.unwrap();
let adapter = Arc::new(
meerkat_runtime::RuntimeSessionAdapter::persistent_without_blobs(
Arc::clone(&store) as Arc<dyn RuntimeStore>
),
);
adapter
.register_session_with_executor(session_id.clone(), Box::new(NoopExecutor))
.await;
let bindings = adapter.prepare_bindings(session_id.clone()).await.unwrap();
assert_eq!(
bindings.epoch_id, epoch_id,
"cold ensure_session_with_executor must recover the persisted epoch_id"
);
let feed = bindings.ops_lifecycle.completion_feed().unwrap();
let batch = feed.list_since(0);
assert!(
!batch.entries.is_empty(),
"cold ensure_session_with_executor must recover persisted completion entries"
);
assert_eq!(batch.entries[0].operation_id, op_id);
}