use std::error::Error;
use beamr::process::ExitReason;
use super::{ConversationActor, ConversationSupervisor};
use crate::channel::ChannelMode;
use crate::conversation::types::{
ConversationConfig, ConversationPhase, CrashPolicy, ParticipantHealth, ParticipantPid,
};
use crate::envelope::Envelope;
fn test_envelope(payload: &[u8]) -> Envelope {
Envelope::new(
payload.to_vec(),
None,
crate::channel::SchemaId::new(),
crate::envelope::PublisherId::default(),
)
}
fn state_after_trapped_participant_death(
actor: &ConversationActor,
) -> Result<crate::conversation::types::ConversationState, Box<dyn Error>> {
for _ in 0..1_000 {
let state = actor.state()?;
if state.current_phase == ConversationPhase::Failed {
return Ok(state);
}
std::thread::yield_now();
}
Err("actor did not record trapped participant death".into())
}
#[test]
fn actor_is_spawned_linked_and_queryable() -> Result<(), Box<dyn Error>> {
let supervisor = ConversationSupervisor::new()?;
let scheduler = supervisor.scheduler();
let participant = ParticipantPid::new(scheduler.spawn_test_process(false));
let actor = supervisor.spawn(ConversationConfig::new(
vec![participant],
None,
ChannelMode::Ephemeral,
CrashPolicy::Fail,
))?;
let actor_pid = actor.pid()?;
let state = actor.state()?;
assert!(scheduler.is_linked(actor_pid.get(), participant.get()));
assert_eq!(state.current_phase, ConversationPhase::Created);
assert_eq!(state.context.len(), 0);
supervisor.shutdown();
Ok(())
}
#[test]
fn handle_progresses_created_active_closed_lifecycle() -> Result<(), Box<dyn Error>> {
let supervisor = ConversationSupervisor::new()?;
let actor = supervisor.spawn(ConversationConfig::new(
Vec::new(),
None,
ChannelMode::Ephemeral,
CrashPolicy::Fail,
))?;
let handle = actor.handle();
assert_eq!(
handle.query_state()?.current_phase,
ConversationPhase::Created
);
handle.send(test_envelope(b"hello"))?;
assert_eq!(
handle.query_state()?.current_phase,
ConversationPhase::Active
);
assert_eq!(handle.receive()?.payload, b"hello");
handle.close()?;
assert_eq!(
handle.query_state()?.current_phase,
ConversationPhase::Closed
);
supervisor.shutdown();
Ok(())
}
#[test]
fn participant_death_arrives_as_trapped_exit_without_timeout() -> Result<(), Box<dyn Error>> {
let supervisor = ConversationSupervisor::new()?;
let scheduler = supervisor.scheduler();
let participant = ParticipantPid::new(scheduler.spawn_test_process(false));
let actor = supervisor.spawn(ConversationConfig::new(
vec![participant],
None,
ChannelMode::Ephemeral,
CrashPolicy::Fail,
))?;
let actor_pid = actor.pid()?;
assert!(scheduler.is_linked(actor_pid.get(), participant.get()));
scheduler.terminate_process(participant.get(), ExitReason::Error);
let state = state_after_trapped_participant_death(&actor)?;
assert_eq!(state.current_phase, ConversationPhase::Failed);
assert_eq!(state.participants[0].health, ParticipantHealth::Dead);
assert_eq!(actor.pid()?, actor_pid);
assert!(scheduler.process_table().get(actor_pid.get()).is_some());
supervisor.shutdown();
Ok(())
}
#[test]
fn notify_on_already_dead_participant_fires_immediately() -> Result<(), Box<dyn Error>> {
use std::sync::mpsc;
use std::time::{Duration, Instant};
let supervisor = ConversationSupervisor::new()?;
let scheduler = supervisor.scheduler();
let participant = ParticipantPid::new(scheduler.spawn_test_process(false));
let actor = supervisor.spawn(ConversationConfig::new(
vec![participant],
None,
ChannelMode::Ephemeral,
CrashPolicy::Fail,
))?;
let actor_pid = actor.pid()?;
assert!(scheduler.is_linked(actor_pid.get(), participant.get()));
let before_crash = Instant::now();
scheduler.terminate_process(participant.get(), ExitReason::Error);
let state = state_after_trapped_participant_death(&actor)?;
assert_eq!(state.participants[0].health, ParticipantHealth::Dead);
let recorded_exit = state.participants[0]
.exited_at
.ok_or("EXIT instant must be recorded when a participant is marked dead")?;
assert!(recorded_exit >= before_crash, "recorded EXIT must be real");
let (tx, rx) = mpsc::sync_channel::<Instant>(1);
actor.notify_on_participant_exit(participant, tx)?;
let replayed = rx
.recv_timeout(Duration::from_millis(50))
.map_err(|_| "already-dead registration must replay the EXIT immediately")?;
assert_eq!(
replayed, recorded_exit,
"replayed instant must equal the recorded EXIT instant"
);
supervisor.shutdown();
Ok(())
}
#[test]
fn supervisor_restarts_only_crashed_actor() -> Result<(), Box<dyn Error>> {
let supervisor = ConversationSupervisor::new()?;
let first = supervisor.spawn(ConversationConfig::new(
Vec::new(),
None,
ChannelMode::Ephemeral,
CrashPolicy::Fail,
))?;
let second = supervisor.spawn(ConversationConfig::new(
Vec::new(),
None,
ChannelMode::Ephemeral,
CrashPolicy::Fail,
))?;
let scheduler = supervisor.scheduler();
let first_pid = first.pid()?;
let second_pid = second.pid()?;
scheduler.terminate_process(first_pid.get(), ExitReason::Error);
let restarted_pid = first.pid()?;
assert_ne!(first_pid, restarted_pid);
assert_eq!(second.pid()?, second_pid);
assert!(scheduler.process_table().get(second_pid.get()).is_some());
supervisor.shutdown();
Ok(())
}