#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use ulid::Ulid;
pub type VaultScope = JobScope;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum Job {
Agent,
Pipeline,
Collect,
Distill,
Backup,
Purge,
ReIndex(ReIndexMode),
Summarize,
Validate,
Audit,
Consolidate,
Curate(CurateSpec),
Forget,
Review,
Classify,
Merge,
Annotate,
Migrate(MigrateSource),
Export(ExportSource),
Notify(NotifySource),
Ingest(IngestSource),
Embed(EmbedSpec),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReIndexMode {
FtsOnly,
VectorsOnly,
Full,
MissingOnly,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CurateSpec {
pub note_id: Ulid,
#[serde(default = "default_tenant_main")]
pub tenant_id: String,
#[serde(default)]
pub title: Option<String>,
#[serde(default)]
pub body: Option<String>,
#[serde(default)]
pub author: Option<String>,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub section_hint: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expected_sha256: Option<[u8; 32]>,
}
fn default_tenant_main() -> String {
"main".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbedSpec {
pub note_id: Ulid,
pub tenant_id: String,
pub force_regenerate: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrateSource {
pub from_path: String,
pub mode: MigrateMode,
pub conflict: ConflictStrategy,
pub dry_run: bool,
pub target: VaultScope,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MigrateMode {
PredecessorV1,
GradatumVault,
RawMarkdown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConflictStrategy {
Overwrite,
Skip,
Rename,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportSource {
pub scope: VaultScope,
pub filter: Option<String>,
pub format: ExportFormat,
pub target: String,
pub template: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExportFormat {
Csv,
Pdf,
Json,
Markdown,
Zip,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotifySource {
pub channel: NotifyChannel,
pub template: String,
pub job_ref: Option<Ulid>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NotifyChannel {
Telegram {
chat_id: String,
},
Slack {
webhook_url: String,
},
Webhook {
url: String,
method: String,
},
Nats {
subject: String,
},
Email {
to: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IngestSource {
pub source: IngestInputSource,
pub vault: String,
pub locus: String,
pub strategy: IngestStrategy,
pub dry_run: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IngestInputSource {
File {
path: String,
},
Url {
url: String,
},
Urls {
urls: Vec<String>,
},
Locus {
path: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IngestStrategy {
Auto,
ForceStructured,
ForceSlidingWindow,
}
#[must_use]
pub fn job_kind_str(job: &Job) -> &'static str {
match job {
Job::Agent => "Agent",
Job::Pipeline => "Pipeline",
Job::Collect => "Collect",
Job::Distill => "Distill",
Job::Backup => "Backup",
Job::Purge => "Purge",
Job::ReIndex(_) => "ReIndex",
Job::Summarize => "Summarize",
Job::Validate => "Validate",
Job::Audit => "Audit",
Job::Consolidate => "Consolidate",
Job::Curate(_) => "Curate",
Job::Forget => "Forget",
Job::Review => "Review",
Job::Classify => "Classify",
Job::Merge => "Merge",
Job::Annotate => "Annotate",
Job::Migrate(_) => "Migrate",
Job::Export(_) => "Export",
Job::Notify(_) => "Notify",
Job::Ingest(_) => "Ingest",
Job::Embed(_) => "Embed",
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobSpec {
pub kind: Job,
pub class: JobClass,
pub mode: JobMode,
pub scope: JobScope,
pub priority: JobPriority,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum JobClass {
System,
Agent,
Human,
Api,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum JobMode {
#[default]
Batch,
Streaming,
Interactive,
DryRun,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobScope {
VaultWide,
Locus(String),
Notes(Vec<Ulid>),
Session(Ulid),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum JobPriority {
High,
#[default]
Normal,
Low,
Deferred,
}
impl JobPriority {
#[must_use]
pub fn as_u8(&self) -> u8 {
match self {
Self::High => 3,
Self::Normal => 2,
Self::Low => 1,
Self::Deferred => 0,
}
}
#[must_use]
pub fn default_for(class: &JobClass) -> Self {
match class {
JobClass::Agent => Self::High,
JobClass::Human => Self::High,
JobClass::Api => Self::Normal,
JobClass::System => Self::Low,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobScheduling {
pub trigger: TriggerSource,
pub scheduled_at: DateTime<Utc>,
pub await_jobs: Vec<JobTrigger>,
pub deadline: Option<DateTime<Utc>>,
pub cron_expr: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobTrigger {
pub job_id: Ulid,
pub condition: TriggerCondition,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TriggerCondition {
OnDone,
OnAnyTerminal,
OnFailed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TriggerSource {
Cron,
Pipeline,
Cascade,
OnEvent,
Demand,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobLifecycle {
pub status: JobStatus,
pub created_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub lease_until: Option<DateTime<Utc>>,
pub result: Option<JobResult>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum JobStatus {
Pending,
Running,
Waiting,
Done,
Failed,
DLQ,
Cancelled,
Conflict,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobResult {
pub success: bool,
pub duration_ms: u32,
pub cost_usd: Option<f32>,
pub result_note: Option<Ulid>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub conflict_payload: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobWorkspace {
pub input: String,
pub output: String,
pub meta: String,
}
impl JobWorkspace {
#[must_use]
pub fn from_job(job: &JobRecord) -> Self {
let date = job.lifecycle.created_at.format("%Y-%m-%d").to_string();
let base = format!("worker/{}/{}", date, job.id);
Self {
input: format!("{}/input/", base),
output: format!("{}/output/", base),
meta: format!("{}/meta/", base),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobProgress {
pub current: u32,
pub total: u32,
pub step: String,
pub eta_secs: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobOutputFile {
pub name: String,
pub mime_type: String,
pub size: u64,
pub ttl_days: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobOutput {
pub notes_created: Vec<Ulid>,
pub notes_modified: Vec<Ulid>,
pub files: Vec<JobOutputFile>,
pub result_note_md: String,
}
impl JobOutput {
#[must_use]
pub fn dry_run(would_affect: usize, description: &str) -> Self {
Self {
notes_created: vec![],
notes_modified: vec![],
files: vec![],
result_note_md: format!(
"## DRY-RUN — {description}\n\n\
**Simulation uniquement — aucune écriture effectuée.**\n\n\
Notes qui auraient été affectées : {would_affect}\n",
),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobRetry {
pub count: u32,
pub max: u32,
pub backoff: RetryBackoff,
pub last_error: Option<String>,
pub errors: Vec<JobError>,
}
impl Default for JobRetry {
fn default() -> Self {
Self {
count: 0,
max: 3,
backoff: RetryBackoff::Exponential { base: 5, max: 120 },
last_error: None,
errors: vec![],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobError {
pub at: DateTime<Utc>,
pub message: String,
pub attempt: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RetryBackoff {
Fixed(u64),
Exponential {
base: u64,
max: u64,
},
}
impl RetryBackoff {
#[must_use]
pub fn duration_for(&self, attempt: u32) -> Duration {
match self {
Self::Fixed(secs) => Duration::from_secs(*secs),
Self::Exponential { base, max } => {
let secs = base.saturating_mul(1_u64 << attempt.min(62));
Duration::from_secs(secs.min(*max))
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobLineage {
pub triggered_by: Option<String>,
pub parent_job: Option<Ulid>,
pub pipeline_id: Option<Ulid>,
pub pipeline_step: Option<String>,
pub children: Vec<Ulid>,
pub cost_usd: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobRecord {
pub id: Ulid,
pub spec: JobSpec,
pub scheduling: JobScheduling,
pub lifecycle: JobLifecycle,
pub retry: JobRetry,
pub lineage: JobLineage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobFilter {
pub class: Option<JobClass>,
pub status: Option<JobStatus>,
pub kind: Option<String>,
pub created_after: Option<DateTime<Utc>>,
pub limit: usize,
pub cursor: Option<Ulid>,
}
impl Default for JobFilter {
fn default() -> Self {
Self {
class: None,
status: None,
kind: None,
created_after: None,
limit: 50,
cursor: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum QueueEvent {
JobInserted(Ulid),
JobCompleted(Ulid, JobStatus, JobResult),
JobFailed(Ulid, u32),
JobReady(Ulid),
JobCancelled(Ulid),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GradatumJob {
pub record: JobRecord,
pub priority: u8,
}
pub trait DryRunAware {
fn is_dry_run(&self) -> bool;
fn notes_would_affect(&self) -> usize {
0
}
}
impl DryRunAware for JobRecord {
fn is_dry_run(&self) -> bool {
self.spec.mode == JobMode::DryRun
}
}
pub trait JobSource {
fn scopes(&self) -> &[VaultScope];
fn dry_run(&self) -> bool;
fn window(&self) -> Option<Duration> {
None
}
}
#[async_trait::async_trait]
pub trait QueueStore: Send + Sync {
async fn enqueue(&self, job: JobRecord) -> Result<Ulid, QueueError>;
async fn dequeue(&self) -> Result<Option<JobRecord>, QueueError>;
async fn dequeue_by_kind(&self, _kind: &str) -> Result<Option<JobRecord>, QueueError> {
self.dequeue().await
}
async fn get(&self, id: Ulid) -> Result<Option<JobRecord>, QueueError>;
async fn complete(&self, id: Ulid, result: JobResult) -> Result<(), QueueError>;
async fn fail(&self, id: Ulid, err: &str, attempt: u32) -> Result<(), QueueError>;
async fn cancel(&self, id: Ulid) -> Result<(), QueueError>;
async fn fail_dlq(&self, id: Ulid, err: &str) -> Result<(), QueueError>;
async fn find_awaiting(&self, job_id: Ulid) -> Result<Vec<JobRecord>, QueueError>;
async fn set_pending(&self, id: Ulid) -> Result<(), QueueError>;
async fn recover_stale_leases(&self, ttl: Duration) -> Result<Vec<Ulid>, QueueError>;
async fn cancel_expired_deadlines(&self, now: DateTime<Utc>) -> Result<Vec<Ulid>, QueueError>;
async fn promote_retries(&self, now: DateTime<Utc>) -> Result<Vec<Ulid>, QueueError>;
async fn schedule_retry(&self, id: Ulid, at: DateTime<Utc>) -> Result<(), QueueError>;
async fn list(&self, filter: JobFilter) -> Result<Vec<JobRecord>, QueueError>;
fn subscribe(&self) -> Receiver<QueueEvent>;
async fn mark_conflict(
&self,
id: Ulid,
result_note_md: String,
duration_ms: u32,
) -> Result<(), QueueError> {
let result = JobResult {
success: false,
duration_ms,
cost_usd: None,
result_note: None,
conflict_payload: serde_json::from_str(&result_note_md).ok(),
};
self.complete(id, result).await
}
}
#[derive(Debug, thiserror::Error)]
pub enum QueueError {
#[error("erreur de stockage : {0}")]
Storage(String),
#[error("job introuvable : {0}")]
NotFound(Ulid),
#[error("erreur de sérialisation : {0}")]
Serialization(String),
#[error("transition d'état invalide : {0}")]
InvalidTransition(String),
#[error("opération annulée : {0}")]
Cancelled(String),
}
#[cfg(test)]
mod tests {
use super::*;
fn make_job_record(job: Job, class: JobClass) -> JobRecord {
let now = Utc::now();
JobRecord {
id: Ulid::new(),
spec: JobSpec {
kind: job,
class,
mode: JobMode::Batch,
scope: JobScope::VaultWide,
priority: JobPriority::default_for(&class),
},
scheduling: JobScheduling {
trigger: TriggerSource::Demand,
scheduled_at: now,
await_jobs: vec![],
deadline: None,
cron_expr: None,
},
lifecycle: JobLifecycle {
status: JobStatus::Pending,
created_at: now,
started_at: None,
completed_at: None,
lease_until: None,
result: None,
},
retry: JobRetry::default(),
lineage: JobLineage {
triggered_by: None,
parent_job: None,
pipeline_id: None,
pipeline_step: None,
children: vec![],
cost_usd: None,
},
}
}
#[test]
fn job_priority_as_u8_ordering() {
assert!(JobPriority::High.as_u8() > JobPriority::Normal.as_u8());
assert!(JobPriority::Normal.as_u8() > JobPriority::Low.as_u8());
assert!(JobPriority::Low.as_u8() > JobPriority::Deferred.as_u8());
}
#[test]
fn job_priority_default_for_class() {
assert_eq!(
JobPriority::default_for(&JobClass::Agent),
JobPriority::High
);
assert_eq!(
JobPriority::default_for(&JobClass::Human),
JobPriority::High
);
assert_eq!(
JobPriority::default_for(&JobClass::Api),
JobPriority::Normal
);
assert_eq!(
JobPriority::default_for(&JobClass::System),
JobPriority::Low
);
}
#[test]
fn job_mode_default_is_batch() {
assert_eq!(JobMode::default(), JobMode::Batch);
}
#[test]
fn job_retry_default_values() {
let r = JobRetry::default();
assert_eq!(r.count, 0);
assert_eq!(r.max, 3);
assert!(r.errors.is_empty());
}
#[test]
fn retry_backoff_fixed_is_constant() {
let b = RetryBackoff::Fixed(10);
assert_eq!(b.duration_for(0), Duration::from_secs(10));
assert_eq!(b.duration_for(5), Duration::from_secs(10));
}
#[test]
fn retry_backoff_exponential_caps_at_max() {
let b = RetryBackoff::Exponential { base: 5, max: 120 };
assert_eq!(b.duration_for(0), Duration::from_secs(5));
assert_eq!(b.duration_for(1), Duration::from_secs(10));
assert_eq!(b.duration_for(10), Duration::from_secs(120)); }
#[test]
fn job_record_serialize_roundtrip() {
let record = make_job_record(
Job::Embed(EmbedSpec {
note_id: Ulid::new(),
tenant_id: "main".to_string(),
force_regenerate: false,
}),
JobClass::Agent,
);
let json =
serde_json::to_string(&record).expect("JobRecord doit être sérialisable en JSON");
let back: JobRecord =
serde_json::from_str(&json).expect("JobRecord doit être désérialisable depuis JSON");
assert_eq!(record.id, back.id);
assert_eq!(record.spec.priority.as_u8(), back.spec.priority.as_u8());
}
#[test]
fn job_workspace_paths_format() {
let record = make_job_record(Job::Consolidate, JobClass::System);
let ws = JobWorkspace::from_job(&record);
assert!(ws.input.ends_with("/input/"));
assert!(ws.output.ends_with("/output/"));
assert!(ws.meta.ends_with("/meta/"));
}
#[test]
fn dry_run_job_record() {
let record = {
let now = Utc::now();
JobRecord {
id: Ulid::new(),
spec: JobSpec {
kind: Job::Curate(CurateSpec {
note_id: Ulid::new(),
tenant_id: "main".to_string(),
..Default::default()
}),
class: JobClass::Agent,
mode: JobMode::DryRun,
scope: JobScope::VaultWide,
priority: JobPriority::High,
},
scheduling: JobScheduling {
trigger: TriggerSource::Demand,
scheduled_at: now,
await_jobs: vec![],
deadline: None,
cron_expr: None,
},
lifecycle: JobLifecycle {
status: JobStatus::Pending,
created_at: now,
started_at: None,
completed_at: None,
lease_until: None,
result: None,
},
retry: JobRetry::default(),
lineage: JobLineage {
triggered_by: None,
parent_job: None,
pipeline_id: None,
pipeline_step: None,
children: vec![],
cost_usd: None,
},
}
};
assert!(record.is_dry_run());
}
#[test]
fn job_output_dry_run_format() {
let out = JobOutput::dry_run(42, "test curate");
assert!(out.notes_created.is_empty());
assert!(out.result_note_md.contains("DRY-RUN"));
assert!(out.result_note_md.contains("42"));
}
#[test]
fn job_filter_default_limit() {
let f = JobFilter::default();
assert_eq!(f.limit, 50);
assert!(f.class.is_none());
assert!(f.status.is_none());
}
#[test]
fn gradatum_job_priority_matches_spec() {
let record = make_job_record(Job::Agent, JobClass::Human);
let expected_priority = record.spec.priority.as_u8();
let job = GradatumJob {
priority: expected_priority,
record,
};
assert_eq!(job.priority, 3); }
#[test]
fn vault_scope_is_alias_of_job_scope() {
let vs: VaultScope = JobScope::VaultWide;
let js: JobScope = vs;
assert!(matches!(js, JobScope::VaultWide));
}
#[test]
fn queue_event_variants_serialize() {
let id = Ulid::new();
let ev = QueueEvent::JobInserted(id);
let json = serde_json::to_string(&ev).expect("QueueEvent doit être sérialisable");
assert!(json.contains("JobInserted"));
}
}