1use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::SystemTime;
25
26use serde::{Deserialize, Serialize};
27use uni_plugin::qname::QName;
28use uni_plugin::scheduler::{
29 SchedulerJobRecord, SchedulerJobStatus, SchedulerPersistence, SchedulerPersistenceError,
30};
31use uni_plugin::traits::background::{CancellationToken, Schedule};
32
33use crate::persistence::LazyCypherSink;
34use uni_sidecar::VecSidecar;
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
46struct PersistedJob {
47 qname: String,
48 status: String,
49 consecutive_failures: u32,
50 #[serde(default = "default_schedule")]
51 schedule: Schedule,
52 #[serde(default)]
53 next_fire_at: Option<SystemTime>,
54}
55
56fn default_schedule() -> Schedule {
57 Schedule::Manual
58}
59
60#[derive(Debug)]
64pub struct SystemLabelSchedulerPersistence {
65 sidecar: VecSidecar<PersistedJob>,
66 write_guard: parking_lot::Mutex<()>,
67 cypher_sink: Arc<LazyCypherSink>,
68}
69
70impl SystemLabelSchedulerPersistence {
71 #[must_use]
73 pub fn new(data_path: impl Into<PathBuf>) -> Self {
74 Self {
75 sidecar: VecSidecar::new(data_path.into(), "background_jobs.json"),
76 write_guard: parking_lot::Mutex::new(()),
77 cypher_sink: Arc::new(LazyCypherSink::new()),
78 }
79 }
80
81 #[must_use]
84 pub fn cypher_sink(&self) -> &Arc<LazyCypherSink> {
85 &self.cypher_sink
86 }
87
88 fn read_all(&self) -> Result<Vec<PersistedJob>, SchedulerPersistenceError> {
89 self.sidecar
90 .load()
91 .map_err(|e| SchedulerPersistenceError::Backend(e.to_string()))
92 }
93
94 fn write_all(&self, rows: &[PersistedJob]) -> Result<(), SchedulerPersistenceError> {
95 self.sidecar
96 .store(rows)
97 .map_err(|e| SchedulerPersistenceError::Backend(e.to_string()))
98 }
99
100 fn mutate_rows<F>(&self, f: F) -> Result<(), SchedulerPersistenceError>
105 where
106 F: FnOnce(&mut Vec<PersistedJob>),
107 {
108 let _guard = self.write_guard.lock();
109 let mut rows = self.read_all()?;
110 f(&mut rows);
111 self.write_all(&rows)
112 }
113
114 fn mirror_cypher(&self, qname: &str, cypher: &str, context: &str) {
118 if let Err(e) = self.cypher_sink.try_write_cypher(cypher) {
119 tracing::debug!(
120 qname = %qname,
121 error = %e,
122 "SystemLabelSchedulerPersistence: {context}",
123 );
124 }
125 }
126
127 fn upsert(
128 &self,
129 id: &QName,
130 status: SchedulerJobStatus,
131 ) -> Result<(), SchedulerPersistenceError> {
132 let qname_str = id.to_string();
133 let status_str = format!("{status:?}");
134 self.mutate_rows(
135 |rows| match rows.iter_mut().find(|r| r.qname == qname_str) {
136 Some(existing) => existing.status = status_str.clone(),
137 None => rows.push(PersistedJob {
138 qname: qname_str.clone(),
139 status: status_str.clone(),
140 consecutive_failures: 0,
141 schedule: Schedule::Manual,
142 next_fire_at: None,
143 }),
144 },
145 )?;
146 let cypher = format!(
147 "MERGE (j:_BackgroundJob {{qname: '{q}'}}) SET j.status = '{s}'",
148 q = qname_str.replace('\'', "''"),
149 s = status_str.replace('\'', "''"),
150 );
151 self.mirror_cypher(&qname_str, &cypher, "cypher mirror skipped");
152 Ok(())
153 }
154}
155
156impl SchedulerPersistence for SystemLabelSchedulerPersistence {
157 fn record_scheduled(
158 &self,
159 id: &QName,
160 schedule: &Schedule,
161 ) -> Result<(), SchedulerPersistenceError> {
162 let qname_str = id.to_string();
163 let next_fire_at = schedule.next_after(std::time::SystemTime::now());
164 self.mutate_rows(
165 |rows| match rows.iter_mut().find(|r| r.qname == qname_str) {
166 Some(existing) => {
167 existing.schedule = schedule.clone();
168 existing.next_fire_at = next_fire_at;
169 }
170 None => rows.push(PersistedJob {
171 qname: qname_str.clone(),
172 status: format!("{:?}", SchedulerJobStatus::Pending),
173 consecutive_failures: 0,
174 schedule: schedule.clone(),
175 next_fire_at,
176 }),
177 },
178 )
179 }
180
181 fn record_started(
182 &self,
183 id: &QName,
184 _started_at: std::time::SystemTime,
185 ) -> Result<(), SchedulerPersistenceError> {
186 self.upsert(id, SchedulerJobStatus::Running)
187 }
188
189 fn record_finished(
190 &self,
191 id: &QName,
192 _finished_at: std::time::SystemTime,
193 success: bool,
194 ) -> Result<(), SchedulerPersistenceError> {
195 let status = if success {
196 SchedulerJobStatus::Idle
197 } else {
198 SchedulerJobStatus::FailedRetrying
199 };
200 self.upsert(id, status)
201 }
202
203 fn cancel(&self, id: &QName) -> Result<(), SchedulerPersistenceError> {
204 let qname_str = id.to_string();
205 self.mutate_rows(|rows| rows.retain(|r| r.qname != qname_str))?;
206 let cypher = format!(
207 "MATCH (j:_BackgroundJob {{qname: '{q}'}}) DETACH DELETE j",
208 q = qname_str.replace('\'', "''"),
209 );
210 self.mirror_cypher(&qname_str, &cypher, "cypher cancel mirror skipped");
211 Ok(())
212 }
213
214 fn load_all(&self) -> Result<Vec<SchedulerJobRecord>, SchedulerPersistenceError> {
215 let rows = self.read_all()?;
216 let records: Vec<SchedulerJobRecord> = rows
217 .into_iter()
218 .filter_map(|r| {
219 let parts: Vec<&str> = r.qname.rsplitn(2, '.').collect();
228 let qname = match parts.as_slice() {
229 [local, ns] => QName::new(*ns, *local),
230 _ => return None,
231 };
232 Some(SchedulerJobRecord {
233 id: qname,
234 status: SchedulerJobStatus::Pending,
239 next_fire_at: r.next_fire_at,
240 last_started_at: None,
241 last_finished_at: None,
242 consecutive_failures: r.consecutive_failures,
243 schedule: r.schedule,
244 cancel: CancellationToken::new(),
245 })
246 })
247 .collect();
248 Ok(records)
249 }
250}
251
252#[must_use]
257pub fn scheduler_persistence_for_data_path(
258 data_path: Option<&std::path::Path>,
259) -> (Arc<dyn SchedulerPersistence>, Option<Arc<LazyCypherSink>>) {
260 match data_path {
261 Some(path) => {
262 let p = Arc::new(SystemLabelSchedulerPersistence::new(path.to_owned()));
263 let sink = Arc::clone(p.cypher_sink());
264 (p as Arc<dyn SchedulerPersistence>, Some(sink))
265 }
266 None => (Arc::new(uni_plugin::scheduler::MemoryPersistence), None),
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use tempfile::TempDir;
274
275 #[test]
276 fn record_started_and_load_all_round_trip() {
277 let tmp = TempDir::new().unwrap();
278 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
279 let id = QName::new("uni", "system.ttl_sweep");
280 p.record_started(&id, std::time::SystemTime::now())
281 .expect("record_started");
282 let loaded = p.load_all().expect("load_all");
283 assert_eq!(loaded.len(), 1);
284 assert_eq!(loaded[0].id.to_string(), "uni.system.ttl_sweep");
285 }
286
287 #[test]
288 fn cancel_removes_the_record() {
289 let tmp = TempDir::new().unwrap();
290 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
291 let id = QName::new("uni", "system.ttl_sweep");
292 p.record_started(&id, std::time::SystemTime::now())
293 .expect("record_started");
294 p.cancel(&id).expect("cancel");
295 assert!(p.load_all().expect("load_all").is_empty());
296 }
297
298 #[test]
299 fn close_reopen_survives() {
300 let tmp = TempDir::new().unwrap();
301 let id = QName::new("uni", "system.ttl_sweep");
302 {
303 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
304 p.record_started(&id, std::time::SystemTime::now())
305 .expect("record_started");
306 }
307 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
308 let loaded = p.load_all().expect("load_all");
309 assert_eq!(loaded.len(), 1);
310 assert_eq!(loaded[0].id.to_string(), "uni.system.ttl_sweep");
311 }
312
313 #[test]
314 fn scheduler_persistence_for_in_memory_returns_no_sink() {
315 let (p, sink) = scheduler_persistence_for_data_path(None);
316 assert!(sink.is_none());
317 assert!(p.load_all().expect("load_all").is_empty());
318 }
319
320 #[test]
321 fn scheduler_persistence_for_local_path_returns_sink() {
322 let tmp = TempDir::new().unwrap();
323 let (_p, sink) = scheduler_persistence_for_data_path(Some(tmp.path()));
324 assert!(sink.is_some());
325 }
326
327 #[test]
328 fn periodic_schedule_survives_restart() {
329 let tmp = TempDir::new().unwrap();
330 let id = QName::new("myorg", "nightly");
331 let schedule = Schedule::Periodic(std::time::Duration::from_secs(60));
332 {
333 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
334 p.record_scheduled(&id, &schedule)
335 .expect("record_scheduled");
336 }
337 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
338 let loaded = p.load_all().expect("load_all");
339 assert_eq!(loaded.len(), 1);
340 match &loaded[0].schedule {
341 Schedule::Periodic(d) => assert_eq!(*d, std::time::Duration::from_secs(60)),
342 other => panic!("expected Periodic, got {other:?}"),
343 }
344 assert!(
345 loaded[0].next_fire_at.is_some(),
346 "next_fire_at should round-trip for Periodic"
347 );
348 }
349
350 #[test]
351 fn cron_schedule_survives_restart() {
352 let tmp = TempDir::new().unwrap();
353 let id = QName::new("myorg", "hourly");
354 let schedule = Schedule::Cron(smol_str::SmolStr::new("0 0 * * * *"));
355 {
356 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
357 p.record_scheduled(&id, &schedule)
358 .expect("record_scheduled");
359 }
360 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
361 let loaded = p.load_all().expect("load_all");
362 assert_eq!(loaded.len(), 1);
363 match &loaded[0].schedule {
364 Schedule::Cron(expr) => assert_eq!(expr.as_str(), "0 0 * * * *"),
365 other => panic!("expected Cron, got {other:?}"),
366 }
367 }
368
369 #[test]
370 fn once_schedule_survives_restart() {
371 let tmp = TempDir::new().unwrap();
372 let id = QName::new("myorg", "oneoff");
373 let fire_at = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
374 let schedule = Schedule::Once(fire_at);
375 {
376 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
377 p.record_scheduled(&id, &schedule)
378 .expect("record_scheduled");
379 }
380 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
381 let loaded = p.load_all().expect("load_all");
382 assert_eq!(loaded.len(), 1);
383 match &loaded[0].schedule {
384 Schedule::Once(at) => assert_eq!(*at, fire_at),
385 other => panic!("expected Once, got {other:?}"),
386 }
387 }
388
389 #[test]
390 fn legacy_sidecar_without_schedule_falls_back_to_manual() {
391 let tmp = TempDir::new().unwrap();
396 let sidecar_dir = tmp.path().join("_system");
397 std::fs::create_dir_all(&sidecar_dir).unwrap();
398 std::fs::write(
399 sidecar_dir.join("background_jobs.json"),
400 r#"[{"qname":"uni.system.ttl_sweep","status":"Pending","consecutive_failures":0}]"#,
401 )
402 .unwrap();
403 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
404 let loaded = p.load_all().expect("legacy sidecar loads");
405 assert_eq!(loaded.len(), 1);
406 assert!(matches!(loaded[0].schedule, Schedule::Manual));
407 assert!(loaded[0].next_fire_at.is_none());
408 }
409
410 #[test]
411 fn record_scheduled_updates_existing_row() {
412 let tmp = TempDir::new().unwrap();
413 let p = SystemLabelSchedulerPersistence::new(tmp.path().to_path_buf());
414 let id = QName::new("myorg", "nightly");
415 p.record_scheduled(&id, &Schedule::Periodic(std::time::Duration::from_secs(60)))
416 .unwrap();
417 p.record_scheduled(
420 &id,
421 &Schedule::Periodic(std::time::Duration::from_secs(120)),
422 )
423 .unwrap();
424 let loaded = p.load_all().expect("load_all");
425 assert_eq!(loaded.len(), 1);
426 match &loaded[0].schedule {
427 Schedule::Periodic(d) => assert_eq!(*d, std::time::Duration::from_secs(120)),
428 other => panic!("expected Periodic(120s), got {other:?}"),
429 }
430 }
431}