use anyhow::{Context, Result};
use crate::kb::{
chunker::{ChunkerInput, LocatorKind, chunk_markdown},
content_store::read::read_doc_body,
entities::extract::{canonical_id, extract_entities},
ledger::LedgerStatus,
model::{KbChunk, KbEntity, KbEntityIndex, LogicalSourceId},
store::{chunks, docs, entities, ledger},
worker::handlers::HandlerCtx,
};
pub fn run(ctx: &HandlerCtx, doc_id: &str, doc_version: u32) -> Result<()> {
let doc = {
let rtx = ctx.store.begin_read()?;
docs::get(&rtx, doc_id)?
.ok_or_else(|| anyhow::anyhow!("chunk_embed: doc {doc_id} not found"))?
};
if doc.version != doc_version {
tracing::warn!(
doc = %crate::kb::redact(doc_id),
"kb worker: doc version mismatch (job v{doc_version} vs current v{}); skipping",
doc.version
);
return Ok(());
}
let abs = ctx.paths.root.join(&doc.markdown_path);
let body = read_doc_body(&abs).with_context(|| format!("read body {}", abs.display()))?;
let lsid = LogicalSourceId(doc.logical_source_id.clone());
let chunks_vec: Vec<KbChunk> = chunk_markdown(ChunkerInput {
logical_source_id: &lsid,
doc_id: &doc.id,
doc_version: doc.version,
markdown_body: &body,
default_locator_kind: LocatorKind::MdSection,
});
let texts: Vec<String> = chunks_vec.iter().map(|c| c.indexed_text.clone()).collect();
let vectors = ctx.embedder.embed_batch(&texts)?;
if vectors.len() != chunks_vec.len() {
return Err(anyhow::anyhow!(
"embedder returned {} vectors for {} chunks",
vectors.len(),
chunks_vec.len()
));
}
let embedder_id = ctx.embedder.embedder_id().to_string();
let chunks_with_vec: Vec<KbChunk> = chunks_vec
.into_iter()
.zip(vectors)
.map(|(mut c, v)| {
c.vector = v;
c.embedder_id = embedder_id.clone();
c
})
.collect();
{
let wtx = ctx.store.begin_write()?;
let now_ms = chrono::Utc::now().timestamp_millis();
let removed =
chunks::delete_for_doc_version_below(&wtx, &doc.logical_source_id, doc.version)?;
if removed > 0 {
tracing::info!(
doc = %crate::kb::redact(doc_id),
old_chunks = removed,
"kb worker: removed stale chunks from prior version"
);
}
for c in &chunks_with_vec {
chunks::put(&wtx, c)?;
for mention in extract_entities(&c.indexed_text) {
let canonical = canonical_id(mention.kind, &mention.surface);
entities::put_entity(
&wtx,
&KbEntity {
canonical_id: canonical.clone(),
surface_forms: vec![mention.surface.clone()],
kind: mention.kind,
created_at: now_ms,
},
)?;
entities::put_index(
&wtx,
&KbEntityIndex {
entity_id: canonical,
chunk_id: c.id.clone(),
doc_id: c.doc_id.clone(),
mention_count: 1,
score: 1.0,
},
)?;
}
}
match ledger::find_pending_by_doc_in_wtx(&wtx, &doc.id)? {
Some(entry) => {
ledger::update_status(&wtx, &entry.id, LedgerStatus::IndexingComplete, now_ms)?;
}
None => {
tracing::debug!(
doc = %crate::kb::redact(&doc.id),
"kb worker: no Pending ledger row for doc — treating as idempotent rerun"
);
}
}
wtx.commit()?;
}
for c in &chunks_with_vec {
ctx.index.upsert_chunk(c)?;
}
ctx.index.commit()?;
tracing::info!(
doc = %crate::kb::redact(doc_id),
n_chunks = chunks_with_vec.len(),
"kb worker: chunk_embed + index update complete"
);
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tempfile::TempDir;
use super::*;
use crate::kb::{
canonicalize::{CanonicalizeInput, canonicalize_by_mime},
embedder::{KbEmbedder, StubEmbedder},
paths::KbPaths,
pipeline::{IngestInput, ingest_canonicalized},
store::KbStore,
};
fn fixture() -> (TempDir, HandlerCtx, String) {
let tmp = TempDir::new().unwrap();
let store = Arc::new(KbStore::open(&tmp.path().join("kb.redb")).unwrap());
let paths = Arc::new(KbPaths::new(tmp.path().join("kb")));
paths.ensure_layout().unwrap();
let embedder: Arc<dyn KbEmbedder> = Arc::new(StubEmbedder::default());
let index = Arc::new(crate::kb::index::KbIndex::open(&paths).unwrap());
let bytes = b"# Hi\n\nbody one.\n\nbody two.";
let canon = canonicalize_by_mime(CanonicalizeInput {
bytes,
mime: "text/markdown",
hint_title: Some("t"),
logical_source_id_seed: None,
})
.unwrap()
.unwrap();
let out = ingest_canonicalized(
&store,
IngestInput {
canon: &canon,
raw_bytes: bytes,
raw_ext: "md",
visibility: None,
owner_user_id: None,
seen_key: None,
source: None,
paths: &paths,
},
)
.unwrap();
let ctx = HandlerCtx {
store,
paths,
embedder,
index,
};
(tmp, ctx, out.doc_id)
}
#[test]
fn writes_chunks_with_vectors() {
let (_tmp, ctx, doc_id) = fixture();
let doc = {
let rtx = ctx.store.begin_read().unwrap();
docs::get(&rtx, &doc_id).unwrap().unwrap()
};
run(&ctx, &doc_id, doc.version).unwrap();
let rtx = ctx.store.begin_read().unwrap();
let cs = chunks::chunks_for_logical(&rtx, &doc.logical_source_id).unwrap();
assert!(!cs.is_empty());
for c in &cs {
assert_eq!(c.vector.len(), 1024);
assert_eq!(c.embedder_id, "stub-sha256-1024");
}
}
#[test]
fn idempotent_rerun_produces_same_chunks() {
let (_tmp, ctx, doc_id) = fixture();
let doc = {
let rtx = ctx.store.begin_read().unwrap();
docs::get(&rtx, &doc_id).unwrap().unwrap()
};
run(&ctx, &doc_id, doc.version).unwrap();
let before = {
let rtx = ctx.store.begin_read().unwrap();
chunks::chunks_for_logical(&rtx, &doc.logical_source_id).unwrap()
};
run(&ctx, &doc_id, doc.version).unwrap();
let after = {
let rtx = ctx.store.begin_read().unwrap();
chunks::chunks_for_logical(&rtx, &doc.logical_source_id).unwrap()
};
assert_eq!(before.len(), after.len());
for (a, b) in before.iter().zip(after.iter()) {
assert_eq!(a.id, b.id);
assert_eq!(a.vector, b.vector);
}
}
#[test]
fn ledger_advances_to_indexing_complete() {
let (_tmp, ctx, doc_id) = fixture();
let doc = {
let rtx = ctx.store.begin_read().unwrap();
docs::get(&rtx, &doc_id).unwrap().unwrap()
};
run(&ctx, &doc_id, doc.version).unwrap();
let rtx = ctx.store.begin_read().unwrap();
let done = ledger::list_by_status(&rtx, LedgerStatus::IndexingComplete).unwrap();
assert_eq!(done.len(), 1);
assert_eq!(done[0].doc_id, doc_id);
}
#[test]
fn rerun_after_ledger_advanced_does_not_error() {
let (_tmp, ctx, doc_id) = fixture();
let doc = {
let rtx = ctx.store.begin_read().unwrap();
docs::get(&rtx, &doc_id).unwrap().unwrap()
};
run(&ctx, &doc_id, doc.version).unwrap();
run(&ctx, &doc_id, doc.version).unwrap();
let rtx = ctx.store.begin_read().unwrap();
let done = ledger::list_by_status(&rtx, LedgerStatus::IndexingComplete).unwrap();
assert_eq!(done.len(), 1);
}
#[test]
fn new_version_drops_old_chunks() {
use crate::kb::{canonicalize::CanonicalizeInput, model::LogicalSourceId};
let tmp = TempDir::new().unwrap();
let store = Arc::new(KbStore::open(&tmp.path().join("kb.redb")).unwrap());
let paths = Arc::new(KbPaths::new(tmp.path().join("kb")));
paths.ensure_layout().unwrap();
let embedder: Arc<dyn KbEmbedder> = Arc::new(StubEmbedder::default());
let index = Arc::new(crate::kb::index::KbIndex::open(&paths).unwrap());
let ctx = HandlerCtx {
store: store.clone(),
paths: paths.clone(),
embedder,
index,
};
let lsid = LogicalSourceId("file:custom:rotate".into());
let mk = |bytes: &[u8]| {
crate::kb::canonicalize::canonicalize_by_mime(CanonicalizeInput {
bytes,
mime: "text/markdown",
hint_title: Some("rotate"),
logical_source_id_seed: Some(lsid.clone()),
})
.unwrap()
.unwrap()
};
let v1 = mk(b"# v1\n\nfirst body very different.");
let v1_out = ingest_canonicalized(
&store,
IngestInput {
canon: &v1,
raw_bytes: b"# v1\n\nfirst body very different.",
raw_ext: "md",
visibility: None,
owner_user_id: None,
seen_key: None,
source: None,
paths: &paths,
},
)
.unwrap();
run(&ctx, &v1_out.doc_id, 1).unwrap();
let v1_chunks_before = {
let rtx = store.begin_read().unwrap();
chunks::chunks_for_logical(&rtx, &lsid.0).unwrap()
};
assert!(!v1_chunks_before.is_empty());
let v2 = mk(b"# v2\n\nentirely different body content here.");
let v2_out = ingest_canonicalized(
&store,
IngestInput {
canon: &v2,
raw_bytes: b"# v2\n\nentirely different body content here.",
raw_ext: "md",
visibility: None,
owner_user_id: None,
seen_key: None,
source: None,
paths: &paths,
},
)
.unwrap();
run(&ctx, &v2_out.doc_id, 2).unwrap();
let rtx = store.begin_read().unwrap();
let chunks_after = chunks::chunks_for_logical(&rtx, &lsid.0).unwrap();
for c in &chunks_after {
assert_eq!(c.doc_version, 2);
}
}
}