use std::sync::Arc;
use crate::process::SuspensionKind;
use crate::scheduler::dirty::DirtyResult;
use crate::term::Term;
use super::SharedState;
pub(super) const RESUME_ANY_HOOK: u64 = 0;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(super) struct SuspensionMirror {
pub(super) call_id: u64,
pub(super) kind: SuspensionKind,
pub(super) wake_on_message: bool,
}
#[derive(Debug)]
pub(super) struct PendingSuspensionResult {
pub(super) call_id: u64,
pub(super) payload: SuspensionResultPayload,
}
#[derive(Debug)]
pub(super) enum SuspensionResultPayload {
Host(crate::ets::OwnedTerm),
Dirty(Box<DirtyResult>),
}
impl SuspensionResultPayload {
pub(super) fn host(term: Term) -> Option<Self> {
if term.is_list() || term.is_boxed() {
crate::ets::copy_term_to_ets(term).ok().map(Self::Host)
} else {
Some(Self::Host(crate::ets::OwnedTerm::immediate(term)))
}
}
}
impl SharedState {
pub(super) fn register_suspension_mirror(
&self,
pid: u64,
call_id: u64,
kind: SuspensionKind,
wake_on_message: bool,
) {
self.suspensions.insert(
pid,
SuspensionMirror {
call_id,
kind,
wake_on_message,
},
);
}
pub(super) fn publish_suspension_result(
&self,
pid: u64,
call_id: u64,
payload: SuspensionResultPayload,
) -> bool {
let matches = self
.suspensions
.get(&pid)
.is_some_and(|mirror| mirror.call_id == call_id);
if !matches {
return false;
}
let stored = match self.suspension_results.entry(pid) {
dashmap::mapref::entry::Entry::Occupied(mut occupied) => {
if occupied.get().call_id > call_id {
false
} else {
occupied.insert(PendingSuspensionResult { call_id, payload });
true
}
}
dashmap::mapref::entry::Entry::Vacant(vacant) => {
vacant.insert(PendingSuspensionResult { call_id, payload });
true
}
};
if !stored {
return false;
}
if self.process_table.get(pid).is_none() {
let _orphan = self.suspension_results.remove(&pid);
return false;
}
true
}
pub(super) fn publish_suspension_result_current(
&self,
pid: u64,
kind: SuspensionKind,
payload: SuspensionResultPayload,
) -> bool {
let Some(call_id) = self
.suspensions
.get(&pid)
.filter(|mirror| mirror.kind == kind)
.map(|mirror| mirror.call_id)
else {
return false;
};
self.publish_suspension_result(pid, call_id, payload)
}
pub(super) fn has_consumable_suspension_event(&self, pid: u64) -> bool {
let Some(mirror) = self.suspensions.get(&pid).map(|mirror| *mirror) else {
return false;
};
if self
.suspension_results
.get(&pid)
.is_some_and(|result| result.call_id == mirror.call_id)
{
return true;
}
match mirror.kind {
SuspensionKind::HostAwait => {
self.file_io_results.contains_key(&pid)
|| self.expired_receive_timers.contains_key(&pid)
}
SuspensionKind::DirtyCall => false,
SuspensionKind::Hook => self
.pending_resumes
.get(&pid)
.is_some_and(|resume| *resume == RESUME_ANY_HOOK || *resume == mirror.call_id),
}
}
pub(super) fn suspension_blocks_wake(&self, pid: u64) -> bool {
let gated = self
.suspensions
.get(&pid)
.is_some_and(|mirror| !mirror.wake_on_message);
gated && !self.has_consumable_suspension_event(pid)
}
pub(super) fn purge_suspension_state(&self, pid: u64) {
let _mirror = self.suspensions.remove(&pid);
let _result = self.suspension_results.remove(&pid);
let _resume = self.pending_resumes.remove(&pid);
let _file_io = self.file_io_results.remove(&pid);
}
}
pub(super) struct SchedulerSuspensionRegistrar {
pub(super) shared: Arc<SharedState>,
}
impl crate::native::SuspensionRegistrar for SchedulerSuspensionRegistrar {
fn register_host_await(&self, pid: u64, call_id: u64, wake_on_message: bool) {
self.shared.register_suspension_mirror(
pid,
call_id,
SuspensionKind::HostAwait,
wake_on_message,
);
}
fn cancel_host_await(&self, pid: u64, call_id: u64) {
self.shared
.suspensions
.remove_if(&pid, |_, mirror| mirror.call_id == call_id);
}
}