chunkshop-rs 0.9.1

Standalone ingest-to-pgvector: source -> chunker -> embedder -> extractor -> table. int8 BGE by default; bakeoff matrix evaluator built in. Cross-language wire-format compatible with the Python `chunkshop` package.
Documentation
//! Single-cell runner: wires source -> chunker -> embedder -> sink.

use std::time::Instant;

use anyhow::{anyhow, Context, Result};
use tracing::info;

use crate::config::{CellConfig, EmbedderConfig, FramerConfig, SourceConfig};
use crate::embedder::FastembedEmbedder;
use crate::extractor::build_extractor;
use crate::framer::{
    FramerImpl, HeadingBoundaryFramer, IdentityFramer, JsonPathFramer, RegexBoundaryFramer,
    SessionEpisodeFramer,
};
use crate::sources::AnySource;

// `build_chunker` lives in `crate::chunker` so the `chunkers` feature can use
// it standalone (the if_oversize fallback chain calls back into it). Re-export
// here to preserve the previous import path for downstream consumers.
pub use crate::chunker::build_chunker;

/// Materialize a `FramerConfig` into a boxed trait object. Mirrors
/// `build_chunker`. New framer = one new arm here + one new variant.
pub fn build_framer(cfg: FramerConfig) -> Result<Box<dyn FramerImpl + Send + Sync>> {
    Ok(match cfg {
        FramerConfig::Identity(c) => Box::new(IdentityFramer::new(c)),
        FramerConfig::HeadingBoundary(c) => Box::new(HeadingBoundaryFramer::new(c)?),
        FramerConfig::RegexBoundary(c) => Box::new(RegexBoundaryFramer::new(c)?),
        FramerConfig::Jsonpath(c) => Box::new(JsonPathFramer::new(c)),
        FramerConfig::SessionEpisode(c) => Box::new(SessionEpisodeFramer::new(c)),
    })
}

#[derive(Debug, Clone)]
pub struct CellResult {
    pub cell_name: String,
    pub docs_processed: usize,
    pub chunks_written: usize,
    pub wall_seconds: f64,
    /// Wall time spent inside the embedder's `embed()` calls. Subset of
    /// `wall_seconds`; the rest covers chunking, extraction, sink writes,
    /// source iteration. Mirrors Python's `CellResult.embed_seconds`.
    pub embed_seconds: f64,
}

pub async fn run_cell(cfg: CellConfig) -> Result<CellResult> {
    let start = Instant::now();
    info!(cell = %cfg.cell_name, "cell starting");

    if matches!(cfg.source, SourceConfig::Inline(_)) {
        return Err(anyhow!(
            "inline source has no auto-iterator: drive ingest from your app \
             with chunkshop::Pipeline::from_yaml(...).ingest_text(doc_id, text, metadata). \
             See docs/incremental.md (Pattern F) and docs/samples/inline-mode/."
        ));
    }
    let source: AnySource = crate::sources::load_source(&cfg.source).context("load source")?;
    let framer = build_framer(cfg.framer)?;
    let chunker = build_chunker(cfg.chunker)?;
    let extractor = build_extractor(cfg.extractor)?;
    let mut embedder = match cfg.embedder {
        EmbedderConfig::Fastembed(ec) => FastembedEmbedder::new(ec)?,
    };
    let backend = crate::backends::load_backend(&cfg.target).context("load backend")?;
    let sink =
        crate::sinks::load_sink(&cfg.target, backend, embedder.dim()).context("load sink")?;

    info!("creating target table");
    use crate::sinks::Sink;
    sink.create_table().await?;

    let docs = source.iter_documents().await?;
    let limit = cfg.runtime.doc_limit.unwrap_or(usize::MAX);
    let heartbeat = cfg.runtime.heartbeat_every.unwrap_or(25);

    let mut docs_processed = 0usize;
    let mut chunks_written = 0usize;

    // Source emits raw documents; framer expands each raw into one or more
    // framed documents; chunker chunks each framed doc independently. The
    // doc limit applies at the raw level (matches Python).
    for raw in docs.into_iter().take(limit) {
        let framed = framer.frame(&raw)?;
        for doc in framed {
            let chunks = chunker.chunk(&doc);
            if chunks.is_empty() {
                docs_processed += 1;
                continue;
            }
            let texts: Vec<String> = chunks.iter().map(|c| c.embedded_content.clone()).collect();
            let embeddings = embedder.embed(texts)?;

            // Per-chunk extractor pass. Tags collected for the sink; metadata
            // merged into each chunk with chunker-wins semantics:
            //   {**doc.metadata, **r.metadata, **c.metadata}
            // matching python/src/chunkshop/runner.py.
            let mut tags_per_chunk: Vec<Vec<String>> = Vec::with_capacity(chunks.len());
            let mut chunks_with_meta: Vec<crate::chunker::Chunk> = Vec::with_capacity(chunks.len());
            let doc_meta = doc.metadata.as_object().cloned().unwrap_or_default();
            for c in &chunks {
                let r = extractor.extract(&c.original_content)?;
                let mut merged = serde_json::Map::new();
                for (k, v) in &doc_meta {
                    merged.insert(k.clone(), v.clone());
                }
                for (k, v) in &r.metadata {
                    merged.insert(k.clone(), v.clone());
                }
                if let Some(c_meta) = c.metadata.as_object() {
                    for (k, v) in c_meta {
                        merged.insert(k.clone(), v.clone());
                    }
                }
                tags_per_chunk.push(r.tags);
                chunks_with_meta.push(crate::chunker::Chunk {
                    doc_id: c.doc_id.clone(),
                    seq_num: c.seq_num,
                    original_content: c.original_content.clone(),
                    embedded_content: c.embedded_content.clone(),
                    metadata: serde_json::Value::Object(merged),
                });
            }

            sink.write_document(&raw.id, &chunks_with_meta, &embeddings, &tags_per_chunk)
                .await
                .context("write_document")?;
            chunks_written += chunks_with_meta.len();
            docs_processed += 1;
            if docs_processed % heartbeat == 0 {
                info!(docs = docs_processed, chunks = chunks_written, "heartbeat");
            }
        }
    }

    // RM-A O3: advance the source's post-iteration watermark ONLY after
    // the per-doc write loop above succeeds. A mid-loop error (which
    // would have `?`-bubbled out above) leaves the watermark unadvanced,
    // so the next run reselects the same sessions. No-op for non-memory
    // sources.
    source
        .commit_processed()
        .await
        .context("commit_processed")?;

    let wall = start.elapsed().as_secs_f64();
    let embed_seconds = embedder.embed_seconds();
    info!(cell = %cfg.cell_name, docs = docs_processed, chunks = chunks_written, wall = wall, embed = embed_seconds, "cell DONE");
    Ok(CellResult {
        cell_name: cfg.cell_name,
        docs_processed,
        chunks_written,
        wall_seconds: wall,
        embed_seconds,
    })
}