#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::sync::Arc;
use meerkat_core::handles::{PeerInteractionHandle, PeerTerminalDisposition};
use meerkat_core::{InboundPeerRequestState, OutboundPeerRequestState, PeerCorrelationId};
use meerkat_runtime::handles::{HandleDslAuthority, RuntimePeerInteractionHandle};
use meerkat_runtime::meerkat_machine::dsl as mm_dsl;
fn new_handle() -> RuntimePeerInteractionHandle {
let dsl = Arc::new(HandleDslAuthority::ephemeral());
dsl.apply_signal(mm_dsl::MeerkatMachineSignal::Initialize, "test::initialize")
.expect("Initialize signal");
dsl.apply_input(
mm_dsl::MeerkatMachineInput::RegisterSession {
session_id: mm_dsl::SessionId::from("peer-interaction-test".to_string()),
},
"test::register_session",
)
.expect("RegisterSession input");
RuntimePeerInteractionHandle::new(dsl)
}
#[test]
fn request_sent_advances_pending_map_to_sent() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
assert!(handle.outbound_state(corr_id).is_none());
handle.request_sent(corr_id, "peer-a".into()).unwrap();
assert_eq!(
handle.outbound_state(corr_id),
Some(OutboundPeerRequestState::Sent)
);
}
#[test]
fn request_sent_rejects_duplicate() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
handle.request_sent(corr_id, "peer-a".into()).unwrap();
let err = handle
.request_sent(corr_id, "peer-a".into())
.expect_err("duplicate send must reject");
assert_eq!(err.context, "PeerInteractionHandle::request_sent");
}
#[test]
fn progress_advances_state_to_accepted() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
handle.request_sent(corr_id, "peer-a".into()).unwrap();
handle.response_progress(corr_id).unwrap();
assert_eq!(
handle.outbound_state(corr_id),
Some(OutboundPeerRequestState::AcceptedProgress)
);
}
#[test]
fn progress_rejects_unknown_corr_id() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
let err = handle
.response_progress(corr_id)
.expect_err("progress on unknown corr_id must reject");
assert_eq!(err.context, "PeerInteractionHandle::response_progress");
}
#[test]
fn terminal_completed_removes_entry_and_emits_cleanup() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
handle.request_sent(corr_id, "peer-a".into()).unwrap();
handle
.response_terminal(corr_id, PeerTerminalDisposition::Completed)
.unwrap();
assert!(handle.outbound_state(corr_id).is_none());
let err = handle
.response_terminal(corr_id, PeerTerminalDisposition::Completed)
.expect_err("second terminal must reject");
assert_eq!(err.context, "PeerInteractionHandle::response_terminal");
}
#[test]
fn terminal_failed_removes_entry() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
handle.request_sent(corr_id, "peer-a".into()).unwrap();
handle
.response_terminal(corr_id, PeerTerminalDisposition::Failed)
.unwrap();
assert!(handle.outbound_state(corr_id).is_none());
}
#[test]
fn timeout_removes_entry_and_emits_cleanup() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
handle.request_sent(corr_id, "peer-a".into()).unwrap();
handle.request_timed_out(corr_id).unwrap();
assert!(handle.outbound_state(corr_id).is_none());
let err = handle
.request_timed_out(corr_id)
.expect_err("second timeout must reject");
assert_eq!(err.context, "PeerInteractionHandle::request_timed_out");
}
#[test]
fn inbound_received_then_replied_advances_and_removes() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
assert!(handle.inbound_state(corr_id).is_none());
handle.request_received(corr_id).unwrap();
assert_eq!(
handle.inbound_state(corr_id),
Some(InboundPeerRequestState::Received)
);
handle.response_replied(corr_id).unwrap();
assert!(handle.inbound_state(corr_id).is_none());
}
#[test]
fn inbound_received_rejects_duplicate() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
handle.request_received(corr_id).unwrap();
let err = handle
.request_received(corr_id)
.expect_err("duplicate inbound receipt must reject");
assert_eq!(err.context, "PeerInteractionHandle::request_received");
}
#[test]
fn inbound_replied_rejects_unknown_corr_id() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
let err = handle
.response_replied(corr_id)
.expect_err("reply on unknown corr_id must reject");
assert_eq!(err.context, "PeerInteractionHandle::response_replied");
}
#[test]
fn cleanup_observer_fires_on_terminal_transitions() {
use meerkat_core::handles::PeerInteractionCleanupObserver;
use std::sync::Mutex;
struct Recorder(Mutex<Vec<PeerCorrelationId>>);
impl PeerInteractionCleanupObserver for Recorder {
fn on_peer_interaction_cleanup(&self, corr_id: PeerCorrelationId) {
self.0.lock().unwrap().push(corr_id);
}
}
let handle = new_handle();
let rec = Arc::new(Recorder(Mutex::new(Vec::new())));
handle.install_cleanup_observer(Arc::clone(&rec) as Arc<dyn PeerInteractionCleanupObserver>);
let a = PeerCorrelationId::new();
handle.request_sent(a, "peer-a".into()).unwrap();
handle.response_progress(a).unwrap();
assert!(
rec.0.lock().unwrap().is_empty(),
"non-terminal transitions must not fire cleanup"
);
handle
.response_terminal(a, PeerTerminalDisposition::Completed)
.unwrap();
assert_eq!(rec.0.lock().unwrap().clone(), vec![a]);
let b = PeerCorrelationId::new();
handle.request_sent(b, "peer-b".into()).unwrap();
handle
.response_terminal(b, PeerTerminalDisposition::Failed)
.unwrap();
assert_eq!(rec.0.lock().unwrap().clone(), vec![a, b]);
let c = PeerCorrelationId::new();
handle.request_sent(c, "peer-c".into()).unwrap();
handle.request_timed_out(c).unwrap();
assert_eq!(rec.0.lock().unwrap().clone(), vec![a, b, c]);
let d = PeerCorrelationId::new();
handle.request_received(d).unwrap();
handle.response_replied(d).unwrap();
assert_eq!(
rec.0.lock().unwrap().clone(),
vec![a, b, c],
"inbound lifecycle does not emit PeerInteractionCleanup"
);
}
#[test]
fn cleanup_observer_is_weakly_held_no_arc_cycle() {
use meerkat_core::handles::PeerInteractionCleanupObserver;
use std::sync::Mutex;
struct Recorder(Mutex<Vec<PeerCorrelationId>>);
impl PeerInteractionCleanupObserver for Recorder {
fn on_peer_interaction_cleanup(&self, corr_id: PeerCorrelationId) {
self.0.lock().unwrap().push(corr_id);
}
}
let handle = new_handle();
let rec: Arc<Recorder> = Arc::new(Recorder(Mutex::new(Vec::new())));
let rec_weak = Arc::downgrade(&rec);
handle.install_cleanup_observer(Arc::clone(&rec) as Arc<dyn PeerInteractionCleanupObserver>);
assert_eq!(
Arc::strong_count(&rec),
1,
"handle must not hold a strong ref"
);
drop(rec);
assert!(
rec_weak.upgrade().is_none(),
"observer must be fully dropped once the caller releases its Arc — no cycle"
);
let corr_id = PeerCorrelationId::new();
handle.request_sent(corr_id, "peer-a".into()).unwrap();
handle
.response_terminal(corr_id, PeerTerminalDisposition::Completed)
.expect("terminal transition must succeed with dropped observer");
assert!(handle.outbound_state(corr_id).is_none());
}
#[test]
fn outbound_inbound_are_independent_namespaces() {
let handle = new_handle();
let corr_id = PeerCorrelationId::new();
handle.request_sent(corr_id, "peer-a".into()).unwrap();
handle.request_received(corr_id).unwrap();
assert_eq!(
handle.outbound_state(corr_id),
Some(OutboundPeerRequestState::Sent)
);
assert_eq!(
handle.inbound_state(corr_id),
Some(InboundPeerRequestState::Received)
);
handle
.response_terminal(corr_id, PeerTerminalDisposition::Completed)
.unwrap();
assert!(handle.outbound_state(corr_id).is_none());
assert_eq!(
handle.inbound_state(corr_id),
Some(InboundPeerRequestState::Received)
);
}