use std::sync::Arc;
use apalis::prelude::Data;
use chrono::Utc;
use smallvec::SmallVec;
use gradatum_core::{
author::AuthorRef,
frontmatter::{ExtraFields, Frontmatter},
identity::{ContentHash, NoteId},
scope::VaultId,
section::Section,
status::NoteStatus,
tag::Tag,
CurateSpec,
DryRunAware,
EmbedSpec,
GradatumJob,
Job,
JobClass,
JobLifecycle,
JobLineage,
JobMode,
JobOutput,
JobPriority,
JobRecord,
JobRetry,
JobScheduling,
JobScope,
JobSpec,
JobStatus,
QueueStore,
TriggerSource,
VectorStore as _,
};
use gradatum_curator::{CurateOutcome, CuratorProcess};
use gradatum_embed::Embedder;
use gradatum_index::SqliteIndex;
use gradatum_vault::{Vault, WriteResult};
#[allow(dead_code)]
#[derive(Debug, thiserror::Error)]
pub enum HandlerError {
#[error("dépendance absente : {0}")]
MissingDependency(&'static str),
#[error("variant job inattendu : {0}")]
UnexpectedVariant(String),
#[error("payload job invalide : {0}")]
InvalidPayload(String),
#[error("erreur métier : {0}")]
Business(String),
}
pub async fn handle_curate(
job: GradatumJob,
vault: Data<Arc<Vault>>,
curator: Data<Arc<dyn CuratorProcess + Send + Sync>>,
index: Data<Arc<SqliteIndex>>,
queue: Data<Arc<dyn QueueStore + Send + Sync>>,
) -> Result<JobOutput, HandlerError> {
if job.record.is_dry_run() {
return Ok(JobOutput::dry_run(0, "curate — simulation"));
}
let spec = match &job.record.spec.kind {
Job::Curate(spec) => spec.clone(),
other => {
return Err(HandlerError::UnexpectedVariant(format!("{other:?}")));
}
};
let (note_id_for_vault, curator_note) = if spec.title.is_some() && spec.body.is_some() {
let curator_note = gradatum_curator::Note {
id: spec.note_id.to_string(),
title: spec.title.clone().unwrap_or_default(),
body: spec.body.clone().unwrap_or_default(),
tags_hint: spec.tags.clone(),
section_hint: spec.section_hint.clone(),
};
(None, curator_note) } else {
let note_id = NoteId(spec.note_id);
let existing = vault
.read_note(note_id)
.await
.map_err(|e| HandlerError::Business(format!("read_note: {e}")))?;
let title_for_curator = gradatum_curator::extract_h1_title(&existing.body.markdown)
.map(|t| t.to_string())
.unwrap_or_else(|| existing.frontmatter.section.as_str().to_string());
let curator_note = gradatum_curator::Note {
id: spec.note_id.to_string(),
title: title_for_curator,
body: existing.body.markdown.clone(),
tags_hint: existing
.frontmatter
.tags
.iter()
.map(|t| t.as_str().to_string())
.collect(),
section_hint: None,
};
(Some(note_id), curator_note)
};
let tenant_id = spec.tenant_id.clone();
let body_for_write = spec
.body
.clone()
.unwrap_or_else(|| curator_note.body.clone());
let title_resolved = curator_note.title.clone();
let curate_outcome = curator.process(curator_note).await;
let written_note_id = match curate_outcome {
CurateOutcome::Admitted { ref decisions } => {
let section =
section_from_str(&decisions.canonical_section).unwrap_or(Section::Reference);
let note = if let Some(existing_note_id) = note_id_for_vault {
let existing = vault
.read_note(existing_note_id)
.await
.map_err(|e| HandlerError::Business(format!("read_note classify: {e}")))?;
let mut fm = existing.frontmatter.clone();
fm.section = section;
for tag_str in &decisions.tags {
if !fm.tags.iter().any(|t| t.as_str() == tag_str.as_str()) {
if let Ok(t) = Tag::new(tag_str) {
fm.tags.push(t);
}
}
}
vault
.write_note(fm, existing.body.markdown.clone())
.await
.map_err(|e| HandlerError::Business(format!("write_note classify: {e}")))?
} else {
let fm = build_frontmatter_from_spec(
&tenant_id,
section,
NoteStatus::Live,
&spec,
&decisions.tags,
);
let write_result = vault
.write_if_match(
fm,
body_for_write.clone(),
NoteId(spec.note_id),
spec.expected_sha256,
)
.await
.map_err(|e| HandlerError::Business(format!("write_note curate: {e}")))?;
match write_result {
WriteResult::Conflict { current_sha256 } => {
let current_hex = ContentHash(current_sha256).hex();
let attempted_hex = spec.expected_sha256.map(|h| ContentHash(h).hex());
let conflict_payload_str = serde_json::json!({
"current_sha256": current_hex,
"attempted_sha256": attempted_hex,
"timestamp_ms": Utc::now().timestamp_millis(),
})
.to_string();
let job_id = job.record.id;
let duration_ms: u32 = job
.record
.lifecycle
.started_at
.map(|s| {
(Utc::now() - s)
.num_milliseconds()
.max(0)
.min(i64::from(u32::MAX)) as u32
})
.unwrap_or(0);
if let Err(e) = queue
.mark_conflict(job_id, conflict_payload_str, duration_ms)
.await
{
tracing::error!(
job_id = %job_id,
error = %e,
"curate: mark_conflict échoué — job restera en état courant"
);
}
return Ok(JobOutput {
notes_created: vec![],
notes_modified: vec![],
files: vec![],
result_note_md: format!(
"curate: conflit optimistic-lock sur note {} — current_sha256={}",
spec.note_id, current_hex
),
});
}
WriteResult::Written { .. } => {}
}
vault.read_note(NoteId(spec.note_id)).await.map_err(|e| {
HandlerError::Business(format!("read_note post-write curate: {e}"))
})?
};
tracing::info!(
job_id = %job.record.id,
section = %decisions.canonical_section,
"curate: note admise et persistée"
);
Some(note.id)
}
CurateOutcome::Pending {
ref decisions,
ref reason,
} => {
let section =
section_from_str(&decisions.canonical_section).unwrap_or(Section::Reference);
let note = if let Some(existing_note_id) = note_id_for_vault {
let existing = vault.read_note(existing_note_id).await.map_err(|e| {
HandlerError::Business(format!("read_note classify pending: {e}"))
})?;
let mut fm = existing.frontmatter.clone();
fm.section = section;
fm.status = NoteStatus::Staging;
vault
.write_note(fm, existing.body.markdown.clone())
.await
.map_err(|e| {
HandlerError::Business(format!("write_note classify pending: {e}"))
})?
} else {
let fm = build_frontmatter_from_spec(
&tenant_id,
section,
NoteStatus::Staging,
&spec,
&decisions.tags,
);
let write_result = vault
.write_if_match(
fm,
body_for_write.clone(),
NoteId(spec.note_id),
spec.expected_sha256,
)
.await
.map_err(|e| {
HandlerError::Business(format!("write_note curate pending: {e}"))
})?;
match write_result {
WriteResult::Conflict { current_sha256 } => {
let current_hex = ContentHash(current_sha256).hex();
let attempted_hex = spec.expected_sha256.map(|h| ContentHash(h).hex());
let conflict_payload_str = serde_json::json!({
"current_sha256": current_hex,
"attempted_sha256": attempted_hex,
"timestamp_ms": Utc::now().timestamp_millis(),
})
.to_string();
let job_id = job.record.id;
let duration_ms: u32 = job
.record
.lifecycle
.started_at
.map(|s| {
(Utc::now() - s)
.num_milliseconds()
.max(0)
.min(i64::from(u32::MAX)) as u32
})
.unwrap_or(0);
if let Err(e) = queue
.mark_conflict(job_id, conflict_payload_str, duration_ms)
.await
{
tracing::error!(
job_id = %job_id,
error = %e,
"curate: mark_conflict échoué (pending) — job restera en état courant"
);
}
return Ok(JobOutput {
notes_created: vec![],
notes_modified: vec![],
files: vec![],
result_note_md: format!(
"curate: conflit optimistic-lock (pending) sur note {} — current_sha256={}",
spec.note_id, current_hex
),
});
}
WriteResult::Written { .. } => {}
}
vault.read_note(NoteId(spec.note_id)).await.map_err(|e| {
HandlerError::Business(format!("read_note post-write curate pending: {e}"))
})?
};
tracing::info!(
job_id = %job.record.id,
reason = %reason,
"curate: note mise en Staging (revue manuelle requise)"
);
Some(note.id)
}
CurateOutcome::Rejected { ref reason } => {
tracing::info!(
job_id = %job.record.id,
reason = %reason,
"curate: note rejetée — aucune écriture vault"
);
None
}
};
if let Some(note_id) = &written_note_id {
if !title_resolved.is_empty() {
if let Err(e) = index.upsert_note_title(note_id, &title_resolved).await {
tracing::warn!(
note_id = %note_id,
title = %title_resolved,
error = %e,
"curate: upsert_note_title échoué — non fatal"
);
}
}
}
if let Some(note_id) = &written_note_id {
process_wikilinks_b5(
index.as_ref(),
&tenant_id,
¬e_id.to_string(),
&body_for_write,
)
.await;
let embed_record = build_embed_job_record(*note_id, &tenant_id, job.record.id);
if let Err(e) = queue.enqueue(embed_record).await {
tracing::warn!(
note_id = %note_id,
error = %e,
"curate: enqueue Job::Embed échoué — note curée, embed non schedulé (best-effort)"
);
}
}
let result_desc = written_note_id
.map(|id| format!("note {} créée/mise à jour", id))
.unwrap_or_else(|| "rejetée".to_string());
Ok(JobOutput {
notes_created: written_note_id.map(|nid| vec![nid.0]).unwrap_or_default(),
notes_modified: vec![],
files: vec![],
result_note_md: format!("curate: {result_desc}"),
})
}
pub async fn handle_embed(
job: GradatumJob,
vault: Data<Arc<Vault>>,
embedder: Data<Arc<dyn Embedder + Send + Sync>>,
index: Data<Arc<SqliteIndex>>,
) -> Result<JobOutput, HandlerError> {
if job.record.is_dry_run() {
return Ok(JobOutput::dry_run(0, "embed — simulation"));
}
let spec = match &job.record.spec.kind {
Job::Embed(spec) => spec.clone(),
other => {
return Err(HandlerError::UnexpectedVariant(format!("{other:?}")));
}
};
let note_id = NoteId(spec.note_id);
let note = vault
.read_note(note_id)
.await
.map_err(|e| HandlerError::Business(format!("embed: read_note: {e}")))?;
let body_text = note.body.markdown.as_str();
if body_text.is_empty() {
tracing::info!(
job_id = %job.record.id,
note_id = %spec.note_id,
"embed: skip — body vide"
);
return Ok(JobOutput {
notes_created: vec![],
notes_modified: vec![],
files: vec![],
result_note_md: format!("embed: skip note {} — body vide", spec.note_id),
});
}
let truncated = if body_text.len() > 8192 {
let end = body_text
.char_indices()
.nth(2048)
.map(|(i, _)| i)
.unwrap_or(body_text.len());
&body_text[..end]
} else {
body_text
};
let vec = embedder
.embed(truncated)
.await
.map_err(|e| HandlerError::Business(format!("embed: embedder: {e}")))?;
index
.insert_note_embedding(¬e_id, embedder.embedder_id(), embedder.dim(), &vec)
.await
.map_err(|e| HandlerError::Business(format!("embed: insert_note_embedding: {e}")))?;
tracing::info!(
job_id = %job.record.id,
note_id = %spec.note_id,
embedder_id = embedder.embedder_id(),
dim = embedder.dim(),
"embed: done"
);
Ok(JobOutput {
notes_created: vec![],
notes_modified: vec![note_id.0],
files: vec![],
result_note_md: format!(
"embed: note {} vecteur dim={} persisté",
spec.note_id,
embedder.dim()
),
})
}
pub async fn handle_reindex(
job: GradatumJob,
_index: Data<Arc<SqliteIndex>>,
_embedder: Data<Arc<dyn Embedder + Send + Sync>>,
) -> Result<JobOutput, HandlerError> {
if job.record.is_dry_run() {
return Ok(JobOutput::dry_run(0, "reindex — simulation"));
}
let mode = match &job.record.spec.kind {
Job::ReIndex(mode) => mode.clone(),
other => {
return Err(HandlerError::UnexpectedVariant(format!("{other:?}")));
}
};
tracing::info!(
job_id = %job.record.id,
mode = ?mode,
"reindex: DEFERRED Phase 0.4.0 — SqliteIndex rebuild_fts non disponible Phase 1.2"
);
Ok(JobOutput {
notes_created: vec![],
notes_modified: vec![],
files: vec![],
result_note_md: format!("reindex {:?}: DEFERRED Phase 0.4.0", mode),
})
}
fn build_embed_job_record(
note_id: gradatum_core::identity::NoteId,
tenant_id: &str,
parent_job_id: ulid::Ulid,
) -> JobRecord {
let now = Utc::now();
let class = JobClass::Agent;
JobRecord {
id: ulid::Ulid::new(),
spec: JobSpec {
kind: Job::Embed(EmbedSpec {
note_id: note_id.0,
tenant_id: tenant_id.to_string(),
force_regenerate: false,
}),
class,
mode: JobMode::Batch,
scope: JobScope::Notes(vec![note_id.0]),
priority: JobPriority::Normal,
},
scheduling: JobScheduling {
trigger: TriggerSource::Demand,
scheduled_at: now,
await_jobs: vec![],
deadline: None,
cron_expr: None,
},
lifecycle: JobLifecycle {
status: JobStatus::Pending,
created_at: now,
started_at: None,
completed_at: None,
lease_until: None,
result: None,
},
retry: JobRetry::default(),
lineage: JobLineage {
triggered_by: None,
parent_job: Some(parent_job_id),
pipeline_id: None,
pipeline_step: None,
children: vec![],
cost_usd: None,
},
}
}
fn section_from_str(s: &str) -> Option<Section> {
let json_str = format!("\"{}\"", s);
serde_json::from_str::<Section>(&json_str).ok()
}
fn build_frontmatter_from_spec(
tenant_id: &str,
section: Section,
status: NoteStatus,
spec: &CurateSpec,
curator_tags: &[String],
) -> Frontmatter {
let mut all_tags: Vec<String> = spec.tags.clone();
for t in curator_tags {
if !all_tags.contains(t) {
all_tags.push(t.clone());
}
}
let tags: SmallVec<[Tag; 4]> = all_tags
.iter()
.filter_map(|t| Tag::new(t.clone()).ok())
.collect();
let author = spec.author.as_deref().map(AuthorRef::system);
let provenance = gradatum_core::provenance::resolve_provenance(spec.section_hint.as_deref());
Frontmatter {
schema_version: 1,
vault_id: VaultId::new(tenant_id),
locus: None,
section,
status,
status_reason: None,
status_changed: None,
tags,
author,
created: Utc::now(),
updated: None,
extra: ExtraFields::empty(),
provenance: Some(provenance.to_string()),
}
}
async fn process_wikilinks_b5(index: &SqliteIndex, tenant_id: &str, src_note_id: &str, body: &str) {
let wikilinks = gradatum_curator::wikilinks::extract_wikilinks(body);
if wikilinks.is_empty() {
return;
}
for target_title in &wikilinks {
match index.title_lookup(tenant_id, target_title).await {
Ok(Some(dst_id)) => {
if let Err(e) = index.upsert_link(tenant_id, src_note_id, &dst_id).await {
tracing::warn!(
src = %src_note_id,
dst = %dst_id,
error = %e,
"B5: upsert_link échoué — non fatal"
);
}
}
Ok(None) => {
tracing::debug!(
target = %target_title,
"B5: note cible non trouvée — wikilink en suspens"
);
}
Err(e) => {
tracing::warn!(
target = %target_title,
error = %e,
"B5: title_lookup échoué — non fatal"
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use gradatum_core::{
CurateSpec, EmbedSpec, GradatumJob, Job, JobClass, JobLifecycle, JobLineage, JobMode,
JobPriority, JobRecord, JobRetry, JobScheduling, JobScope, JobSpec, JobStatus, ReIndexMode,
TriggerSource,
};
use ulid::Ulid;
fn make_job(kind: Job, mode: JobMode) -> GradatumJob {
let now = Utc::now();
let class = JobClass::Agent;
GradatumJob {
priority: JobPriority::default_for(&class).as_u8(),
record: JobRecord {
id: Ulid::new(),
spec: JobSpec {
kind,
class,
mode,
scope: JobScope::VaultWide,
priority: JobPriority::High,
},
scheduling: JobScheduling {
trigger: TriggerSource::Demand,
scheduled_at: now,
await_jobs: vec![],
deadline: None,
cron_expr: None,
},
lifecycle: JobLifecycle {
status: JobStatus::Running,
created_at: now,
started_at: Some(now),
completed_at: None,
lease_until: None,
result: None,
},
retry: JobRetry::default(),
lineage: JobLineage {
triggered_by: None,
parent_job: None,
pipeline_id: None,
pipeline_step: None,
children: vec![],
cost_usd: None,
},
},
}
}
#[tokio::test]
async fn curate_dry_run_returns_output() {
let job = make_job(
Job::Curate(CurateSpec {
note_id: Ulid::new(),
tenant_id: "main".to_string(),
title: Some("Test note".to_string()),
body: Some("Test body".to_string()),
..Default::default()
}),
JobMode::DryRun,
);
assert!(job.record.is_dry_run());
}
#[tokio::test]
async fn embed_dry_run_is_detected() {
let job = make_job(
Job::Embed(EmbedSpec {
note_id: Ulid::new(),
tenant_id: "main".to_string(),
force_regenerate: false,
}),
JobMode::DryRun,
);
assert!(job.record.is_dry_run());
}
#[tokio::test]
async fn reindex_dry_run_is_detected() {
let job = make_job(Job::ReIndex(ReIndexMode::FtsOnly), JobMode::DryRun);
assert!(job.record.is_dry_run());
}
#[tokio::test]
async fn curate_unexpected_variant_check() {
let job = make_job(Job::Backup, JobMode::Batch);
assert!(!matches!(&job.record.spec.kind, Job::Curate(_)));
}
}