#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::redundant_clone
)]
use async_trait::async_trait;
use meerkat_comms::{CommsRuntime, PeerRequestResponseAuthority};
use meerkat_core::PeerCorrelationId;
use meerkat_core::Provider;
use meerkat_core::agent::CommsRuntime as CoreCommsRuntime;
use meerkat_core::comms::{
CommsCommand, PeerDirectorySource, PeerName, PeerRoute, SendReceipt, TrustedPeerDescriptor,
};
use meerkat_core::service::{
CreateSessionRequest, SessionBuildOptions, SessionError, SessionInfo, SessionQuery,
SessionService, SessionSummary, SessionUsage, SessionView, StartTurnRequest,
};
use meerkat_core::types::{RunResult, SessionId, Usage};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::RwLock;
use uuid::Uuid;
fn inproc_peer_descriptor(
name: &str,
runtime: &CommsRuntime,
) -> Result<TrustedPeerDescriptor, String> {
TrustedPeerDescriptor::unsigned_with_pubkey(
name,
runtime.public_key().to_peer_id().to_string(),
*runtime.public_key().as_bytes(),
format!("inproc://{name}"),
)
}
fn inproc_peer_route(name: &str, runtime: &CommsRuntime) -> Result<PeerRoute, String> {
Ok(PeerRoute::with_display_name(
runtime.public_key().to_peer_id(),
PeerName::new(name.to_string())?,
))
}
fn install_ephemeral_peer_request_response_authority(runtime: &Arc<CommsRuntime>, session: &str) {
let dsl = Arc::new(meerkat_runtime::HandleDslAuthority::ephemeral());
dsl.apply_signal(
meerkat_runtime::meerkat_machine::dsl::MeerkatMachineSignal::Initialize,
"test::initialize",
)
.expect("Initialize");
dsl.apply_input(
meerkat_runtime::meerkat_machine::dsl::MeerkatMachineInput::RegisterSession {
session_id: meerkat_runtime::meerkat_machine::dsl::SessionId::from(session.to_string()),
},
"test::register_session",
)
.expect("RegisterSession");
runtime.install_peer_request_response_authority(PeerRequestResponseAuthority::new(
Arc::new(meerkat_runtime::RuntimePeerInteractionHandle::new(
Arc::clone(&dsl),
)),
Arc::new(meerkat_runtime::RuntimeInteractionStreamHandle::new(dsl)),
));
}
#[tokio::test]
async fn contract_mob_002_peer_request_response_round_trip() {
let suffix = Uuid::new_v4().simple().to_string();
let sender_name = format!("c002-sender-{suffix}");
let receiver_name = format!("c002-receiver-{suffix}");
let sender = Arc::new(CommsRuntime::inproc_only(&sender_name).unwrap());
let receiver = Arc::new(CommsRuntime::inproc_only(&receiver_name).unwrap());
install_ephemeral_peer_request_response_authority(&sender, &format!("c002-sender-{suffix}"));
install_ephemeral_peer_request_response_authority(
&receiver,
&format!("c002-receiver-{suffix}"),
);
let peer_spec =
inproc_peer_descriptor(&receiver_name, receiver.as_ref()).expect("valid peer spec");
CoreCommsRuntime::add_trusted_peer(sender.as_ref(), peer_spec)
.await
.expect("add sender->receiver trust");
let reverse_spec =
inproc_peer_descriptor(&sender_name, sender.as_ref()).expect("valid reverse spec");
CoreCommsRuntime::add_trusted_peer(receiver.as_ref(), reverse_spec)
.await
.expect("add receiver->sender trust");
let request_cmd = CommsCommand::PeerRequest {
to: inproc_peer_route(&receiver_name, receiver.as_ref()).expect("valid peer route"),
intent: "mob.ping".to_string(),
params: serde_json::json!({"seq": 1}),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
stream: meerkat_core::comms::InputStreamMode::None,
};
let receipt = CoreCommsRuntime::send(sender.as_ref(), request_cmd)
.await
.expect("PeerRequest send should succeed");
let (request_envelope_id, request_interaction_id) = match receipt {
SendReceipt::PeerRequestSent {
envelope_id,
interaction_id,
..
} => (envelope_id, interaction_id),
other => panic!("expected PeerRequestSent, got: {other:?}"),
};
assert_eq!(
request_envelope_id, request_interaction_id.0,
"peer request receipt should expose the canonical request envelope id as its raw interaction id"
);
assert_ne!(
request_envelope_id,
Uuid::nil(),
"peer request envelope id should be populated"
);
let interactions = CoreCommsRuntime::drain_inbox_interactions(receiver.as_ref()).await;
assert_eq!(
interactions.len(),
1,
"receiver should see exactly one interaction"
);
let request_interaction = &interactions[0];
assert_eq!(request_interaction.from, sender_name);
assert!(request_interaction.rendered_text.contains(&format!(
"\"peer_id\":\"{}\"",
sender.peer_id().expect("sender peer id")
)));
assert!(
request_interaction
.rendered_text
.contains(&format!("display_name: {sender_name}"))
);
let request_id = match &request_interaction.content {
meerkat_core::InteractionContent::Request { intent, params, .. } => {
assert_eq!(intent, "mob.ping");
assert_eq!(params["seq"], 1);
assert_eq!(
request_interaction.id.0, request_envelope_id,
"receiver-visible request interaction id should equal the sender envelope id for response correlation"
);
request_interaction.id
}
other => panic!("expected Request interaction, got: {other:?}"),
};
receiver
.peer_interaction_handle()
.expect("receiver should have peer interaction authority")
.request_received(PeerCorrelationId::from_uuid(request_id.0))
.expect("direct comms-drain bypass must seed inbound request state");
let response_cmd = CommsCommand::PeerResponse {
to: inproc_peer_route(&sender_name, sender.as_ref()).expect("valid peer route"),
in_reply_to: request_id,
status: meerkat_core::ResponseStatus::Completed,
result: serde_json::json!({"pong": true}),
blocks: None,
handling_mode: None,
};
let resp_receipt = CoreCommsRuntime::send(receiver.as_ref(), response_cmd)
.await
.expect("PeerResponse send should succeed");
assert!(
matches!(resp_receipt, SendReceipt::PeerResponseSent { .. }),
"expected PeerResponseSent, got: {resp_receipt:?}"
);
let sender_interactions = CoreCommsRuntime::drain_inbox_interactions(sender.as_ref()).await;
assert_eq!(
sender_interactions.len(),
1,
"sender should see exactly one response interaction"
);
match &sender_interactions[0].content {
meerkat_core::InteractionContent::Response {
in_reply_to,
status,
result,
blocks: _,
} => {
assert_eq!(*in_reply_to, request_id);
assert_eq!(*status, meerkat_core::ResponseStatus::Completed);
assert_eq!(result["pong"], true);
}
other => panic!("expected Response interaction, got: {other:?}"),
}
}
#[tokio::test]
async fn contract_mob_002b_terminal_transition_drives_registry_cleanup_via_effect() {
use meerkat_core::comms::InputStreamMode;
use meerkat_core::handles::{PeerInteractionHandle, PeerTerminalDisposition};
use meerkat_runtime::{RuntimeInteractionStreamHandle, RuntimePeerInteractionHandle};
let suffix = Uuid::new_v4().simple().to_string();
let sender_name = format!("c002b-sender-{suffix}");
let receiver_name = format!("c002b-receiver-{suffix}");
let sender = Arc::new(CommsRuntime::inproc_only(&sender_name).unwrap());
let receiver = CommsRuntime::inproc_only(&receiver_name).unwrap();
let dsl = Arc::new(meerkat_runtime::HandleDslAuthority::ephemeral());
dsl.apply_signal(
meerkat_runtime::meerkat_machine::dsl::MeerkatMachineSignal::Initialize,
"test::initialize",
)
.expect("Initialize");
dsl.apply_input(
meerkat_runtime::meerkat_machine::dsl::MeerkatMachineInput::RegisterSession {
session_id: meerkat_runtime::meerkat_machine::dsl::SessionId::from(format!(
"c002b-{suffix}"
)),
},
"test::register_session",
)
.expect("RegisterSession");
let handle: Arc<dyn PeerInteractionHandle> =
Arc::new(RuntimePeerInteractionHandle::new(Arc::clone(&dsl)));
sender.install_peer_request_response_authority(
meerkat_comms::PeerRequestResponseAuthority::new(
Arc::clone(&handle),
Arc::new(RuntimeInteractionStreamHandle::new(Arc::clone(&dsl))),
),
);
CoreCommsRuntime::add_trusted_peer(
sender.as_ref(),
inproc_peer_descriptor(&receiver_name, &receiver).unwrap(),
)
.await
.unwrap();
CoreCommsRuntime::add_trusted_peer(
&receiver,
inproc_peer_descriptor(&sender_name, sender.as_ref()).unwrap(),
)
.await
.unwrap();
let request_cmd = CommsCommand::PeerRequest {
to: inproc_peer_route(&receiver_name, &receiver).unwrap(),
intent: "mob.ping".into(),
params: serde_json::json!({"seq": 1}),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
stream: InputStreamMode::ReserveInteraction,
};
let receipt = CoreCommsRuntime::send(sender.as_ref(), request_cmd)
.await
.unwrap();
let request_interaction_id = match receipt {
SendReceipt::PeerRequestSent {
interaction_id,
stream_reserved,
..
} => {
assert!(stream_reserved, "stream should be reserved");
interaction_id
}
other => panic!("expected PeerRequestSent, got {other:?}"),
};
let corr_id = meerkat_core::PeerCorrelationId::from_uuid(request_interaction_id.0);
assert_eq!(
handle.outbound_state(corr_id),
Some(meerkat_core::OutboundPeerRequestState::Sent),
"outbound state must be Sent after request_sent transition"
);
assert!(
CoreCommsRuntime::interaction_subscriber(sender.as_ref(), &request_interaction_id)
.is_some(),
"subscriber should be live after reserve"
);
handle
.response_terminal(corr_id, PeerTerminalDisposition::Completed)
.expect("terminal transition must succeed");
assert!(
handle.outbound_state(corr_id).is_none(),
"terminal transition removes the outbound entry"
);
assert!(
CoreCommsRuntime::interaction_subscriber(sender.as_ref(), &request_interaction_id)
.is_none(),
"subscriber registry entry must be dropped by the DSL cleanup effect"
);
let _ = CoreCommsRuntime::drain_inbox_interactions(&receiver).await;
}
#[tokio::test]
async fn contract_mob_002c_dsl_reject_refuses_shell_commit() {
use meerkat_core::comms::InputStreamMode;
use meerkat_core::handles::PeerInteractionHandle;
let suffix = Uuid::new_v4().simple().to_string();
let sender_name = format!("c002c-sender-{suffix}");
let receiver_name = format!("c002c-receiver-{suffix}");
let sender = Arc::new(CommsRuntime::inproc_only(&sender_name).unwrap());
let receiver = CommsRuntime::inproc_only(&receiver_name).unwrap();
let dsl = Arc::new(meerkat_runtime::HandleDslAuthority::ephemeral());
dsl.apply_signal(
meerkat_runtime::meerkat_machine::dsl::MeerkatMachineSignal::Initialize,
"test::initialize",
)
.expect("Initialize");
dsl.apply_input(
meerkat_runtime::meerkat_machine::dsl::MeerkatMachineInput::RegisterSession {
session_id: meerkat_runtime::meerkat_machine::dsl::SessionId::from(format!(
"c002c-{suffix}"
)),
},
"test::register_session",
)
.expect("RegisterSession");
let handle: Arc<dyn PeerInteractionHandle> = Arc::new(
meerkat_runtime::RuntimePeerInteractionHandle::new(Arc::clone(&dsl)),
);
sender.install_peer_request_response_authority(PeerRequestResponseAuthority::new(
Arc::clone(&handle),
Arc::new(meerkat_runtime::RuntimeInteractionStreamHandle::new(
Arc::clone(&dsl),
)),
));
CoreCommsRuntime::add_trusted_peer(
sender.as_ref(),
inproc_peer_descriptor(&receiver_name, &receiver).unwrap(),
)
.await
.unwrap();
CoreCommsRuntime::add_trusted_peer(
&receiver,
inproc_peer_descriptor(&sender_name, sender.as_ref()).unwrap(),
)
.await
.unwrap();
let corr_id = meerkat_core::PeerCorrelationId::new();
handle
.request_sent(corr_id, "peer-a".into())
.expect("initial request_sent");
let err = handle
.request_sent(corr_id, "peer-a".into())
.expect_err("duplicate request_sent must reject");
assert_eq!(err.context, "PeerInteractionHandle::request_sent");
assert_eq!(
handle.outbound_state(corr_id),
Some(meerkat_core::OutboundPeerRequestState::Sent),
"DSL reject must leave state unchanged"
);
let ok_receipt = CoreCommsRuntime::send(
sender.as_ref(),
CommsCommand::PeerRequest {
to: inproc_peer_route(&receiver_name, &receiver).unwrap(),
intent: "mob.ping".into(),
params: serde_json::json!({"seq": 2}),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
stream: InputStreamMode::None,
},
)
.await
.expect("fresh send with unique corr_id should succeed");
assert!(matches!(ok_receipt, SendReceipt::PeerRequestSent { .. }));
let _ = CoreCommsRuntime::drain_inbox_interactions(&receiver).await;
}
#[tokio::test]
async fn contract_mob_002d_inbound_terminal_reply_closes_lifecycle_via_send() {
use meerkat_core::handles::PeerInteractionHandle;
let suffix = Uuid::new_v4().simple().to_string();
let responder_name = format!("c002d-responder-{suffix}");
let originator_name = format!("c002d-originator-{suffix}");
let responder = Arc::new(CommsRuntime::inproc_only(&responder_name).unwrap());
let originator = CommsRuntime::inproc_only(&originator_name).unwrap();
let dsl = Arc::new(meerkat_runtime::HandleDslAuthority::ephemeral());
dsl.apply_signal(
meerkat_runtime::meerkat_machine::dsl::MeerkatMachineSignal::Initialize,
"test::initialize",
)
.expect("Initialize");
dsl.apply_input(
meerkat_runtime::meerkat_machine::dsl::MeerkatMachineInput::RegisterSession {
session_id: meerkat_runtime::meerkat_machine::dsl::SessionId::from(format!(
"c002d-{suffix}"
)),
},
"test::register_session",
)
.expect("RegisterSession");
let handle: Arc<dyn PeerInteractionHandle> = Arc::new(
meerkat_runtime::RuntimePeerInteractionHandle::new(Arc::clone(&dsl)),
);
responder.install_peer_request_response_authority(PeerRequestResponseAuthority::new(
Arc::clone(&handle),
Arc::new(meerkat_runtime::RuntimeInteractionStreamHandle::new(
Arc::clone(&dsl),
)),
));
CoreCommsRuntime::add_trusted_peer(
responder.as_ref(),
inproc_peer_descriptor(&originator_name, &originator).unwrap(),
)
.await
.unwrap();
CoreCommsRuntime::add_trusted_peer(
&originator,
inproc_peer_descriptor(&responder_name, responder.as_ref()).unwrap(),
)
.await
.unwrap();
let request_corr_id = meerkat_core::PeerCorrelationId::new();
handle
.request_received(request_corr_id)
.expect("inbound request_received must advance DSL");
assert_eq!(
handle.inbound_state(request_corr_id),
Some(meerkat_core::InboundPeerRequestState::Received)
);
let in_reply_to = meerkat_core::InteractionId(request_corr_id.as_uuid());
CoreCommsRuntime::send(
responder.as_ref(),
CommsCommand::PeerResponse {
to: inproc_peer_route(&originator_name, &originator).unwrap(),
in_reply_to,
status: meerkat_core::ResponseStatus::Accepted,
result: serde_json::json!({"progress": true}),
blocks: None,
handling_mode: None,
},
)
.await
.expect("Accepted response must send");
assert_eq!(
handle.inbound_state(request_corr_id),
Some(meerkat_core::InboundPeerRequestState::Received),
"Accepted (progress) reply must not close the inbound entry"
);
CoreCommsRuntime::send(
responder.as_ref(),
CommsCommand::PeerResponse {
to: inproc_peer_route(&originator_name, &originator).unwrap(),
in_reply_to,
status: meerkat_core::ResponseStatus::Completed,
result: serde_json::json!({"done": true}),
blocks: None,
handling_mode: None,
},
)
.await
.expect("Completed response must send");
assert!(
handle.inbound_state(request_corr_id).is_none(),
"terminal reply must close the inbound entry via response_replied"
);
}
#[tokio::test]
async fn contract_mob_003_inproc_namespace_isolation() {
let suffix = Uuid::new_v4().simple().to_string();
let alpha_a_name = format!("c003-alpha-a-{suffix}");
let alpha_b_name = format!("c003-alpha-b-{suffix}");
let alpha_a =
CommsRuntime::inproc_only_scoped(&alpha_a_name, Some("mob.alpha".to_string())).unwrap();
let alpha_b =
CommsRuntime::inproc_only_scoped(&alpha_b_name, Some("mob.alpha".to_string())).unwrap();
let beta_name = format!("c003-beta-{suffix}");
let _beta = CommsRuntime::inproc_only_scoped(&beta_name, Some("mob.beta".to_string())).unwrap();
let alpha_a_peers = CoreCommsRuntime::peers(&alpha_a).await;
let alpha_a_peer_names: Vec<String> = alpha_a_peers
.iter()
.map(|e| e.name.as_string().clone())
.collect();
assert!(
!alpha_a_peer_names.contains(&beta_name),
"beta namespace agent must not appear in alpha namespace peers"
);
let spec = inproc_peer_descriptor(&alpha_b_name, &alpha_b).expect("valid spec");
CoreCommsRuntime::add_trusted_peer(&alpha_a, spec)
.await
.expect("add trusted peer within namespace");
let alpha_a_peers_after = CoreCommsRuntime::peers(&alpha_a).await;
let peer_names_after: Vec<String> = alpha_a_peers_after
.iter()
.map(|e| e.name.as_string().clone())
.collect();
assert!(
peer_names_after.contains(&alpha_b_name),
"alpha_b should be visible after trusting within same namespace"
);
assert!(
!peer_names_after.contains(&beta_name),
"beta should still not be visible after trusting alpha_b"
);
}
#[tokio::test]
async fn contract_mob_004_add_trusted_peer_is_idempotent() {
let suffix = Uuid::new_v4().simple().to_string();
let runtime_name = format!("c004-runtime-{suffix}");
let peer_name = format!("c004-peer-{suffix}");
let runtime = CommsRuntime::inproc_only(&runtime_name).unwrap();
let peer = CommsRuntime::inproc_only(&peer_name).unwrap();
let make_spec = || inproc_peer_descriptor(&peer_name, &peer).expect("valid spec");
CoreCommsRuntime::add_trusted_peer(&runtime, make_spec())
.await
.expect("first add should succeed");
CoreCommsRuntime::add_trusted_peer(&runtime, make_spec())
.await
.expect("second (idempotent) add should succeed");
let peers = CoreCommsRuntime::peers(&runtime).await;
let matching: Vec<_> = peers
.iter()
.filter(|e| e.name.as_str() == peer_name)
.collect();
assert_eq!(
matching.len(),
1,
"idempotent add should not create duplicates; found {} entries",
matching.len()
);
}
#[tokio::test]
async fn contract_mob_005_remove_trusted_peer_revokes_send() {
let suffix = Uuid::new_v4().simple().to_string();
let sender_name = format!("c005-sender-{suffix}");
let receiver_name = format!("c005-receiver-{suffix}");
let sender = CommsRuntime::inproc_only(&sender_name).unwrap();
let receiver = CommsRuntime::inproc_only(&receiver_name).unwrap();
let spec = inproc_peer_descriptor(&receiver_name, &receiver).expect("valid spec");
CoreCommsRuntime::add_trusted_peer(&sender, spec)
.await
.expect("add trusted peer");
let reverse_spec = inproc_peer_descriptor(&sender_name, &sender).expect("valid spec");
CoreCommsRuntime::add_trusted_peer(&receiver, reverse_spec)
.await
.expect("add reverse trusted peer");
let cmd = CommsCommand::PeerMessage {
to: inproc_peer_route(&receiver_name, &receiver).expect("valid peer route"),
body: "before removal".to_string(),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
};
let receipt = CoreCommsRuntime::send(&sender, cmd).await;
assert!(
matches!(receipt, Ok(SendReceipt::PeerMessageSent { .. })),
"send should succeed before removal"
);
let _ = CoreCommsRuntime::drain_inbox_interactions(&receiver).await;
let peer_id = receiver.public_key().to_peer_id().to_string();
let removed = CoreCommsRuntime::remove_trusted_peer(&sender, &peer_id)
.await
.expect("remove should succeed");
assert!(removed, "should return true for existing peer");
let peers_after = CoreCommsRuntime::peers(&sender).await;
assert!(
!peers_after.iter().any(|e| e.name.as_str() == receiver_name),
"removed peer should not appear in peers()"
);
let cmd_after = CommsCommand::PeerMessage {
to: inproc_peer_route(&receiver_name, &receiver).expect("valid peer route"),
body: "after removal".to_string(),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
};
let result = CoreCommsRuntime::send(&sender, cmd_after).await;
assert!(
matches!(result, Err(meerkat_core::SendError::PeerNotFound(_))),
"send should fail with PeerNotFound after removal, got: {result:?}"
);
}
#[tokio::test]
async fn contract_mobx_001_trust_accepts_non_inproc_addresses_and_preserves_peer_id() {
let suffix = Uuid::new_v4().simple().to_string();
let runtime_name = format!("c0x1-runtime-{suffix}");
let peer_name = format!("c0x1-peer-{suffix}");
let runtime = CommsRuntime::inproc_only(&runtime_name).unwrap();
let peer = CommsRuntime::inproc_only(&peer_name).unwrap();
let peer_id = peer.public_key().to_peer_id().to_string();
let backend_address = format!("tcp://backend.example.invalid:{}", 10_000 + suffix.len());
let spec = TrustedPeerDescriptor::unsigned_with_pubkey(
&peer_name,
peer_id.clone(),
*peer.public_key().as_bytes(),
backend_address.clone(),
)
.expect("valid trusted peer spec");
CoreCommsRuntime::add_trusted_peer(&runtime, spec)
.await
.expect("add trusted peer should accept backend-provided address");
let peers_after_add = CoreCommsRuntime::peers(&runtime).await;
let entry = peers_after_add
.iter()
.find(|entry| entry.name.as_str() == peer_name)
.expect("trusted peer should be listed");
assert_eq!(
entry.peer_id.to_string(),
peer_id,
"peer_id semantics must remain stable for remove operations"
);
assert_eq!(
entry.address.to_string(),
backend_address,
"runtime should preserve backend-provided address string"
);
let removed = CoreCommsRuntime::remove_trusted_peer(&runtime, &peer_id)
.await
.expect("remove_trusted_peer should succeed by peer_id");
assert!(
removed,
"remove_trusted_peer should return true for existing peer"
);
let peers_after_remove = CoreCommsRuntime::peers(&runtime).await;
assert!(
peers_after_remove
.iter()
.all(|entry| entry.name.as_str() != peer_name),
"peer should no longer appear after remove_trusted_peer"
);
}
fn test_config(
name: &str,
tmp: &tempfile::TempDir,
namespace: Option<String>,
) -> meerkat_comms::ResolvedCommsConfig {
meerkat_comms::ResolvedCommsConfig {
enabled: true,
name: name.to_string(),
inproc_namespace: namespace,
listen_uds: None,
listen_tcp: None,
event_listen_tcp: None,
#[cfg(unix)]
event_listen_uds: None,
identity_dir: tmp.path().join("identity"),
trusted_peers_path: tmp.path().join("trusted_peers.json"),
comms_config: meerkat_comms::CommsConfig::default(),
auth: meerkat_core::CommsAuthMode::Open,
require_peer_auth: false,
allow_external_unauthenticated: false,
}
}
#[tokio::test]
async fn contract_mob_006_peer_meta_labels_discoverable_via_peers() {
let suffix = Uuid::new_v4().simple().to_string();
let ns = format!("c006-{suffix}");
let runtime_name = format!("c006-runtime-{suffix}");
let peer_name = format!("c006-peer-{suffix}");
let peer_tmp = tempfile::tempdir().unwrap();
let peer_config = test_config(&peer_name, &peer_tmp, Some(ns.clone()));
let _peer = CommsRuntime::new(peer_config).await.unwrap();
let meta = meerkat_core::PeerMeta::default()
.with_description("test worker")
.with_label("mob_id", "test-mob")
.with_label("role", "coder");
let peer_pubkey = _peer.public_key();
meerkat_comms::InprocRegistry::global().unregister(&peer_pubkey);
let (_, inbox_sender) = meerkat_comms::Inbox::new();
meerkat_comms::InprocRegistry::global().register_with_meta_in_namespace(
&ns,
&peer_name,
peer_pubkey,
inbox_sender,
meta.clone(),
);
let runtime_tmp = tempfile::tempdir().unwrap();
let runtime_config = test_config(&runtime_name, &runtime_tmp, Some(ns));
let runtime = CommsRuntime::new(runtime_config).await.unwrap();
let peers = CoreCommsRuntime::peers(&runtime).await;
let matching: Vec<_> = peers
.iter()
.filter(|e| e.name.as_str() == peer_name)
.collect();
assert_eq!(matching.len(), 1, "peer should appear exactly once");
let entry = matching[0];
assert_eq!(entry.meta, meta);
assert_eq!(
entry.meta.description.as_deref(),
Some("test worker"),
"description should be preserved"
);
assert_eq!(
entry.meta.labels.get("mob_id").map(String::as_str),
Some("test-mob"),
"mob_id label should be preserved"
);
assert_eq!(
entry.meta.labels.get("role").map(String::as_str),
Some("coder"),
"role label should be preserved"
);
assert_eq!(
entry.source,
PeerDirectorySource::Inproc,
"source should be Inproc since peer is only visible via registry"
);
meerkat_comms::InprocRegistry::global().unregister(&peer_pubkey);
meerkat_comms::InprocRegistry::global().unregister(&runtime.public_key());
}
struct ContractSessionService {
sessions: RwLock<HashMap<SessionId, Arc<CommsRuntime>>>,
}
impl ContractSessionService {
fn new() -> Self {
Self {
sessions: RwLock::new(HashMap::new()),
}
}
async fn comms(&self, id: &SessionId) -> Option<Arc<CommsRuntime>> {
self.sessions.read().await.get(id).cloned()
}
}
fn run_result(session_id: SessionId, text: &str) -> RunResult {
RunResult {
text: text.to_string(),
session_id,
usage: Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
}
}
#[async_trait]
impl SessionService for ContractSessionService {
async fn create_session(&self, req: CreateSessionRequest) -> Result<RunResult, SessionError> {
let session_id = req
.build
.as_ref()
.and_then(|build| build.resume_session.as_ref())
.map(|session| session.id().clone())
.unwrap_or_default();
let comms_name = req
.build
.as_ref()
.and_then(|b| b.comms_name.clone())
.unwrap_or_else(|| format!("contract-session-{}", Uuid::new_v4().simple()));
let comms = Arc::new(CommsRuntime::inproc_only(&comms_name).map_err(|e| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"failed to create comms runtime: {e}"
)))
})?);
install_ephemeral_peer_request_response_authority(
&comms,
&format!("contract-{session_id}"),
);
self.sessions
.write()
.await
.insert(session_id.clone(), comms);
Ok(run_result(session_id, "session created"))
}
async fn start_turn(
&self,
id: &SessionId,
_req: StartTurnRequest,
) -> Result<RunResult, SessionError> {
let sessions = self.sessions.read().await;
if !sessions.contains_key(id) {
return Err(SessionError::NotFound { id: id.clone() });
}
Ok(run_result(id.clone(), "turn completed"))
}
async fn interrupt(&self, id: &SessionId) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
if !sessions.contains_key(id) {
return Err(SessionError::NotFound { id: id.clone() });
}
Ok(())
}
async fn read(&self, id: &SessionId) -> Result<SessionView, SessionError> {
let sessions = self.sessions.read().await;
if !sessions.contains_key(id) {
return Err(SessionError::NotFound { id: id.clone() });
}
Ok(SessionView {
state: SessionInfo {
session_id: id.clone(),
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
is_active: false,
model: "claude-sonnet-4-5".to_string(),
provider: Provider::Anthropic,
last_assistant_text: None,
labels: Default::default(),
},
billing: SessionUsage {
total_tokens: 0,
usage: Usage::default(),
},
})
}
async fn list(&self, _query: SessionQuery) -> Result<Vec<SessionSummary>, SessionError> {
let sessions = self.sessions.read().await;
Ok(sessions
.keys()
.map(|id| SessionSummary {
session_id: id.clone(),
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
total_tokens: 0,
is_active: false,
labels: Default::default(),
})
.collect())
}
async fn has_live_session(&self, id: &SessionId) -> Result<bool, SessionError> {
Ok(self.sessions.read().await.contains_key(id))
}
async fn archive(&self, id: &SessionId) -> Result<(), SessionError> {
let removed = self.sessions.write().await.remove(id).is_some();
if removed {
Ok(())
} else {
Err(SessionError::NotFound { id: id.clone() })
}
}
}
fn keep_alive_req(comms_name: &str) -> CreateSessionRequest {
CreateSessionRequest {
model: "contract-mock".to_string(),
prompt: "hello".to_string().into(),
render_metadata: None,
system_prompt: None,
max_tokens: None,
event_tx: None,
skill_references: None,
initial_turn: meerkat_core::service::InitialTurnPolicy::RunImmediately,
deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions {
comms_name: Some(comms_name.to_string()),
..Default::default()
}),
labels: None,
}
}
#[tokio::test]
async fn contract_mob_001_keep_alive_session_stays_alive() {
let suffix = Uuid::new_v4().simple().to_string();
let service = Arc::new(ContractSessionService::new());
let a_name = format!("c001-a-{suffix}");
let b_name = format!("c001-b-{suffix}");
let sid_a = service
.create_session(keep_alive_req(&a_name))
.await
.expect("create host-mode session A")
.session_id;
let sid_b = service
.create_session(keep_alive_req(&b_name))
.await
.expect("create host-mode session B")
.session_id;
let comms_a = service.comms(&sid_a).await.expect("comms for A");
let comms_b = service.comms(&sid_b).await.expect("comms for B");
let a_to_b = inproc_peer_descriptor(&b_name, &comms_b).expect("valid trusted peer spec a->b");
CoreCommsRuntime::add_trusted_peer(&*comms_a, a_to_b)
.await
.expect("trust a->b");
let b_to_a = inproc_peer_descriptor(&a_name, &comms_a).expect("valid trusted peer spec b->a");
CoreCommsRuntime::add_trusted_peer(&*comms_b, b_to_a)
.await
.expect("trust b->a");
let before_cmd = CommsCommand::PeerRequest {
to: inproc_peer_route(&b_name, &comms_b).expect("valid peer route"),
intent: "mob.contract.before".to_string(),
params: serde_json::json!({"step": "before_turn"}),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
stream: meerkat_core::comms::InputStreamMode::None,
};
let before_receipt = CoreCommsRuntime::send(&*comms_a, before_cmd)
.await
.expect("send before turn");
assert!(
matches!(before_receipt, SendReceipt::PeerRequestSent { .. }),
"expected peer request send before turn, got: {before_receipt:?}"
);
let before_interactions = CoreCommsRuntime::drain_inbox_interactions(&*comms_b).await;
assert_eq!(
before_interactions.len(),
1,
"receiver should get request before additional turn"
);
service
.start_turn(
&sid_a,
StartTurnRequest {
prompt: "follow up".to_string().into(),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::default(),
},
)
.await
.expect("start second turn on keep-alive session");
let after_cmd = CommsCommand::PeerRequest {
to: inproc_peer_route(&a_name, &comms_a).expect("valid peer route"),
intent: "mob.contract.after".to_string(),
params: serde_json::json!({"step": "after_turn"}),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
stream: meerkat_core::comms::InputStreamMode::None,
};
let after_receipt = CoreCommsRuntime::send(&*comms_b, after_cmd)
.await
.expect("send after turn");
assert!(
matches!(after_receipt, SendReceipt::PeerRequestSent { .. }),
"expected peer request send after turn, got: {after_receipt:?}"
);
let after_interactions = CoreCommsRuntime::drain_inbox_interactions(&*comms_a).await;
assert_eq!(
after_interactions.len(),
1,
"sender should still receive peer request after additional turn"
);
}
#[tokio::test]
async fn contract_mob_007_session_archive_removes_from_active_list() {
let suffix = Uuid::new_v4().simple().to_string();
let service = Arc::new(ContractSessionService::new());
let sid = service
.create_session(keep_alive_req(&format!("c007-{suffix}")))
.await
.expect("create session")
.session_id;
let before_list = service
.list(SessionQuery::default())
.await
.expect("list before archive");
assert!(
before_list.iter().any(|s| s.session_id == sid),
"session should be listed before archive"
);
service.archive(&sid).await.expect("archive session");
let after_list = service
.list(SessionQuery::default())
.await
.expect("list after archive");
assert!(
!after_list.iter().any(|s| s.session_id == sid),
"archived session must not appear in list()"
);
let start_result = service
.start_turn(
&sid,
StartTurnRequest {
prompt: "should fail".to_string().into(),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::default(),
},
)
.await;
assert!(
matches!(start_result, Err(SessionError::NotFound { .. })),
"start_turn after archive should return NotFound, got: {start_result:?}"
);
}