use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use anyhow::{Context, Result, bail};
use clap::Args as ClapArgs;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use mnem_ingest::{
ChunkerAuto, ChunkerKind, IngestConfig, IngestResult, Ingester, Section, SourceKind,
auto_chunker, chunk as chunk_sections,
};
use tracing::{info, info_span};
use crate::config;
use crate::repo;
#[derive(ClapArgs, Debug)]
#[command(after_long_help = "\
Examples:
mnem ingest notes.md
mnem ingest --chunker recursive --max-tokens 1024 book.pdf
mnem ingest --recursive docs/
")]
pub(crate) struct Args {
pub path: PathBuf,
#[arg(long, default_value = "Doc")]
pub ntype: String,
#[arg(long, default_value = "auto")]
pub chunker: String,
#[arg(long, default_value_t = 512)]
pub max_tokens: u32,
#[arg(long, default_value_t = 32)]
pub overlap: u32,
#[arg(long)]
pub recursive: bool,
#[arg(long, short = 'm')]
pub message: Option<String>,
#[arg(long, default_value = "none")]
pub extractor: String,
#[arg(long)]
pub no_embed: bool,
}
const MAX_TOKENS_CAP: u32 = 8192;
const SUPPORTED_EXTS: &[&str] = &["md", "markdown", "txt", "pdf", "json", "jsonl"];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ChunkerChoice {
Auto,
Paragraph,
Recursive,
Session,
}
pub(crate) fn run(override_path: Option<&Path>, a: Args) -> Result<()> {
let _span = info_span!("mnem-ingest").entered();
if a.max_tokens > MAX_TOKENS_CAP {
bail!(
"--max-tokens {} exceeds the {MAX_TOKENS_CAP} cap; lower it \
or raise the ceiling in code",
a.max_tokens
);
}
let data_dir = repo::locate_data_dir(override_path)?;
let cfg = config::load(&data_dir)?;
let r = repo::open_repo(Some(data_dir.as_path()))?;
let files: Vec<PathBuf> = if a.recursive {
collect_files(&a.path)?
} else {
if !a.path.is_file() {
bail!(
"{} is not a file; pass --recursive to walk a directory",
a.path.display()
);
}
vec![a.path.clone()]
};
if files.is_empty() {
bail!("no ingestable files found under {}", a.path.display());
}
let choice = parse_chunker(&a.chunker)?;
let keybert_embedder: Option<std::sync::Arc<dyn mnem_embed_providers::Embedder>> =
match a.extractor.as_str() {
"none" | "" => None,
"keybert" => {
let pc = config::resolve_embedder(&cfg).ok_or_else(|| {
anyhow::anyhow!(
"--extractor keybert requires an `[embed]` provider; checked \
MNEM_EMBED_PROVIDER env, per-repo `.mnem/config.toml`, and \
the user-global `~/.mnem/config.toml`. Configure one with \
`mnem config set embed.provider ollama` (per-repo) or write \
a `[embed]` section to `~/.mnem/config.toml` (global)."
)
})?;
let boxed = mnem_embed_providers::open(&pc)
.map_err(|e| anyhow::anyhow!("opening embed provider for keybert: {e}"))?;
Some(std::sync::Arc::from(boxed))
}
other => bail!("unknown --extractor {other}; expected one of: none, keybert"),
};
struct PreReadFile {
path: PathBuf,
bytes: Vec<u8>,
kind: SourceKind,
chunker: ChunkerKind,
chunk_count: u64,
}
let mut pre: Vec<PreReadFile> = Vec::with_capacity(files.len());
for file in &files {
let kind = Ingester::source_kind_for_path(file);
let bytes = std::fs::read(file).with_context(|| format!("reading {}", file.display()))?;
let chunker = resolve_chunker(choice, kind, a.max_tokens, a.overlap);
let chunk_count = count_chunks_for(&bytes, kind, &chunker).unwrap_or(0);
pre.push(PreReadFile {
path: file.clone(),
bytes,
kind,
chunker,
chunk_count,
});
}
let total_chunks: u64 = pre.iter().map(|f| f.chunk_count).sum();
let use_chunk_progress = total_chunks > 0 && pre.iter().all(|f| f.chunk_count > 0);
let started = Instant::now();
let mut totals = Totals::default();
let mut tx = r.start_transaction();
let pb_total = if use_chunk_progress {
total_chunks
} else {
files.len() as u64
};
let pb = ProgressBar::new(pb_total);
pb.set_style(
ProgressStyle::with_template(
" [{elapsed_precise}] {bar:32.cyan/blue} {pos}/{len} ({percent}%) ETA {eta} {msg}",
)
.unwrap()
.progress_chars("=>-"),
);
pb.set_draw_target(ProgressDrawTarget::stdout());
pb.enable_steady_tick(Duration::from_millis(120));
let progress_cb: Option<std::sync::Arc<dyn Fn() + Send + Sync>> = if use_chunk_progress {
let pb_cb = pb.clone();
Some(std::sync::Arc::new(move || {
pb_cb.inc(1);
}))
} else {
None
};
for f in &pre {
let config = IngestConfig {
chunker: f.chunker.clone(),
ntype: a.ntype.clone(),
max_tokens: a.max_tokens,
overlap: a.overlap,
ner: config::resolve_ner(&cfg),
};
let mut ing = Ingester::new(config);
if let Some(emb) = &keybert_embedder {
ing = ing.with_extractor(Box::new(mnem_ingest::KeyBertAdapter::new(
emb.clone(),
"Keyword",
)));
}
if let Some(cb) = &progress_cb {
ing = ing.with_progress(cb.clone());
}
let display_name = f
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("<unnamed>")
.to_string();
pb.set_message(display_name);
info!(path = %f.path.display(), kind = ?f.kind, "ingesting");
let result = ing
.ingest(&mut tx, &f.bytes, f.kind)
.with_context(|| format!("ingest failed on {}", f.path.display()))?;
totals.add(&result);
if !use_chunk_progress {
pb.inc(1);
}
}
pb.finish_and_clear();
let file_count = files.len();
let default_msg = format!("mnem ingest: {file_count} file(s)");
let msg = a.message.as_deref().unwrap_or(&default_msg);
let new_r = tx.commit(&config::author_string(&cfg), msg)?;
let elapsed_ms = started.elapsed().as_millis();
println!(
"ingested {file_count} files, {} chunks, {} nodes, {} edges in {}ms",
totals.chunk_count, totals.node_count, totals.relation_count, elapsed_ms
);
println!(" op_id {}", new_r.op_id());
if let Some(head) = new_r.view().heads.first() {
println!(" commit_cid {head}");
}
drop(new_r);
drop(r);
drop(keybert_embedder);
if !a.no_embed && config::resolve_embedder(&cfg).is_some() {
println!();
super::reindex::run(
override_path,
super::reindex::Args {
force: false,
label: None,
since: None,
dry_run: false,
message: None,
},
)?;
}
Ok(())
}
#[derive(Default)]
struct Totals {
node_count: u64,
chunk_count: u64,
entity_count: u64,
relation_count: u64,
}
impl Totals {
fn add(&mut self, r: &IngestResult) {
self.node_count = self.node_count.saturating_add(r.node_count);
self.chunk_count = self.chunk_count.saturating_add(r.chunk_count);
self.entity_count = self.entity_count.saturating_add(r.entity_count);
self.relation_count = self.relation_count.saturating_add(r.relation_count);
}
}
fn parse_chunker(s: &str) -> Result<ChunkerChoice> {
Ok(match s.to_ascii_lowercase().as_str() {
"auto" => ChunkerChoice::Auto,
"session" => ChunkerChoice::Session,
"paragraph" => ChunkerChoice::Paragraph,
"recursive" => ChunkerChoice::Recursive,
other => bail!("--chunker must be one of auto|session|paragraph|recursive; got `{other}`"),
})
}
fn resolve_chunker(
choice: ChunkerChoice,
kind: SourceKind,
max_tokens: u32,
overlap: u32,
) -> ChunkerKind {
match choice {
ChunkerChoice::Auto => auto_chunker(
kind,
ChunkerAuto {
max_tokens: Some(max_tokens),
overlap: Some(overlap),
max_messages: None,
},
),
ChunkerChoice::Paragraph => ChunkerKind::Paragraph,
ChunkerChoice::Recursive => ChunkerKind::Recursive {
max_tokens,
overlap,
},
ChunkerChoice::Session => ChunkerKind::Session { max_messages: 10 },
}
}
fn count_chunks_for(bytes: &[u8], kind: SourceKind, chunker: &ChunkerKind) -> Result<u64> {
let sections: Vec<Section> = match kind {
SourceKind::Markdown => {
let s = std::str::from_utf8(bytes).with_context(|| "non-utf8 markdown source")?;
mnem_ingest::md::parse_markdown(s).map_err(|e| anyhow::anyhow!(e.to_string()))?
}
SourceKind::Text => {
let s = std::str::from_utf8(bytes).with_context(|| "non-utf8 text source")?;
mnem_ingest::text::parse_text(s).map_err(|e| anyhow::anyhow!(e.to_string()))?
}
SourceKind::Pdf => {
mnem_ingest::pdf::parse_pdf(bytes).map_err(|e| anyhow::anyhow!(e.to_string()))?
}
SourceKind::Conversation => mnem_ingest::conversation::parse_conversation(bytes)
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
};
Ok(u64::try_from(chunk_sections(§ions, chunker).len()).unwrap_or(u64::MAX))
}
fn collect_files(root: &Path) -> Result<Vec<PathBuf>> {
if !root.is_dir() {
bail!(
"{} is not a directory; drop --recursive to ingest a single file",
root.display()
);
}
let mut out = Vec::new();
let mut stack = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
let entries =
std::fs::read_dir(&dir).with_context(|| format!("reading dir {}", dir.display()))?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
stack.push(path);
continue;
}
if let Some(ext) = path
.extension()
.and_then(|e| e.to_str())
.map(str::to_ascii_lowercase)
&& SUPPORTED_EXTS.contains(&ext.as_str())
{
out.push(path);
}
}
}
out.sort();
Ok(out)
}