use std::sync::Arc;
use bincode::config::standard as bincode_std;
use gradatum_core::scope::VaultId;
use gradatum_core::section::Section;
use gradatum_core::status::NoteStatus;
use gradatum_queue::{NewJob, Queue, SqliteQueue};
use gradatum_vault::Vault;
use gradatum_worker::dispatch::{Dispatcher, NoopAuditSink};
use tempfile::TempDir;
fn encode_write_payload(title: &str, body: &str, section_hint: Option<&str>) -> Vec<u8> {
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct WriteReq {
title: String,
body: String,
#[serde(default)]
author: Option<String>,
#[serde(default)]
tags: Vec<String>,
#[serde(default)]
section_hint: Option<String>,
#[serde(default = "default_main")]
tenant_id: String,
}
fn default_main() -> String {
"main".into()
}
let req = WriteReq {
title: title.into(),
body: body.into(),
author: None,
tags: vec![],
section_hint: section_hint.map(|s| s.to_string()),
tenant_id: "main".into(),
};
bincode::serde::encode_to_vec(&req, bincode_std()).unwrap()
}
fn encode_classify_payload(note_id: &str) -> Vec<u8> {
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct ClassifyReq {
note_id: String,
#[serde(default = "default_main")]
tenant_id: String,
}
fn default_main() -> String {
"main".into()
}
let req = ClassifyReq {
note_id: note_id.into(),
tenant_id: "main".into(),
};
bincode::serde::encode_to_vec(&req, bincode_std()).unwrap()
}
fn encode_downgrade_payload(note_id: &str, reason: &str) -> Vec<u8> {
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct DowngradeReq {
note_id: String,
reason: String,
#[serde(default)]
replaced_by: Option<String>,
#[serde(default = "default_main")]
tenant_id: String,
}
fn default_main() -> String {
"main".into()
}
let req = DowngradeReq {
note_id: note_id.into(),
reason: reason.into(),
replaced_by: None,
tenant_id: "main".into(),
};
bincode::serde::encode_to_vec(&req, bincode_std()).unwrap()
}
#[tokio::test]
async fn dispatch_curate_writes_note_with_assigned_section() {
let dir = TempDir::new().unwrap();
let queue = Arc::new(
SqliteQueue::new(&dir.path().join("queue.db"))
.await
.unwrap(),
);
let vault = Arc::new(
Vault::create(dir.path().join("vault").as_path(), VaultId::new("main"))
.await
.unwrap(),
);
let payload = encode_write_payload(
"[DECISIONS] Test note dispatch curate",
"Corps de la note.",
None,
);
queue
.enqueue(NewJob {
tenant_id: "main".into(),
kind: "curate".into(),
payload,
max_attempts: 5,
})
.await
.unwrap();
let dispatcher = Dispatcher::new(queue.clone())
.with_vault(vault.clone())
.with_curator(Arc::new(gradatum_curator::CuratorPipeline::new()))
.with_audit(Arc::new(NoopAuditSink));
let processed = dispatcher.run_once().await.unwrap();
assert!(
processed,
"le dispatcher doit signaler qu'un job a été traité"
);
let count = vault.index().locus_count().await.unwrap();
assert_eq!(count, 1, "une note doit être indexée après curate admis");
}
#[tokio::test]
async fn dispatch_classify_reclassifies_note() {
let dir = TempDir::new().unwrap();
let queue = Arc::new(
SqliteQueue::new(&dir.path().join("queue.db"))
.await
.unwrap(),
);
let vault = Arc::new(
Vault::create(dir.path().join("vault").as_path(), VaultId::new("main"))
.await
.unwrap(),
);
let frontmatter = build_minimal_frontmatter(Section::Reference, NoteStatus::Live);
let note = vault
.write_note(frontmatter, "debug content OOM crash fix".into())
.await
.unwrap();
let note_id = note.id.to_string();
let payload = encode_classify_payload(¬e_id);
queue
.enqueue(NewJob {
tenant_id: "main".into(),
kind: "classify".into(),
payload,
max_attempts: 5,
})
.await
.unwrap();
let dispatcher = Dispatcher::new(queue.clone())
.with_vault(vault.clone())
.with_curator(Arc::new(gradatum_curator::CuratorPipeline::new()))
.with_audit(Arc::new(NoopAuditSink));
let processed = dispatcher.run_once().await.unwrap();
assert!(
processed,
"le dispatcher doit signaler qu'un job a été traité"
);
let count = vault.index().locus_count().await.unwrap();
assert_eq!(count, 1, "toujours 1 note après reclassification");
}
#[tokio::test]
async fn dispatch_downgrade_deprecates_live_note() {
let dir = TempDir::new().unwrap();
let queue = Arc::new(
SqliteQueue::new(&dir.path().join("queue.db"))
.await
.unwrap(),
);
let vault = Arc::new(
Vault::create(dir.path().join("vault").as_path(), VaultId::new("main"))
.await
.unwrap(),
);
let frontmatter = build_minimal_frontmatter(Section::Decisions, NoteStatus::Live);
let note = vault
.write_note(frontmatter, "décision archivée".into())
.await
.unwrap();
let note_id = note.id.to_string();
let payload = encode_downgrade_payload(¬e_id, "remplacée par une version révisée");
queue
.enqueue(NewJob {
tenant_id: "main".into(),
kind: "downgrade".into(),
payload,
max_attempts: 5,
})
.await
.unwrap();
let dispatcher = Dispatcher::new(queue.clone())
.with_vault(vault.clone())
.with_curator(Arc::new(gradatum_curator::CuratorPipeline::new()))
.with_audit(Arc::new(NoopAuditSink));
let processed = dispatcher.run_once().await.unwrap();
assert!(
processed,
"le dispatcher doit signaler qu'un job a été traité"
);
let count = vault.index().locus_count().await.unwrap();
assert!(count >= 1, "au moins 1 note dans l'index après downgrade");
}
#[tokio::test]
async fn dispatch_empty_queue_returns_false() {
let dir = TempDir::new().unwrap();
let queue = Arc::new(
SqliteQueue::new(&dir.path().join("queue.db"))
.await
.unwrap(),
);
let vault = Arc::new(
Vault::create(dir.path().join("vault").as_path(), VaultId::new("main"))
.await
.unwrap(),
);
let dispatcher = Dispatcher::new(queue.clone())
.with_vault(vault.clone())
.with_curator(Arc::new(gradatum_curator::CuratorPipeline::new()))
.with_audit(Arc::new(NoopAuditSink));
let result = dispatcher.run_once().await;
assert!(
result.is_ok(),
"run_once ne doit pas retourner Err sur queue vide"
);
let processed = result.unwrap();
assert!(!processed, "run_once retourne false si la queue est vide");
}
struct MockCuratorProcess {
call_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
outcome: gradatum_curator::CurateOutcome,
}
#[async_trait::async_trait]
impl gradatum_curator::CuratorProcess for MockCuratorProcess {
async fn process(&self, _note: gradatum_curator::Note) -> gradatum_curator::CurateOutcome {
self.call_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.outcome.clone()
}
}
#[tokio::test]
async fn classify_job_calls_curator_cascade_not_heuristic_only() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let dir = TempDir::new().unwrap();
let queue = Arc::new(
SqliteQueue::new(&dir.path().join("queue.db"))
.await
.unwrap(),
);
let vault = Arc::new(
Vault::create(dir.path().join("vault").as_path(), VaultId::new("main"))
.await
.unwrap(),
);
let frontmatter = build_minimal_frontmatter(Section::Reference, NoteStatus::Live);
let note = vault
.write_note(
frontmatter,
"# Note de raisonnement\n\nContenu sémantique pour classify cascade.".into(),
)
.await
.unwrap();
let note_id = note.id.to_string();
let call_count = Arc::new(AtomicUsize::new(0));
let mock = Arc::new(MockCuratorProcess {
call_count: Arc::clone(&call_count),
outcome: gradatum_curator::CurateOutcome::Admitted {
decisions: gradatum_curator::CuratorDecisions {
canonical_section: "reasoning".to_string(),
tags: vec![],
novelty: gradatum_curator::novelty::NoveltyVerdict::Admitted,
wikilinks: vec![],
dedup: gradatum_curator::dedup::DedupVerdict::Unique,
},
},
});
let payload = encode_classify_payload(¬e_id);
queue
.enqueue(NewJob {
tenant_id: "main".into(),
kind: "classify".into(),
payload,
max_attempts: 5,
})
.await
.unwrap();
let dispatcher = gradatum_worker::dispatch::Dispatcher::new(queue.clone())
.with_vault(vault.clone())
.with_curator(mock as Arc<dyn gradatum_curator::CuratorProcess>)
.with_audit(Arc::new(NoopAuditSink));
let processed = dispatcher.run_once().await.unwrap();
assert!(processed, "run_once doit signaler qu'un job a été traité");
assert!(
call_count.load(Ordering::Relaxed) > 0,
"le mock curator doit avoir été appelé par process_job(classify) — B3 cascade"
);
}
#[tokio::test]
async fn classify_job_rejected_does_not_modify_note() {
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
let dir = TempDir::new().unwrap();
let queue = Arc::new(
SqliteQueue::new(&dir.path().join("queue.db"))
.await
.unwrap(),
);
let vault = Arc::new(
Vault::create(dir.path().join("vault").as_path(), VaultId::new("main"))
.await
.unwrap(),
);
let frontmatter = build_minimal_frontmatter(Section::Reference, NoteStatus::Live);
let note = vault
.write_note(frontmatter, "# Note testée\n\nCorps de note.".into())
.await
.unwrap();
let note_id = note.id.to_string();
let mock = Arc::new(MockCuratorProcess {
call_count: Arc::new(AtomicUsize::new(0)),
outcome: gradatum_curator::CurateOutcome::Rejected {
reason: "note non pertinente pour classify".to_string(),
},
});
let payload = encode_classify_payload(¬e_id);
queue
.enqueue(NewJob {
tenant_id: "main".into(),
kind: "classify".into(),
payload,
max_attempts: 5,
})
.await
.unwrap();
let dispatcher = gradatum_worker::dispatch::Dispatcher::new(queue.clone())
.with_vault(vault.clone())
.with_curator(mock as Arc<dyn gradatum_curator::CuratorProcess>)
.with_audit(Arc::new(NoopAuditSink));
let processed = dispatcher.run_once().await.unwrap();
assert!(
processed,
"run_once doit traiter le job même en cas de Rejected"
);
let count = vault.index().locus_count().await.unwrap();
assert_eq!(
count, 1,
"note rejetée par classify ne doit pas changer l'index (toujours 1 note)"
);
}
#[tokio::test]
async fn classify_job_pending_sets_staging_status() {
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
let dir = TempDir::new().unwrap();
let queue = Arc::new(
SqliteQueue::new(&dir.path().join("queue.db"))
.await
.unwrap(),
);
let vault = Arc::new(
Vault::create(dir.path().join("vault").as_path(), VaultId::new("main"))
.await
.unwrap(),
);
let frontmatter = build_minimal_frontmatter(Section::Reference, NoteStatus::Live);
let note = vault
.write_note(
frontmatter,
"# Note pour classify pending\n\nContenu.".into(),
)
.await
.unwrap();
let note_id = note.id.to_string();
let mock = Arc::new(MockCuratorProcess {
call_count: Arc::new(AtomicUsize::new(0)),
outcome: gradatum_curator::CurateOutcome::Pending {
decisions: gradatum_curator::CuratorDecisions {
canonical_section: "architecture".to_string(),
tags: vec![],
novelty: gradatum_curator::novelty::NoveltyVerdict::Admitted,
wikilinks: vec![],
dedup: gradatum_curator::dedup::DedupVerdict::Unique,
},
reason: "confiance LLM insuffisante".to_string(),
},
});
let payload = encode_classify_payload(¬e_id);
queue
.enqueue(NewJob {
tenant_id: "main".into(),
kind: "classify".into(),
payload,
max_attempts: 5,
})
.await
.unwrap();
let dispatcher = gradatum_worker::dispatch::Dispatcher::new(queue.clone())
.with_vault(vault.clone())
.with_curator(mock as Arc<dyn gradatum_curator::CuratorProcess>)
.with_audit(Arc::new(NoopAuditSink));
let processed = dispatcher.run_once().await.unwrap();
assert!(processed, "run_once doit traiter le job Pending");
let count = vault.index().locus_count().await.unwrap();
assert!(
count >= 1,
"au moins 1 note dans l'index après classify pending"
);
}
#[tokio::test]
async fn classify_job_twice_on_same_note_does_not_corrupt() {
use std::sync::Arc;
let dir = TempDir::new().unwrap();
let queue = Arc::new(
SqliteQueue::new(&dir.path().join("queue.db"))
.await
.unwrap(),
);
let vault = Arc::new(
Vault::create(dir.path().join("vault").as_path(), VaultId::new("main"))
.await
.unwrap(),
);
let frontmatter = build_minimal_frontmatter(Section::Reference, NoteStatus::Live);
let note = vault
.write_note(frontmatter, "# Note double classify\n\nContenu.".into())
.await
.unwrap();
let note_id = note.id.to_string();
for _ in 0..2 {
let payload = encode_classify_payload(¬e_id);
queue
.enqueue(NewJob {
tenant_id: "main".into(),
kind: "classify".into(),
payload,
max_attempts: 5,
})
.await
.unwrap();
}
let curator = Arc::new(gradatum_curator::CuratorPipeline::new());
let dispatcher = gradatum_worker::dispatch::Dispatcher::new(queue.clone())
.with_vault(vault.clone())
.with_curator(curator as Arc<dyn gradatum_curator::CuratorProcess>)
.with_audit(Arc::new(NoopAuditSink));
let r1 = dispatcher.run_once().await.unwrap();
assert!(r1, "premier classify doit traiter un job");
let r2 = dispatcher.run_once().await.unwrap();
assert!(r2, "second classify doit traiter un job sans panique");
let count = vault.index().locus_count().await.unwrap();
assert!(count >= 1, "vault cohérent après double classify");
}
fn build_minimal_frontmatter(
section: Section,
status: NoteStatus,
) -> gradatum_core::frontmatter::Frontmatter {
use chrono::Utc;
use gradatum_core::frontmatter::{ExtraFields, Frontmatter};
use smallvec::SmallVec;
Frontmatter {
schema_version: 1,
vault_id: VaultId::new("main"),
locus: None,
section,
status,
status_reason: None,
status_changed: None,
tags: SmallVec::new(),
author: None,
created: Utc::now(),
updated: None,
extra: ExtraFields::empty(),
}
}