lash_core/runtime/process_work_runner.rs
1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::Notify;
5
6use super::DurableProcessWorker;
7use super::process::ProcessRegistry;
8use crate::PluginError;
9
10/// How often the runner re-drives pending processes absent a poke.
11///
12/// Pokes make consumption prompt; the poll is a safety net that picks up work
13/// no poke reached (a crash-orphaned non-terminal row, or a poke dropped while
14/// the runner was mid-drive).
15const PROCESS_WORK_POLL_INTERVAL: Duration = Duration::from_millis(400);
16
17/// Drives the registry's non-terminal rows (the durable work queue) to terminal
18/// on poke, on a poll tick, and once at startup.
19///
20/// The registry non-terminal rows *are* the queue; a poke makes consumption
21/// prompt. The single coordination point is the `ProcessLease` claimed inside
22/// the [`ProcessRunHandle`], so a poke is idempotent (a leased or terminal row
23/// is skipped) and the same control seam can poke after any process start.
24///
25/// The loop is a [`tokio::select`] over a [`Notify`] (poke) and an interval
26/// (poll), plus one startup drive that folds in the former startup-only
27/// recovery sweep.
28pub struct ProcessWorkRunner {
29 run_handle: Arc<dyn ProcessRunHandle>,
30 notify: Arc<Notify>,
31}
32
33impl ProcessWorkRunner {
34 /// Build a runner over the given [`ProcessRunHandle`].
35 pub fn new(run_handle: Arc<dyn ProcessRunHandle>) -> Self {
36 Self {
37 run_handle,
38 notify: Arc::new(Notify::new()),
39 }
40 }
41
42 /// Build a runner that drives an inline [`DurableProcessWorker`] directly.
43 pub fn inline(worker: DurableProcessWorker) -> Self {
44 Self::new(Arc::new(InlineProcessRunHandle::new(worker)))
45 }
46
47 /// A cloneable poke handle that wakes the loop. Hand a clone to the control
48 /// seam so a successful process start can make consumption prompt.
49 pub fn poke_handle(&self) -> ProcessWorkPoke {
50 ProcessWorkPoke {
51 notify: Arc::clone(&self.notify),
52 }
53 }
54
55 /// Spawn the loop on the current tokio runtime, returning the poke handle.
56 ///
57 /// The loop drives once at startup (folding in the former startup-only
58 /// recovery sweep), then on every poke and every poll tick until the
59 /// process exits. Each drive is idempotent, so a poke racing a poll never
60 /// double-runs a process.
61 pub fn spawn(self) -> ProcessWorkPoke {
62 let poke = self.poke_handle();
63 tokio::spawn(async move {
64 self.run().await;
65 });
66 poke
67 }
68
69 async fn run(self) {
70 // Startup drive: the runner first tick replaces the startup-only
71 // recovery sweep, so crash-orphaned non-terminal rows are picked up
72 // without a separate boot-time sweep.
73 self.drive("startup").await;
74 let mut poll = tokio::time::interval(PROCESS_WORK_POLL_INTERVAL);
75 poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
76 loop {
77 tokio::select! {
78 _ = self.notify.notified() => {
79 self.drive("poke").await;
80 }
81 _ = poll.tick() => {
82 self.drive("poll").await;
83 }
84 }
85 }
86 }
87
88 async fn drive(&self, reason: &str) {
89 if let Err(err) = self.run_handle.claim_and_run_pending().await {
90 tracing::warn!("process work runner drive ({reason}) failed: {err}");
91 }
92 }
93}
94
95/// Registry and wake handle for a process work runner owned outside
96/// [`LashCore`](https://docs.rs/lash/latest/lash/struct.LashCore.html).
97///
98/// Durable deployments use this to bind one registry to the external runner
99/// that consumes that registry's non-terminal process rows. The facade can then
100/// configure process lifecycle support from the driver without accepting a
101/// second, potentially divergent registry argument.
102#[derive(Clone)]
103pub struct ProcessWorkDriver {
104 registry: Arc<dyn ProcessRegistry>,
105 poke: ProcessWorkPoke,
106}
107
108impl ProcessWorkDriver {
109 pub fn new(registry: Arc<dyn ProcessRegistry>, poke: ProcessWorkPoke) -> Self {
110 Self { registry, poke }
111 }
112
113 pub fn process_registry(&self) -> Arc<dyn ProcessRegistry> {
114 Arc::clone(&self.registry)
115 }
116
117 pub fn poke_handle(&self) -> ProcessWorkPoke {
118 self.poke.clone()
119 }
120}
121
122/// Cloneable handle that wakes a [`ProcessWorkRunner`] loop.
123///
124/// Poking is idempotent — the runner skips leased and terminal rows — so the
125/// control seam can poke after any successful process start (in-turn-inline,
126/// trigger or host event) without coordinating with the runner.
127#[derive(Clone)]
128pub struct ProcessWorkPoke {
129 notify: Arc<Notify>,
130}
131
132impl ProcessWorkPoke {
133 /// Wake the runner to drive pending processes promptly.
134 pub fn poke(&self) {
135 self.notify.notify_one();
136 }
137}
138
139/// One lease-protected drive of the registry's pending (non-terminal) processes.
140///
141/// Implementations claim the single-owner [`ProcessLease`](crate::ProcessLease)
142/// per non-terminal row to fence execution, so a concurrent drive on another
143/// owner skips an already-leased process and a process runs exactly once.
144#[async_trait::async_trait]
145pub trait ProcessRunHandle: Send + Sync {
146 /// Claim and run every pending process this owner can claim, driving each to
147 /// a terminal state. Idempotent: leased and terminal rows are skipped.
148 async fn claim_and_run_pending(&self) -> Result<(), PluginError>;
149}
150
151/// Inline run handle: drives the worker's own lease-protected sweep in-process.
152///
153/// Delegates to [`DurableProcessWorker::drive_pending_processes`], the existing
154/// `list_non_terminal -> claim lease -> run -> complete -> release` loop, so the
155/// inline tier reuses the same coordination point as the durable tier.
156pub struct InlineProcessRunHandle {
157 worker: DurableProcessWorker,
158}
159
160impl InlineProcessRunHandle {
161 pub fn new(worker: DurableProcessWorker) -> Self {
162 Self { worker }
163 }
164}
165
166#[async_trait::async_trait]
167impl ProcessRunHandle for InlineProcessRunHandle {
168 async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
169 self.worker.drive_pending_processes().await
170 }
171}