Skip to main content

uni_plugin_host/
scheduler_persistence.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Durable [`SchedulerPersistence`] backend for `uni-db`.
5//!
6//! Mirrors the
7//! [`crate::persistence::SystemLabelPersistence`] pattern but scoped
8//! to scheduler job state. Writes are dual-routed:
9//!
10//! 1. **JSON sidecar** at `<data_path>/_system/background_jobs.json`
11//!    — atomic write-then-rename, source-of-truth at startup
12//!    (`load_all`). The sidecar lives next to
13//!    `declared_plugins.json` under the same `_system/` reservation.
14//! 2. **`_BackgroundJob` graph label** (best-effort) — issued via the
15//!    shared [`crate::persistence::LazyCypherSink`] once
16//!    `Uni::build` finishes wiring it. Gives operators
17//!    `MATCH (j:_BackgroundJob) RETURN j` visibility without needing
18//!    a separate introspection procedure.
19
20// Rust guideline compliant
21
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::SystemTime;
25
26use serde::{Deserialize, Serialize};
27use uni_plugin::qname::QName;
28use uni_plugin::scheduler::{
29    SchedulerJobRecord, SchedulerJobStatus, SchedulerPersistence, SchedulerPersistenceError,
30};
31use uni_plugin::traits::background::{CancellationToken, Schedule};
32
33use crate::persistence::LazyCypherSink;
34use uni_sidecar::VecSidecar;
35
36/// JSON-encoded shape of a scheduler job row in the sidecar / system
37/// label. Stable across restarts so the on-disk + on-graph forms are
38/// the same.
39///
40/// `schedule` and `next_fire_at` are `#[serde(default)]` so sidecars
41/// written by pre-M11-closure builds (which only carried
42/// `qname`/`status`/`consecutive_failures`) keep deserializing. The
43/// default for `schedule` is [`Schedule::Manual`], matching prior
44/// behavior.
45#[derive(Clone, Debug, Serialize, Deserialize)]
46struct PersistedJob {
47    qname: String,
48    status: String,
49    consecutive_failures: u32,
50    #[serde(default = "default_schedule")]
51    schedule: Schedule,
52    #[serde(default)]
53    next_fire_at: Option<SystemTime>,
54}
55
56fn default_schedule() -> Schedule {
57    Schedule::Manual
58}
59
60/// Durable [`SchedulerPersistence`] backed by a JSON sidecar.
61///
62/// The Cypher mirror is best-effort; failures are logged at debug.
63#[derive(Debug)]
64pub struct SystemLabelSchedulerPersistence {
65    sidecar: VecSidecar<PersistedJob>,
66    write_guard: parking_lot::Mutex<()>,
67    cypher_sink: Arc<LazyCypherSink>,
68}
69
70impl SystemLabelSchedulerPersistence {
71    /// Construct rooted at `data_path/_system/background_jobs.json`.
72    #[must_use]
73    pub fn new(data_path: impl Into<PathBuf>) -> Self {
74        Self {
75            sidecar: VecSidecar::new(data_path.into(), "background_jobs.json"),
76            write_guard: parking_lot::Mutex::new(()),
77            cypher_sink: Arc::new(LazyCypherSink::new()),
78        }
79    }
80
81    /// Borrow the lazy Cypher sink so the host can wire it after
82    /// `Uni::build` completes.
83    #[must_use]
84    pub fn cypher_sink(&self) -> &Arc<LazyCypherSink> {
85        &self.cypher_sink
86    }
87
88    fn read_all(&self) -> Result<Vec<PersistedJob>, SchedulerPersistenceError> {
89        self.sidecar
90            .load()
91            .map_err(|e| SchedulerPersistenceError::Backend(e.to_string()))
92    }
93
94    fn write_all(&self, rows: &[PersistedJob]) -> Result<(), SchedulerPersistenceError> {
95        self.sidecar
96            .store(rows)
97            .map_err(|e| SchedulerPersistenceError::Backend(e.to_string()))
98    }
99
100    /// Serialize a read-modify-write against the sidecar: take the write
101    /// guard, load the full row set, apply `f`, and persist the result
102    /// atomically. Every mutating entry point routes through this so the
103    /// lock → read → mutate → write skeleton lives in one place.
104    fn mutate_rows<F>(&self, f: F) -> Result<(), SchedulerPersistenceError>
105    where
106        F: FnOnce(&mut Vec<PersistedJob>),
107    {
108        let _guard = self.write_guard.lock();
109        let mut rows = self.read_all()?;
110        f(&mut rows);
111        self.write_all(&rows)
112    }
113
114    /// Best-effort Cypher mirror: issue `cypher` against the lazy sink,
115    /// logging (but not propagating) a skip at debug. `context` labels the
116    /// log line (e.g. `"cypher mirror skipped"`).
117    fn mirror_cypher(&self, qname: &str, cypher: &str, context: &str) {
118        if let Err(e) = self.cypher_sink.try_write_cypher(cypher) {
119            tracing::debug!(
120                qname = %qname,
121                error = %e,
122                "SystemLabelSchedulerPersistence: {context}",
123            );
124        }
125    }
126
127    fn upsert(
128        &self,
129        id: &QName,
130        status: SchedulerJobStatus,
131    ) -> Result<(), SchedulerPersistenceError> {
132        let qname_str = id.to_string();
133        let status_str = format!("{status:?}");
134        self.mutate_rows(
135            |rows| match rows.iter_mut().find(|r| r.qname == qname_str) {
136                Some(existing) => existing.status = status_str.clone(),
137                None => rows.push(PersistedJob {
138                    qname: qname_str.clone(),
139                    status: status_str.clone(),
140                    consecutive_failures: 0,
141                    schedule: Schedule::Manual,
142                    next_fire_at: None,
143                }),
144            },
145        )?;
146        let cypher = format!(
147            "MERGE (j:_BackgroundJob {{qname: '{q}'}}) SET j.status = '{s}'",
148            q = qname_str.replace('\'', "''"),
149            s = status_str.replace('\'', "''"),
150        );
151        self.mirror_cypher(&qname_str, &cypher, "cypher mirror skipped");
152        Ok(())
153    }
154}
155
156impl SchedulerPersistence for SystemLabelSchedulerPersistence {
157    fn record_scheduled(
158        &self,
159        id: &QName,
160        schedule: &Schedule,
161    ) -> Result<(), SchedulerPersistenceError> {
162        let qname_str = id.to_string();
163        let next_fire_at = schedule.next_after(std::time::SystemTime::now());
164        self.mutate_rows(
165            |rows| match rows.iter_mut().find(|r| r.qname == qname_str) {
166                Some(existing) => {
167                    existing.schedule = schedule.clone();
168                    existing.next_fire_at = next_fire_at;
169                }
170                None => rows.push(PersistedJob {
171                    qname: qname_str.clone(),
172                    status: format!("{:?}", SchedulerJobStatus::Pending),
173                    consecutive_failures: 0,
174                    schedule: schedule.clone(),
175                    next_fire_at,
176                }),
177            },
178        )
179    }
180
181    fn record_started(
182        &self,
183        id: &QName,
184        _started_at: std::time::SystemTime,
185    ) -> Result<(), SchedulerPersistenceError> {
186        self.upsert(id, SchedulerJobStatus::Running)
187    }
188
189    fn record_finished(
190        &self,
191        id: &QName,
192        _finished_at: std::time::SystemTime,
193        success: bool,
194    ) -> Result<(), SchedulerPersistenceError> {
195        let status = if success {
196            SchedulerJobStatus::Idle
197        } else {
198            SchedulerJobStatus::FailedRetrying
199        };
200        self.upsert(id, status)
201    }
202
203    fn cancel(&self, id: &QName) -> Result<(), SchedulerPersistenceError> {
204        let qname_str = id.to_string();
205        self.mutate_rows(|rows| rows.retain(|r| r.qname != qname_str))?;
206        let cypher = format!(
207            "MATCH (j:_BackgroundJob {{qname: '{q}'}}) DETACH DELETE j",
208            q = qname_str.replace('\'', "''"),
209        );
210        self.mirror_cypher(&qname_str, &cypher, "cypher cancel mirror skipped");
211        Ok(())
212    }
213
214    fn load_all(&self) -> Result<Vec<SchedulerJobRecord>, SchedulerPersistenceError> {
215        let rows = self.read_all()?;
216        let records: Vec<SchedulerJobRecord> = rows
217            .into_iter()
218            .filter_map(|r| {
219                // The persisted `qname` is the dotted `namespace.local` form
220                // (`QName::to_string`). Split on the *last* dot so a multi-dot
221                // local part (e.g. `system.ttl_sweep`) stays intact as the
222                // local segment. A qname with no dot can't be reconstructed into
223                // a `(namespace, local)` pair, so the row is dropped — such rows
224                // are never written by this backend (every `QName` it persists
225                // is namespaced) and would only appear from hand-edited or
226                // foreign-written sidecars.
227                let parts: Vec<&str> = r.qname.rsplitn(2, '.').collect();
228                let qname = match parts.as_slice() {
229                    [local, ns] => QName::new(*ns, *local),
230                    _ => return None,
231                };
232                Some(SchedulerJobRecord {
233                    id: qname,
234                    // Restored jobs always re-enter as `Pending`: on restart the
235                    // scheduler re-evaluates each schedule from scratch, so the
236                    // persisted run-state (`Running`/`Idle`/...) is intentionally
237                    // not resurrected — it would be stale across the restart.
238                    status: SchedulerJobStatus::Pending,
239                    next_fire_at: r.next_fire_at,
240                    last_started_at: None,
241                    last_finished_at: None,
242                    consecutive_failures: r.consecutive_failures,
243                    schedule: r.schedule,
244                    cancel: CancellationToken::new(),
245                })
246            })
247            .collect();
248        Ok(records)
249    }
250}
251
252/// Choose the appropriate [`SchedulerPersistence`] for a `Uni`
253/// instance. Returns [`SystemLabelSchedulerPersistence`] for local-disk
254/// paths and `None` (caller falls back to `MemoryPersistence`) for
255/// remote / in-memory URIs.
256#[must_use]
257pub fn scheduler_persistence_for_data_path(
258    data_path: Option<&std::path::Path>,
259) -> (Arc<dyn SchedulerPersistence>, Option<Arc<LazyCypherSink>>) {
260    match data_path {
261        Some(path) => {
262            let p = Arc::new(SystemLabelSchedulerPersistence::new(path.to_owned()));
263            let sink = Arc::clone(p.cypher_sink());
264            (p as Arc<dyn SchedulerPersistence>, Some(sink))
265        }
266        None => (Arc::new(uni_plugin::scheduler::MemoryPersistence), None),
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use tempfile::TempDir;
274
275    #[test]
276    fn record_started_and_load_all_round_trip() {
277        let tmp = TempDir::new().unwrap();
278        let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
279        let id = QName::new("uni", "system.ttl_sweep");
280        p.record_started(&id, std::time::SystemTime::now())
281            .expect("record_started");
282        let loaded = p.load_all().expect("load_all");
283        assert_eq!(loaded.len(), 1);
284        assert_eq!(loaded[0].id.to_string(), "uni.system.ttl_sweep");
285    }
286
287    #[test]
288    fn cancel_removes_the_record() {
289        let tmp = TempDir::new().unwrap();
290        let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
291        let id = QName::new("uni", "system.ttl_sweep");
292        p.record_started(&id, std::time::SystemTime::now())
293            .expect("record_started");
294        p.cancel(&id).expect("cancel");
295        assert!(p.load_all().expect("load_all").is_empty());
296    }
297
298    #[test]
299    fn close_reopen_survives() {
300        let tmp = TempDir::new().unwrap();
301        let id = QName::new("uni", "system.ttl_sweep");
302        {
303            let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
304            p.record_started(&id, std::time::SystemTime::now())
305                .expect("record_started");
306        }
307        let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
308        let loaded = p.load_all().expect("load_all");
309        assert_eq!(loaded.len(), 1);
310        assert_eq!(loaded[0].id.to_string(), "uni.system.ttl_sweep");
311    }
312
313    #[test]
314    fn scheduler_persistence_for_in_memory_returns_no_sink() {
315        let (p, sink) = scheduler_persistence_for_data_path(None);
316        assert!(sink.is_none());
317        assert!(p.load_all().expect("load_all").is_empty());
318    }
319
320    #[test]
321    fn scheduler_persistence_for_local_path_returns_sink() {
322        let tmp = TempDir::new().unwrap();
323        let (_p, sink) = scheduler_persistence_for_data_path(Some(tmp.path()));
324        assert!(sink.is_some());
325    }
326
327    #[test]
328    fn periodic_schedule_survives_restart() {
329        let tmp = TempDir::new().unwrap();
330        let id = QName::new("myorg", "nightly");
331        let schedule = Schedule::Periodic(std::time::Duration::from_secs(60));
332        {
333            let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
334            p.record_scheduled(&id, &schedule)
335                .expect("record_scheduled");
336        }
337        let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
338        let loaded = p.load_all().expect("load_all");
339        assert_eq!(loaded.len(), 1);
340        match &loaded[0].schedule {
341            Schedule::Periodic(d) => assert_eq!(*d, std::time::Duration::from_secs(60)),
342            other => panic!("expected Periodic, got {other:?}"),
343        }
344        assert!(
345            loaded[0].next_fire_at.is_some(),
346            "next_fire_at should round-trip for Periodic"
347        );
348    }
349
350    #[test]
351    fn cron_schedule_survives_restart() {
352        let tmp = TempDir::new().unwrap();
353        let id = QName::new("myorg", "hourly");
354        let schedule = Schedule::Cron(smol_str::SmolStr::new("0 0 * * * *"));
355        {
356            let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
357            p.record_scheduled(&id, &schedule)
358                .expect("record_scheduled");
359        }
360        let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
361        let loaded = p.load_all().expect("load_all");
362        assert_eq!(loaded.len(), 1);
363        match &loaded[0].schedule {
364            Schedule::Cron(expr) => assert_eq!(expr.as_str(), "0 0 * * * *"),
365            other => panic!("expected Cron, got {other:?}"),
366        }
367    }
368
369    #[test]
370    fn once_schedule_survives_restart() {
371        let tmp = TempDir::new().unwrap();
372        let id = QName::new("myorg", "oneoff");
373        let fire_at = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
374        let schedule = Schedule::Once(fire_at);
375        {
376            let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
377            p.record_scheduled(&id, &schedule)
378                .expect("record_scheduled");
379        }
380        let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
381        let loaded = p.load_all().expect("load_all");
382        assert_eq!(loaded.len(), 1);
383        match &loaded[0].schedule {
384            Schedule::Once(at) => assert_eq!(*at, fire_at),
385            other => panic!("expected Once, got {other:?}"),
386        }
387    }
388
389    #[test]
390    fn legacy_sidecar_without_schedule_falls_back_to_manual() {
391        // Simulate a sidecar written by a pre-closure build that only
392        // carried `qname` / `status` / `consecutive_failures`. The
393        // `#[serde(default)]` annotations on the new fields should
394        // make this deserialize cleanly with `Schedule::Manual`.
395        let tmp = TempDir::new().unwrap();
396        let sidecar_dir = tmp.path().join("_system");
397        std::fs::create_dir_all(&sidecar_dir).unwrap();
398        std::fs::write(
399            sidecar_dir.join("background_jobs.json"),
400            r#"[{"qname":"uni.system.ttl_sweep","status":"Pending","consecutive_failures":0}]"#,
401        )
402        .unwrap();
403        let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
404        let loaded = p.load_all().expect("legacy sidecar loads");
405        assert_eq!(loaded.len(), 1);
406        assert!(matches!(loaded[0].schedule, Schedule::Manual));
407        assert!(loaded[0].next_fire_at.is_none());
408    }
409
410    #[test]
411    fn record_scheduled_updates_existing_row() {
412        let tmp = TempDir::new().unwrap();
413        let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
414        let id = QName::new("myorg", "nightly");
415        p.record_scheduled(&id, &Schedule::Periodic(std::time::Duration::from_secs(60)))
416            .unwrap();
417        // Re-register with a different schedule — should overwrite,
418        // not duplicate.
419        p.record_scheduled(
420            &id,
421            &Schedule::Periodic(std::time::Duration::from_secs(120)),
422        )
423        .unwrap();
424        let loaded = p.load_all().expect("load_all");
425        assert_eq!(loaded.len(), 1);
426        match &loaded[0].schedule {
427            Schedule::Periodic(d) => assert_eq!(*d, std::time::Duration::from_secs(120)),
428            other => panic!("expected Periodic(120s), got {other:?}"),
429        }
430    }
431}