use std::sync::atomic::Ordering;
use aion_core::TimerId;
use beamr::atom::Atom;
use beamr::native::stdlib_stubs::maps_bifs::ContinuationStep;
use beamr::native::{AionTimeoutContinuation, NativeContinuation, ProcessContext};
use beamr::term::Term;
use crate::durability::{Resolution, ResolveOutcome};
use crate::runtime::engine_nifs::error_result_term;
use crate::runtime::nif_context::NifContext;
use crate::runtime::nif_state::EngineNifState;
use crate::runtime::nif_timer::{
add_duration, build_context_for_pid, cancel_live_timer, decode_duration_arg, record_started,
recorded_now, schedule_sleep_timer, timer_command, timer_terminal_recorded,
};
pub(crate) const SCOPE_EXPIRED_MESSAGE: &str = "timeout:deadline expired";
pub(super) struct TimeoutScope {
pub(super) pid: u64,
timer_id: TimerId,
replay_timed_out: Option<bool>,
}
#[cfg(test)]
impl TimeoutScope {
pub(super) fn replayed_for_test(pid: u64, timed_out: bool) -> Self {
Self {
pid,
timer_id: TimerId::anonymous(0),
replay_timed_out: Some(timed_out),
}
}
pub(super) fn live_for_test(pid: u64, timer_id: TimerId) -> Self {
Self {
pid,
timer_id,
replay_timed_out: None,
}
}
pub(super) fn replayed_expired_with_deadline_for_test(pid: u64, timer_id: TimerId) -> Self {
Self {
pid,
timer_id,
replay_timed_out: Some(true),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ExpiredScopeDeadline {
RecordedAt(u64),
Unordered,
}
pub(crate) fn expired_scope_deadline(
state: &EngineNifState,
pid: u64,
history: &[aion_core::Event],
) -> Option<ExpiredScopeDeadline> {
let mut earliest: Option<u64> = None;
let mut expired_without_position = false;
{
let stack = state.timeout_scope_stacks.get(&pid)?;
for state_id in stack.iter() {
let Some(scope) = state.timeout_scopes.get(state_id) else {
continue;
};
let fired_seq = timer_fired_seq(history, &scope.timer_id);
let expired = match scope.replay_timed_out {
Some(true) => true,
Some(false) => false,
None => fired_seq.is_some(),
};
if !expired {
continue;
}
match fired_seq {
Some(seq) => {
earliest = Some(earliest.map_or(seq, |current| current.min(seq)));
}
None => expired_without_position = true,
}
}
}
if expired_without_position {
return Some(ExpiredScopeDeadline::Unordered);
}
earliest.map(ExpiredScopeDeadline::RecordedAt)
}
fn timer_fired_seq(history: &[aion_core::Event], timer_id: &TimerId) -> Option<u64> {
history.iter().find_map(|event| match event {
aion_core::Event::TimerFired {
envelope,
timer_id: fired,
..
} if fired == timer_id => Some(envelope.seq),
_ => None,
})
}
pub(super) fn with_timeout_impl(args: &[Term], ctx: &mut ProcessContext) -> Result<Term, Term> {
if args.len() > 255 {
return Err(Term::NIL);
}
if args.len() != 2 {
return Ok(error_result_term(
ctx,
&format!("with_timeout: expected 2 arguments, got {}", args.len()),
)
.unwrap_or(Term::NIL));
}
let Some(pid) = ctx.pid() else {
return Ok(
error_result_term(ctx, "with_timeout: missing calling process pid")
.unwrap_or(Term::NIL),
);
};
let state = match super::nif_state::engine_nif_state(ctx) {
Ok(state) => state,
Err(error) => return Ok(error_result_term(ctx, &error).unwrap_or(Term::NIL)),
};
if let Err(error) =
super::nif_query_pump::ensure_not_servicing_query(&state, pid, "with_timeout")
{
return Ok(error_result_term(ctx, &error).unwrap_or(Term::NIL));
}
match arm_scope(&state, args, pid) {
Ok((fun, state_id)) => {
ctx.set_continuation_trampoline(
fun,
Vec::new(),
NativeContinuation::AionTimeout(AionTimeoutContinuation {
state_id,
resume: |state, closure_result, ctx| {
resume_with_timeout(&state, closure_result, ctx)
},
}),
);
Ok(Term::NIL)
}
Err(message) => Ok(error_result_term(ctx, &message).unwrap_or(Term::NIL)),
}
}
fn arm_scope(state: &EngineNifState, args: &[Term], pid: u64) -> Result<(Term, u64), String> {
let duration =
decode_duration_arg("with_timeout deadline", args[0]).map_err(|error| error.to_string())?;
let fun = args[1];
let mut context = build_context_for_pid(state, pid).map_err(|error| error.to_string())?;
let now = recorded_now(&context);
let timer_id = TimerId::anonymous(context.next_timer_ordinal());
let fire_at = add_duration(now, duration).map_err(|error| error.to_string())?;
let replay_timed_out = match context
.resolve_command(timer_command(timer_id.clone(), fire_at))
.map_err(|error| error.to_string())?
{
ResolveOutcome::Recorded(Resolution::TimerFired) => Some(true),
ResolveOutcome::Recorded(Resolution::TimerCancelled) => Some(false),
ResolveOutcome::Recorded(Resolution::TimerStarted) => {
timer_terminal_recorded(&context, &timer_id)
}
ResolveOutcome::Recorded(_) => return Err("with_timeout history mismatch".to_owned()),
ResolveOutcome::ResumeLive => {
record_started(&context, now, timer_id.clone(), fire_at)
.map_err(|error| error.to_string())?;
schedule_sleep_timer(state, &context, duration, now, &timer_id, fire_at)
.map_err(|error| error.to_string())?;
None
}
};
let state_id = state.next_timeout_scope_id.fetch_add(1, Ordering::Relaxed);
state.timeout_scopes.insert(
state_id,
TimeoutScope {
pid,
timer_id,
replay_timed_out,
},
);
state
.timeout_scope_stacks
.entry(pid)
.or_default()
.push(state_id);
Ok((fun, state_id))
}
fn resume_with_timeout(
continuation: &AionTimeoutContinuation,
closure_result: Term,
ctx: &mut ProcessContext<'_>,
) -> Result<ContinuationStep, Term> {
let state_id = continuation.state_id;
let engine_state = match super::nif_state::engine_nif_state(ctx) {
Ok(state) => state,
Err(error) => {
return Ok(ContinuationStep::Done(
error_result_term(ctx, &error).unwrap_or(Term::NIL),
));
}
};
let Some((_, scope)) = engine_state.timeout_scopes.remove(&state_id) else {
return Ok(ContinuationStep::Done(
error_result_term(ctx, "with_timeout: scope state missing").unwrap_or(Term::NIL),
));
};
if let Some(mut stack) = engine_state.timeout_scope_stacks.get_mut(&scope.pid) {
stack.retain(|entry| *entry != state_id);
}
let timed_out = match scope.replay_timed_out {
Some(timed_out) => timed_out,
None => match settle_live_scope(&engine_state, &scope) {
Ok(timed_out) => timed_out,
Err(message) => {
return Ok(ContinuationStep::Done(
error_result_term(ctx, &message).unwrap_or(Term::NIL),
));
}
},
};
if timed_out {
Ok(ContinuationStep::Done(
error_result_term(ctx, SCOPE_EXPIRED_MESSAGE).unwrap_or(Term::NIL),
))
} else {
let wrapped = ctx.alloc_tuple(&[Term::atom(Atom::OK), closure_result])?;
Ok(ContinuationStep::Done(wrapped))
}
}
fn settle_live_scope(state: &EngineNifState, scope: &TimeoutScope) -> Result<bool, String> {
let context = build_context_for_pid(state, scope.pid).map_err(|error| error.to_string())?;
cancel_live_timer(state, &context, scope.timer_id.clone())
.map_err(|error| error.to_string())?;
let context = build_context_for_pid(state, scope.pid).map_err(|error| error.to_string())?;
timer_fired_recorded(&context, &scope.timer_id)
}
fn timer_fired_recorded(context: &NifContext, timer_id: &TimerId) -> Result<bool, String> {
timer_terminal_recorded(context, timer_id)
.ok_or_else(|| "with_timeout: deadline timer has no recorded terminal event".to_owned())
}