uni_plugin/traits/background.rs
1//! Background-job provider plugins.
2//!
3//! Scheduled / periodic / fire-and-forget execution analogous to APOC's
4//! `apoc.periodic.*` family. Jobs run on a host-owned scheduler; this
5//! trait describes the job interface, not the scheduler itself. The
6//! host-side scheduler (`uni/src/scheduler.rs`) is delivered as part of
7//! M11.
8
9// Rust guideline compliant
10
11use std::time::Duration;
12
13use serde::{Deserialize, Serialize};
14use smol_str::SmolStr;
15
16use crate::errors::FnError;
17use crate::qname::QName;
18
19/// A background-job provider.
20pub trait BackgroundJobProvider: Send + Sync {
21 /// Static definition (schedule, concurrency, timeout, docs).
22 fn definition(&self) -> &JobDefinition;
23
24 /// Execute one run of the job.
25 ///
26 /// # Threading policy
27 ///
28 /// - **Driven from Tokio via `tokio::task::spawn_blocking`.** The
29 /// host scheduler runs this synchronous method on a blocking
30 /// worker thread so it never stalls the async runtime.
31 /// - **Must not block the runtime directly.** If the job needs to
32 /// perform I/O, it must do so on the current (blocking) thread —
33 /// never call `block_on` against the host runtime from inside
34 /// `execute`.
35 /// - **Must observe [`JobContext::cancel`] cooperatively.** Poll
36 /// [`CancellationToken::is_cancelled`] at every safe point
37 /// (between batches, before long compute, before issuing each
38 /// query). The scheduler trips the token on shutdown / reload /
39 /// explicit cancel; an unresponsive job stays alive until the
40 /// process exits.
41 /// - **Errors propagate as [`FnError`].** Panics are caught at the
42 /// scheduler boundary and recorded as a failed run; they do not
43 /// crash the host.
44 ///
45 /// See `docs/PLUGIN_THREADING.md` for the long-form rationale.
46 ///
47 /// # Errors
48 ///
49 /// Returns [`FnError`] on execution failure. The host's scheduler
50 /// honors the [`JobDefinition::retry`] policy.
51 fn execute(&self, ctx: JobContext<'_>) -> Result<JobOutcome, FnError>;
52}
53
54/// Static definition for a [`BackgroundJobProvider`].
55#[derive(Clone, Debug)]
56pub struct JobDefinition {
57 /// Qualified job id.
58 pub id: QName,
59 /// When this job runs.
60 pub schedule: Schedule,
61 /// Concurrency cap for *this job* (independent of the plugin's overall
62 /// concurrency limit, which is enforced by the scheduler).
63 pub concurrency: ConcurrencyLimit,
64 /// Per-run wall-clock cap.
65 pub timeout: Duration,
66 /// Retry policy on transient failure.
67 pub retry: RetryPolicy,
68 /// Markdown docs.
69 pub docs: String,
70}
71
72/// When a background job runs.
73///
74/// Implements `Serialize`/`Deserialize` so durable persistence backends
75/// (e.g. `SystemLabelSchedulerPersistence`) can round-trip the schedule
76/// across restart. `SystemTime`, `Duration`, and `SmolStr` are all
77/// serde-compatible out of the box.
78#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
79#[non_exhaustive]
80pub enum Schedule {
81 /// Fire once at the absolute instant given.
82 Once(std::time::SystemTime),
83 /// Repeat every `period` (uniform spacing).
84 Periodic(Duration),
85 /// Cron-style schedule (`"0 */15 * * * *"`).
86 Cron(SmolStr),
87 /// Only via explicit `uni.plugin.runJob('id')`.
88 Manual,
89}
90
91impl Schedule {
92 /// Compute the next instant at or after `from` that this schedule
93 /// fires, or `None` if the schedule is exhausted (a `Once` whose
94 /// instant has already passed) or the cron expression cannot be
95 /// parsed.
96 ///
97 /// Used by the host scheduler driver
98 /// ([`crate::scheduler::Scheduler::tick_at`]) to time-gate
99 /// dispatch.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// use std::time::{Duration, SystemTime};
105 /// use uni_plugin::traits::background::Schedule;
106 ///
107 /// let now = SystemTime::now();
108 /// let s = Schedule::Periodic(Duration::from_secs(10));
109 /// let next = s.next_after(now).unwrap();
110 /// assert!(next >= now + Duration::from_secs(10));
111 ///
112 /// // A Once whose instant has passed is exhausted.
113 /// let past = now - Duration::from_secs(60);
114 /// assert!(Schedule::Once(past).next_after(now).is_none());
115 /// ```
116 #[must_use]
117 pub fn next_after(&self, from: std::time::SystemTime) -> Option<std::time::SystemTime> {
118 use std::str::FromStr;
119 match self {
120 Schedule::Manual => Some(from),
121 Schedule::Once(at) => (*at >= from).then_some(*at),
122 Schedule::Periodic(every) => Some(from + *every),
123 Schedule::Cron(expr) => {
124 // Registration-time validation rejects malformed cron
125 // expressions, but a persisted job's expression could
126 // round-trip through storage and fail to re-parse here.
127 // Log loudly so the operator notices, then return `None`
128 // so the job is treated as "not currently due" rather
129 // than silently lost. (`next_after`'s signature is
130 // infallible because tons of call sites depend on it;
131 // changing it is a separate, larger refactor.)
132 let sched = match cron::Schedule::from_str(expr.as_str()) {
133 Ok(s) => s,
134 Err(e) => {
135 tracing::error!(
136 target: "uni_plugin::scheduler",
137 cron_expr = %expr,
138 error = %e,
139 "Cron schedule failed to parse; job will not fire until \
140 the expression is fixed or the job is re-registered."
141 );
142 return None;
143 }
144 };
145 let from_chrono: chrono::DateTime<chrono::Utc> = from.into();
146 sched
147 .after(&from_chrono)
148 .next()
149 .map(|t: chrono::DateTime<chrono::Utc>| t.into())
150 }
151 }
152 }
153}
154
155/// Concurrency limit for one job.
156#[derive(Clone, Copy, Debug, PartialEq, Eq)]
157#[non_exhaustive]
158pub enum ConcurrencyLimit {
159 /// Never overlaps with itself.
160 Exclusive,
161 /// At most `N` concurrent runs.
162 Bounded(u32),
163 /// No limit.
164 Unbounded,
165}
166
167/// Retry policy on transient failure.
168#[derive(Clone, Copy, Debug, PartialEq, Eq)]
169#[non_exhaustive]
170pub enum RetryPolicy {
171 /// No retry; failure surfaces immediately.
172 Never,
173 /// Up to `max` attempts with `delay` between.
174 FixedDelay {
175 /// Maximum attempts (including the first).
176 max: u32,
177 /// Delay between attempts.
178 delay: Duration,
179 },
180}
181
182/// Outcome of one job execution.
183#[derive(Debug)]
184#[non_exhaustive]
185pub enum JobOutcome {
186 /// Job completed; no further work needed.
187 Done,
188 /// Job completed; reschedule to fire again after `delay`.
189 DoneAndReschedule(Duration),
190 /// Job failed; `retry` indicates whether retry-policy applies.
191 Failed {
192 /// Failure reason for telemetry.
193 reason: String,
194 /// `true` if retry-policy should be honored.
195 retry: bool,
196 },
197}
198
199/// Marker trait for the host's background-job execution facilities.
200///
201/// Concrete hosts (e.g., `uni-db`'s `SchedulerJobHost`) implement this
202/// and expose typed accessors on the concrete type. Job providers
203/// downcast via [`JobHost::as_any`] when they need host services like
204/// the storage manager, plugin registry, or write-mode inner-query
205/// execution.
206///
207/// Mirrors [`crate::traits::procedure::ProcedureHost`] — same
208/// downcasting pattern, just per-job-context flavor.
209pub trait JobHost: Send + Sync + std::any::Any + std::fmt::Debug {
210 /// Returns the host as a downcastable `&dyn Any`.
211 fn as_any(&self) -> &dyn std::any::Any;
212
213 /// Trigger a best-effort storage compaction.
214 ///
215 /// The built-in `uni.system.compaction` job calls this from its
216 /// `execute()` body. The default impl is a no-op so test hosts
217 /// don't have to implement storage access.
218 ///
219 /// # Errors
220 ///
221 /// Returns [`FnError`] if the host's storage manager surfaces a
222 /// compaction failure.
223 fn compact_storage(&self) -> Result<(), FnError> {
224 Ok(())
225 }
226
227 /// Execute a write-mode Cypher statement against the host.
228 ///
229 /// The built-in `uni.system.ttl_sweep` job calls this with a
230 /// `MATCH (n) WHERE n.__ttl < timestamp() DETACH DELETE n` body.
231 /// The default impl returns an error so test hosts that don't
232 /// wire write-mode Cypher can still load.
233 ///
234 /// # Errors
235 ///
236 /// Returns [`FnError`] if the host has not wired write-mode
237 /// Cypher (default) or if the statement fails.
238 fn execute_write_cypher(&self, _cypher: &str) -> Result<(), FnError> {
239 Err(FnError::new(
240 0xD10,
241 "JobHost: write-mode Cypher not supported by this host",
242 ))
243 }
244}
245
246/// Per-run context.
247#[derive(Debug)]
248#[non_exhaustive]
249pub struct JobContext<'a> {
250 /// Information about the previous run, if any.
251 pub last_run: Option<JobRunRecord>,
252 /// Cooperative-cancel token — implementations check between work
253 /// units to honor reload / shutdown.
254 pub cancel: CancellationToken,
255 /// Optional host services pointer. `None` in pure unit tests; the
256 /// scheduler driver populates it with a concrete `dyn JobHost`
257 /// (typically `uni-db::scheduler::SchedulerJobHost`).
258 pub host: Option<&'a dyn JobHost>,
259 /// Lifetime marker for session / config refs added later.
260 pub _marker: std::marker::PhantomData<&'a ()>,
261}
262
263impl<'a> JobContext<'a> {
264 /// Construct a fresh per-run context from a cancellation token
265 /// and the previous run's record (if any).
266 ///
267 /// Out-of-crate callers (e.g., the host scheduler driver in
268 /// `uni-db::scheduler`) use this constructor because
269 /// [`JobContext`] is `#[non_exhaustive]` and cannot be built with
270 /// a struct literal from outside this crate. Host services
271 /// (storage, inner-query, etc.) are attached via
272 /// [`Self::with_host`].
273 #[must_use]
274 pub fn new(cancel: CancellationToken, last_run: Option<JobRunRecord>) -> Self {
275 Self {
276 last_run,
277 cancel,
278 host: None,
279 _marker: std::marker::PhantomData,
280 }
281 }
282
283 /// Attach a host pointer for the run.
284 #[must_use]
285 pub fn with_host(mut self, host: &'a dyn JobHost) -> Self {
286 self.host = Some(host);
287 self
288 }
289}
290
291/// Bookkeeping record of a prior run; persisted in `uni_system.background_jobs`.
292#[derive(Clone, Debug)]
293pub struct JobRunRecord {
294 /// Run started at.
295 pub started_at: std::time::SystemTime,
296 /// Run finished at (or last activity, if still running).
297 pub finished_at: std::time::SystemTime,
298 /// Outcome — recorded as the variant name as a string for portability.
299 pub outcome: String,
300}
301
302/// Cooperative cancellation token.
303///
304/// The scheduler creates one per run and trips it on shutdown / reload /
305/// explicit cancel. Job implementations are responsible for checking the
306/// token at safe points (sync polling via [`CancellationToken::is_cancelled`])
307/// or, for async-aware bodies, awaiting [`CancellationToken::cancelled`].
308///
309/// §1.2 / Phase 6 consolidation: re-exported from
310/// [`tokio_util::sync::CancellationToken`]. The previous hand-rolled
311/// `Arc<AtomicBool>` token shipped only the sync `is_cancelled()` flag,
312/// forcing the scheduler driver to poll. The upstream type adds an async
313/// `cancelled().await` future, which lets the driver wrap dispatch in a
314/// `tokio::select!` against the cancel signal and react immediately —
315/// the sync API (`new`, `cancel`, `is_cancelled`, `Clone`, `Debug`,
316/// `Default`) is preserved verbatim, so existing call sites compile
317/// unchanged.
318pub use tokio_util::sync::CancellationToken;
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn cancel_token_round_trip() {
326 let t = CancellationToken::new();
327 assert!(!t.is_cancelled());
328 t.cancel();
329 assert!(t.is_cancelled());
330 }
331
332 #[test]
333 fn cancel_token_clone_shares_state() {
334 let t = CancellationToken::new();
335 let u = t.clone();
336 t.cancel();
337 assert!(u.is_cancelled());
338 }
339}