use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use anyhow::{Context, Result, bail};
use clap::Args as ClapArgs;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use mnem_ingest::{
ChunkerKind, IngestConfig, IngestResult, Ingester, Section, SourceKind,
chunk as chunk_sections, resolve_chunker,
};
use tracing::{info, info_span};
use crate::config;
use crate::repo;
#[derive(ClapArgs, Debug)]
#[command(after_long_help = "\
Examples:
mnem ingest notes.md
mnem ingest --text \"The quick brown fox\"
mnem ingest --chunker recursive --max-tokens 1024 book.pdf
mnem ingest --recursive docs/
")]
pub(crate) struct Args {
#[arg(conflicts_with = "text")]
pub path: Option<PathBuf>,
#[arg(long, conflicts_with = "path", conflicts_with = "recursive")]
pub text: Option<String>,
#[arg(long, default_value = "Doc")]
pub ntype: String,
#[arg(long, default_value = "auto")]
pub chunker: String,
#[arg(long, default_value_t = 512)]
pub max_tokens: u32,
#[arg(long, default_value_t = 32)]
pub overlap: u32,
#[arg(long, conflicts_with = "text")]
pub recursive: bool,
#[arg(long, short = 'm')]
pub message: Option<String>,
#[arg(long, default_value = "none")]
pub extractor: String,
#[arg(long)]
pub no_embed: bool,
}
const MAX_TOKENS_CAP: u32 = 8192;
const SUPPORTED_EXTS: &[&str] = &[
"md", "markdown", "txt", "pdf", "json", "jsonl",
"yaml", "yml", "toml", "xml", "html", "htm", "csv", "sql",
"rs", "py", "js", "ts", "tsx", "mts", "cts", "go", "java", "c", "cpp", "cc", "cxx", "h", "hpp", "hxx",
"rb", "gemspec", "rake", "erb", "cs", "csx",
"sh", "bash", "zsh", "fish",
"php", "swift", "kt", "kts", "scala", "lua",
"ex", "exs", "hs", "lhs", "r", "zig",
"ini", "conf", "env",
];
pub(crate) fn run(override_path: Option<&Path>, a: Args) -> Result<()> {
let _span = info_span!("mnem-ingest").entered();
if a.max_tokens > MAX_TOKENS_CAP {
bail!(
"--max-tokens {} exceeds the {MAX_TOKENS_CAP} cap; lower it \
or raise the ceiling in code",
a.max_tokens
);
}
let data_dir = repo::locate_data_dir(override_path)?;
let cfg = config::load(&data_dir)?;
let r = repo::open_repo(Some(data_dir.as_path()))?;
let text_bytes: Option<Vec<u8>> = a.text.as_deref().map(|s| s.as_bytes().to_vec());
let files: Vec<PathBuf> = if text_bytes.is_some() {
Vec::new()
} else if let Some(ref p) = a.path {
if a.recursive {
collect_files(p)?
} else {
if !p.is_file() {
bail!(
"{} is not a file; pass --recursive to walk a directory",
p.display()
);
}
vec![p.clone()]
}
} else {
bail!("either <PATH> or --text must be provided");
};
if text_bytes.is_none() && files.is_empty() {
bail!(
"no ingestable files found under {}",
a.path
.as_deref()
.unwrap_or_else(|| std::path::Path::new("<unknown>"))
.display()
);
}
const DEFAULT_MAX_TOKENS: u32 = 512;
if a.max_tokens != DEFAULT_MAX_TOKENS
&& matches!(
a.chunker.to_ascii_lowercase().as_str(),
"paragraph" | "session"
)
{
eprintln!(
"warning: --max-tokens has no effect with --chunker {}; \
use --chunker recursive to enable token-based splitting",
a.chunker
);
}
let keybert_embedder: Option<std::sync::Arc<dyn mnem_embed_providers::Embedder>> =
match a.extractor.as_str() {
"none" | "" => None,
"keybert" => {
let pc = config::resolve_embedder(&cfg).ok_or_else(|| {
anyhow::anyhow!(
"--extractor keybert requires an `[embed]` provider; checked \
MNEM_EMBED_PROVIDER env, per-repo `.mnem/config.toml`, and \
the user-global `~/.mnem/config.toml`. Configure one with \
`mnem config set embed.provider ollama` (per-repo) or write \
a `[embed]` section to `~/.mnem/config.toml` (global)."
)
})?;
let boxed = mnem_embed_providers::open(&pc)
.map_err(|e| anyhow::anyhow!("opening embed provider for keybert: {e}"))?;
Some(std::sync::Arc::from(boxed))
}
other => bail!("unknown --extractor {other}; expected one of: none, keybert"),
};
struct PreReadFile {
path: PathBuf,
bytes: Vec<u8>,
kind: SourceKind,
chunker: ChunkerKind,
chunk_count: u64,
}
let mut pre: Vec<PreReadFile> = Vec::with_capacity(files.len().max(1));
if let Some(bytes) = text_bytes {
let kind = SourceKind::Text;
let chunker =
resolve_chunker(&a.chunker, kind, a.max_tokens, a.overlap).context("--chunker")?;
let chunk_count = count_chunks_for(&bytes, kind, &chunker).unwrap_or(0);
pre.push(PreReadFile {
path: PathBuf::from("<inline>"),
bytes,
kind,
chunker,
chunk_count,
});
} else {
for file in &files {
let kind = Ingester::source_kind_for_path(file);
let bytes =
std::fs::read(file).with_context(|| format!("reading {}", file.display()))?;
let chunker =
resolve_chunker(&a.chunker, kind, a.max_tokens, a.overlap).context("--chunker")?;
let chunk_count = count_chunks_for(&bytes, kind, &chunker).unwrap_or(0);
pre.push(PreReadFile {
path: file.clone(),
bytes,
kind,
chunker,
chunk_count,
});
}
}
let total_chunks: u64 = pre.iter().map(|f| f.chunk_count).sum();
let use_chunk_progress = total_chunks > 0 && pre.iter().all(|f| f.chunk_count > 0);
let started = Instant::now();
let mut totals = Totals::default();
let mut tx = r.start_transaction();
let pb_total = if use_chunk_progress {
total_chunks
} else {
pre.len() as u64
};
let pb = ProgressBar::new(pb_total);
pb.set_style(
ProgressStyle::with_template(
" [{elapsed_precise}] {bar:32.cyan/blue} {pos}/{len} ({percent}%) ETA {eta} {msg}",
)
.unwrap()
.progress_chars("=>-"),
);
pb.set_draw_target(ProgressDrawTarget::stdout());
pb.enable_steady_tick(Duration::from_millis(120));
let progress_cb: Option<std::sync::Arc<dyn Fn() + Send + Sync>> = if use_chunk_progress {
let pb_cb = pb.clone();
Some(std::sync::Arc::new(move || {
pb_cb.inc(1);
}))
} else {
None
};
for f in &pre {
let config = IngestConfig {
chunker: f.chunker.clone(),
ntype: a.ntype.clone(),
max_tokens: a.max_tokens,
overlap: a.overlap,
ner: config::resolve_ner(&cfg),
};
let mut ing = Ingester::new(config);
if let Some(emb) = &keybert_embedder {
ing = ing.with_extractor(Box::new(mnem_ingest::KeyBertAdapter::new(
emb.clone(),
"Keyword",
)));
}
if let Some(cb) = &progress_cb {
ing = ing.with_progress(cb.clone());
}
let display_name = f
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("<unnamed>")
.to_string();
pb.set_message(display_name);
info!(path = %f.path.display(), kind = ?f.kind, "ingesting");
let result = ing
.ingest(&mut tx, &f.bytes, f.kind)
.with_context(|| format!("ingest failed on {}", f.path.display()))?;
totals.add(&result);
if !use_chunk_progress {
pb.inc(1);
}
}
pb.finish_and_clear();
let file_count = pre.len();
let default_msg = format!("mnem ingest: {file_count} file(s)");
let msg = a.message.as_deref().unwrap_or(&default_msg);
let new_r = tx.commit(&config::author_string(&cfg), msg)?;
let elapsed_ms = started.elapsed().as_millis();
println!(
"ingested {file_count} files, {} chunks, {} nodes, {} edges in {}ms",
totals.chunk_count, totals.node_count, totals.edge_count, elapsed_ms
);
println!(" op_id {}", new_r.op_id());
if let Some(head) = new_r.view().heads.first() {
println!(" commit_cid {head}");
}
drop(new_r);
drop(r);
drop(keybert_embedder);
if !a.no_embed && config::resolve_embedder(&cfg).is_some() {
println!();
super::reindex::run(
override_path,
super::reindex::Args {
force: false,
label: None,
since: None,
dry_run: false,
message: None,
lift_legacy_extra: false,
lift_legacy_sparse: false,
},
)?;
}
Ok(())
}
#[derive(Default)]
struct Totals {
node_count: u64,
chunk_count: u64,
entity_count: u64,
relation_count: u64,
edge_count: u64,
}
impl Totals {
fn add(&mut self, r: &IngestResult) {
self.node_count = self.node_count.saturating_add(r.node_count);
self.chunk_count = self.chunk_count.saturating_add(r.chunk_count);
self.entity_count = self.entity_count.saturating_add(r.entity_count);
self.relation_count = self.relation_count.saturating_add(r.relation_count);
self.edge_count = self.edge_count.saturating_add(r.edge_count);
}
}
fn count_chunks_for(bytes: &[u8], kind: SourceKind, chunker: &ChunkerKind) -> Result<u64> {
let sections: Vec<Section> = match kind {
SourceKind::Markdown => {
let s = std::str::from_utf8(bytes).with_context(|| "non-utf8 markdown source")?;
mnem_ingest::md::parse_markdown(s).map_err(|e| anyhow::anyhow!(e.to_string()))?
}
SourceKind::Text => {
let s = std::str::from_utf8(bytes).with_context(|| "non-utf8 text source")?;
mnem_ingest::text::parse_text(s).map_err(|e| anyhow::anyhow!(e.to_string()))?
}
SourceKind::Pdf => {
mnem_ingest::pdf::parse_pdf(bytes).map_err(|e| anyhow::anyhow!(e.to_string()))?
}
SourceKind::Conversation => mnem_ingest::conversation::parse_conversation(bytes)
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
SourceKind::Code(lang) => {
let s = std::str::from_utf8(bytes).with_context(|| "non-utf8 code source")?;
mnem_ingest::code::parse_code(s, lang).map_err(|e| anyhow::anyhow!(e.to_string()))?
}
};
Ok(u64::try_from(chunk_sections(§ions, chunker).len()).unwrap_or(u64::MAX))
}
fn collect_files(root: &Path) -> Result<Vec<PathBuf>> {
if !root.is_dir() {
bail!(
"{} is not a directory; drop --recursive to ingest a single file",
root.display()
);
}
let mut out = Vec::new();
let mut stack = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
let entries =
std::fs::read_dir(&dir).with_context(|| format!("reading dir {}", dir.display()))?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
stack.push(path);
continue;
}
if let Some(ext) = path
.extension()
.and_then(|e| e.to_str())
.map(str::to_ascii_lowercase)
&& SUPPORTED_EXTS.contains(&ext.as_str())
{
out.push(path);
}
}
}
out.sort();
Ok(out)
}