use std::time::Instant;
use anyhow::{anyhow, Context, Result};
use tracing::info;
use crate::config::{CellConfig, EmbedderConfig, FramerConfig, SourceConfig};
use crate::embedder::FastembedEmbedder;
use crate::extractor::build_extractor;
use crate::framer::{
FramerImpl, HeadingBoundaryFramer, IdentityFramer, JsonPathFramer, RegexBoundaryFramer,
SessionEpisodeFramer,
};
use crate::sources::AnySource;
pub use crate::chunker::build_chunker;
pub fn build_framer(cfg: FramerConfig) -> Result<Box<dyn FramerImpl + Send + Sync>> {
Ok(match cfg {
FramerConfig::Identity(c) => Box::new(IdentityFramer::new(c)),
FramerConfig::HeadingBoundary(c) => Box::new(HeadingBoundaryFramer::new(c)?),
FramerConfig::RegexBoundary(c) => Box::new(RegexBoundaryFramer::new(c)?),
FramerConfig::Jsonpath(c) => Box::new(JsonPathFramer::new(c)),
FramerConfig::SessionEpisode(c) => Box::new(SessionEpisodeFramer::new(c)),
})
}
#[derive(Debug, Clone)]
pub struct CellResult {
pub cell_name: String,
pub docs_processed: usize,
pub chunks_written: usize,
pub wall_seconds: f64,
pub embed_seconds: f64,
}
pub async fn run_cell(cfg: CellConfig) -> Result<CellResult> {
let start = Instant::now();
info!(cell = %cfg.cell_name, "cell starting");
if matches!(cfg.source, SourceConfig::Inline(_)) {
return Err(anyhow!(
"inline source has no auto-iterator: drive ingest from your app \
with chunkshop::Pipeline::from_yaml(...).ingest_text(doc_id, text, metadata). \
See docs/incremental.md (Pattern F) and docs/samples/inline-mode/."
));
}
let source: AnySource = crate::sources::load_source(&cfg.source).context("load source")?;
let framer = build_framer(cfg.framer)?;
let chunker = build_chunker(cfg.chunker)?;
let extractor = build_extractor(cfg.extractor)?;
let mut embedder = match cfg.embedder {
EmbedderConfig::Fastembed(ec) => FastembedEmbedder::new(ec)?,
};
let backend = crate::backends::load_backend(&cfg.target).context("load backend")?;
let sink =
crate::sinks::load_sink(&cfg.target, backend, embedder.dim()).context("load sink")?;
info!("creating target table");
use crate::sinks::Sink;
sink.create_table().await?;
let docs = source.iter_documents().await?;
let limit = cfg.runtime.doc_limit.unwrap_or(usize::MAX);
let heartbeat = cfg.runtime.heartbeat_every.unwrap_or(25);
let mut docs_processed = 0usize;
let mut chunks_written = 0usize;
for raw in docs.into_iter().take(limit) {
let framed = framer.frame(&raw)?;
for doc in framed {
let chunks = chunker.chunk(&doc);
if chunks.is_empty() {
docs_processed += 1;
continue;
}
let texts: Vec<String> = chunks.iter().map(|c| c.embedded_content.clone()).collect();
let embeddings = embedder.embed(texts)?;
let mut tags_per_chunk: Vec<Vec<String>> = Vec::with_capacity(chunks.len());
let mut chunks_with_meta: Vec<crate::chunker::Chunk> = Vec::with_capacity(chunks.len());
let doc_meta = doc.metadata.as_object().cloned().unwrap_or_default();
for c in &chunks {
let r = extractor.extract(&c.original_content)?;
let mut merged = serde_json::Map::new();
for (k, v) in &doc_meta {
merged.insert(k.clone(), v.clone());
}
for (k, v) in &r.metadata {
merged.insert(k.clone(), v.clone());
}
if let Some(c_meta) = c.metadata.as_object() {
for (k, v) in c_meta {
merged.insert(k.clone(), v.clone());
}
}
tags_per_chunk.push(r.tags);
chunks_with_meta.push(crate::chunker::Chunk {
doc_id: c.doc_id.clone(),
seq_num: c.seq_num,
original_content: c.original_content.clone(),
embedded_content: c.embedded_content.clone(),
metadata: serde_json::Value::Object(merged),
});
}
sink.write_document(&raw.id, &chunks_with_meta, &embeddings, &tags_per_chunk)
.await
.context("write_document")?;
chunks_written += chunks_with_meta.len();
docs_processed += 1;
if docs_processed % heartbeat == 0 {
info!(docs = docs_processed, chunks = chunks_written, "heartbeat");
}
}
}
source
.commit_processed()
.await
.context("commit_processed")?;
let wall = start.elapsed().as_secs_f64();
let embed_seconds = embedder.embed_seconds();
info!(cell = %cfg.cell_name, docs = docs_processed, chunks = chunks_written, wall = wall, embed = embed_seconds, "cell DONE");
Ok(CellResult {
cell_name: cfg.cell_name,
docs_processed,
chunks_written,
wall_seconds: wall,
embed_seconds,
})
}