use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
use beamr::process::ExitReason;
use liminal::conversation::ParticipantBehaviour;
use liminal::envelope::Envelope;
use liminal::protocol::{CausalContext, MessageEnvelope, SchemaId};
use liminal_server::server::connection::{ConnectionServices, LiminalConnectionServices};
const CRASH_DETECTION_GUARD: Duration = Duration::from_secs(5);
fn message_envelope(payload: &[u8]) -> MessageEnvelope {
MessageEnvelope::new(
SchemaId::new([0xAA; SchemaId::WIRE_LEN]),
CausalContext::independent(),
payload.to_vec(),
)
}
#[test]
fn open_conversation_spawns_real_supervised_actor_with_participant() -> Result<(), Box<dyn Error>> {
let services = LiminalConnectionServices::empty()?;
let conversation = services.open_conversation(7, "supervised-subject")?;
let participants = conversation.participant_pids();
assert_eq!(
participants.len(),
1,
"a real supervised conversation links exactly one participant process"
);
let participant_pid = participants[0];
assert!(participant_pid != 0, "participant must be a live beamr pid");
let scheduler = services.conversation_supervisor().scheduler();
assert!(
scheduler.process_table().get(participant_pid).is_some(),
"the linked participant must be a live process in the scheduler table"
);
assert!(
!conversation.has_detected_crash(),
"a freshly opened conversation must not report a crash"
);
services.close_conversation(conversation)?;
services.conversation_supervisor().shutdown();
Ok(())
}
#[test]
fn conversation_message_drives_real_actor() -> Result<(), Box<dyn Error>> {
let services = LiminalConnectionServices::empty()?;
let conversation = services.open_conversation(11, "msg-subject")?;
services.conversation_message(&conversation, &message_envelope(b"hello"))?;
services.close_conversation(conversation)?;
services.conversation_supervisor().shutdown();
Ok(())
}
const REPLY_GUARD: Duration = Duration::from_secs(5);
#[test]
fn request_reply_message_is_really_processed_by_the_participant() -> Result<(), Box<dyn Error>> {
let services = LiminalConnectionServices::empty()?;
let conversation = services.open_conversation(99, "request-reply")?;
let request = b"request-reply-proof-payload";
services.conversation_message(&conversation, &message_envelope(request))?;
let reply = conversation.receive_reply(REPLY_GUARD)?;
assert_eq!(
reply.payload, request,
"the participant must genuinely process the request and echo it back; \
an inert participant produces no reply and this drain would time out"
);
services.close_conversation(conversation)?;
services.conversation_supervisor().shutdown();
Ok(())
}
#[derive(Debug)]
struct PrefixResponder;
impl ParticipantBehaviour for PrefixResponder {
fn process(&self, request: &Envelope) -> Option<Envelope> {
let mut reply = request.clone();
let mut payload = b"responder:".to_vec();
payload.extend_from_slice(&request.payload);
reply.payload = payload;
Some(reply)
}
}
#[test]
fn registered_responder_handles_its_subject_instead_of_echo() -> Result<(), Box<dyn Error>> {
let services = LiminalConnectionServices::empty()?;
let previous = services.register_responder("worker-subject", Arc::new(PrefixResponder))?;
assert!(
previous.is_none(),
"first registration for a subject replaces nothing"
);
let conversation = services.open_conversation(1001, "worker-subject")?;
let participants = conversation.participant_pids();
assert_eq!(
participants.len(),
1,
"a registered responder runs as one supervised linked participant process"
);
let scheduler = services.conversation_supervisor().scheduler();
assert!(
scheduler.process_table().get(participants[0]).is_some(),
"the responder participant must be a live process in the scheduler table"
);
let request = b"ping";
services.conversation_message(&conversation, &message_envelope(request))?;
let reply = conversation.receive_reply(REPLY_GUARD)?;
assert_eq!(
reply.payload, b"responder:ping",
"the reply must come from the registered responder (transformed payload), \
not the echo fallback (which would return the raw request bytes)"
);
services.close_conversation(conversation)?;
services.conversation_supervisor().shutdown();
Ok(())
}
#[test]
fn unregistered_subject_still_echoes() -> Result<(), Box<dyn Error>> {
let services = LiminalConnectionServices::empty()?;
services.register_responder("other-subject", Arc::new(PrefixResponder))?;
let conversation = services.open_conversation(1002, "unregistered-subject")?;
let request = b"echo-me";
services.conversation_message(&conversation, &message_envelope(request))?;
let reply = conversation.receive_reply(REPLY_GUARD)?;
assert_eq!(
reply.payload, request,
"an unregistered subject must echo the request unchanged (no seam regression)"
);
services.close_conversation(conversation)?;
services.conversation_supervisor().shutdown();
Ok(())
}
#[test]
fn participant_crash_is_detected_via_structural_linked_exit() -> Result<(), Box<dyn Error>> {
let services = LiminalConnectionServices::empty()?;
let conversation = services.open_conversation(42, "crash-subject")?;
let participants = conversation.participant_pids();
assert_eq!(participants.len(), 1, "expected one linked participant");
let participant_pid = participants[0];
let scheduler = services.conversation_supervisor().scheduler();
scheduler.terminate_process(participant_pid, ExitReason::Error);
let crash_instant = conversation.await_crash(CRASH_DETECTION_GUARD);
assert!(
crash_instant.is_some(),
"participant crash must be detected via the trapped linked-EXIT signal"
);
assert!(
conversation.has_detected_crash(),
"crash must remain observable after detection"
);
let after_crash = services.conversation_message(&conversation, &message_envelope(b"late"));
assert!(
after_crash.is_err(),
"messages after a participant crash must be rejected, not silently dropped"
);
services.close_conversation(conversation)?;
services.conversation_supervisor().shutdown();
Ok(())
}