Skip to main content

lash_core/runtime/
queued_work_driver.rs

1use std::sync::Arc;
2
3use crate::PluginError;
4
5#[derive(Clone, Debug)]
6pub struct QueuedWorkRunRequest {
7    pub session_id: Option<String>,
8    pub reason: String,
9    pub trace_idle: bool,
10}
11
12impl QueuedWorkRunRequest {
13    fn new(session_id: Option<String>, reason: impl Into<String>, trace_idle: bool) -> Self {
14        Self {
15            session_id,
16            reason: reason.into(),
17            trace_idle,
18        }
19    }
20}
21
22#[async_trait::async_trait]
23pub trait QueuedWorkRunHandle: Send + Sync {
24    async fn run_queued_work(&self, request: QueuedWorkRunRequest) -> Result<(), PluginError>;
25
26    /// Host-driven single pass: claim and submit ready queued work, optionally
27    /// narrowed to one session. The symmetric counterpart to
28    /// [`ProcessRunHandle::claim_and_run_pending`](super::ProcessRunHandle::claim_and_run_pending).
29    ///
30    /// Idempotency is the store scheduler's job, not a same-process memory
31    /// guard. Hosts call this on an event (enqueue, process wake, turn
32    /// completion) instead of polling.
33    async fn claim_and_run_pending(
34        &self,
35        session_id: Option<&str>,
36        reason: &str,
37    ) -> Result<(), PluginError> {
38        let request =
39            QueuedWorkRunRequest::new(session_id.map(str::to_string), reason.to_string(), false);
40        self.run_queued_work(request).await
41    }
42}
43
44#[derive(Clone)]
45pub struct QueuedWorkDriver {
46    run_handle: Arc<dyn QueuedWorkRunHandle>,
47}
48
49impl QueuedWorkDriver {
50    pub fn new(run_handle: Arc<dyn QueuedWorkRunHandle>) -> Self {
51        Self { run_handle }
52    }
53
54    pub async fn claim_and_run_pending(
55        &self,
56        session_id: Option<&str>,
57        reason: &str,
58    ) -> Result<(), PluginError> {
59        if let Err(err) = self
60            .run_handle
61            .claim_and_run_pending(session_id, reason)
62            .await
63        {
64            tracing::warn!("queued work drive ({reason}) failed: {err}");
65            return Err(err);
66        }
67        Ok(())
68    }
69}