use std::collections::VecDeque;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex, RwLock};
use aion_core::TimerId;
use beamr::native::ProcessContext;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use crate::activity::bridge::ActivityDispatcher;
use super::nif_activity::RuntimeContext;
use super::nif_child_engine::ChildNifBridge;
use super::nif_determinism::NifContextSource;
use super::nif_query::{QueryBridgeState, QueryHandlers};
use super::nif_signal::SignalNifBridge;
use super::nif_timeout::TimeoutScope;
use super::nif_timer_bridge::TimerNifBridge;
#[derive(Default)]
pub(crate) struct EngineNifState {
pub(super) runtime_context: RwLock<Option<RuntimeContext>>,
pub(super) activity_dispatcher: RwLock<Option<Arc<dyn ActivityDispatcher>>>,
pub(super) context_source: RwLock<Option<Arc<NifContextSource>>>,
pub(super) signal_bridge: RwLock<Option<Arc<SignalNifBridge>>>,
pub(super) child_bridge: RwLock<Option<Arc<ChildNifBridge>>>,
pub(super) query_bridge: Mutex<Option<QueryBridgeState>>,
pub(super) query_handlers: QueryHandlers,
pub(super) timer_bridge: Mutex<Option<Arc<TimerNifBridge>>>,
pub(super) timeout_scopes: DashMap<u64, TimeoutScope>,
pub(super) timeout_scope_stacks: DashMap<u64, Vec<u64>>,
pub(super) next_timeout_scope_id: AtomicU64,
pub(super) pending_awaits: DashMap<u64, PendingAwait>,
pub(super) pending_queries: DashMap<u64, VecDeque<PendingQuery>>,
pub(super) servicing_queries: DashMap<u64, String>,
wake_observation_epochs: DashMap<u64, u64>,
}
const WAKE_EPOCH_EXITED: u64 = u64::MAX;
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PendingQuery {
pub(super) query_id: String,
pub(super) name: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum PendingAwait {
Sleep {
timer_id: TimerId,
fire_at: DateTime<Utc>,
},
Signal {
index: usize,
},
Child {
child_workflow_id: aion_core::WorkflowId,
},
Collect {
base_ordinal: u64,
count: u64,
kind: CollectKind,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum CollectKind {
All,
Race,
}
impl EngineNifState {
pub(crate) fn set_activity_dispatcher(&self, dispatcher: Arc<dyn ActivityDispatcher>) {
match self.activity_dispatcher.write() {
Ok(mut slot) => *slot = Some(dispatcher),
Err(poisoned) => *poisoned.into_inner() = Some(dispatcher),
}
}
pub(crate) fn activity_dispatcher(&self) -> Option<Arc<dyn ActivityDispatcher>> {
match self.activity_dispatcher.read() {
Ok(slot) => slot.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}
pub(crate) fn cleanup_process(&self, pid: u64) {
self.wake_observation_epochs.insert(pid, WAKE_EPOCH_EXITED);
self.pending_awaits.remove(&pid);
self.timeout_scope_stacks.remove(&pid);
self.timeout_scopes.retain(|_, scope| scope.pid != pid);
self.pending_queries.remove(&pid);
self.servicing_queries.remove(&pid);
self.query_handlers.cleanup_pid(pid);
if let Some(bridge) = self.installed_child_bridge() {
bridge.abort_child_terminal_watches_for_parent(pid);
}
}
pub(crate) fn observe_native_entry(&self, pid: u64) {
self.wake_observation_epochs
.entry(pid)
.and_modify(|epoch| {
if *epoch != WAKE_EPOCH_EXITED {
*epoch += 1;
}
})
.or_insert(1);
}
pub(crate) fn wake_observation_epoch(&self, pid: u64) -> u64 {
self.wake_observation_epochs
.get(&pid)
.map_or(0, |epoch| *epoch)
}
pub(crate) fn wake_ladder_done(&self, pid: u64, snapshot: u64) -> bool {
let now = self.wake_observation_epoch(pid);
now != snapshot || now == WAKE_EPOCH_EXITED
}
pub(crate) fn shutdown_child_tasks(&self) {
if let Some(bridge) = self.installed_child_bridge() {
bridge.shutdown_child_tasks();
}
}
fn installed_child_bridge(&self) -> Option<Arc<ChildNifBridge>> {
match self.child_bridge.read() {
Ok(slot) => slot.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}
}
pub(crate) fn engine_nif_state(ctx: &ProcessContext) -> Result<Arc<EngineNifState>, String> {
let data = ctx
.nif_private_data()
.ok_or_else(|| "engine NIF state is not installed for this runtime".to_owned())?;
Arc::clone(data)
.downcast::<EngineNifState>()
.map_err(|_| "engine NIF private data has an unexpected type".to_owned())
}