Skip to main content

chunkshop/
runner.rs

1//! Single-cell runner: wires source -> chunker -> embedder -> sink.
2
3use std::time::Instant;
4
5use anyhow::{anyhow, Context, Result};
6use tracing::info;
7
8use crate::config::{CellConfig, EmbedderConfig, FramerConfig, SourceConfig};
9use crate::embedder::FastembedEmbedder;
10use crate::extractor::build_extractor;
11use crate::framer::{
12    FramerImpl, HeadingBoundaryFramer, IdentityFramer, JsonPathFramer, RegexBoundaryFramer,
13    SessionEpisodeFramer,
14};
15use crate::sources::AnySource;
16
17// `build_chunker` lives in `crate::chunker` so the `chunkers` feature can use
18// it standalone (the if_oversize fallback chain calls back into it). Re-export
19// here to preserve the previous import path for downstream consumers.
20pub use crate::chunker::build_chunker;
21
22/// Materialize a `FramerConfig` into a boxed trait object. Mirrors
23/// `build_chunker`. New framer = one new arm here + one new variant.
24pub fn build_framer(cfg: FramerConfig) -> Result<Box<dyn FramerImpl + Send + Sync>> {
25    Ok(match cfg {
26        FramerConfig::Identity(c) => Box::new(IdentityFramer::new(c)),
27        FramerConfig::HeadingBoundary(c) => Box::new(HeadingBoundaryFramer::new(c)?),
28        FramerConfig::RegexBoundary(c) => Box::new(RegexBoundaryFramer::new(c)?),
29        FramerConfig::Jsonpath(c) => Box::new(JsonPathFramer::new(c)),
30        FramerConfig::SessionEpisode(c) => Box::new(SessionEpisodeFramer::new(c)),
31    })
32}
33
34#[derive(Debug, Clone)]
35pub struct CellResult {
36    pub cell_name: String,
37    pub docs_processed: usize,
38    pub chunks_written: usize,
39    pub wall_seconds: f64,
40    /// Wall time spent inside the embedder's `embed()` calls. Subset of
41    /// `wall_seconds`; the rest covers chunking, extraction, sink writes,
42    /// source iteration. Mirrors Python's `CellResult.embed_seconds`.
43    pub embed_seconds: f64,
44}
45
46pub async fn run_cell(cfg: CellConfig) -> Result<CellResult> {
47    let start = Instant::now();
48    info!(cell = %cfg.cell_name, "cell starting");
49
50    if matches!(cfg.source, SourceConfig::Inline(_)) {
51        return Err(anyhow!(
52            "inline source has no auto-iterator: drive ingest from your app \
53             with chunkshop::Pipeline::from_yaml(...).ingest_text(doc_id, text, metadata). \
54             See docs/incremental.md (Pattern F) and docs/samples/inline-mode/."
55        ));
56    }
57    let source: AnySource = crate::sources::load_source(&cfg.source).context("load source")?;
58    let framer = build_framer(cfg.framer)?;
59    let chunker = build_chunker(cfg.chunker)?;
60    let extractor = build_extractor(cfg.extractor)?;
61    let mut embedder = match cfg.embedder {
62        EmbedderConfig::Fastembed(ec) => FastembedEmbedder::new(ec)?,
63    };
64    let backend = crate::backends::load_backend(&cfg.target).context("load backend")?;
65    let sink =
66        crate::sinks::load_sink(&cfg.target, backend, embedder.dim()).context("load sink")?;
67
68    info!("creating target table");
69    use crate::sinks::Sink;
70    sink.create_table().await?;
71
72    let docs = source.iter_documents().await?;
73    let limit = cfg.runtime.doc_limit.unwrap_or(usize::MAX);
74    let heartbeat = cfg.runtime.heartbeat_every.unwrap_or(25);
75
76    let mut docs_processed = 0usize;
77    let mut chunks_written = 0usize;
78
79    // Source emits raw documents; framer expands each raw into one or more
80    // framed documents; chunker chunks each framed doc independently. The
81    // doc limit applies at the raw level (matches Python).
82    for raw in docs.into_iter().take(limit) {
83        let framed = framer.frame(&raw)?;
84        for doc in framed {
85            let chunks = chunker.chunk(&doc);
86            if chunks.is_empty() {
87                docs_processed += 1;
88                continue;
89            }
90            let texts: Vec<String> = chunks.iter().map(|c| c.embedded_content.clone()).collect();
91            let embeddings = embedder.embed(texts)?;
92
93            // Per-chunk extractor pass. Tags collected for the sink; metadata
94            // merged into each chunk with chunker-wins semantics:
95            //   {**doc.metadata, **r.metadata, **c.metadata}
96            // matching python/src/chunkshop/runner.py.
97            let mut tags_per_chunk: Vec<Vec<String>> = Vec::with_capacity(chunks.len());
98            let mut chunks_with_meta: Vec<crate::chunker::Chunk> = Vec::with_capacity(chunks.len());
99            let doc_meta = doc.metadata.as_object().cloned().unwrap_or_default();
100            for c in &chunks {
101                let r = extractor.extract(&c.original_content)?;
102                let mut merged = serde_json::Map::new();
103                for (k, v) in &doc_meta {
104                    merged.insert(k.clone(), v.clone());
105                }
106                for (k, v) in &r.metadata {
107                    merged.insert(k.clone(), v.clone());
108                }
109                if let Some(c_meta) = c.metadata.as_object() {
110                    for (k, v) in c_meta {
111                        merged.insert(k.clone(), v.clone());
112                    }
113                }
114                tags_per_chunk.push(r.tags);
115                chunks_with_meta.push(crate::chunker::Chunk {
116                    doc_id: c.doc_id.clone(),
117                    seq_num: c.seq_num,
118                    original_content: c.original_content.clone(),
119                    embedded_content: c.embedded_content.clone(),
120                    metadata: serde_json::Value::Object(merged),
121                });
122            }
123
124            sink.write_document(&raw.id, &chunks_with_meta, &embeddings, &tags_per_chunk)
125                .await
126                .context("write_document")?;
127            chunks_written += chunks_with_meta.len();
128            docs_processed += 1;
129            if docs_processed % heartbeat == 0 {
130                info!(docs = docs_processed, chunks = chunks_written, "heartbeat");
131            }
132        }
133    }
134
135    // RM-A O3: advance the source's post-iteration watermark ONLY after
136    // the per-doc write loop above succeeds. A mid-loop error (which
137    // would have `?`-bubbled out above) leaves the watermark unadvanced,
138    // so the next run reselects the same sessions. No-op for non-memory
139    // sources.
140    source
141        .commit_processed()
142        .await
143        .context("commit_processed")?;
144
145    let wall = start.elapsed().as_secs_f64();
146    let embed_seconds = embedder.embed_seconds();
147    info!(cell = %cfg.cell_name, docs = docs_processed, chunks = chunks_written, wall = wall, embed = embed_seconds, "cell DONE");
148    Ok(CellResult {
149        cell_name: cfg.cell_name,
150        docs_processed,
151        chunks_written,
152        wall_seconds: wall,
153        embed_seconds,
154    })
155}