Skip to main content

uni_plugin/traits/
background.rs

1//! Background-job provider plugins.
2//!
3//! Scheduled / periodic / fire-and-forget execution analogous to APOC's
4//! `apoc.periodic.*` family. Jobs run on a host-owned scheduler; this
5//! trait describes the job interface, not the scheduler itself. The
6//! host-side scheduler (`uni/src/scheduler.rs`) is delivered as part of
7//! M11.
8
9// Rust guideline compliant
10
11use std::time::Duration;
12
13use serde::{Deserialize, Serialize};
14use smol_str::SmolStr;
15
16use crate::errors::FnError;
17use crate::qname::QName;
18
19/// A background-job provider.
20pub trait BackgroundJobProvider: Send + Sync {
21    /// Static definition (schedule, concurrency, timeout, docs).
22    fn definition(&self) -> &JobDefinition;
23
24    /// Execute one run of the job.
25    ///
26    /// # Threading policy
27    ///
28    /// - **Driven from Tokio via `tokio::task::spawn_blocking`.** The
29    ///   host scheduler runs this synchronous method on a blocking
30    ///   worker thread so it never stalls the async runtime.
31    /// - **Must not block the runtime directly.** If the job needs to
32    ///   perform I/O, it must do so on the current (blocking) thread —
33    ///   never call `block_on` against the host runtime from inside
34    ///   `execute`.
35    /// - **Must observe [`JobContext::cancel`] cooperatively.** Poll
36    ///   [`CancellationToken::is_cancelled`] at every safe point
37    ///   (between batches, before long compute, before issuing each
38    ///   query). The scheduler trips the token on shutdown / reload /
39    ///   explicit cancel; an unresponsive job stays alive until the
40    ///   process exits.
41    /// - **Errors propagate as [`FnError`].** Panics are caught at the
42    ///   scheduler boundary and recorded as a failed run; they do not
43    ///   crash the host.
44    ///
45    /// See `docs/PLUGIN_THREADING.md` for the long-form rationale.
46    ///
47    /// # Errors
48    ///
49    /// Returns [`FnError`] on execution failure. The host's scheduler
50    /// honors the [`JobDefinition::retry`] policy.
51    fn execute(&self, ctx: JobContext<'_>) -> Result<JobOutcome, FnError>;
52}
53
54/// Static definition for a [`BackgroundJobProvider`].
55#[derive(Clone, Debug)]
56pub struct JobDefinition {
57    /// Qualified job id.
58    pub id: QName,
59    /// When this job runs.
60    pub schedule: Schedule,
61    /// Concurrency cap for *this job* (independent of the plugin's overall
62    /// concurrency limit, which is enforced by the scheduler).
63    pub concurrency: ConcurrencyLimit,
64    /// Per-run wall-clock cap.
65    pub timeout: Duration,
66    /// Retry policy on transient failure.
67    pub retry: RetryPolicy,
68    /// Markdown docs.
69    pub docs: String,
70}
71
72/// When a background job runs.
73///
74/// Implements `Serialize`/`Deserialize` so durable persistence backends
75/// (e.g. `SystemLabelSchedulerPersistence`) can round-trip the schedule
76/// across restart. `SystemTime`, `Duration`, and `SmolStr` are all
77/// serde-compatible out of the box.
78#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
79#[non_exhaustive]
80pub enum Schedule {
81    /// Fire once at the absolute instant given.
82    Once(std::time::SystemTime),
83    /// Repeat every `period` (uniform spacing).
84    Periodic(Duration),
85    /// Cron-style schedule (`"0 */15 * * * *"`).
86    Cron(SmolStr),
87    /// Only via explicit `uni.plugin.runJob('id')`.
88    Manual,
89}
90
91impl Schedule {
92    /// Compute the next instant at or after `from` that this schedule
93    /// fires, or `None` if the schedule is exhausted (a `Once` whose
94    /// instant has already passed) or the cron expression cannot be
95    /// parsed.
96    ///
97    /// Used by the host scheduler driver
98    /// ([`crate::scheduler::Scheduler::tick_at`]) to time-gate
99    /// dispatch.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use std::time::{Duration, SystemTime};
105    /// use uni_plugin::traits::background::Schedule;
106    ///
107    /// let now = SystemTime::now();
108    /// let s = Schedule::Periodic(Duration::from_secs(10));
109    /// let next = s.next_after(now).unwrap();
110    /// assert!(next >= now + Duration::from_secs(10));
111    ///
112    /// // A Once whose instant has passed is exhausted.
113    /// let past = now - Duration::from_secs(60);
114    /// assert!(Schedule::Once(past).next_after(now).is_none());
115    /// ```
116    #[must_use]
117    pub fn next_after(&self, from: std::time::SystemTime) -> Option<std::time::SystemTime> {
118        use std::str::FromStr;
119        match self {
120            Schedule::Manual => Some(from),
121            Schedule::Once(at) => (*at >= from).then_some(*at),
122            Schedule::Periodic(every) => Some(from + *every),
123            Schedule::Cron(expr) => {
124                // Registration-time validation rejects malformed cron
125                // expressions, but a persisted job's expression could
126                // round-trip through storage and fail to re-parse here.
127                // Log loudly so the operator notices, then return `None`
128                // so the job is treated as "not currently due" rather
129                // than silently lost. (`next_after`'s signature is
130                // infallible because tons of call sites depend on it;
131                // changing it is a separate, larger refactor.)
132                let sched = match cron::Schedule::from_str(expr.as_str()) {
133                    Ok(s) => s,
134                    Err(e) => {
135                        tracing::error!(
136                            target: "uni_plugin::scheduler",
137                            cron_expr = %expr,
138                            error = %e,
139                            "Cron schedule failed to parse; job will not fire until \
140                             the expression is fixed or the job is re-registered."
141                        );
142                        return None;
143                    }
144                };
145                let from_chrono: chrono::DateTime<chrono::Utc> = from.into();
146                sched
147                    .after(&from_chrono)
148                    .next()
149                    .map(|t: chrono::DateTime<chrono::Utc>| t.into())
150            }
151        }
152    }
153}
154
155/// Concurrency limit for one job.
156#[derive(Clone, Copy, Debug, PartialEq, Eq)]
157#[non_exhaustive]
158pub enum ConcurrencyLimit {
159    /// Never overlaps with itself.
160    Exclusive,
161    /// At most `N` concurrent runs.
162    Bounded(u32),
163    /// No limit.
164    Unbounded,
165}
166
167/// Retry policy on transient failure.
168#[derive(Clone, Copy, Debug, PartialEq, Eq)]
169#[non_exhaustive]
170pub enum RetryPolicy {
171    /// No retry; failure surfaces immediately.
172    Never,
173    /// Up to `max` attempts with `delay` between.
174    FixedDelay {
175        /// Maximum attempts (including the first).
176        max: u32,
177        /// Delay between attempts.
178        delay: Duration,
179    },
180}
181
182/// Outcome of one job execution.
183#[derive(Debug)]
184#[non_exhaustive]
185pub enum JobOutcome {
186    /// Job completed; no further work needed.
187    Done,
188    /// Job completed; reschedule to fire again after `delay`.
189    DoneAndReschedule(Duration),
190    /// Job failed; `retry` indicates whether retry-policy applies.
191    Failed {
192        /// Failure reason for telemetry.
193        reason: String,
194        /// `true` if retry-policy should be honored.
195        retry: bool,
196    },
197}
198
199/// Marker trait for the host's background-job execution facilities.
200///
201/// Concrete hosts (e.g., `uni-db`'s `SchedulerJobHost`) implement this
202/// and expose typed accessors on the concrete type. Job providers
203/// downcast via [`JobHost::as_any`] when they need host services like
204/// the storage manager, plugin registry, or write-mode inner-query
205/// execution.
206///
207/// Mirrors [`crate::traits::procedure::ProcedureHost`] — same
208/// downcasting pattern, just per-job-context flavor.
209pub trait JobHost: Send + Sync + std::any::Any + std::fmt::Debug {
210    /// Returns the host as a downcastable `&dyn Any`.
211    fn as_any(&self) -> &dyn std::any::Any;
212
213    /// Trigger a best-effort storage compaction.
214    ///
215    /// The built-in `uni.system.compaction` job calls this from its
216    /// `execute()` body. The default impl is a no-op so test hosts
217    /// don't have to implement storage access.
218    ///
219    /// # Errors
220    ///
221    /// Returns [`FnError`] if the host's storage manager surfaces a
222    /// compaction failure.
223    fn compact_storage(&self) -> Result<(), FnError> {
224        Ok(())
225    }
226
227    /// Execute a write-mode Cypher statement against the host.
228    ///
229    /// The built-in `uni.system.ttl_sweep` job calls this with a
230    /// `MATCH (n) WHERE n.__ttl < timestamp() DETACH DELETE n` body.
231    /// The default impl returns an error so test hosts that don't
232    /// wire write-mode Cypher can still load.
233    ///
234    /// # Errors
235    ///
236    /// Returns [`FnError`] if the host has not wired write-mode
237    /// Cypher (default) or if the statement fails.
238    fn execute_write_cypher(&self, _cypher: &str) -> Result<(), FnError> {
239        Err(FnError::new(
240            0xD10,
241            "JobHost: write-mode Cypher not supported by this host",
242        ))
243    }
244}
245
246/// Per-run context.
247#[derive(Debug)]
248#[non_exhaustive]
249pub struct JobContext<'a> {
250    /// Information about the previous run, if any.
251    pub last_run: Option<JobRunRecord>,
252    /// Cooperative-cancel token — implementations check between work
253    /// units to honor reload / shutdown.
254    pub cancel: CancellationToken,
255    /// Optional host services pointer. `None` in pure unit tests; the
256    /// scheduler driver populates it with a concrete `dyn JobHost`
257    /// (typically `uni-db::scheduler::SchedulerJobHost`).
258    pub host: Option<&'a dyn JobHost>,
259    /// Lifetime marker for session / config refs added later.
260    pub _marker: std::marker::PhantomData<&'a ()>,
261}
262
263impl<'a> JobContext<'a> {
264    /// Construct a fresh per-run context from a cancellation token
265    /// and the previous run's record (if any).
266    ///
267    /// Out-of-crate callers (e.g., the host scheduler driver in
268    /// `uni-db::scheduler`) use this constructor because
269    /// [`JobContext`] is `#[non_exhaustive]` and cannot be built with
270    /// a struct literal from outside this crate. Host services
271    /// (storage, inner-query, etc.) are attached via
272    /// [`Self::with_host`].
273    #[must_use]
274    pub fn new(cancel: CancellationToken, last_run: Option<JobRunRecord>) -> Self {
275        Self {
276            last_run,
277            cancel,
278            host: None,
279            _marker: std::marker::PhantomData,
280        }
281    }
282
283    /// Attach a host pointer for the run.
284    #[must_use]
285    pub fn with_host(mut self, host: &'a dyn JobHost) -> Self {
286        self.host = Some(host);
287        self
288    }
289}
290
291/// Bookkeeping record of a prior run; persisted in `uni_system.background_jobs`.
292#[derive(Clone, Debug)]
293pub struct JobRunRecord {
294    /// Run started at.
295    pub started_at: std::time::SystemTime,
296    /// Run finished at (or last activity, if still running).
297    pub finished_at: std::time::SystemTime,
298    /// Outcome — recorded as the variant name as a string for portability.
299    pub outcome: String,
300}
301
302/// Cooperative cancellation token.
303///
304/// The scheduler creates one per run and trips it on shutdown / reload /
305/// explicit cancel. Job implementations are responsible for checking the
306/// token at safe points (sync polling via [`CancellationToken::is_cancelled`])
307/// or, for async-aware bodies, awaiting [`CancellationToken::cancelled`].
308///
309/// §1.2 / Phase 6 consolidation: re-exported from
310/// [`tokio_util::sync::CancellationToken`]. The previous hand-rolled
311/// `Arc<AtomicBool>` token shipped only the sync `is_cancelled()` flag,
312/// forcing the scheduler driver to poll. The upstream type adds an async
313/// `cancelled().await` future, which lets the driver wrap dispatch in a
314/// `tokio::select!` against the cancel signal and react immediately —
315/// the sync API (`new`, `cancel`, `is_cancelled`, `Clone`, `Debug`,
316/// `Default`) is preserved verbatim, so existing call sites compile
317/// unchanged.
318pub use tokio_util::sync::CancellationToken;
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn cancel_token_round_trip() {
326        let t = CancellationToken::new();
327        assert!(!t.is_cancelled());
328        t.cancel();
329        assert!(t.is_cancelled());
330    }
331
332    #[test]
333    fn cancel_token_clone_shares_state() {
334        let t = CancellationToken::new();
335        let u = t.clone();
336        t.cancel();
337        assert!(u.is_cancelled());
338    }
339}