use anyhow::{Context, Result};
use gradatum_queue::Queue;
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct BackfillArgs {
pub root: PathBuf,
pub tenant: Option<String>,
pub limit: Option<usize>,
}
pub async fn backfill(args: BackfillArgs) -> Result<usize> {
let queue_path = args.root.join("db/queue.sqlite");
let index_path = args.root.join("vault/.gradatum/index.db");
if !queue_path.exists() {
anyhow::bail!(
"queue.sqlite introuvable : {} — exécuter `gradatum-admin init` d'abord",
queue_path.display()
);
}
if !index_path.exists() {
anyhow::bail!(
"index.db introuvable : {} — le worker doit avoir démarré au moins une fois",
index_path.display()
);
}
let tenant = args.tenant.as_deref().unwrap_or("main").to_string();
let candidates = collect_unembedded_notes(&index_path, &tenant, args.limit)
.context("scan notes sans embedding")?;
if candidates.is_empty() {
eprintln!(
"backfill: 0 jobs enqueued — toutes les notes sont déjà embedded (tenant='{tenant}')"
);
return Ok(0);
}
let total = candidates.len();
eprintln!("backfill: {total} notes sans embedding trouvées (tenant='{tenant}') — enqueue...");
let queue = gradatum_queue::SqliteQueue::new(&queue_path)
.await
.context("ouverture SqliteQueue")?;
let mut enqueued = 0usize;
for (batch_start, (note_id, body_text)) in candidates.into_iter().enumerate() {
let payload = serde_json::json!({
"note_id": note_id,
"body_text": body_text,
});
let job = gradatum_queue::NewJob {
tenant_id: tenant.clone(),
kind: "embed_note".to_string(),
payload: serde_json::to_vec(&payload).context("sérialisation payload embed_note")?,
max_attempts: 3,
};
queue.enqueue(job).await.context("enqueue embed_note")?;
enqueued += 1;
if (batch_start + 1) % 100 == 0 {
eprintln!("backfill: {enqueued}/{total} jobs enqueued...");
}
}
eprintln!("backfill: {enqueued} jobs enqueued (tenant='{tenant}')");
Ok(enqueued)
}
fn collect_unembedded_notes(
index_path: &std::path::Path,
tenant: &str,
limit: Option<usize>,
) -> Result<Vec<(String, String)>> {
let conn = rusqlite::Connection::open(index_path).context("ouverture index.db en lecture")?;
let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
let query = format!(
"SELECT n.id, n.body_text
FROM notes n
LEFT JOIN note_embeddings e ON n.id = e.note_id
WHERE e.note_id IS NULL
AND n.vault_id = ?1
ORDER BY n.id
{limit_clause}"
);
let mut stmt = conn
.prepare(&query)
.context("préparation requête backfill")?;
let rows = stmt
.query_map(rusqlite::params![tenant], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})
.context("exécution requête backfill")?;
let candidates: Vec<(String, String)> = rows
.collect::<std::result::Result<_, _>>()
.context("collecte résultats backfill")?;
Ok(candidates)
}