1use std::time::Instant;
4
5use anyhow::{anyhow, Context, Result};
6use tracing::info;
7
8use crate::config::{CellConfig, EmbedderConfig, FramerConfig, SourceConfig};
9use crate::embedder::FastembedEmbedder;
10use crate::extractor::build_extractor;
11use crate::framer::{
12 FramerImpl, HeadingBoundaryFramer, IdentityFramer, JsonPathFramer, RegexBoundaryFramer,
13 SessionEpisodeFramer,
14};
15use crate::sources::AnySource;
16
17pub use crate::chunker::build_chunker;
21
22pub fn build_framer(cfg: FramerConfig) -> Result<Box<dyn FramerImpl + Send + Sync>> {
25 Ok(match cfg {
26 FramerConfig::Identity(c) => Box::new(IdentityFramer::new(c)),
27 FramerConfig::HeadingBoundary(c) => Box::new(HeadingBoundaryFramer::new(c)?),
28 FramerConfig::RegexBoundary(c) => Box::new(RegexBoundaryFramer::new(c)?),
29 FramerConfig::Jsonpath(c) => Box::new(JsonPathFramer::new(c)),
30 FramerConfig::SessionEpisode(c) => Box::new(SessionEpisodeFramer::new(c)),
31 })
32}
33
34#[derive(Debug, Clone)]
35pub struct CellResult {
36 pub cell_name: String,
37 pub docs_processed: usize,
38 pub chunks_written: usize,
39 pub wall_seconds: f64,
40 pub embed_seconds: f64,
44}
45
46pub async fn run_cell(cfg: CellConfig) -> Result<CellResult> {
47 let start = Instant::now();
48 info!(cell = %cfg.cell_name, "cell starting");
49
50 if matches!(cfg.source, SourceConfig::Inline(_)) {
51 return Err(anyhow!(
52 "inline source has no auto-iterator: drive ingest from your app \
53 with chunkshop::Pipeline::from_yaml(...).ingest_text(doc_id, text, metadata). \
54 See docs/incremental.md (Pattern F) and docs/samples/inline-mode/."
55 ));
56 }
57 let source: AnySource = crate::sources::load_source(&cfg.source).context("load source")?;
58 let framer = build_framer(cfg.framer)?;
59 let chunker = build_chunker(cfg.chunker)?;
60 let extractor = build_extractor(cfg.extractor)?;
61 let mut embedder = match cfg.embedder {
62 EmbedderConfig::Fastembed(ec) => FastembedEmbedder::new(ec)?,
63 };
64 let backend = crate::backends::load_backend(&cfg.target).context("load backend")?;
65 let sink =
66 crate::sinks::load_sink(&cfg.target, backend, embedder.dim()).context("load sink")?;
67
68 info!("creating target table");
69 use crate::sinks::Sink;
70 sink.create_table().await?;
71
72 let docs = source.iter_documents().await?;
73 let limit = cfg.runtime.doc_limit.unwrap_or(usize::MAX);
74 let heartbeat = cfg.runtime.heartbeat_every.unwrap_or(25);
75
76 let mut docs_processed = 0usize;
77 let mut chunks_written = 0usize;
78
79 for raw in docs.into_iter().take(limit) {
83 let framed = framer.frame(&raw)?;
84 for doc in framed {
85 let chunks = chunker.chunk(&doc);
86 if chunks.is_empty() {
87 docs_processed += 1;
88 continue;
89 }
90 let texts: Vec<String> = chunks.iter().map(|c| c.embedded_content.clone()).collect();
91 let embeddings = embedder.embed(texts)?;
92
93 let mut tags_per_chunk: Vec<Vec<String>> = Vec::with_capacity(chunks.len());
98 let mut chunks_with_meta: Vec<crate::chunker::Chunk> = Vec::with_capacity(chunks.len());
99 let doc_meta = doc.metadata.as_object().cloned().unwrap_or_default();
100 for c in &chunks {
101 let r = extractor.extract(&c.original_content)?;
102 let mut merged = serde_json::Map::new();
103 for (k, v) in &doc_meta {
104 merged.insert(k.clone(), v.clone());
105 }
106 for (k, v) in &r.metadata {
107 merged.insert(k.clone(), v.clone());
108 }
109 if let Some(c_meta) = c.metadata.as_object() {
110 for (k, v) in c_meta {
111 merged.insert(k.clone(), v.clone());
112 }
113 }
114 tags_per_chunk.push(r.tags);
115 chunks_with_meta.push(crate::chunker::Chunk {
116 doc_id: c.doc_id.clone(),
117 seq_num: c.seq_num,
118 original_content: c.original_content.clone(),
119 embedded_content: c.embedded_content.clone(),
120 metadata: serde_json::Value::Object(merged),
121 });
122 }
123
124 sink.write_document(&raw.id, &chunks_with_meta, &embeddings, &tags_per_chunk)
125 .await
126 .context("write_document")?;
127 chunks_written += chunks_with_meta.len();
128 docs_processed += 1;
129 if docs_processed % heartbeat == 0 {
130 info!(docs = docs_processed, chunks = chunks_written, "heartbeat");
131 }
132 }
133 }
134
135 source
141 .commit_processed()
142 .await
143 .context("commit_processed")?;
144
145 let wall = start.elapsed().as_secs_f64();
146 let embed_seconds = embedder.embed_seconds();
147 info!(cell = %cfg.cell_name, docs = docs_processed, chunks = chunks_written, wall = wall, embed = embed_seconds, "cell DONE");
148 Ok(CellResult {
149 cell_name: cfg.cell_name,
150 docs_processed,
151 chunks_written,
152 wall_seconds: wall,
153 embed_seconds,
154 })
155}