1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//! Runtime-owned process monitoring helpers.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::{EngineError, Pid, RuntimeHandle};
use super::outcome::{self, WorkflowProcessOutcome};
/// Handle returned after installing a workflow process monitor.
#[derive(Clone)]
pub struct ProcessMonitorHandle {
installed: Arc<AtomicBool>,
}
impl ProcessMonitorHandle {
fn installed() -> Self {
Self {
installed: Arc::new(AtomicBool::new(true)),
}
}
/// Returns whether the runtime accepted monitor installation.
#[must_use]
pub fn is_installed(&self) -> bool {
self.installed.load(Ordering::Acquire)
}
}
impl RuntimeHandle {
/// Install a runtime-owned monitor that invokes `callback` when `pid` exits.
///
/// The callback runs on a dedicated monitor thread outside workflow dirty NIF
/// execution. The runtime boundary owns the process wait and BEAM term
/// conversion so lifecycle code never imports beamr types.
///
/// # Errors
///
/// Returns [`EngineError::Runtime`] when `pid` is neither live nor a
/// process this runtime previously spawned. A monitored process that
/// already exited is accepted: its outcome is still observable through
/// the scheduler's exit tombstone, so the callback fires immediately
/// instead of the spawn path spuriously failing for fast-completing
/// workflows.
pub fn monitor_process<F>(
self: &Arc<Self>,
pid: Pid,
callback: F,
) -> Result<ProcessMonitorHandle, EngineError>
where
F: FnOnce(Result<WorkflowProcessOutcome, EngineError>) + Send + 'static,
{
self.ensure_monitorable_pid(pid)?;
let runtime = Arc::clone(self);
std::thread::Builder::new()
.name(format!("aion-workflow-monitor-{pid}"))
.spawn(move || {
let outcome =
outcome::workflow_process_outcome(&runtime.scheduler, &runtime.atom_table, pid);
runtime.release_spawn_heaps(pid);
runtime.nif_state().cleanup_process(pid);
// D5: completions delivered after the workflow stopped
// awaiting them (race losers, post-exit deliveries) are
// never taken; drop them with the process.
runtime.drain_activity_completions(pid);
callback(outcome);
})
.map_err(|error| EngineError::Runtime {
reason: format!("failed to spawn workflow monitor for process {pid}: {error}"),
})?;
Ok(ProcessMonitorHandle::installed())
}
/// Test-only monitor installation status probe.
///
/// # Errors
///
/// Returns [`EngineError`] if the runtime rejects the monitor installation.
#[cfg(test)]
pub fn monitor_process_for_test<F>(
self: &Arc<Self>,
pid: Pid,
callback: F,
) -> Result<ProcessMonitorHandle, EngineError>
where
F: FnOnce(Result<WorkflowProcessOutcome, EngineError>) + Send + 'static,
{
self.monitor_process(pid, callback)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::mpsc;
use std::time::Duration;
use crate::runtime::{RuntimeConfig, RuntimeHandle};
type TestResult = Result<(), Box<dyn std::error::Error>>;
#[test]
fn monitor_installs_for_process_that_already_exited() -> TestResult {
let runtime = Arc::new(RuntimeHandle::new(RuntimeConfig::new(Some(1)))?);
let pid = runtime.spawn_test_process()?;
runtime.cancel_pid(pid)?;
assert!(
!runtime.is_live(pid),
"terminated test process should leave the live table"
);
// A workflow can finish on a scheduler thread before its completion
// monitor installs; the monitor must still observe the exit outcome
// through the scheduler's tombstone instead of rejecting the pid.
let (sender, receiver) = mpsc::channel();
let handle = runtime.monitor_process_for_test(pid, move |outcome| {
let _ = sender.send(outcome.is_ok());
})?;
assert!(handle.is_installed());
let callback_fired = receiver.recv_timeout(Duration::from_secs(10))?;
// The outcome conversion result is exercised elsewhere; this test
// pins the contract that the callback fires for an exited process.
let _ = callback_fired;
runtime.shutdown()?;
Ok(())
}
#[test]
fn monitor_rejects_pid_never_spawned_by_this_runtime() -> TestResult {
let runtime = Arc::new(RuntimeHandle::new(RuntimeConfig::new(Some(1)))?);
let error = runtime
.monitor_process_for_test(9_999, |_| {})
.err()
.ok_or("monitor accepted a pid this runtime never spawned")?;
assert!(error.to_string().contains("never spawned"));
runtime.shutdown()?;
Ok(())
}
}