use std::path::{Path, PathBuf};
use std::time::Instant;
use anyhow::{Context, Result};
use globset::{Glob, GlobSet, GlobSetBuilder};
use heliosdb_nano::code_graph::{CodeIndexOptions, CodeIndexStats};
use heliosdb_nano::config::{Config as EngineConfig, WalSyncModeConfig};
use heliosdb_nano::graph_rag::{
ChunkStrategy, IngestDocsOptions, IngestStats as DocStats, LinkerStats,
};
use heliosdb_nano::{EmbeddedDatabase, Value};
use ignore::WalkBuilder;
use heliosdb_nano::code_graph::embed::FastEmbedder;
fn run_code_index_with_inproc_embedder(
db: &EmbeddedDatabase,
opts: CodeIndexOptions,
) -> heliosdb_nano::Result<CodeIndexStats> {
let embedder = FastEmbedder::try_default()?;
heliosdb_nano::code_graph::storage::code_index_with_embedder(db, opts, Box::new(embedder))
}
pub fn open_kb_for_ingest(kb_dir: &Path, durable: bool) -> Result<EmbeddedDatabase> {
let mut cfg = EngineConfig::default();
cfg.storage.path = Some(kb_dir.to_path_buf());
cfg.storage.memory_only = false;
cfg.storage.wal_sync_mode = if durable {
WalSyncModeConfig::Sync
} else {
WalSyncModeConfig::Async
};
EmbeddedDatabase::with_config(cfg)
.with_context(|| format!("failed to open EmbeddedDatabase at {}", kb_dir.display()))
}
const MAX_FILE_BYTES: u64 = 5 * 1024 * 1024; const SOURCE_TABLE: &str = "src";
const DOCS_TABLE: &str = "docs";
const DOCS_MD_TABLE: &str = "docs_md";
const MAX_ERROR_SAMPLES: usize = 10;
const PROGRESS_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2);
const PROGRESS_EVERY_FILES: u64 = 250;
const SKIP_DIRS: &[&str] = &[
"target", "node_modules", "dist", "build", "out", ".venv",
"venv", "__pycache__", ".next",
".nuxt", ".cache", "vendor", "Pods", ".gradle",
".mvn", ".idea",
".vscode", ".pytest_cache",
".mypy_cache",
".ruff_cache",
".tox",
];
#[derive(Debug, Clone)]
pub struct IngestOptions {
pub source_root: PathBuf,
pub kb_dir: PathBuf,
pub include_binary_docs: bool,
pub force_reparse: bool,
pub durable_writes: bool,
pub with_embeddings: bool,
pub background_quality: bool,
pub llm_distill: Option<crate::distill::LlmDistillOptions>,
}
#[derive(Debug, Default)]
pub struct IngestSummary {
pub files_seen: u64,
pub code_upserts: u64,
pub doc_upserts: u64,
pub md_doc_upserts: u64,
pub binary_upserts: u64,
pub skipped: u64,
pub read_errors: u64,
pub read_error_samples: Vec<String>,
pub elapsed_ms: u128,
pub code: Option<CodeIndexStats>,
pub docs: Option<DocStats>,
pub docs_md: Option<DocStats>,
pub links: Option<LinkerStats>,
pub distill: Option<DistillSummary>,
pub llm_distill: Option<LlmDistillSummary>,
}
#[derive(Debug, Default, Clone)]
#[allow(dead_code)]
pub struct DistillSummary {
pub symbols_written: usize,
pub symbols_unchanged: usize,
pub files_written: usize,
pub pagerank_iters: u32,
pub pagerank_converged: bool,
}
#[derive(Debug, Default, Clone)]
#[allow(dead_code)]
pub struct LlmDistillSummary {
pub written: usize,
pub unchanged: usize,
pub failed: usize,
pub total_prompt_tokens: u64,
pub total_completion_tokens: u64,
}
#[derive(Debug)]
enum Class<'a> {
Code(&'a str), CodeAndDoc(&'a str),
Text,
Notebook, Pdf,
Docx,
Xlsx,
Skip,
}
fn classify(path: &Path) -> Class<'static> {
let ext = path
.extension()
.and_then(|s| s.to_str())
.map(|s| s.to_ascii_lowercase());
let ext = match ext.as_deref() {
Some(e) => e,
None => return Class::Skip,
};
match ext {
"rs" => Class::Code("rust"),
"py" => Class::Code("python"),
"ts" => Class::Code("typescript"),
"tsx" => Class::Code("tsx"),
"js" | "mjs" | "cjs" => Class::Code("javascript"),
"go" => Class::Code("go"),
"sql" => Class::Code("sql"),
"md" | "markdown" => Class::CodeAndDoc("markdown"),
"ipynb" => Class::Notebook,
"graphql" | "gql" => Class::Text, "proto" | "thrift" => Class::Text, "txt" | "rst" | "tex" | "org" | "log" | "toml" | "yaml" | "yml" | "json" | "ini"
| "cfg" => Class::Text,
"pdf" => Class::Pdf,
"docx" => Class::Docx,
"xlsx" | "xlsm" => Class::Xlsx,
_ => Class::Skip,
}
}
pub fn ingest(db: &EmbeddedDatabase, opts: IngestOptions) -> Result<IngestSummary> {
let started = Instant::now();
let mut summary = IngestSummary::default();
ensure_tables(db)?;
let prior = crate::checkpoint::read(&opts.kb_dir)?;
let resume_from = prior.as_ref().map(|cp| cp.phase);
let source_root_str = opts.source_root.to_string_lossy().into_owned();
if let Some(ref cp) = prior {
eprintln!(
"ingest: resuming from interrupted run (left at phase = {:?}, started {} s ago)",
cp.phase,
crate::quality::now_secs().saturating_sub(cp.started_at_secs),
);
}
let is_quality_child = std::env::var(crate::quality::PROGRESS_ENV).is_ok();
if !is_quality_child {
let skip_walk = matches!(
resume_from,
Some(crate::checkpoint::Phase::CodeIndex) | Some(crate::checkpoint::Phase::GraphRag)
);
if skip_walk {
if let Ok(rows) = db.query("SELECT count(*) FROM src", &[]) {
if let Some(n) = rows.first().and_then(|r| r.values.first()) {
summary.code_upserts = match n {
Value::Int4(v) => *v as u64,
Value::Int8(v) => *v as u64,
_ => 0,
};
}
}
if let Ok(rows) = db.query("SELECT count(*) FROM docs", &[]) {
if let Some(n) = rows.first().and_then(|r| r.values.first()) {
let n = match n {
Value::Int4(v) => *v as u64,
Value::Int8(v) => *v as u64,
_ => 0,
};
summary.doc_upserts = n;
}
}
if let Ok(rows) = db.query("SELECT count(*) FROM docs_md", &[]) {
if let Some(n) = rows.first().and_then(|r| r.values.first()) {
summary.md_doc_upserts = match n {
Value::Int4(v) => *v as u64,
Value::Int8(v) => *v as u64,
_ => 0,
};
}
}
eprintln!(
"ingest phase: walk skipped (resume) — trusting existing src/docs rows ({} src, {} docs, {} docs_md)",
summary.code_upserts, summary.doc_upserts, summary.md_doc_upserts,
);
} else {
crate::checkpoint::begin(
&opts.kb_dir,
&source_root_str,
crate::checkpoint::Phase::Walk,
)?;
let txn = TxnGuard::begin(db)?;
let walk_result = walk_and_upsert(db, &opts, &mut summary);
match walk_result {
Ok(()) => txn.commit()?,
Err(e) => {
let _ = txn.rollback();
return Err(e);
}
}
}
} else {
if let Ok(rows) = db.query("SELECT count(*) FROM src", &[]) {
if let Some(n) = rows.first().and_then(|r| r.values.first()) {
summary.code_upserts = match n {
Value::Int4(v) => *v as u64,
Value::Int8(v) => *v as u64,
_ => 0,
};
}
}
if let Ok(rows) = db.query("SELECT count(*) FROM docs", &[]) {
if let Some(n) = rows.first().and_then(|r| r.values.first()) {
let n = match n {
Value::Int4(v) => *v as u64,
Value::Int8(v) => *v as u64,
_ => 0,
};
summary.doc_upserts = n;
}
}
eprintln!(
"ingest phase (quality-child): skipping walk; trusting existing src/docs rows ({} src, {} docs)",
summary.code_upserts, summary.doc_upserts,
);
}
if summary.code_upserts > 0 {
if !is_quality_child {
crate::checkpoint::advance(
&opts.kb_dir,
&source_root_str,
crate::checkpoint::Phase::CodeIndex,
)?;
}
eprintln!(
"ingest phase: walk done in {:.1} s ({} files upserted) — \
starting code-graph indexer (parse + symbol extract + write to _hdb_code_*){}",
started.elapsed().as_secs_f64(),
summary.code_upserts,
if opts.with_embeddings {
" + body embeddings"
} else {
""
}
);
let code_started = Instant::now();
let cio = CodeIndexOptions {
source_table: SOURCE_TABLE.to_string(),
embed_bodies: opts.with_embeddings,
embed_endpoint: None,
embed_bearer: None,
force_reparse: opts.force_reparse,
parallelism: None,
chunk_size: None,
};
let result = if opts.with_embeddings {
run_code_index_with_inproc_embedder(db, cio)
} else {
db.code_index(cio)
};
match result.map_err(anyhow::Error::from) {
Ok(s) => {
summary.code = Some(s);
eprintln!(
"ingest phase: code-graph done in {:.1} s",
code_started.elapsed().as_secs_f64()
);
}
Err(e) => tracing::warn!("code_index failed: {e}"),
}
}
let has_row_docs = summary.doc_upserts + summary.binary_upserts > 0;
let has_md_docs = summary.md_doc_upserts > 0;
if has_row_docs || has_md_docs {
if !is_quality_child {
crate::checkpoint::advance(
&opts.kb_dir,
&source_root_str,
crate::checkpoint::Phase::GraphRag,
)?;
}
eprintln!(
"ingest phase: starting graph-rag doc projection ({} row-mode + {} heading-mode docs)",
summary.doc_upserts + summary.binary_upserts,
summary.md_doc_upserts,
);
let docs_started = Instant::now();
if has_row_docs {
let opts2 = IngestDocsOptions {
source_table: DOCS_TABLE.to_string(),
id_col: "path".to_string(),
text_col: "content".to_string(),
title_col: None,
chunk_by: ChunkStrategy::Row,
};
match db.graph_rag_ingest_docs(&opts2) {
Ok(s) => summary.docs = Some(s),
Err(e) => tracing::warn!("graph_rag_ingest_docs(row) failed: {e}"),
}
}
if has_md_docs {
let opts_md = IngestDocsOptions {
source_table: DOCS_MD_TABLE.to_string(),
id_col: "path".to_string(),
text_col: "content".to_string(),
title_col: None,
chunk_by: ChunkStrategy::Headings,
};
match db.graph_rag_ingest_docs(&opts_md) {
Ok(s) => summary.docs_md = Some(s),
Err(e) => tracing::warn!("graph_rag_ingest_docs(headings) failed: {e}"),
}
}
eprintln!(
"ingest phase: graph-rag done in {:.1} s",
docs_started.elapsed().as_secs_f64()
);
let link_started = Instant::now();
match crate::linker::link_mentions_bulk(db) {
Ok(stats) => {
eprintln!(
"ingest phase: linker done in {:.1} s — {} MENTIONS edges added (bulk)",
link_started.elapsed().as_secs_f64(),
stats.mentions_added
);
summary.links = Some(stats);
}
Err(e) => tracing::warn!("link_mentions_bulk failed: {e}"),
}
if !is_quality_child {
crate::checkpoint::advance(
&opts.kb_dir,
&source_root_str,
crate::checkpoint::Phase::Distill,
)?;
let distill_started = Instant::now();
let sym_stats = match crate::distill::build_symbol_cards(db) {
Ok(s) => s,
Err(e) => {
tracing::warn!("distill::build_symbol_cards failed: {e}");
crate::distill::DistillStats::default()
}
};
let repo_stats = match crate::distill::build_repomap_cards(db) {
Ok(s) => s,
Err(e) => {
tracing::warn!("distill::build_repomap_cards failed: {e}");
crate::distill::DistillStats::default()
}
};
eprintln!(
"ingest phase: distill done in {:.1} s — {} symbol cards ({} unchanged, scanned {}), \
{} file cards (pagerank {} iters, converged={})",
distill_started.elapsed().as_secs_f64(),
sym_stats.symbols_written,
sym_stats.symbols_unchanged,
sym_stats.symbols_scanned,
repo_stats.files_written,
repo_stats.pagerank_iters,
repo_stats.pagerank_converged,
);
summary.distill = Some(DistillSummary {
symbols_written: sym_stats.symbols_written,
symbols_unchanged: sym_stats.symbols_unchanged,
files_written: repo_stats.files_written,
pagerank_iters: repo_stats.pagerank_iters,
pagerank_converged: repo_stats.pagerank_converged,
});
if let Some(ref llm_opts) = opts.llm_distill {
let llm_started = Instant::now();
eprintln!(
"ingest phase: LLM distill starting — endpoint={} model={} concurrency={}",
llm_opts.endpoint, llm_opts.model, llm_opts.concurrency
);
match crate::distill::build_llm_summaries(db, llm_opts) {
Ok(stats) => {
eprintln!(
"ingest phase: LLM distill done in {:.1} s — {}/{} written ({} unchanged, {} failed), \
tokens prompt={} completion={}",
llm_started.elapsed().as_secs_f64(),
stats.written,
stats.candidates,
stats.unchanged,
stats.failed,
stats.total_prompt_tokens,
stats.total_completion_tokens,
);
summary.llm_distill = Some(LlmDistillSummary {
written: stats.written,
unchanged: stats.unchanged,
failed: stats.failed,
total_prompt_tokens: stats.total_prompt_tokens,
total_completion_tokens: stats.total_completion_tokens,
});
}
Err(e) => tracing::warn!("build_llm_summaries failed: {e}"),
}
}
}
}
if !is_quality_child {
let _ = crate::checkpoint::clear(&opts.kb_dir);
}
summary.elapsed_ms = started.elapsed().as_millis();
Ok(summary)
}
fn walk_and_upsert(
db: &EmbeddedDatabase,
opts: &IngestOptions,
summary: &mut IngestSummary,
) -> Result<()> {
let walker = WalkBuilder::new(&opts.source_root)
.hidden(true) .git_ignore(true) .git_global(true) .git_exclude(true)
.filter_entry(|entry| {
if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
if let Some(name) = entry.file_name().to_str() {
if SKIP_DIRS.contains(&name) {
return false;
}
}
}
true
})
.build();
let linguist_skip = load_linguist_generated_globset(&opts.source_root);
let mut last_progress_at = Instant::now();
let mut last_progress_files: u64 = 0;
let walk_started = Instant::now();
for entry in walker {
let entry = match entry {
Ok(e) => e,
Err(_) => {
summary.read_errors += 1;
continue;
}
};
if last_progress_at.elapsed() >= PROGRESS_INTERVAL
|| summary.files_seen.saturating_sub(last_progress_files) >= PROGRESS_EVERY_FILES
{
eprintln!(
"ingest progress: walked {} files ({} code, {} text, {} doc upserted) — {:.1} s",
summary.files_seen,
summary.code_upserts,
summary.doc_upserts,
summary.binary_upserts,
walk_started.elapsed().as_secs_f64()
);
last_progress_at = Instant::now();
last_progress_files = summary.files_seen;
}
let path = entry.path();
if !entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
continue;
}
if path
.components()
.any(|c| c.as_os_str() == ".helios-kb" || c.as_os_str() == ".helios-index")
{
continue;
}
summary.files_seen += 1;
let meta = match entry.metadata() {
Ok(m) => m,
Err(_) => {
summary.skipped += 1;
continue;
}
};
if meta.len() > MAX_FILE_BYTES {
summary.skipped += 1;
continue;
}
let class = classify(path);
let rel = relative_path(path, &opts.source_root);
if matches!(class, Class::Code(_) | Class::Notebook) {
if let Some(set) = linguist_skip.as_ref() {
if set.is_match(&rel) {
summary.skipped += 1;
continue;
}
}
}
if matches!(class, Class::Code(_) | Class::Notebook) && is_generated_file(path) {
summary.skipped += 1;
continue;
}
match class {
Class::Code(lang) => match read_utf8(path) {
Ok(content) => {
upsert_src(db, &rel, &content, lang)?;
summary.code_upserts += 1;
}
Err(e) => record_read_error(summary, path, &e.to_string()),
},
Class::CodeAndDoc(lang) => match read_utf8(path) {
Ok(content) => {
upsert_src(db, &rel, &content, lang)?;
summary.code_upserts += 1;
upsert_doc_md(db, &rel, &content)?;
summary.md_doc_upserts += 1;
}
Err(e) => record_read_error(summary, path, &e.to_string()),
},
Class::Text => match read_utf8(path) {
Ok(content) => {
upsert_doc(db, &rel, &content, "text")?;
summary.doc_upserts += 1;
}
Err(e) => record_read_error(summary, path, &e.to_string()),
},
Class::Notebook => match extract_ipynb(path) {
Ok((src_text, lang)) if !src_text.trim().is_empty() => {
upsert_src(db, &rel, &src_text, lang)?;
summary.code_upserts += 1;
}
Ok(_) => record_read_error(summary, path, "notebook had no code cells"),
Err(e) => record_read_error(summary, path, &e.to_string()),
},
Class::Pdf if opts.include_binary_docs => match extract_pdf(path) {
Ok(text) if !text.trim().is_empty() => {
upsert_doc(db, &rel, &text, "pdf")?;
summary.binary_upserts += 1;
}
Ok(_) => record_read_error(summary, path, "PDF produced empty text"),
Err(e) => record_read_error(summary, path, &e.to_string()),
},
Class::Docx if opts.include_binary_docs => match extract_docx(path) {
Ok(text) if !text.trim().is_empty() => {
upsert_doc(db, &rel, &text, "docx")?;
summary.binary_upserts += 1;
}
Ok(_) => record_read_error(summary, path, "DOCX produced empty text"),
Err(e) => record_read_error(summary, path, &e.to_string()),
},
Class::Xlsx if opts.include_binary_docs => match extract_xlsx(path) {
Ok(text) if !text.trim().is_empty() => {
upsert_doc(db, &rel, &text, "xlsx")?;
summary.binary_upserts += 1;
}
Ok(_) => record_read_error(summary, path, "XLSX produced empty text"),
Err(e) => record_read_error(summary, path, &e.to_string()),
},
Class::Pdf | Class::Docx | Class::Xlsx => {
summary.skipped += 1;
}
Class::Skip => {
summary.skipped += 1;
}
}
}
Ok(())
}
struct TxnGuard<'a> {
db: &'a EmbeddedDatabase,
finished: bool,
}
impl<'a> TxnGuard<'a> {
fn begin(db: &'a EmbeddedDatabase) -> Result<Self> {
db.execute("BEGIN").context("BEGIN transaction")?;
Ok(Self {
db,
finished: false,
})
}
fn commit(mut self) -> Result<()> {
self.db.execute("COMMIT").context("COMMIT transaction")?;
self.finished = true;
Ok(())
}
fn rollback(mut self) -> Result<()> {
self.db
.execute("ROLLBACK")
.context("ROLLBACK transaction")?;
self.finished = true;
Ok(())
}
}
impl<'a> Drop for TxnGuard<'a> {
fn drop(&mut self) {
if !self.finished {
let _ = self.db.execute("ROLLBACK");
}
}
}
fn ensure_tables(db: &EmbeddedDatabase) -> Result<()> {
db.execute(
"CREATE TABLE IF NOT EXISTS src (
path TEXT PRIMARY KEY,
content TEXT,
lang TEXT
)",
)
.context("create src table")?;
db.execute(
"CREATE TABLE IF NOT EXISTS docs (
path TEXT PRIMARY KEY,
content TEXT,
kind TEXT
)",
)
.context("create docs table")?;
db.execute(
"CREATE TABLE IF NOT EXISTS docs_md (
path TEXT PRIMARY KEY,
content TEXT,
kind TEXT
)",
)
.context("create docs_md table")?;
Ok(())
}
fn load_linguist_generated_globset(root: &Path) -> Option<GlobSet> {
let candidates = [
root.join(".gitattributes"),
root.join(".git").join("info").join("attributes"),
];
let mut builder = GlobSetBuilder::new();
let mut count = 0usize;
for p in &candidates {
let body = match std::fs::read_to_string(p) {
Ok(s) => s,
Err(_) => continue,
};
for raw in body.lines() {
let line = raw.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
let mut parts = line.split_whitespace();
let pattern = match parts.next() {
Some(p) => p,
None => continue,
};
let mut linguist_generated = false;
for attr in parts {
if attr == "linguist-generated"
|| attr == "linguist-generated=true"
|| attr == "linguist-generated=set"
{
linguist_generated = true;
break;
}
}
if !linguist_generated {
continue;
}
if let Ok(glob) = Glob::new(pattern) {
builder.add(glob);
count += 1;
}
}
}
if count == 0 {
return None;
}
builder.build().ok()
}
fn is_generated_file(path: &Path) -> bool {
use std::io::Read;
let mut file = match std::fs::File::open(path) {
Ok(f) => f,
Err(_) => return false,
};
let mut buf = [0u8; 4096];
let n = match file.read(&mut buf) {
Ok(n) => n,
Err(_) => return false,
};
let head = String::from_utf8_lossy(&buf[..n]);
head.contains("@generated")
|| head.contains("DO NOT EDIT")
|| head.contains("AUTO-GENERATED")
|| head.contains("Code generated by") }
fn record_read_error(summary: &mut IngestSummary, path: &Path, reason: &str) {
summary.read_errors += 1;
if summary.read_error_samples.len() < MAX_ERROR_SAMPLES {
summary
.read_error_samples
.push(format!("{}: {}", path.display(), reason));
}
}
fn upsert_src(db: &EmbeddedDatabase, path: &str, content: &str, lang: &str) -> Result<()> {
db.execute_params(
"INSERT INTO src (path, content, lang) VALUES ($1, $2, $3) \
ON CONFLICT(path) DO UPDATE SET content = excluded.content, lang = excluded.lang",
&[
Value::String(path.to_string()),
Value::String(content.to_string()),
Value::String(lang.to_string()),
],
)
.with_context(|| format!("upsert_src {path}"))?;
Ok(())
}
fn upsert_doc_md(db: &EmbeddedDatabase, path: &str, content: &str) -> Result<()> {
db.execute_params(
"INSERT INTO docs_md (path, content, kind) VALUES ($1, $2, 'markdown') \
ON CONFLICT(path) DO UPDATE SET content = excluded.content, kind = excluded.kind",
&[
Value::String(path.to_string()),
Value::String(content.to_string()),
],
)
.with_context(|| format!("upsert_doc_md {path}"))?;
Ok(())
}
fn upsert_doc(db: &EmbeddedDatabase, path: &str, content: &str, kind: &str) -> Result<()> {
db.execute_params(
"INSERT INTO docs (path, content, kind) VALUES ($1, $2, $3) \
ON CONFLICT(path) DO UPDATE SET content = excluded.content, kind = excluded.kind",
&[
Value::String(path.to_string()),
Value::String(content.to_string()),
Value::String(kind.to_string()),
],
)
.with_context(|| format!("upsert_doc {path}"))?;
Ok(())
}
fn read_utf8(path: &Path) -> Result<String> {
let bytes = std::fs::read(path)?;
String::from_utf8(bytes).map_err(|e| anyhow::anyhow!("not utf-8: {e}"))
}
fn relative_path(path: &Path, root: &Path) -> String {
path.strip_prefix(root)
.unwrap_or(path)
.to_string_lossy()
.into_owned()
}
fn extract_ipynb(path: &Path) -> Result<(String, &'static str)> {
let body = std::fs::read_to_string(path)
.with_context(|| format!("read notebook {}", path.display()))?;
let v: serde_json::Value = serde_json::from_str(&body)
.with_context(|| format!("parse notebook {} as JSON", path.display()))?;
let lang_tag = v
.pointer("/metadata/kernelspec/language")
.and_then(|x| x.as_str())
.and_then(|s| match s.to_lowercase().as_str() {
"python" | "python3" => Some("python"),
"typescript" => Some("typescript"),
"javascript" => Some("javascript"),
"rust" => Some("rust"),
"go" => Some("go"),
"sql" => Some("sql"),
_ => None,
})
.unwrap_or("python");
let mut out = String::new();
if let Some(cells) = v.get("cells").and_then(|c| c.as_array()) {
for cell in cells {
let kind = cell.get("cell_type").and_then(|c| c.as_str()).unwrap_or("");
if kind != "code" {
continue;
}
if let Some(src) = cell.get("source") {
if let Some(s) = src.as_str() {
out.push_str(s);
out.push('\n');
} else if let Some(arr) = src.as_array() {
for line in arr {
if let Some(s) = line.as_str() {
out.push_str(s);
}
}
out.push('\n');
}
}
}
}
Ok((out, lang_tag))
}
#[cfg(feature = "native-binary-docs")]
fn extract_pdf(path: &Path) -> Result<String> {
pdf_extract::extract_text(path).map_err(|e| anyhow::anyhow!("pdf-extract: {e}"))
}
#[cfg(not(feature = "native-binary-docs"))]
fn extract_pdf(_path: &Path) -> Result<String> {
anyhow::bail!(
"PDF ingestion not enabled — rebuild with `--features native-binary-docs` (or use Docling)"
)
}
#[cfg(feature = "native-binary-docs")]
fn extract_docx(path: &Path) -> Result<String> {
use docx_rs::*;
let bytes = std::fs::read(path)?;
let docx = read_docx(&bytes).map_err(|e| anyhow::anyhow!("docx-rs read: {e}"))?;
let mut out = String::new();
for child in &docx.document.children {
if let DocumentChild::Paragraph(p) = child {
for run in &p.children {
if let ParagraphChild::Run(r) = run {
for c in &r.children {
if let RunChild::Text(t) = c {
out.push_str(&t.text);
}
}
}
}
out.push('\n');
}
}
Ok(out)
}
#[cfg(not(feature = "native-binary-docs"))]
fn extract_docx(_path: &Path) -> Result<String> {
anyhow::bail!(
"DOCX ingestion not enabled — rebuild with `--features native-binary-docs` (or use Docling)"
)
}
#[cfg(feature = "native-binary-docs")]
fn extract_xlsx(path: &Path) -> Result<String> {
use calamine::{open_workbook_auto, Reader};
let mut wb = open_workbook_auto(path).map_err(|e| anyhow::anyhow!("calamine open: {e}"))?;
let mut out = String::new();
let names: Vec<String> = wb.sheet_names().to_owned();
for name in &names {
if let Ok(range) = wb.worksheet_range(name) {
out.push_str(&format!("# Sheet: {name}\n"));
for row in range.rows() {
let cells: Vec<String> = row.iter().map(|c| c.to_string()).collect();
out.push_str(&cells.join("\t"));
out.push('\n');
}
out.push('\n');
}
}
Ok(out)
}
#[cfg(not(feature = "native-binary-docs"))]
fn extract_xlsx(_path: &Path) -> Result<String> {
anyhow::bail!(
"XLSX ingestion not enabled — rebuild with `--features native-binary-docs` (or use Docling)"
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
#[test]
fn classify_extensions() {
assert!(matches!(classify(Path::new("a.rs")), Class::Code("rust")));
assert!(matches!(classify(Path::new("a.py")), Class::Code("python")));
assert!(matches!(classify(Path::new("a.tsx")), Class::Code("tsx")));
assert!(matches!(
classify(Path::new("a.md")),
Class::CodeAndDoc("markdown")
));
assert!(matches!(classify(Path::new("a.txt")), Class::Text));
assert!(matches!(classify(Path::new("a.pdf")), Class::Pdf));
assert!(matches!(classify(Path::new("a.docx")), Class::Docx));
assert!(matches!(classify(Path::new("a.xlsx")), Class::Xlsx));
assert!(matches!(classify(Path::new("a.png")), Class::Skip));
assert!(matches!(classify(Path::new("a")), Class::Skip));
}
}