mnem-cli 0.1.3

Command-line interface for mnem - git for knowledge graphs.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
//! `mnem ingest <path>` - parse external source files into the graph.
//!
//! Wires `mnem-ingest`'s [`Ingester`] pipeline behind a CLI surface so
//! an operator can point at a file or directory and get a committed
//! Doc + Chunk + Entity subgraph.
//!
//! ## Supported sources
//!
//! - `.md` / `.markdown` - `CommonMark` + GFM.
//! - `.txt` - plain text, one section.
//! - `.pdf` - pure-Rust text-layer extraction.
//! - `.json` / `.jsonl` - chat-conversation exports (`ChatGPT` / Claude / generic).
//!
//! Unknown extensions fall back to `SourceKind::Text` so `README`
//! without an extension still ingests cleanly.
//!
//! ## Examples
//!
//! ```text
//! mnem ingest notes.md
//! mnem ingest --chunker recursive --max-tokens 1024 book.pdf
//! mnem ingest --recursive docs/
//! ```

use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

use anyhow::{Context, Result, bail};
use clap::Args as ClapArgs;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use mnem_ingest::{
    ChunkerAuto, ChunkerKind, IngestConfig, IngestResult, Ingester, Section, SourceKind,
    auto_chunker, chunk as chunk_sections,
};
use tracing::{info, info_span};

use crate::config;
use crate::repo;

/// `mnem ingest` arguments.
#[derive(ClapArgs, Debug)]
#[command(after_long_help = "\
Examples:
 mnem ingest notes.md
 mnem ingest --chunker recursive --max-tokens 1024 book.pdf
 mnem ingest --recursive docs/
")]
pub(crate) struct Args {
    /// Path to a file, or a directory when `--recursive` is set.
    pub path: PathBuf,

    /// Root Doc node label (e.g. `Doc`, `Note`, `Transcript`). Default `Doc`.
    #[arg(long, default_value = "Doc")]
    pub ntype: String,

    /// Chunker strategy. `auto` picks per source kind (default).
    /// Explicit choices: `session` | `paragraph` | `recursive`.
    #[arg(long, default_value = "auto")]
    pub chunker: String,

    /// Target tokens per chunk (recursive chunker). Clamped at 8192.
    #[arg(long, default_value_t = 512)]
    pub max_tokens: u32,

    /// Overlap tokens between adjacent chunks (recursive chunker).
    #[arg(long, default_value_t = 32)]
    pub overlap: u32,

    /// Walk directory trees. When set, `path` must be a directory and
    /// each supported file under it is ingested as one Doc.
    #[arg(long)]
    pub recursive: bool,

    /// Commit message. Overridable per-run; the default embeds the
    /// ingested file count so the op-log stays self-describing.
    #[arg(long, short = 'm')]
    pub message: Option<String>,

    /// Entity / relation extractor. `none` (default) keeps the built-in
    /// rule-based [`RuleExtractor`]. `keybert` swaps in the statistical
    /// KeyBERT adapter - requires the `keybert` feature on the mnem-cli
    /// build; an error is emitted if the binary was compiled without it.
    #[arg(long, default_value = "none")]
    pub extractor: String,

    /// Skip the auto-index phase that runs after a successful ingest.
    /// `mnem ingest` previously committed
    /// chunks without vectors, forcing a manual `mnem reindex` before
    /// `mnem retrieve` would surface anything semantic. With this
    /// flag absent, the CLI now invokes the reindex driver
    /// automatically when an `[embed]` provider is configured, so
    /// ingest → retrieve works in one shot. Pass `--no-embed` to
    /// preserve the legacy two-step flow (e.g. for bulk imports
    /// where you'll batch reindex later).
    #[arg(long)]
    pub no_embed: bool,
}

/// Upper bound on `--max-tokens`. Mirrors the MCP + HTTP clamps so a
/// caller that migrates between surfaces sees the same ceiling.
const MAX_TOKENS_CAP: u32 = 8192;

/// Extensions we recurse into when `--recursive` is set. Anything else
/// is skipped silently.
const SUPPORTED_EXTS: &[&str] = &["md", "markdown", "txt", "pdf", "json", "jsonl"];

/// Strategy selector derived from `--chunker`. `Auto` defers the
/// decision to per-source heuristics at ingest time; every other
/// variant pins a fixed chunker across all files in the run.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ChunkerChoice {
    Auto,
    Paragraph,
    Recursive,
    Session,
}

/// Run `mnem ingest`.
///
/// # Errors
///
/// Returns an error if the repo cannot be opened, the source path
/// does not exist, a file cannot be read, or the pipeline rejects
/// the payload (e.g. malformed UTF-8 on a Markdown file).
pub(crate) fn run(override_path: Option<&Path>, a: Args) -> Result<()> {
    let _span = info_span!("mnem-ingest").entered();

    if a.max_tokens > MAX_TOKENS_CAP {
        bail!(
            "--max-tokens {} exceeds the {MAX_TOKENS_CAP} cap; lower it \
 or raise the ceiling in code",
            a.max_tokens
        );
    }

    let data_dir = repo::locate_data_dir(override_path)?;
    let cfg = config::load(&data_dir)?;
    let r = repo::open_repo(Some(data_dir.as_path()))?;

    let files: Vec<PathBuf> = if a.recursive {
        collect_files(&a.path)?
    } else {
        if !a.path.is_file() {
            bail!(
                "{} is not a file; pass --recursive to walk a directory",
                a.path.display()
            );
        }
        vec![a.path.clone()]
    };

    if files.is_empty() {
        bail!("no ingestable files found under {}", a.path.display());
    }

    let choice = parse_chunker(&a.chunker)?;

    // Extractor selector. Default (`none`) keeps the built-in
    // `RuleExtractor`. `keybert` (C3 FIX-3) wires the statistical
    // KeyBERT adapter; it needs an embedder opened from the
    // operator's `[embed]` config, so we resolve one upfront and
    // share it across every file in this ingest run.
    let keybert_embedder: Option<std::sync::Arc<dyn mnem_embed_providers::Embedder>> =
        match a.extractor.as_str() {
            "none" | "" => None,
            "keybert" => {
                // resolve via `config::resolve_embedder`
                // so MNEM_EMBED_* env vars and the user-global
                // `~/.mnem/config.toml` both work as fallbacks. Per-repo
                // `.mnem/config.toml` still wins when set; precedence
                // matches `mnem retrieve` so behaviour is symmetric
                // across the two embedder consumers.
                let pc = config::resolve_embedder(&cfg).ok_or_else(|| {
                    anyhow::anyhow!(
                        "--extractor keybert requires an `[embed]` provider; checked \
 MNEM_EMBED_PROVIDER env, per-repo `.mnem/config.toml`, and \
 the user-global `~/.mnem/config.toml`. Configure one with \
 `mnem config set embed.provider ollama` (per-repo) or write \
 a `[embed]` section to `~/.mnem/config.toml` (global)."
                    )
                })?;
                let boxed = mnem_embed_providers::open(&pc)
                    .map_err(|e| anyhow::anyhow!("opening embed provider for keybert: {e}"))?;
                // Box<dyn Embedder> -> Arc<dyn Embedder> via the
                // conversion trait Rust ships for unsized pointees.
                Some(std::sync::Arc::from(boxed))
            }
            other => bail!("unknown --extractor {other}; expected one of: none, keybert"),
        };

    // Pre-walk: read every file once, parse to sections, run the
    // chunker to count the chunks each file will produce. Two reasons:
    // (1) `Ingester::ingest` re-parses internally so re-reading later
    // costs only a sub-millisecond per file vs the embedder cost we
    // are trying to surface, and (2) summing the per-file chunk counts
    // gives the progress bar a chunk-level total instead of a 3-of-3
    // file-level total - single huge files (Bible books, long PDFs)
    // would otherwise show "0/N" for the entire keybert pass.
    //
    // Memory: holds every file's bytes in RAM during phase 1 so the
    // ingest phase can reuse them without a re-read. Within typical
    // agent-memory corpora this is megabytes, not gigabytes; truly
    // large book ingests should pre-split files.
    struct PreReadFile {
        path: PathBuf,
        bytes: Vec<u8>,
        kind: SourceKind,
        chunker: ChunkerKind,
        chunk_count: u64,
    }
    let mut pre: Vec<PreReadFile> = Vec::with_capacity(files.len());
    for file in &files {
        let kind = Ingester::source_kind_for_path(file);
        let bytes = std::fs::read(file).with_context(|| format!("reading {}", file.display()))?;
        let chunker = resolve_chunker(choice, kind, a.max_tokens, a.overlap);
        // count_chunks_for is best-effort; on parse failure we fall
        // back to a per-file bar (chunk_count = 0 marks "unknown").
        let chunk_count = count_chunks_for(&bytes, kind, &chunker).unwrap_or(0);
        pre.push(PreReadFile {
            path: file.clone(),
            bytes,
            kind,
            chunker,
            chunk_count,
        });
    }
    let total_chunks: u64 = pre.iter().map(|f| f.chunk_count).sum();
    // If pre-count succeeded for every file, drive the bar in chunks;
    // otherwise fall back to per-file ticks so the bar still moves.
    let use_chunk_progress = total_chunks > 0 && pre.iter().all(|f| f.chunk_count > 0);

    let started = Instant::now();
    let mut totals = Totals::default();
    let mut tx = r.start_transaction();

    // Single in-place progress bar. Total is chunk-level when the
    // pre-walk produced counts (so a 4000-chunk Bible run shows
    // "1234/4000" mid-Genesis instead of "0/3"); per-file otherwise.
    // `enable_steady_tick` redraws the bar every 120ms even when the
    // position has not moved, so long single-file embedding passes
    // keep ticking elapsed/spinner instead of looking frozen.
    let pb_total = if use_chunk_progress {
        total_chunks
    } else {
        files.len() as u64
    };
    let pb = ProgressBar::new(pb_total);
    pb.set_style(
        ProgressStyle::with_template(
            " [{elapsed_precise}] {bar:32.cyan/blue} {pos}/{len} ({percent}%) ETA {eta} {msg}",
        )
        .unwrap()
        .progress_chars("=>-"),
    );
    // Route bar to stdout so a stderr-side warning from the embedder
    // (e.g. `mnem-embed: input filled max_length=...`) cannot break
    // the bar's in-place ANSI redraw and leave a duplicate frozen
    // line above the live one. Terminal still receives both streams;
    // they just stop competing for the same cursor anchor.
    pb.set_draw_target(ProgressDrawTarget::stdout());
    pb.enable_steady_tick(Duration::from_millis(120));

    // Per-chunk progress callback (). Fires from
    // inside `Ingester::ingest` after every chunk is written, so the
    // bar moves smoothly even mid-Genesis.md instead of waiting for
    // the whole file to commit.
    let progress_cb: Option<std::sync::Arc<dyn Fn() + Send + Sync>> = if use_chunk_progress {
        let pb_cb = pb.clone();
        Some(std::sync::Arc::new(move || {
            pb_cb.inc(1);
        }))
    } else {
        None
    };

    for f in &pre {
        let config = IngestConfig {
            chunker: f.chunker.clone(),
            ntype: a.ntype.clone(),
            max_tokens: a.max_tokens,
            overlap: a.overlap,
            ner: config::resolve_ner(&cfg),
        };
        let mut ing = Ingester::new(config);
        if let Some(emb) = &keybert_embedder {
            ing = ing.with_extractor(Box::new(mnem_ingest::KeyBertAdapter::new(
                emb.clone(),
                "Keyword",
            )));
        }
        if let Some(cb) = &progress_cb {
            ing = ing.with_progress(cb.clone());
        }

        let display_name = f
            .path
            .file_name()
            .and_then(|n| n.to_str())
            .unwrap_or("<unnamed>")
            .to_string();
        pb.set_message(display_name);
        info!(path = %f.path.display(), kind = ?f.kind, "ingesting");
        let result = ing
            .ingest(&mut tx, &f.bytes, f.kind)
            .with_context(|| format!("ingest failed on {}", f.path.display()))?;
        totals.add(&result);
        // When chunk-level progress is on, the callback already inc'd
        // the bar once per chunk inside the ingest loop; nothing more
        // to do here. Per-file fallback still ticks one unit per file.
        if !use_chunk_progress {
            pb.inc(1);
        }
    }
    pb.finish_and_clear();

    let file_count = files.len();
    let default_msg = format!("mnem ingest: {file_count} file(s)");
    let msg = a.message.as_deref().unwrap_or(&default_msg);
    let new_r = tx.commit(&config::author_string(&cfg), msg)?;

    let elapsed_ms = started.elapsed().as_millis();
    println!(
        "ingested {file_count} files, {} chunks, {} nodes, {} edges in {}ms",
        totals.chunk_count, totals.node_count, totals.relation_count, elapsed_ms
    );
    println!(" op_id {}", new_r.op_id());
    if let Some(head) = new_r.view().heads.first() {
        println!(" commit_cid {head}");
    }

    // Drop ALL ingest-side handles so the reindex driver can reopen
    // the redb file under its own RW lock. Both `r` (original open)
    // and `new_r` (post-commit handle) own Arc<Database> clones -
    // dropping just one is not enough; redb refuses a second
    // `Database::open` while any clone lives. Also drop the keybert
    // adapter's embedder Arc so a future ORT-EP-locking provider
    // does not race with reindex's own embedder open. `tx` was
    // consumed by `tx.commit(...)`.
    drop(new_r);
    drop(r);
    drop(keybert_embedder);

    // auto-index newly-committed nodes when an
    // embedder is configured and the operator did not opt out.
    // Symmetric with `mnem add node`'s auto-embed contract from
    // commit c68a6b2. Two visible phases (ingest → reindex)
    // each with their own progress bar + completion message; the
    // existing reindex driver provides both. When no embedder is
    // configured the call is skipped silently - the legacy
    // ingest-then-manual-reindex path still works unchanged.
    if !a.no_embed && config::resolve_embedder(&cfg).is_some() {
        println!();
        super::reindex::run(
            override_path,
            super::reindex::Args {
                force: false,
                label: None,
                since: None,
                dry_run: false,
                message: None,
            },
        )?;
    }
    Ok(())
}

/// Accumulated counters across a multi-file ingest run. `elapsed_ms`
/// on the emitted summary is wall-clock for the whole CLI invocation,
/// not the sum of per-file pipelines.
#[derive(Default)]
struct Totals {
    node_count: u64,
    chunk_count: u64,
    entity_count: u64,
    relation_count: u64,
}

impl Totals {
    fn add(&mut self, r: &IngestResult) {
        self.node_count = self.node_count.saturating_add(r.node_count);
        self.chunk_count = self.chunk_count.saturating_add(r.chunk_count);
        self.entity_count = self.entity_count.saturating_add(r.entity_count);
        self.relation_count = self.relation_count.saturating_add(r.relation_count);
    }
}

fn parse_chunker(s: &str) -> Result<ChunkerChoice> {
    Ok(match s.to_ascii_lowercase().as_str() {
        "auto" => ChunkerChoice::Auto,
        "session" => ChunkerChoice::Session,
        "paragraph" => ChunkerChoice::Paragraph,
        "recursive" => ChunkerChoice::Recursive,
        other => bail!("--chunker must be one of auto|session|paragraph|recursive; got `{other}`"),
    })
}

/// Materialise a [`ChunkerKind`] from the CLI `--chunker` choice plus
/// numeric knobs. `auto` dispatches through `mnem_ingest::auto_chunker`
/// with `max_tokens` / `overlap` forwarded as overrides; everything
/// else constructs a fixed variant.
fn resolve_chunker(
    choice: ChunkerChoice,
    kind: SourceKind,
    max_tokens: u32,
    overlap: u32,
) -> ChunkerKind {
    match choice {
        ChunkerChoice::Auto => auto_chunker(
            kind,
            ChunkerAuto {
                max_tokens: Some(max_tokens),
                overlap: Some(overlap),
                max_messages: None,
            },
        ),
        ChunkerChoice::Paragraph => ChunkerKind::Paragraph,
        ChunkerChoice::Recursive => ChunkerKind::Recursive {
            max_tokens,
            overlap,
        },
        ChunkerChoice::Session => ChunkerKind::Session { max_messages: 10 },
    }
}

/// Best-effort estimate of the chunk count the ingest pipeline will
/// produce for a given source file. Used by the CLI's progress bar to
/// drive a chunk-level total (so a 4000-chunk Bible-by-book run shows
/// real granularity instead of "0/3"). Errors are mapped to the
/// caller, which falls back to a per-file bar.
fn count_chunks_for(bytes: &[u8], kind: SourceKind, chunker: &ChunkerKind) -> Result<u64> {
    let sections: Vec<Section> = match kind {
        SourceKind::Markdown => {
            let s = std::str::from_utf8(bytes).with_context(|| "non-utf8 markdown source")?;
            mnem_ingest::md::parse_markdown(s).map_err(|e| anyhow::anyhow!(e.to_string()))?
        }
        SourceKind::Text => {
            let s = std::str::from_utf8(bytes).with_context(|| "non-utf8 text source")?;
            mnem_ingest::text::parse_text(s).map_err(|e| anyhow::anyhow!(e.to_string()))?
        }
        SourceKind::Pdf => {
            mnem_ingest::pdf::parse_pdf(bytes).map_err(|e| anyhow::anyhow!(e.to_string()))?
        }
        SourceKind::Conversation => mnem_ingest::conversation::parse_conversation(bytes)
            .map_err(|e| anyhow::anyhow!(e.to_string()))?,
    };
    Ok(u64::try_from(chunk_sections(&sections, chunker).len()).unwrap_or(u64::MAX))
}

/// Recursively collect every file under `root` whose extension matches
/// [`SUPPORTED_EXTS`]. Symlinks are followed; hidden files are not
/// skipped (same policy as `mnem add` on a file path - the caller's
/// shell decides).
fn collect_files(root: &Path) -> Result<Vec<PathBuf>> {
    if !root.is_dir() {
        bail!(
            "{} is not a directory; drop --recursive to ingest a single file",
            root.display()
        );
    }
    let mut out = Vec::new();
    let mut stack = vec![root.to_path_buf()];
    while let Some(dir) = stack.pop() {
        let entries =
            std::fs::read_dir(&dir).with_context(|| format!("reading dir {}", dir.display()))?;
        for entry in entries {
            let entry = entry?;
            let path = entry.path();
            if path.is_dir() {
                stack.push(path);
                continue;
            }
            if let Some(ext) = path
                .extension()
                .and_then(|e| e.to_str())
                .map(str::to_ascii_lowercase)
                && SUPPORTED_EXTS.contains(&ext.as_str())
            {
                out.push(path);
            }
        }
    }
    out.sort();
    Ok(out)
}