use std::sync::mpsc;
use std::time::{Duration, Instant};
use beamr::scheduler::Scheduler;
use crate::channel::schema::{SchemaId, SchemaValidationError};
use crate::error::LiminalError;
pub(super) const COMMAND_TIMEOUT: Duration = Duration::from_secs(5);
const LIVENESS_POLL: Duration = Duration::from_millis(10);
enum WaitFailure {
Disconnected,
Dead,
TimedOut,
}
pub(super) fn wait_live<T>(
scheduler: &Scheduler,
response: &mpsc::Receiver<Result<T, LiminalError>>,
pid: u64,
) -> Result<Result<T, LiminalError>, LiminalError> {
poll_reply(scheduler, response, pid).map_err(|failure| match failure {
WaitFailure::Dead => LiminalError::DeliveryFailed {
message: format!("channel actor pid {pid} died before replying"),
},
WaitFailure::Disconnected => LiminalError::DeliveryFailed {
message: "channel command reply channel disconnected".to_owned(),
},
WaitFailure::TimedOut => LiminalError::DeliveryFailed {
message: "channel command timed out".to_owned(),
},
})
}
pub(super) fn wait_schema_live(
scheduler: &Scheduler,
response: &mpsc::Receiver<Result<SchemaId, SchemaValidationError>>,
pid: u64,
) -> Result<Result<SchemaId, SchemaValidationError>, SchemaValidationError> {
poll_reply(scheduler, response, pid).map_err(|failure| match failure {
WaitFailure::Dead => SchemaValidationError::InvalidSchema {
message: format!("channel actor pid {pid} died before evolving the schema"),
},
WaitFailure::Disconnected | WaitFailure::TimedOut => SchemaValidationError::InvalidSchema {
message: "channel schema-evolution reply unavailable".to_owned(),
},
})
}
fn poll_reply<R>(
scheduler: &Scheduler,
response: &mpsc::Receiver<R>,
pid: u64,
) -> Result<R, WaitFailure> {
let deadline = Instant::now() + COMMAND_TIMEOUT;
loop {
match response.recv_timeout(LIVENESS_POLL) {
Ok(reply) => return Ok(reply),
Err(mpsc::RecvTimeoutError::Disconnected) => return Err(WaitFailure::Disconnected),
Err(mpsc::RecvTimeoutError::Timeout) => {
if scheduler.process_table().get(pid).is_none() {
return Err(WaitFailure::Dead);
}
if Instant::now() >= deadline {
return Err(WaitFailure::TimedOut);
}
}
}
}
}