use std::time::Duration;
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use crate::errors::FnError;
use crate::qname::QName;
pub trait BackgroundJobProvider: Send + Sync {
fn definition(&self) -> &JobDefinition;
fn execute(&self, ctx: JobContext<'_>) -> Result<JobOutcome, FnError>;
}
#[derive(Clone, Debug)]
pub struct JobDefinition {
pub id: QName,
pub schedule: Schedule,
pub concurrency: ConcurrencyLimit,
pub timeout: Duration,
pub retry: RetryPolicy,
pub docs: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum Schedule {
Once(std::time::SystemTime),
Periodic(Duration),
Cron(SmolStr),
Manual,
}
impl Schedule {
#[must_use]
pub fn next_after(&self, from: std::time::SystemTime) -> Option<std::time::SystemTime> {
use std::str::FromStr;
match self {
Schedule::Manual => Some(from),
Schedule::Once(at) => (*at >= from).then_some(*at),
Schedule::Periodic(every) => Some(from + *every),
Schedule::Cron(expr) => {
let sched = match cron::Schedule::from_str(expr.as_str()) {
Ok(s) => s,
Err(e) => {
tracing::error!(
target: "uni_plugin::scheduler",
cron_expr = %expr,
error = %e,
"Cron schedule failed to parse; job will not fire until \
the expression is fixed or the job is re-registered."
);
return None;
}
};
let from_chrono: chrono::DateTime<chrono::Utc> = from.into();
sched
.after(&from_chrono)
.next()
.map(|t: chrono::DateTime<chrono::Utc>| t.into())
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ConcurrencyLimit {
Exclusive,
Bounded(u32),
Unbounded,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum RetryPolicy {
Never,
FixedDelay {
max: u32,
delay: Duration,
},
}
#[derive(Debug)]
#[non_exhaustive]
pub enum JobOutcome {
Done,
DoneAndReschedule(Duration),
Failed {
reason: String,
retry: bool,
},
}
pub trait JobHost: Send + Sync + std::any::Any + std::fmt::Debug {
fn as_any(&self) -> &dyn std::any::Any;
fn compact_storage(&self) -> Result<(), FnError> {
Ok(())
}
fn execute_write_cypher(&self, _cypher: &str) -> Result<(), FnError> {
Err(FnError::new(
0xD10,
"JobHost: write-mode Cypher not supported by this host",
))
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct JobContext<'a> {
pub last_run: Option<JobRunRecord>,
pub cancel: CancellationToken,
pub host: Option<&'a dyn JobHost>,
pub _marker: std::marker::PhantomData<&'a ()>,
}
impl<'a> JobContext<'a> {
#[must_use]
pub fn new(cancel: CancellationToken, last_run: Option<JobRunRecord>) -> Self {
Self {
last_run,
cancel,
host: None,
_marker: std::marker::PhantomData,
}
}
#[must_use]
pub fn with_host(mut self, host: &'a dyn JobHost) -> Self {
self.host = Some(host);
self
}
}
#[derive(Clone, Debug)]
pub struct JobRunRecord {
pub started_at: std::time::SystemTime,
pub finished_at: std::time::SystemTime,
pub outcome: String,
}
pub use tokio_util::sync::CancellationToken;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cancel_token_round_trip() {
let t = CancellationToken::new();
assert!(!t.is_cancelled());
t.cancel();
assert!(t.is_cancelled());
}
#[test]
fn cancel_token_clone_shares_state() {
let t = CancellationToken::new();
let u = t.clone();
t.cancel();
assert!(u.is_cancelled());
}
}