use std::time::Duration;
use crate::atom::Atom;
use crate::hook::{HookDecision, HookEvent};
use crate::process::{ExitReason, Process, ProcessStatus};
use crate::term::Term;
use super::{ProcessSlot, ScheduledProcess, SharedState, lock_or_recover};
pub(super) fn invoke_hook(
shared: &SharedState,
process: &Process,
reductions: u32,
) -> HookDecision {
if !shared.hook.is_registered() {
return HookDecision::Continue;
}
let (module, function, arity) = process.current_mfa().unwrap_or((Atom::NIL, Atom::NIL, 0));
shared.hook.invoke(HookEvent {
pid: process.pid(),
module,
function,
arity,
reductions_consumed: reductions,
})
}
pub(super) fn register_receive_timer(shared: &SharedState, process: &mut Process) {
let timeout = match process.receive_timeout() {
Some(timeout) => timeout,
None => return,
};
if process.receive_timer_ref().is_some() {
return;
}
let delay = Duration::from_millis(timeout.milliseconds);
let pid = process.pid();
let timer_ref = lock_or_recover(&shared.timers).schedule(delay, pid, Term::NIL);
process.set_receive_timer_ref(Some(timer_ref.id()));
}
pub(super) fn tick_replay_timers(shared: &SharedState) {
let Some(driver) = &shared.replay_driver else {
return;
};
let recorded = match driver.lock() {
Ok(mut guard) => guard.next_timer_expiry(),
Err(error) => error.into_inner().next_timer_expiry(),
};
match recorded {
Ok(recorded) => {
let _discarded = lock_or_recover(&shared.timers).tick_at(recorded.now);
expire_timers(shared, recorded.expired);
}
Err(error) => fail_replay_timer(shared, error),
}
}
fn fail_replay_timer(shared: &SharedState, error: crate::replay::ReplayMismatch) {
let exec_error = crate::error::ExecError::from(error);
for entry in &shared.process_bodies {
let pid = *entry.key();
shared.exit_errors.insert(pid, exec_error.clone());
shared.exit_tombstones.insert(pid, ExitReason::Error);
let _removed = shared.process_table.remove(pid);
}
shared
.shutdown
.store(true, std::sync::atomic::Ordering::Release);
shared.wake_condvar.notify_all();
}
pub(super) fn tick_timers(shared: &SharedState) {
let expired = lock_or_recover(&shared.timers).tick();
expire_timers(shared, expired);
}
fn expire_timers(shared: &SharedState, expired: Vec<crate::timer::ExpiredTimer>) {
for timer in expired {
let pid = timer.target_pid;
if shared.process_table.get(pid).is_none() {
continue;
}
mark_fired_receive_timer(shared, pid, timer.reference.id());
}
}
pub(super) fn mark_fired_receive_timer(shared: &SharedState, pid: u64, timer_id: u64) {
shared
.expired_receive_timers
.entry(pid)
.or_default()
.push(timer_id);
if shared.process_table.get(pid).is_none() {
let _orphaned_mark = shared.expired_receive_timers.remove(&pid);
return;
}
let mut wait_set = lock_or_recover(&shared.wait_set);
if let Some(index) = wait_set.waiting.remove(&pid) {
wait_set.woken.push((pid, index));
shared.wake_condvar.notify_all();
}
}
pub(super) fn has_pending_expired_timer(shared: &SharedState, pid: u64) -> bool {
shared.expired_receive_timers.contains_key(&pid)
}
pub(super) fn apply_expired_receive_timer(shared: &SharedState, process: &mut Process) -> bool {
let Some((_, fired)) = shared.expired_receive_timers.remove(&process.pid()) else {
return false;
};
let (Some(timeout), Some(armed)) = (process.receive_timeout(), process.receive_timer_ref())
else {
return false;
};
if fired.contains(&armed) {
process.set_receive_timer_ref(None);
process.set_code_position(Some(timeout.timeout_position));
true
} else {
false
}
}
pub(super) fn resume_suspended(shared: &SharedState, pid: u64) -> bool {
if shared.suspensions.contains_key(&pid) && !shared.has_consumable_suspension_event(pid) {
return false;
}
let transitioned = mutate_process_result(shared, pid, |process| {
if process.status() == ProcessStatus::Suspended {
process.transition_to(ProcessStatus::Yielded).is_ok()
} else {
false
}
});
if transitioned != Some(true) {
return false;
}
let mut wait_set = lock_or_recover(&shared.wait_set);
if let Some(index) = wait_set.waiting.remove(&pid) {
wait_set.woken.push((pid, index));
shared.wake_condvar.notify_all();
true
} else {
false
}
}
fn mutate_process_result<T>(
shared: &SharedState,
pid: u64,
f: impl FnOnce(&mut Process) -> T,
) -> Option<T> {
let entry = shared.process_bodies.get(&pid)?;
let mut slot = lock_or_recover(&entry);
match &mut *slot {
ProcessSlot::Present(ScheduledProcess(process)) => Some(f(process)),
ProcessSlot::Executing(_) | ProcessSlot::Absent => None,
}
}