use std::sync::Arc;
use std::time::Duration;
use crate::channel::ChannelMode;
use crate::conversation::participant::ParticipantBehaviour;
use crate::conversation::types::CrashPolicy;
use crate::conversation::{ConversationSupervisor, ParticipantPid};
use crate::envelope::Envelope;
use crate::error::LiminalError;
pub const DEFAULT_ASK_TIMEOUT: Duration = Duration::from_secs(5);
pub fn ask(
supervisor: &ConversationSupervisor,
behaviour: Arc<dyn ParticipantBehaviour>,
request: Envelope,
timeout: Duration,
) -> Result<Envelope, LiminalError> {
let (actor, _participant): (_, ParticipantPid) = supervisor.spawn_with_participant(
behaviour,
Some(timeout),
ChannelMode::Ephemeral,
CrashPolicy::Fail,
)?;
actor.handle().send(request)?;
let reply = actor.receive_timeout(timeout);
match reply {
Ok(reply) => {
actor.handle().close()?;
Ok(reply)
}
Err(error) => {
let _ = actor.handle().close();
Err(error)
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use beamr::process::ExitReason;
use super::{DEFAULT_ASK_TIMEOUT, ask};
use crate::channel::{ChannelMode, SchemaId};
use crate::conversation::participant::{EchoBehaviour, ParticipantBehaviour};
use crate::conversation::types::CrashPolicy;
use crate::conversation::{ConversationSupervisor, ParticipantPid};
use crate::envelope::{Envelope, PublisherId};
use crate::error::LiminalError;
fn request(payload: &[u8]) -> Envelope {
Envelope::new(
payload.to_vec(),
None,
SchemaId::new(),
PublisherId::default(),
)
}
#[derive(Debug)]
struct SilentBehaviour;
impl ParticipantBehaviour for SilentBehaviour {
fn process(&self, _request: &Envelope) -> Option<Envelope> {
None
}
}
#[test]
fn ask_returns_real_participant_reply() -> Result<(), Box<dyn std::error::Error>> {
#[derive(Debug)]
struct UppercaseBehaviour;
impl ParticipantBehaviour for UppercaseBehaviour {
fn process(&self, request: &Envelope) -> Option<Envelope> {
let mut reply = request.clone();
reply.payload = request.payload.to_ascii_uppercase();
Some(reply)
}
}
let supervisor = ConversationSupervisor::new()?;
let reply = ask(
&supervisor,
Arc::new(UppercaseBehaviour),
request(b"ping"),
DEFAULT_ASK_TIMEOUT,
)?;
assert_eq!(
reply.payload, b"PING",
"ask must return the participant's processed reply, not a loopback"
);
supervisor.shutdown();
Ok(())
}
#[test]
fn ask_echo_round_trips_payload() -> Result<(), Box<dyn std::error::Error>> {
let supervisor = ConversationSupervisor::new()?;
let reply = ask(
&supervisor,
Arc::new(EchoBehaviour),
request(b"hello-world"),
DEFAULT_ASK_TIMEOUT,
)?;
assert_eq!(reply.payload, b"hello-world");
supervisor.shutdown();
Ok(())
}
#[test]
fn ask_times_out_when_participant_never_replies() -> Result<(), Box<dyn std::error::Error>> {
let supervisor = ConversationSupervisor::new()?;
let result = ask(
&supervisor,
Arc::new(SilentBehaviour),
request(b"unanswered"),
Duration::from_millis(150),
);
assert!(
matches!(result, Err(LiminalError::ConversationTimeout { .. })),
"a non-replying participant must yield ConversationTimeout, got {result:?}"
);
supervisor.shutdown();
Ok(())
}
#[test]
fn ask_reports_participant_crash_before_reply() -> Result<(), Box<dyn std::error::Error>> {
let supervisor = ConversationSupervisor::new()?;
let (actor, participant): (_, ParticipantPid) = supervisor.spawn_with_participant(
Arc::new(SilentBehaviour),
Some(Duration::from_secs(5)),
ChannelMode::Ephemeral,
CrashPolicy::Fail,
)?;
supervisor
.scheduler()
.terminate_process(participant.get(), ExitReason::Error);
let mut result = Err(LiminalError::ConversationTimeout {
message: "init".to_owned(),
});
for _ in 0..1_000 {
result = actor.receive_timeout(Duration::from_millis(20));
if matches!(result, Err(LiminalError::ParticipantCrashed { .. })) {
break;
}
std::thread::yield_now();
}
assert!(
matches!(result, Err(LiminalError::ParticipantCrashed { .. })),
"participant crash before reply must surface as ParticipantCrashed, got {result:?}"
);
supervisor.shutdown();
Ok(())
}
}