Skip to main content

lash_core/runtime/
process_work_driver.rs

1use std::sync::Arc;
2
3use super::DurableProcessWorker;
4use super::process::ProcessRegistry;
5use crate::PluginError;
6
7/// Registry and run handle for process work owned outside
8/// [`LashCore`](https://docs.rs/lash/latest/lash/struct.LashCore.html).
9///
10/// The registry non-terminal rows are the durable work queue. Hosts drive that
11/// queue explicitly by calling [`claim_and_run_pending`](Self::claim_and_run_pending)
12/// on each relevant event. Cross-process idempotency belongs to the registry
13/// claim; there is no core-owned polling loop.
14#[derive(Clone)]
15pub struct ProcessWorkDriver {
16    registry: Arc<dyn ProcessRegistry>,
17    run_handle: Arc<dyn ProcessRunHandle>,
18}
19
20impl ProcessWorkDriver {
21    pub fn new(registry: Arc<dyn ProcessRegistry>, run_handle: Arc<dyn ProcessRunHandle>) -> Self {
22        Self {
23            registry,
24            run_handle,
25        }
26    }
27
28    pub fn inline(registry: Arc<dyn ProcessRegistry>, worker: DurableProcessWorker) -> Self {
29        Self::new(registry, Arc::new(InlineProcessRunHandle::new(worker)))
30    }
31
32    pub fn process_registry(&self) -> Arc<dyn ProcessRegistry> {
33        Arc::clone(&self.registry)
34    }
35
36    pub async fn claim_and_run_pending(&self, reason: &str) -> Result<(), PluginError> {
37        if let Err(err) = self.run_handle.claim_and_run_pending().await {
38            tracing::warn!("process work drive ({reason}) failed: {err}");
39            return Err(err);
40        }
41        Ok(())
42    }
43}
44
45/// One lease-protected drive of the registry's pending (non-terminal) processes.
46///
47/// Implementations claim the single-owner [`ProcessLease`](crate::ProcessLease)
48/// per non-terminal row to fence execution, so a concurrent drive on another
49/// owner skips an already-leased process and a process runs exactly once.
50#[async_trait::async_trait]
51pub trait ProcessRunHandle: Send + Sync {
52    /// Claim and run every pending process this owner can claim, driving each to
53    /// a terminal state. Idempotent: leased and terminal rows are skipped.
54    async fn claim_and_run_pending(&self) -> Result<(), PluginError>;
55}
56
57/// Inline run handle: drives the worker's own lease-protected sweep in-process.
58///
59/// Delegates to [`DurableProcessWorker::drive_pending_processes`], the existing
60/// `list_non_terminal -> claim lease -> run -> complete -> release` loop, so the
61/// inline tier reuses the same coordination point as the durable tier.
62pub struct InlineProcessRunHandle {
63    worker: DurableProcessWorker,
64}
65
66impl InlineProcessRunHandle {
67    pub fn new(worker: DurableProcessWorker) -> Self {
68        Self { worker }
69    }
70}
71
72#[async_trait::async_trait]
73impl ProcessRunHandle for InlineProcessRunHandle {
74    async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
75        self.worker.drive_pending_processes().await
76    }
77}