use super::*;
use indicatif::{ProgressBar, ProgressStyle};
use mnem_core::id::Cid;
use mnem_core::prolly::Cursor;
use std::collections::HashSet;
use std::time::Instant;
#[derive(clap::Args, Debug)]
#[command(after_long_help = "\
Retro-embed nodes that don't have a vector yet. One commit per run.
Examples:
mnem reindex # embed every node missing a vector
mnem reindex --label Person # only nodes of this label
mnem reindex --since <commit> # only nodes added/changed since <commit>
mnem reindex --force # re-embed even already-embedded nodes
mnem reindex --dry-run # report count without changing anything
mnem reindex --lift-legacy-extra # promote v0.3 inline embed bytes to sidecar
mnem reindex --lift-legacy-sparse # promote pre-G17 inline sparse_embed to sidecar
Source text per node:
- `summary` (the `-s` argument to `mnem add node`) when set
- else `label + sorted props` rendered as text (so unsummarised
nodes still receive a vector instead of being silently skipped)
")]
pub(crate) struct Args {
#[arg(long)]
pub force: bool,
#[arg(long)]
pub label: Option<String>,
#[arg(long, value_name = "COMMIT")]
pub since: Option<String>,
#[arg(long)]
pub dry_run: bool,
#[arg(long, short = 'm')]
pub message: Option<String>,
#[arg(long)]
pub lift_legacy_extra: bool,
#[arg(long)]
pub lift_legacy_sparse: bool,
}
fn fallback_text_of(node: &Node) -> String {
let mut parts: Vec<String> = Vec::with_capacity(1 + node.props.len());
parts.push(node.ntype.clone());
let mut keys: Vec<&String> = node.props.keys().collect();
keys.sort();
for k in keys {
if let Some(v) = node.props.get(k) {
parts.push(format!("{k}={}", ipld_to_text(v)));
}
}
parts.join(" ")
}
fn ipld_to_text(v: &Ipld) -> String {
match v {
Ipld::Null => String::new(),
Ipld::Bool(b) => b.to_string(),
Ipld::Integer(i) => i.to_string(),
Ipld::Float(f) => f.to_string(),
Ipld::String(s) => s.clone(),
Ipld::Bytes(b) => format!("[{}b]", b.len()),
Ipld::List(_) | Ipld::Map(_) => format!("{v:?}"),
Ipld::Link(c) => c.to_string(),
}
}
fn reindex_text_of(node: &Node) -> String {
if let Some(s) = &node.summary
&& !s.trim().is_empty()
{
return s.clone();
}
if let Some(text) = embed_text_of(node) {
return text;
}
fallback_text_of(node)
}
fn nodes_at(
bs: &std::sync::Arc<dyn mnem_core::store::Blockstore>,
commit_cid: &Cid,
) -> Result<HashSet<Cid>> {
let bytes = bs
.get(commit_cid)?
.ok_or_else(|| anyhow!("commit CID {commit_cid} missing from store"))?;
let commit: Commit = from_canonical_bytes(&bytes)?;
let mut out: HashSet<Cid> = HashSet::new();
let cursor = Cursor::new(&**bs, &commit.nodes)?;
for entry in cursor {
let (_k, node_cid) = entry?;
out.insert(node_cid);
}
Ok(out)
}
pub(crate) fn run(override_path: Option<&Path>, args: Args) -> Result<()> {
if args.lift_legacy_extra && args.force {
anyhow::bail!(
"--lift-legacy-extra and --force are mutually exclusive: \
--lift-legacy-extra promotes existing inline bytes without re-deriving; \
--force re-derives from text via the embedder. Pick one."
);
}
if args.lift_legacy_sparse && args.force {
anyhow::bail!(
"--lift-legacy-sparse and --force are mutually exclusive: \
--lift-legacy-sparse promotes existing inline bytes without re-encoding; \
--force re-derives from text via the embedder. Pick one."
);
}
let is_lift_only = args.lift_legacy_extra || args.lift_legacy_sparse;
let data_dir = repo::locate_data_dir(override_path)?;
let cfg = config::load(&data_dir)?;
if !is_lift_only {
let Some(_pc) = config::resolve_embedder(&cfg) else {
anyhow::bail!(
"no embedder configured; run `mnem config set embed.provider <openai|ollama>` \
and `mnem config set embed.model <name>` first"
);
};
}
let pc_opt = config::resolve_embedder(&cfg);
let (_dir, r, bs, _ohs) = repo::open_all(Some(data_dir.as_path()))?;
let Some(head) = r.head_commit() else {
println!("no nodes in this repo yet (run `mnem add node --summary ...` first)");
return Ok(());
};
let since_set: Option<HashSet<Cid>> = match &args.since {
None => None,
Some(s) => {
let cid = resolve_commitish(&r, s)?;
Some(nodes_at(&bs, &cid)?)
}
};
if args.lift_legacy_extra {
return run_lift_legacy_extra(&r, &bs, head, since_set, &args, &cfg);
}
if args.lift_legacy_sparse {
return run_lift_legacy_sparse(&r, &bs, head, since_set, &args, &cfg);
}
let pc = pc_opt.expect("embedder config present (checked above)");
let embedder_result = mnem_embed_providers::open(&pc);
let (embedder, model_fq) = match (&embedder_result, args.dry_run) {
(Ok(e), _) => {
let m = e.model().to_string();
(Some(e), m)
}
(Err(_), true) => (None, String::from("<configured-embedder>")),
(Err(e), false) => {
eprintln!("{}", format_embed_failure(e, &pc, "embedding"));
anyhow::bail!("cannot reindex: embedder open failed (see above)");
}
};
let mut candidates: Vec<(Cid, Node)> = Vec::new();
let mut total_nodes: usize = 0;
let mut matched_label: usize = 0;
let mut skipped_already_embedded: usize = 0;
let mut skipped_outside_since: usize = 0;
let cursor = Cursor::new(&*bs, &head.nodes)?;
for entry in cursor {
let (_k, node_cid) = entry?;
let bytes = bs
.get(&node_cid)?
.ok_or_else(|| anyhow!("node CID {node_cid} missing from store"))?;
let node: Node = from_canonical_bytes(&bytes)?;
total_nodes += 1;
if let Some(set) = &since_set
&& set.contains(&node_cid)
{
skipped_outside_since += 1;
continue;
}
if let Some(lbl) = &args.label
&& &node.ntype != lbl
{
continue;
}
matched_label += 1;
let already = if args.force {
false
} else {
r.embedding_for(&node_cid, &model_fq)?.is_some()
};
if already {
skipped_already_embedded += 1;
continue;
}
if mnem_core::anchor::is_system_node(&node) {
continue;
}
candidates.push((node_cid, node));
}
if candidates.is_empty() {
if matched_label == 0 {
if let Some(lbl) = &args.label {
println!(
"no nodes match --label {lbl} ({total_nodes} node(s) scanned; \
drop --label to reindex across all labels)"
);
} else if since_set.is_some() && skipped_outside_since == total_nodes {
println!(
"no nodes added since the supplied commit \
({total_nodes} node(s) scanned)"
);
} else {
println!("repo has no nodes to reindex");
}
} else if skipped_already_embedded == matched_label {
println!(
"every matched node already has a {model_fq} vector \
({skipped_already_embedded} node(s)); use --force to re-embed"
);
} else {
println!(
"nothing to reindex: {matched_label} matched, \
{skipped_already_embedded} already embedded"
);
}
return Ok(());
}
if args.dry_run {
println!("would reindex {} node(s) via {model_fq}", candidates.len());
return Ok(());
}
let embedder = embedder.expect("embedder live for non-dry-run path");
let total = candidates.len();
let started = Instant::now();
eprintln!("reindexing {total} node(s) via {model_fq}");
let pb = ProgressBar::new(total as u64);
pb.set_style(
ProgressStyle::with_template(
" [{elapsed_precise}] {bar:32.cyan/blue} {pos}/{len} ({percent}%) ETA {eta}",
)
.unwrap()
.progress_chars("=>-"),
);
let mut tx = r.start_transaction();
for (node_cid, node) in candidates {
let text = reindex_text_of(&node);
let v = embedder.embed(&text)?;
let emb = mnem_embed_providers::to_embedding(&model_fq, &v);
tx.set_embedding(node_cid, model_fq.clone(), emb)?;
pb.inc(1);
}
pb.finish_and_clear();
let msg = args
.message
.unwrap_or_else(|| format!("mnem reindex: {total} nodes embedded with {model_fq}"));
let new_r = tx.commit(&config::author_string(&cfg), &msg)?;
println!(
"reindexed {total} node(s) in {:.1}s; committed as op {}",
started.elapsed().as_secs_f32(),
new_r.op_id()
);
Ok(())
}
fn run_lift_legacy_extra(
r: &mnem_core::repo::ReadonlyRepo,
bs: &std::sync::Arc<dyn mnem_core::store::Blockstore>,
head: &mnem_core::objects::Commit,
since_set: Option<HashSet<Cid>>,
args: &Args,
cfg: &crate::config::Config,
) -> Result<()> {
use mnem_core::objects::node::Embedding;
let mut total_nodes: usize = 0;
let mut legacy_count: usize = 0;
let mut decode_errors: usize = 0;
let mut to_lift: Vec<(Cid, Embedding)> = Vec::new();
let cursor = Cursor::new(&**bs, &head.nodes)?;
for entry in cursor {
let (_k, node_cid) = entry?;
let bytes = bs
.get(&node_cid)?
.ok_or_else(|| anyhow!("node CID {node_cid} missing from store"))?;
let node: Node = from_canonical_bytes(&bytes)?;
total_nodes += 1;
if let Some(set) = &since_set
&& set.contains(&node_cid)
{
continue;
}
if let Some(lbl) = &args.label
&& &node.ntype != lbl
{
continue;
}
if mnem_core::anchor::is_system_node(&node) {
continue;
}
let Some(ipld_val) = node.extra.get("embed") else {
continue;
};
let emb: Embedding = match decode_embedding_from_ipld(ipld_val) {
Ok(e) => e,
Err(err) => {
eprintln!(
"warning: node {node_cid} has extra[\"embed\"] but decoding failed: \
{err}; skipping"
);
decode_errors += 1;
continue;
}
};
legacy_count += 1;
to_lift.push((node_cid, emb));
}
if args.dry_run {
println!(
"would lift {legacy_count} legacy inline embedding(s) to sidecar \
({total_nodes} node(s) scanned, {decode_errors} decode error(s))"
);
return Ok(());
}
if to_lift.is_empty() {
println!(
"no nodes with extra[\"embed\"] found ({total_nodes} scanned); \
nothing to lift"
);
return Ok(());
}
let total = to_lift.len();
let started = Instant::now();
let mut tx = r.start_transaction();
for (node_cid, emb) in to_lift {
let model = emb.model.clone();
tx.set_embedding(node_cid, model, emb)?;
}
let msg = args.message.clone().unwrap_or_else(|| {
format!("mnem reindex --lift-legacy-extra: {total} embedding(s) promoted to sidecar")
});
let new_r = tx.commit(&crate::config::author_string(cfg), &msg)?;
println!(
"lifted {total} embedding(s) to sidecar in {:.1}s; committed as op {}",
started.elapsed().as_secs_f32(),
new_r.op_id()
);
if decode_errors > 0 {
eprintln!(
"warning: {decode_errors} node(s) had undecodable extra[\"embed\"] and were skipped"
);
}
Ok(())
}
fn decode_embedding_from_ipld(val: &Ipld) -> Result<mnem_core::objects::node::Embedding> {
use mnem_core::codec::{from_canonical_bytes, to_canonical_bytes};
let bytes = to_canonical_bytes(val)
.map_err(|e| anyhow!("CBOR re-encode of extra[\"embed\"] failed: {e}"))?;
let emb: mnem_core::objects::node::Embedding = from_canonical_bytes(&bytes)
.map_err(|e| anyhow!("decode of extra[\"embed\"] as Embedding failed: {e}"))?;
emb.validate()
.map_err(|e| anyhow!("extra[\"embed\"] Embedding invariant violated: {e:?}"))?;
Ok(emb)
}
fn run_lift_legacy_sparse(
r: &mnem_core::repo::ReadonlyRepo,
bs: &std::sync::Arc<dyn mnem_core::store::Blockstore>,
head: &mnem_core::objects::Commit,
since_set: Option<HashSet<Cid>>,
args: &Args,
cfg: &crate::config::Config,
) -> Result<()> {
use mnem_core::sparse::SparseEmbed;
let mut total_nodes: usize = 0;
let mut legacy_count: usize = 0;
let mut decode_errors: usize = 0;
let mut to_lift: Vec<(Cid, SparseEmbed)> = Vec::new();
let cursor = Cursor::new(&**bs, &head.nodes)?;
for entry in cursor {
let (_k, node_cid) = entry?;
let bytes = bs
.get(&node_cid)?
.ok_or_else(|| anyhow!("node CID {node_cid} missing from store"))?;
let node: Node = from_canonical_bytes(&bytes)?;
total_nodes += 1;
if let Some(set) = &since_set
&& set.contains(&node_cid)
{
continue;
}
if let Some(lbl) = &args.label
&& &node.ntype != lbl
{
continue;
}
if mnem_core::anchor::is_system_node(&node) {
continue;
}
let Some(ipld_val) = node.extra.get("sparse_embed") else {
continue;
};
let se: SparseEmbed = match decode_sparse_embed_from_ipld(ipld_val) {
Ok(s) => s,
Err(err) => {
eprintln!(
"warning: node {node_cid} has extra[\"sparse_embed\"] but decoding \
failed: {err}; skipping"
);
decode_errors += 1;
continue;
}
};
if r.sparse_for(&node_cid, &se.vocab_id)?.is_some() {
continue;
}
legacy_count += 1;
to_lift.push((node_cid, se));
}
if args.dry_run {
println!(
"would lift {legacy_count} legacy inline sparse embedding(s) to sidecar \
({total_nodes} node(s) scanned, {decode_errors} decode error(s))"
);
return Ok(());
}
if to_lift.is_empty() {
println!(
"no nodes with extra[\"sparse_embed\"] found ({total_nodes} scanned); \
nothing to lift"
);
return Ok(());
}
let total = to_lift.len();
let started = Instant::now();
let mut tx = r.start_transaction();
for (node_cid, se) in to_lift {
let vocab_id = se.vocab_id.clone();
tx.set_sparse_embedding(node_cid, vocab_id, se)?;
}
let msg = args.message.clone().unwrap_or_else(|| {
format!(
"mnem reindex --lift-legacy-sparse: {total} sparse embedding(s) promoted to sidecar"
)
});
let new_r = tx.commit(&crate::config::author_string(cfg), &msg)?;
println!(
"lifted {total} sparse embedding(s) to sidecar in {:.1}s; committed as op {}",
started.elapsed().as_secs_f32(),
new_r.op_id()
);
if decode_errors > 0 {
eprintln!(
"warning: {decode_errors} node(s) had undecodable extra[\"sparse_embed\"] and \
were skipped"
);
}
Ok(())
}
fn decode_sparse_embed_from_ipld(val: &Ipld) -> Result<mnem_core::sparse::SparseEmbed> {
use mnem_core::codec::{from_canonical_bytes, to_canonical_bytes};
let bytes = to_canonical_bytes(val)
.map_err(|e| anyhow!("CBOR re-encode of extra[\"sparse_embed\"] failed: {e}"))?;
let se: mnem_core::sparse::SparseEmbed = from_canonical_bytes(&bytes)
.map_err(|e| anyhow!("decode of extra[\"sparse_embed\"] as SparseEmbed failed: {e}"))?;
se.validate()
.map_err(|e| anyhow!("extra[\"sparse_embed\"] SparseEmbed invariant violated: {e}"))?;
Ok(se)
}
#[cfg(test)]
mod tests {
use super::*;
use mnem_core::id::NodeId;
#[test]
fn fallback_text_uses_label_and_sorted_props() {
let n = Node::new(NodeId::from_bytes_raw([1u8; 16]), "Person")
.with_prop("name", Ipld::String("Alice".into()))
.with_prop("city", Ipld::String("Berlin".into()));
let s = fallback_text_of(&n);
assert!(s.starts_with("Person "), "got: {s}");
let ci = s.find("city=").expect("city present");
let ni = s.find("name=").expect("name present");
assert!(ci < ni, "props must be sorted: {s}");
}
#[test]
fn reindex_text_prefers_summary() {
let n = Node::new(NodeId::from_bytes_raw([2u8; 16]), "Doc")
.with_summary("Important brief")
.with_prop("title", Ipld::String("X".into()));
assert_eq!(reindex_text_of(&n), "Important brief");
}
#[test]
fn reindex_text_falls_back_when_no_summary_or_content() {
let n = Node::new(NodeId::from_bytes_raw([3u8; 16]), "Person")
.with_prop("name", Ipld::String("Bob".into()));
let s = reindex_text_of(&n);
assert!(s.contains("Person"));
assert!(s.contains("name=Bob"));
}
}