Skip to main content

lash_core/runtime/
queued_work_driver.rs

1use std::sync::Arc;
2
3use crate::PluginError;
4
5#[derive(Clone, Debug)]
6pub struct QueuedWorkRunRequest {
7    pub session_id: Option<String>,
8    pub reason: String,
9    pub trace_idle: bool,
10}
11
12impl QueuedWorkRunRequest {
13    fn new(session_id: Option<String>, reason: impl Into<String>, trace_idle: bool) -> Self {
14        Self {
15            session_id,
16            reason: reason.into(),
17            trace_idle,
18        }
19    }
20}
21
22#[async_trait::async_trait]
23pub trait QueuedWorkRunHandle: Send + Sync {
24    async fn run_queued_work(&self, request: QueuedWorkRunRequest) -> Result<(), PluginError>;
25
26    /// Host-driven single pass: claim and submit ready queued work, optionally
27    /// narrowed to one session. The symmetric counterpart to
28    /// [`ProcessRunHandle::claim_and_run_pending`](super::ProcessRunHandle::claim_and_run_pending).
29    ///
30    /// Idempotency is the store claim's job ([`claim_ready_queued_work`]), not a
31    /// same-process memory guard. Hosts call this on an event (enqueue, process
32    /// wake, turn completion) instead of polling.
33    ///
34    /// [`claim_ready_queued_work`]: crate::store::RuntimePersistence::claim_ready_queued_work
35    async fn claim_and_run_pending(
36        &self,
37        session_id: Option<&str>,
38        reason: &str,
39    ) -> Result<(), PluginError> {
40        let request =
41            QueuedWorkRunRequest::new(session_id.map(str::to_string), reason.to_string(), false);
42        self.run_queued_work(request).await
43    }
44}
45
46#[derive(Clone)]
47pub struct QueuedWorkDriver {
48    run_handle: Arc<dyn QueuedWorkRunHandle>,
49}
50
51impl QueuedWorkDriver {
52    pub fn new(run_handle: Arc<dyn QueuedWorkRunHandle>) -> Self {
53        Self { run_handle }
54    }
55
56    pub async fn claim_and_run_pending(
57        &self,
58        session_id: Option<&str>,
59        reason: &str,
60    ) -> Result<(), PluginError> {
61        if let Err(err) = self
62            .run_handle
63            .claim_and_run_pending(session_id, reason)
64            .await
65        {
66            tracing::warn!("queued work drive ({reason}) failed: {err}");
67            return Err(err);
68        }
69        Ok(())
70    }
71}