use std::sync::Arc;
use std::time::Duration;
use anyhow::Context as _;
use async_trait::async_trait;
use chrono::Utc;
use gradatum_core::audit::http::{AuditSink, HttpAuditActor, HttpAuditEvent};
use gradatum_core::frontmatter::{ExtraFields, Frontmatter};
use gradatum_core::identity::NoteId;
use gradatum_core::scope::VaultId;
use gradatum_core::section::Section;
use gradatum_core::status::NoteStatus;
use gradatum_curator::{CurateOutcome, CuratorProcess, Note as CuratorNote};
use gradatum_embed::Embedder;
use gradatum_index::SqliteIndex;
use gradatum_core::VectorStore as _;
use gradatum_queue::{LeasedJob, NewJob, Queue, SqliteQueue};
use gradatum_vault::Vault;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use tracing::instrument;
use ulid::Ulid;
const DEFAULT_LEASE_DURATION: Duration = Duration::from_secs(300);
const WORKER_SYSTEM_KID: &str = "gradatum-worker";
#[derive(Debug, Serialize, Deserialize)]
struct VaultWriteRequest {
title: String,
body: String,
#[serde(default)]
author: Option<String>,
#[serde(default)]
tags: Vec<String>,
#[serde(default)]
section_hint: Option<String>,
#[serde(default = "default_main")]
tenant_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct VaultClassifyRequest {
note_id: String,
#[serde(default = "default_main")]
tenant_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct VaultDowngradeRequest {
note_id: String,
reason: String,
#[serde(default)]
replaced_by: Option<String>,
#[serde(default = "default_main")]
tenant_id: String,
}
fn default_main() -> String {
"main".into()
}
pub struct NoopAuditSink;
#[async_trait]
impl AuditSink for NoopAuditSink {
async fn record(&self, _event: HttpAuditEvent) -> Result<(), std::io::Error> {
Ok(())
}
}
pub struct Dispatcher {
queue: Arc<SqliteQueue>,
vault: Option<Arc<Vault>>,
curator: Option<Arc<dyn CuratorProcess>>,
audit: Option<Arc<dyn AuditSink>>,
index: Option<Arc<SqliteIndex>>,
embedder: Option<Arc<dyn Embedder>>,
}
impl Dispatcher {
pub fn new(queue: Arc<SqliteQueue>) -> Self {
Self {
queue,
vault: None,
curator: None,
audit: None,
index: None,
embedder: None,
}
}
#[must_use]
pub fn with_vault(mut self, vault: Arc<Vault>) -> Self {
self.vault = Some(vault);
self
}
#[must_use]
pub fn with_curator(mut self, curator: Arc<dyn CuratorProcess>) -> Self {
self.curator = Some(curator);
self
}
#[must_use]
pub fn with_audit(mut self, audit: Arc<dyn AuditSink>) -> Self {
self.audit = Some(audit);
self
}
#[must_use]
pub fn with_index(mut self, index: Arc<SqliteIndex>) -> Self {
self.index = Some(index);
self
}
#[must_use]
pub fn with_embedder(mut self, embedder: Arc<dyn Embedder>) -> Self {
self.embedder = Some(embedder);
self
}
pub async fn run_once(&self) -> anyhow::Result<bool> {
let leased = self
.queue
.lease(
&["curate", "classify", "downgrade", "embed_note"],
DEFAULT_LEASE_DURATION,
)
.await?;
let Some(job) = leased else {
return Ok(false);
};
match self.process_job(&job).await {
Ok(()) => {
self.queue.complete(job.id).await?;
}
Err(e) => {
tracing::error!(
job_id = job.id,
kind = %job.kind,
error = %e,
"job échoué — enregistrement pour retry ou dead-letter"
);
self.queue.fail(job.id, &e.to_string()).await?;
}
}
Ok(true)
}
#[instrument(skip(self), fields(job_id = job.id, kind = %job.kind))]
async fn process_job(&self, job: &LeasedJob) -> anyhow::Result<()> {
let start = std::time::Instant::now();
let outcome: &str;
if job.kind.as_str() == "embed_note" {
self.process_embed_note(job).await?;
let duration_ms = start.elapsed().as_millis() as i64;
self.emit_audit(job, "ok", duration_ms).await;
tracing::info!(
job_id = job.id,
kind = %job.kind,
outcome = "ok",
duration_ms = duration_ms,
"job traité"
);
return Ok(());
}
let vault = self
.vault
.as_ref()
.ok_or_else(|| anyhow::anyhow!("vault non configuré — appeler with_vault"))?;
let curator = self
.curator
.as_ref()
.ok_or_else(|| anyhow::anyhow!("curator non configuré — appeler with_curator"))?;
match job.kind.as_str() {
"curate" => {
let req: VaultWriteRequest =
bincode::serde::decode_from_slice(&job.payload, bincode::config::standard())
.context("decode VaultWriteRequest bincode")?
.0;
tracing::info!(
job_id = job.id,
title = %req.title,
"job curate — lancement cascade curator"
);
let curator_note = CuratorNote {
id: ulid::Ulid::new().to_string(),
title: req.title.clone(),
body: req.body.clone(),
tags_hint: req.tags.clone(),
section_hint: req.section_hint.clone(),
};
let curate_outcome = curator.process(curator_note).await;
match curate_outcome {
CurateOutcome::Admitted { decisions } => {
let section = section_from_str(&decisions.canonical_section)
.unwrap_or(Section::Reference);
let frontmatter = build_frontmatter(
&job.tenant_id,
section,
NoteStatus::Live,
&req,
&decisions.tags,
);
let note = vault
.write_note(frontmatter, req.body.clone())
.await
.context("vault.write_note curate")?;
outcome = "admitted";
tracing::info!(
job_id = job.id,
section = %decisions.canonical_section,
"note admise et persistée"
);
process_wikilinks_b5(self, &job.tenant_id, ¬e.id.to_string(), &req.body)
.await;
let embed_payload = serde_json::json!({
"note_id": note.id.to_string(),
"body_text": note.body.markdown,
});
let new_embed_job = NewJob {
tenant_id: job.tenant_id.clone(),
kind: "embed_note".to_string(),
payload: serde_json::to_vec(&embed_payload).unwrap_or_default(),
max_attempts: 3,
};
if let Err(e) = self.queue.enqueue(new_embed_job).await {
tracing::warn!(
note_id = %note.id,
error = %e,
"chaînage embed_note enqueue échoué — backfill pourra re-tenter"
);
}
}
CurateOutcome::Rejected { reason } => {
outcome = "rejected";
tracing::info!(
job_id = job.id,
reason = %reason,
"note rejetée par le curator — aucune écriture"
);
}
CurateOutcome::Pending { decisions, reason } => {
let section = section_from_str(&decisions.canonical_section)
.unwrap_or(Section::Reference);
let frontmatter = build_frontmatter(
&job.tenant_id,
section,
NoteStatus::Staging,
&req,
&decisions.tags,
);
let note = vault
.write_note(frontmatter, req.body.clone())
.await
.context("vault.write_note curate pending")?;
outcome = "pending";
tracing::info!(
job_id = job.id,
reason = %reason,
"note mise en Staging (revue manuelle requise)"
);
process_wikilinks_b5(self, &job.tenant_id, ¬e.id.to_string(), &req.body)
.await;
let embed_payload = serde_json::json!({
"note_id": note.id.to_string(),
"body_text": note.body.markdown,
});
let new_embed_job = NewJob {
tenant_id: job.tenant_id.clone(),
kind: "embed_note".to_string(),
payload: serde_json::to_vec(&embed_payload).unwrap_or_default(),
max_attempts: 3,
};
if let Err(e) = self.queue.enqueue(new_embed_job).await {
tracing::warn!(
note_id = %note.id,
error = %e,
"chaînage embed_note enqueue échoué — backfill pourra re-tenter"
);
}
}
}
}
"classify" => {
let req: VaultClassifyRequest =
bincode::serde::decode_from_slice(&job.payload, bincode::config::standard())
.context("decode VaultClassifyRequest bincode")?
.0;
tracing::info!(
job_id = job.id,
note_id = %req.note_id,
"job classify — cascade curator complète (B3 alpha.15)"
);
let note_ulid = Ulid::from_string(&req.note_id)
.map_err(|e| anyhow::anyhow!("ULID invalide {}: {e}", req.note_id))?;
let note_id = NoteId(note_ulid);
let existing = vault
.read_note(note_id)
.await
.context("read_note pour classify")?;
let title_for_curator = gradatum_curator::extract_h1_title(&existing.body.markdown)
.map(|t| t.to_string())
.unwrap_or_else(|| existing.frontmatter.section.as_str().to_string());
let curator_note = CuratorNote {
id: req.note_id.clone(),
title: title_for_curator,
body: existing.body.markdown.clone(),
tags_hint: existing
.frontmatter
.tags
.iter()
.map(|t| t.as_str().to_string())
.collect(),
section_hint: None,
};
tracing::debug!(
job_id = job.id,
note_id = %req.note_id,
"classify curator processing — cascade complète"
);
let curate_outcome = curator.process(curator_note).await;
match curate_outcome {
CurateOutcome::Admitted { decisions } => {
let new_section = section_from_str(&decisions.canonical_section)
.unwrap_or(Section::Reference);
let mut updated_fm = existing.frontmatter.clone();
updated_fm.section = new_section;
for tag_str in &decisions.tags {
if !updated_fm
.tags
.iter()
.any(|t| t.as_str() == tag_str.as_str())
{
if let Ok(t) = gradatum_core::tag::Tag::new(tag_str) {
updated_fm.tags.push(t);
}
}
}
vault
.write_note(updated_fm, existing.body.markdown.clone())
.await
.context("vault.write_note classify admitted")?;
outcome = "reclassified";
tracing::info!(
job_id = job.id,
section = %decisions.canonical_section,
"note reclassifiée par cascade curator (Admitted)"
);
}
CurateOutcome::Pending { decisions, reason } => {
let new_section = section_from_str(&decisions.canonical_section)
.unwrap_or(Section::Reference);
let mut updated_fm = existing.frontmatter.clone();
updated_fm.section = new_section;
updated_fm.status = NoteStatus::Staging;
vault
.write_note(updated_fm, existing.body.markdown.clone())
.await
.context("vault.write_note classify pending")?;
outcome = "classify_pending";
tracing::warn!(
job_id = job.id,
reason = %reason,
"note mise en Staging par classify (LLM incertain)"
);
}
CurateOutcome::Rejected { reason } => {
outcome = "classify_rejected";
tracing::warn!(
job_id = job.id,
reason = %reason,
"classify rejeté par le curator — note inchangée dans le vault"
);
}
}
}
"downgrade" => {
let req: VaultDowngradeRequest =
bincode::serde::decode_from_slice(&job.payload, bincode::config::standard())
.context("decode VaultDowngradeRequest bincode")?
.0;
tracing::info!(
job_id = job.id,
note_id = %req.note_id,
reason = %req.reason,
"job downgrade — rétrogradation de la note"
);
let note_ulid = Ulid::from_string(&req.note_id)
.map_err(|e| anyhow::anyhow!("ULID invalide {}: {e}", req.note_id))?;
let note_id = NoteId(note_ulid);
let existing = vault
.read_note(note_id)
.await
.context("read_note pour downgrade")?;
if !existing
.frontmatter
.status
.can_transition_to(NoteStatus::Deprecated)
{
anyhow::bail!(
"transition invalide {:?} → Deprecated pour la note {} — seul Live est autorisé",
existing.frontmatter.status,
req.note_id
);
}
let mut downgraded_fm = existing.frontmatter.clone();
downgraded_fm.status = NoteStatus::Deprecated;
downgraded_fm.status_reason = Some(req.reason.clone());
downgraded_fm.status_changed = Some(Utc::now());
vault
.write_note(downgraded_fm, existing.body.markdown.clone())
.await
.context("vault.write_note downgrade")?;
outcome = "deprecated";
tracing::info!(job_id = job.id, "note rétrogradée vers Deprecated");
}
other => {
anyhow::bail!("kind de job inconnu : {other:?}");
}
}
let duration_ms = start.elapsed().as_millis() as i64;
self.emit_audit(job, outcome, duration_ms).await;
tracing::info!(
job_id = job.id,
kind = %job.kind,
outcome = outcome,
duration_ms = duration_ms,
"job traité"
);
Ok(())
}
async fn process_embed_note(&self, job: &LeasedJob) -> anyhow::Result<()> {
let embedder = match &self.embedder {
Some(e) => e,
None => {
tracing::info!(job_id = job.id, "embed_note skipped — embedder absent");
return Ok(());
}
};
let index = match &self.index {
Some(i) => i,
None => {
tracing::info!(job_id = job.id, "embed_note skipped — index absent");
return Ok(());
}
};
let payload: serde_json::Value =
serde_json::from_slice(&job.payload).context("embed_note: parse payload JSON")?;
let note_id_str = payload
.get("note_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("embed_note: payload manque 'note_id'"))?;
let body_text = payload
.get("body_text")
.and_then(|v| v.as_str())
.unwrap_or("");
if body_text.is_empty() {
tracing::info!(
job_id = job.id,
note_id = %note_id_str,
"embed_note skipped — body vide"
);
return Ok(());
}
let truncated = if body_text.len() > 8192 {
let end = body_text
.char_indices()
.nth(2048)
.map(|(i, _)| i)
.unwrap_or(body_text.len());
&body_text[..end]
} else {
body_text
};
let vec = embedder
.embed(truncated)
.await
.map_err(|e| anyhow::anyhow!("embed_note embed: {e}"))?;
let note_ulid = Ulid::from_string(note_id_str)
.map_err(|e| anyhow::anyhow!("embed_note: ULID invalide '{note_id_str}': {e}"))?;
let note_id = NoteId(note_ulid);
index
.insert_note_embedding(¬e_id, embedder.embedder_id(), embedder.dim(), &vec)
.await
.map_err(|e| anyhow::anyhow!("embed_note insert_note_embedding: {e}"))?;
tracing::info!(
job_id = job.id,
note_id = %note_id_str,
embedder_id = embedder.embedder_id(),
dim = embedder.dim(),
"embed_note done"
);
Ok(())
}
async fn emit_audit(&self, job: &LeasedJob, outcome: &str, duration_ms: i64) {
if let Some(audit) = &self.audit {
let event = HttpAuditEvent {
ts: Utc::now(),
event: format!("worker_{}", job.kind),
actor: HttpAuditActor {
kid: WORKER_SYSTEM_KID.into(),
sub: "gradatum-worker".into(),
aud: "gradatum".into(),
},
tenant_id: job.tenant_id.clone(),
locus: format!("{}/{}", job.tenant_id, job.kind),
note_id: None,
content_hash: None,
outcome: outcome.into(),
curator: Some(serde_json::json!({ "duration_ms": duration_ms })),
request_id: format!("job-{}", job.id),
};
if let Err(e) = audit.record(event).await {
tracing::warn!(
job_id = job.id,
error = %e,
"échec écriture audit — le job est quand même marqué complet"
);
}
}
}
}
fn section_from_str(s: &str) -> Option<Section> {
let json_str = format!("\"{}\"", s);
serde_json::from_str::<Section>(&json_str).ok()
}
fn build_frontmatter(
tenant_id: &str,
section: Section,
status: NoteStatus,
req: &VaultWriteRequest,
curator_tags: &[String],
) -> Frontmatter {
let mut all_tags: Vec<String> = req.tags.clone();
for t in curator_tags {
if !all_tags.contains(t) {
all_tags.push(t.clone());
}
}
let tags: SmallVec<[gradatum_core::tag::Tag; 4]> = all_tags
.iter()
.filter_map(|t| gradatum_core::tag::Tag::new(t.clone()).ok())
.collect();
let author = req
.author
.as_deref()
.map(gradatum_core::author::AuthorRef::system);
Frontmatter {
schema_version: 1,
vault_id: VaultId::new(tenant_id),
locus: None,
section,
status,
status_reason: None,
status_changed: None,
tags,
author,
created: Utc::now(),
updated: None,
extra: ExtraFields::empty(),
provenance: None,
}
}
async fn process_wikilinks_b5(
dispatcher: &Dispatcher,
tenant_id: &str,
src_note_id: &str,
body: &str,
) {
let Some(idx) = dispatcher.index.as_ref() else {
tracing::debug!(
note_id = %src_note_id,
"B5 skip: dispatcher sans index injecté (test historique ?)"
);
return;
};
let wikilinks = gradatum_curator::wikilinks::extract_wikilinks(body);
if wikilinks.is_empty() {
return;
}
let mut join_set = tokio::task::JoinSet::new();
for target_title in &wikilinks {
let idx_arc = Arc::clone(idx);
let tenant = tenant_id.to_string();
let title = target_title.clone();
join_set.spawn(async move {
let result = idx_arc.title_lookup(&tenant, &title).await;
(title, result)
});
}
let mut lookup_results = Vec::with_capacity(wikilinks.len());
while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok(pair) => lookup_results.push(pair),
Err(e) => {
tracing::warn!(err = %e, "B5 title_lookup task panicked — wikilink ignoré");
}
}
}
for (target_title, lookup_result) in lookup_results {
match lookup_result {
Ok(Some(dst_id)) => {
if let Err(e) = idx.upsert_link(tenant_id, src_note_id, &dst_id).await {
tracing::warn!(
err = %e,
src = %src_note_id,
dst = %dst_id,
"B5 upsert_link failed — non-fatal"
);
} else {
tracing::debug!(
src = %src_note_id,
dst = %dst_id,
target = %target_title,
"B5 wikilink persisté"
);
}
}
Ok(None) => {
tracing::debug!(
target = %target_title,
"B5 wikilink non résolu — note cible absente (caveat C3 Phase 3)"
);
}
Err(e) => {
tracing::warn!(
err = %e,
target = %target_title,
"B5 title_lookup failed — wikilink ignoré (non-fatal)"
);
}
}
}
}