Skip to main content

mnem_core/repo/
transaction.rs

1//! [`Transaction`] - accumulator for pending mutations + commit.
2//!
3//! A transaction captures a snapshot of the current [`ReadonlyRepo`] at
4//! `start_transaction()` time. Mutations ([`Transaction::add_node`],
5//! `add_edge`, `remove_node`, etc.) are buffered. [`Transaction::commit`]
6//! atomically:
7//!
8//! 1. Rebuilds the node / edge / schema Prolly trees from the base
9//! commit's roots + the buffered additions and removals.
10//! 2. Writes a new Commit whose `parents` is the previous head.
11//! 3. Writes a new View whose `heads = [new commit]` and whose `refs`
12//! reflect any ref-update mutations.
13//! 4. Writes a new Operation whose `parents = [old op]`.
14//! 5. Advances the op-heads store: inserts new op, removes old op.
15//! 6. Returns a fresh [`ReadonlyRepo`] pinned to the new op.
16//!
17//! Multi-writer safety: step 5 is atomic per
18//! [`OpHeadsStore::update`][crate::store::OpHeadsStore::update].
19//! If another writer has advanced the heads concurrently, both new ops
20//! remain in the heads set; the next [`ReadonlyRepo::open`] will see
21//! multiple heads and (in M8.5) trigger a 3-way merge.
22
23use std::collections::{BTreeMap, HashSet};
24
25use ipld_core::ipld::Ipld;
26
27use crate::codec::hash_to_cid;
28use crate::error::{Error, RepoError};
29use crate::id::{ChangeId, Cid, EdgeId, NodeId};
30use crate::index;
31use crate::objects::node::Embedding;
32use crate::objects::{
33    Commit, Edge, EmbeddingBucket, IndexSet, Node, Operation, RefTarget, Tombstone, View,
34};
35use crate::prolly::{self, Cursor, ProllyKey};
36use crate::store::Blockstore;
37
38use super::readonly::{ReadonlyRepo, decode_from_store, now_micros};
39
40/// Options controlling the commit path.
41///
42/// The default (via [`Transaction::commit`]) is lock-free: concurrent
43/// writers both succeed; the next reader merges. Setting
44/// [`linearize`](Self::linearize) to `true` enables SPEC §6.5
45/// opportunistic concurrency - if any other writer has advanced
46/// op-heads since this transaction started, the commit fails with
47/// [`RepoError::Stale`] instead of appending a concurrent head.
48#[derive(Clone, Copy, Debug)]
49pub struct CommitOptions<'a> {
50    /// Commit author (UTF-8, stored on the new Commit + Operation).
51    pub author: &'a str,
52    /// Commit message.
53    pub message: &'a str,
54    /// Opt-in SPEC §6.5 linearize mode. Defaults to `false`.
55    pub linearize: bool,
56    /// Override the commit + operation timestamp. Measured in
57    /// microseconds since Unix epoch. `None` (the default) calls
58    /// `SystemTime::now()` at commit time, which is what a human
59    /// workflow wants.
60    ///
61    /// Set this to `Some(...)` when byte-identical CIDs across
62    /// machines matter: two processes that build the same logical
63    /// commit (same author, same message, same graph mutations,
64    /// same time, same `change_id`) will produce the same commit CID
65    /// and the same op-id. This is the escape hatch for
66    /// audit-replay, distributed-agent consensus, and regression
67    /// tests that assert on commit CIDs.
68    pub time_micros: Option<u64>,
69    /// Override the commit's `change_id`. `None` (the default)
70    /// generates a fresh `ChangeId::new_v7()`, which embeds wall-
71    /// clock time and therefore varies per call. Deterministic-
72    /// replay workflows MUST supply this explicitly alongside
73    /// `time_micros`; otherwise the v7 randomness alone defeats the
74    /// byte-identical-CID contract.
75    pub change_id: Option<ChangeId>,
76}
77
78impl<'a> CommitOptions<'a> {
79    /// Construct with all deterministic-override fields set to `None`
80    /// (the caller-convenient default: auto-clock + auto-change-id).
81    #[must_use]
82    pub const fn new(author: &'a str, message: &'a str) -> Self {
83        Self {
84            author,
85            message,
86            linearize: false,
87            time_micros: None,
88            change_id: None,
89        }
90    }
91
92    /// Pin the timestamp for deterministic replay. See
93    /// [`Self::time_micros`] for the wider contract.
94    #[must_use]
95    pub const fn with_time_micros(mut self, t: u64) -> Self {
96        self.time_micros = Some(t);
97        self
98    }
99
100    /// Pin the change-id for deterministic replay. See
101    /// [`Self::change_id`] for the wider contract.
102    #[must_use]
103    pub const fn with_change_id(mut self, id: ChangeId) -> Self {
104        self.change_id = Some(id);
105        self
106    }
107}
108
109/// Buffered mutations against a [`ReadonlyRepo`].
110///
111/// Construct via [`ReadonlyRepo::start_transaction`].
112pub struct Transaction {
113    base: ReadonlyRepo,
114    new_nodes: BTreeMap<NodeId, Cid>,
115    removed_nodes: HashSet<NodeId>,
116    new_edges: BTreeMap<EdgeId, Cid>,
117    removed_edges: HashSet<EdgeId>,
118    ref_updates: BTreeMap<String, Option<RefTarget>>,
119    /// Tombstones staged for insertion into the new View at commit
120    /// time. Keyed by `NodeId`; later writes to the same `NodeId` in
121    /// the same transaction overwrite the earlier ones (consistent
122    /// with [`Self::tombstone_node`]'s idempotent-deterministic rule).
123    new_tombstones: BTreeMap<NodeId, Tombstone>,
124    /// Side-table for `resolve_or_create_node`: maps
125    /// `(label, prop_name, blake3(canonical(value))[..16])` to the
126    /// `NodeId` of a node added in this transaction. Bounded by the
127    /// number of `resolve_or_create_node` or `add_node` calls in this
128    /// tx; prevents the O(pending²) decode loop the naive
129    /// implementation would trigger.
130    pending_by_prop: BTreeMap<(String, String, [u8; 16]), NodeId>,
131    /// Lazy, one-time decode of the base commit's `IndexSet`. Populated
132    /// on the first `resolve_or_create_node` call and re-used by
133    /// every subsequent call in this transaction. `None` means
134    /// "not yet fetched"; `Some(None)` means "no `IndexSet` on the
135    /// base commit" (either uninitialised or pre-0.2 commit).
136    cached_base_indexes: Option<Option<IndexSet>>,
137    /// Pending embedding-sidecar writes, keyed by the content-addressed
138    /// `NodeCid` they reference. Multiple `set_embedding` calls for the
139    /// same node accumulate into one bucket (one entry per `model`
140    /// string). Empty by default; the commit path skips the sidecar
141    /// rebuild entirely when this map is empty AND the base commit
142    /// carried no `embeddings` root.
143    pending_embeddings: BTreeMap<Cid, EmbeddingBucket>,
144}
145
146impl Transaction {
147    pub(crate) fn new(base: ReadonlyRepo) -> Self {
148        Self {
149            base,
150            new_nodes: BTreeMap::new(),
151            removed_nodes: HashSet::new(),
152            new_edges: BTreeMap::new(),
153            removed_edges: HashSet::new(),
154            ref_updates: BTreeMap::new(),
155            new_tombstones: BTreeMap::new(),
156            pending_by_prop: BTreeMap::new(),
157            cached_base_indexes: None,
158            pending_embeddings: BTreeMap::new(),
159        }
160    }
161
162    /// The base repo this transaction is derived from.
163    #[must_use]
164    pub const fn base(&self) -> &ReadonlyRepo {
165        &self.base
166    }
167
168    // ---------------- Mutations ----------------
169
170    /// Add (or overwrite) a node. Cancels any pending `remove_node` for
171    /// the same id. Returns the node's content-addressed CID.
172    ///
173    /// # Errors
174    ///
175    /// Codec or blockstore errors while writing the node.
176    pub fn add_node(&mut self, node: &Node) -> Result<Cid, Error> {
177        let (bytes, cid) = hash_to_cid(node)?;
178        // safety: cid computed above via hash_to_cid
179        self.base.blockstore.put_trusted(cid.clone(), bytes)?;
180        self.removed_nodes.remove(&node.id);
181        self.new_nodes.insert(node.id, cid.clone());
182        // Populate the pending-by-prop cache so future
183        // `resolve_or_create_node` calls in this tx find the node in
184        // O(1) instead of decoding every pending node.
185        for (prop_name, prop_value) in &node.props {
186            if let Ok(hash) = index::prop_value_hash(prop_value) {
187                self.pending_by_prop
188                    .insert((node.ntype.clone(), prop_name.clone(), hash), node.id);
189            }
190        }
191        Ok(cid)
192    }
193
194    /// Stage an embedding for a previously-added node into the
195    /// embedding-sidecar Prolly tree referenced by
196    /// `Commit.embeddings`.
197    ///
198    /// Symmetric with [`Self::add_node`]: pass the `node_cid` returned
199    /// from `add_node` (or any pre-existing NodeCid you want to attach
200    /// a new vector to). Multiple `set_embedding` calls for the same
201    /// `node_cid` accumulate into one [`EmbeddingBucket`]; calling
202    /// twice with the same `model` upserts (the second value wins).
203    ///
204    /// The actual sidecar tree is built and committed by
205    /// [`Self::commit`] / [`Self::commit_opts`]; staging does not
206    /// touch the blockstore.
207    ///
208    /// # Why this lives outside the Node bytes
209    ///
210    /// Dense embedding vectors drift in their last bit across ORT
211    /// thread counts (`f32` reduction reordering is non-associative).
212    /// Storing them inline on `Node` would couple `NodeCid` to
213    /// thread count and break federated dedup. The sidecar separates
214    /// identity (Node) from derived bytes (Embedding); two machines
215    /// re-deriving the same source text on different cores share the
216    /// Node CID even when their vectors differ.
217    ///
218    /// # Errors
219    ///
220    /// Currently infallible (the staged map cannot fail to insert);
221    /// the `Result` shape is reserved for future validation hooks
222    /// (e.g. dim/dtype checks against a per-repo config).
223    pub fn set_embedding(
224        &mut self,
225        node_cid: Cid,
226        model: String,
227        embedding: Embedding,
228    ) -> Result<(), Error> {
229        let bucket = self.pending_embeddings.entry(node_cid).or_default();
230        bucket.upsert(model, embedding);
231        Ok(())
232    }
233
234    /// Remove a node. Cancels any pending `add_node` for the same id.
235    ///
236    /// If the node was added AND had an embedding staged via
237    /// `set_embedding` in this same transaction, the staged embedding
238    /// is dropped to prevent an orphan sidecar entry. Embeddings that
239    /// already live in the base commit's sidecar tree are NOT scrubbed
240    /// here; they remain reachable through the inherited tree (a
241    /// follow-up audit will add explicit sidecar tombstones).
242    pub fn remove_node(&mut self, id: NodeId) {
243        if let Some(cid) = self.new_nodes.remove(&id) {
244            self.pending_embeddings.remove(&cid);
245        }
246        self.removed_nodes.insert(id);
247        // Drop any pending-by-prop entries pointing at this id.
248        self.pending_by_prop.retain(|_, v| *v != id);
249    }
250
251    /// Add (or overwrite) an edge. Returns the edge's content-addressed CID.
252    ///
253    /// # Errors
254    ///
255    /// Codec or blockstore errors while writing the edge.
256    pub fn add_edge(&mut self, edge: &Edge) -> Result<Cid, Error> {
257        let (bytes, cid) = hash_to_cid(edge)?;
258        // safety: cid computed above via hash_to_cid
259        self.base.blockstore.put_trusted(cid.clone(), bytes)?;
260        self.removed_edges.remove(&edge.id);
261        self.new_edges.insert(edge.id, cid.clone());
262        Ok(cid)
263    }
264
265    /// Remove an edge.
266    pub fn remove_edge(&mut self, id: EdgeId) {
267        self.new_edges.remove(&id);
268        self.removed_edges.insert(id);
269    }
270
271    /// Logically "forget" a node without breaking the append-only,
272    /// content-addressed invariant of the graph.
273    ///
274    /// The node block remains in the node Prolly tree; its CID does
275    /// not change. What changes is the [`View`]: at commit time, a
276    /// [`Tombstone`] record keyed by `node_id` is inserted into
277    /// [`View::tombstones`]. Retrieval paths filter out tombstoned
278    /// nodes by default - see
279    /// [`crate::retrieve::Retriever::include_tombstoned`] for the
280    /// opt-out used by audit / debug callers.
281    ///
282    /// The tombstone's `tombstoned_at` timestamp is set at commit
283    /// time (via the commit's resolved `now`), not at the call site,
284    /// so two transactions built in parallel don't disagree on when a
285    /// node was revoked just because of clock skew between author
286    /// processes.
287    ///
288    /// Idempotence: calling `tombstone_node` twice for the same
289    /// `node_id` in the same transaction is a no-op at the semantic
290    /// level. The second call overwrites the first's reason; no
291    /// additional state change is observable to a retrieve or to a
292    /// subsequent `is_tombstoned` query. Across transactions, the
293    /// rule is the same: each new tombstone commit fully replaces the
294    /// prior record for that node.
295    ///
296    /// The original Node is NOT removed and edges referencing it are
297    /// NOT touched. For physical removal, use
298    /// [`Self::remove_node`] instead.
299    ///
300    /// # Errors
301    ///
302    /// Currently infallible; the `Result` return type reserves space
303    /// for future validation (e.g. rejecting tombstones on a
304    /// non-existent `node_id`).
305    #[tracing::instrument(
306 level = "debug",
307 target = "mnem::repo::transaction",
308 skip(self, reason),
309 fields(node_id = %node_id)
310 )]
311    pub fn tombstone_node(
312        &mut self,
313        node_id: NodeId,
314        reason: impl Into<String>,
315    ) -> Result<(), Error> {
316        // Stamp with a placeholder `0` timestamp; the real
317        // `tombstoned_at` is filled in at commit time from the
318        // commit's resolved `now`. This keeps multiple
319        // `tombstone_node` calls in one transaction all sharing the
320        // same timestamp, which is the semantic agents expect
321        // ("everything in this commit got revoked together").
322        let ts = Tombstone::new(reason, 0);
323        self.new_tombstones.insert(node_id, ts);
324        Ok(())
325    }
326
327    /// Set a named ref in the new View. `None` removes the ref.
328    pub fn update_ref(&mut self, name: impl Into<String>, target: Option<RefTarget>) {
329        self.ref_updates.insert(name.into(), target);
330    }
331
332    /// Ergonomic one-call node write for agent workflows.
333    ///
334    /// Generates a fresh [`NodeId::new_v7`], builds the node with the
335    /// caller's `ntype`, `summary`, and properties, auto-stamps two
336    /// reserved temporal props (`mnem:created_at`, `mnem:updated_at`)
337    /// with the current microseconds-since-Unix-epoch, and writes the
338    /// node via [`Self::add_node`]. Returns the freshly generated
339    /// `NodeId`.
340    ///
341    /// The reserved prop keys are the substrate's temporal-range filter
342    /// contract (see
343    /// [`crate::retrieve::Retriever::where_created_after`] et al.) and
344    /// avoid a breaking Node-CID change that a dedicated header field
345    /// would have triggered. Callers who need deterministic-replay CIDs
346    /// can override either key by passing it explicitly in `props`; the
347    /// auto-stamp only fills absent keys.
348    ///
349    /// # Errors
350    ///
351    /// Propagates codec/blockstore errors from [`Self::add_node`].
352    pub fn commit_memory<I>(
353        &mut self,
354        ntype: impl Into<String>,
355        summary: impl Into<String>,
356        props: I,
357    ) -> Result<NodeId, Error>
358    where
359        I: IntoIterator<Item = (String, Ipld)>,
360    {
361        let id = NodeId::new_v7();
362        let mut node = Node::new(id, ntype).with_summary(summary);
363        for (k, v) in props {
364            node.props.insert(k, v);
365        }
366        let now = now_micros();
367        node.props
368            .entry("mnem:created_at".to_string())
369            .or_insert_with(|| Ipld::Integer(i128::from(now)));
370        node.props
371            .entry("mnem:updated_at".to_string())
372            .or_insert_with(|| Ipld::Integer(i128::from(now)));
373        self.add_node(&node)?;
374        Ok(id)
375    }
376
377    /// Find-or-create a node by a primary-key property.
378    ///
379    /// Looks for an existing node with `(ntype == label, props[prop_name] == value)`
380    /// in the following order:
381    ///
382    /// 1. Nodes added in this transaction (O(1) via a cache that
383    /// `add_node` maintains).
384    /// 2. The base commit's property index, if one exists (O(log n)).
385    ///
386    /// If a match is found, its `NodeId` is returned. Otherwise a new
387    /// node is added (with `prop_name -> value` set) and its fresh
388    /// `NodeId` is returned. This is the go-to helper for agents
389    /// writing facts from LLM output where the same entity may be
390    /// mentioned multiple times across tool calls.
391    ///
392    /// Within a single `resolve_or_create_node` call the cost is
393    /// bounded by one cache lookup + one index point lookup; total
394    /// cost of N calls in a transaction is O(N log n), not O(N²).
395    ///
396    /// # Errors
397    ///
398    /// Propagates codec/store errors from the property-index lookup or
399    /// node write.
400    pub fn resolve_or_create_node(
401        &mut self,
402        label: &str,
403        prop_name: &str,
404        value: impl Into<Ipld>,
405    ) -> Result<NodeId, Error> {
406        let value = value.into();
407        let hash = index::prop_value_hash(&value)?;
408
409        // 1. Pending-adds cache: O(1) BTreeMap lookup.
410        if let Some(id) =
411            self.pending_by_prop
412                .get(&(label.to_string(), prop_name.to_string(), hash))
413        {
414            return Ok(*id);
415        }
416
417        // 2. Base commit's property index: O(log n) point lookup.
418        // The IndexSet is fetched once per transaction and cached;
419        // a hot resolve loop pays exactly one decode_from_store of
420        // the IndexSet, not N.
421        if self.cached_base_indexes.is_none() {
422            let fetched = if let Some(commit) = self.base.commit.as_deref() {
423                if let Some(idx_cid) = &commit.indexes {
424                    Some(decode_from_store::<IndexSet, _>(
425                        &*self.base.blockstore,
426                        idx_cid,
427                    )?)
428                } else {
429                    None
430                }
431            } else {
432                None
433            };
434            self.cached_base_indexes = Some(fetched);
435        }
436        if let Some(Some(indexes)) = self.cached_base_indexes.as_ref()
437            && let Some((_cid, node)) =
438                index::lookup_by_prop(&*self.base.blockstore, indexes, label, prop_name, &value)?
439            && !self.removed_nodes.contains(&node.id)
440        {
441            return Ok(node.id);
442        }
443
444        // 3. Create.
445        let new_node = Node::new(NodeId::new_v7(), label).with_prop(prop_name, value);
446        self.add_node(&new_node)?;
447        Ok(new_node.id)
448    }
449
450    // ---------------- Commit ----------------
451
452    /// Convenience: commit in the default lock-free mode.
453    ///
454    /// Delegates to [`commit_opts`](Self::commit_opts) with
455    /// `linearize: false`. See there for semantics.
456    ///
457    /// # Errors
458    ///
459    /// Codec, store, and tree-rebuild errors.
460    pub fn commit(self, author: &str, message: &str) -> Result<ReadonlyRepo, Error> {
461        self.commit_opts(CommitOptions::new(author, message))
462    }
463
464    /// Finalize the transaction with explicit options.
465    ///
466    /// Lock-free default ([`CommitOptions::linearize`] = `false`):
467    /// rebuild trees, write Commit / View / Operation, advance the
468    /// op-head regardless of concurrent writers, return a fresh
469    /// [`ReadonlyRepo`] pinned to the new op.
470    ///
471    /// Linearize mode (`linearize = true`, SPEC §6.5): re-read
472    /// op-heads just before advancing. If the current set is not
473    /// exactly `[base.op_id]`, return [`RepoError::Stale`] without
474    /// advancing. Tree / commit / view / op bytes already written to
475    /// the blockstore remain (they are content-addressed and
476    /// collision-free; a retry will re-reference them).
477    ///
478    /// # Errors
479    ///
480    /// - [`RepoError::Stale`] in linearize mode when op-heads drift.
481    /// - Codec, store, and tree-rebuild errors.
482    ///
483    /// # Instrumentation
484    ///
485    /// Emits one `info`-level span `mnem::repo::transaction::commit` per
486    /// call with bounded-cardinality fields: `added_nodes`,
487    /// `removed_nodes`, `added_edges`, `removed_edges`, `tombstones`,
488    /// `ref_updates`, `linearize`. No node payloads or CIDs are
489    /// recorded - a commit of 100k nodes still produces one span of
490    /// constant size. Agents wanting per-node detail should enable the
491    /// `debug`-level `tombstone_node` span or add their own.
492    #[tracing::instrument(
493 name = "commit",
494 level = "info",
495 target = "mnem::repo::transaction",
496 skip(self, opts),
497 fields(
498 added_nodes = self.new_nodes.len(),
499 removed_nodes = self.removed_nodes.len(),
500 added_edges = self.new_edges.len(),
501 removed_edges = self.removed_edges.len(),
502 tombstones = self.new_tombstones.len(),
503 ref_updates = self.ref_updates.len(),
504 linearize = opts.linearize,
505 )
506 )]
507    pub fn commit_opts(self, opts: CommitOptions<'_>) -> Result<ReadonlyRepo, Error> {
508        let Self {
509            base,
510            new_nodes,
511            removed_nodes,
512            new_edges,
513            removed_edges,
514            ref_updates,
515            new_tombstones,
516            pending_by_prop: _,
517            cached_base_indexes: _,
518            pending_embeddings,
519        } = self;
520
521        let bs = base.blockstore.clone();
522        let ohs = base.op_heads.clone();
523
524        // Base roots (trees from the previous commit, or empty trees if
525        // this is the first commit on a fresh repo).
526        let (base_nodes, base_edges, base_schema) = if let Some(commit) = base.commit.as_deref() {
527            (
528                commit.nodes.clone(),
529                commit.edges.clone(),
530                commit.schema.clone(),
531            )
532        } else {
533            let empty_root = prolly::build_tree(&*bs, std::iter::empty())?;
534            (empty_root.clone(), empty_root.clone(), empty_root)
535        };
536
537        // Decide gating for the incremental-index fast path BEFORE we
538        // consume `new_nodes` / `removed_*` / `new_edges` into their
539        // ProllyKey forms.
540        let is_append_only_at_graph_level = removed_nodes.is_empty()
541            && removed_edges.is_empty()
542            && new_edges.is_empty()
543            && !new_nodes.is_empty();
544        let base_indexes_cid: Option<Cid> = base.commit.as_deref().and_then(|c| c.indexes.clone());
545
546        // Keep a NodeId-keyed sorted copy of the added nodes so the
547        // incremental index path can re-decode them. The ProllyKey
548        // map used for the node-tree rebuild is derived from this
549        // without consuming it.
550        let new_nodes_btree: BTreeMap<NodeId, Cid> = new_nodes.into_iter().collect();
551        let node_additions: BTreeMap<ProllyKey, Cid> = new_nodes_btree
552            .iter()
553            .map(|(id, cid)| (ProllyKey::from(*id), cid.clone()))
554            .collect();
555        let node_removals: HashSet<ProllyKey> =
556            removed_nodes.into_iter().map(ProllyKey::from).collect();
557        let new_nodes_root = rebuild_tree(&*bs, &base_nodes, &node_additions, &node_removals)?;
558
559        // Rebuild edge tree with mutations.
560        let edge_additions: BTreeMap<ProllyKey, Cid> = new_edges
561            .into_iter()
562            .map(|(id, cid)| (ProllyKey::from(id), cid))
563            .collect();
564        let edge_removals: HashSet<ProllyKey> =
565            removed_edges.into_iter().map(ProllyKey::from).collect();
566        let new_edges_root = rebuild_tree(&*bs, &base_edges, &edge_additions, &edge_removals)?;
567
568        // Schema tree unchanged in M8 MVP (no schema mutations yet).
569        let new_schema_root = base_schema;
570
571        // Build secondary indexes. Fast path: incremental append when
572        // the transaction is a pure node-level append AND we have a
573        // previous IndexSet to extend AND no added NodeId collides with
574        // an existing one in the base node tree. Slow path: full
575        // rebuild (same as before; correctness baseline).
576        //
577        // The fast path is byte-equivalent to the slow path in the
578        // conditions above; the `incremental_append_indexes` contract
579        // in `mnem-core::index` pins this. Tests in this module pin
580        // the equivalence on round-trip.
581        let new_indexes_cid = match (is_append_only_at_graph_level, base_indexes_cid.as_ref()) {
582            (true, Some(base_idx)) => {
583                // O(|new_nodes| * log N) point-lookup check for collisions
584                // against the base node tree. On any lookup error, fall
585                // back to full rebuild (safety over speed).
586                let has_collision = new_nodes_btree.keys().any(|node_id| {
587                    let key = ProllyKey::from(*node_id);
588                    matches!(
589                        crate::prolly::lookup(&*bs, &base_nodes, &key),
590                        Ok(Some(_)) | Err(_)
591                    )
592                });
593                if has_collision {
594                    index::build_index_set(&*bs, &new_nodes_root, &new_edges_root)?
595                } else {
596                    index::incremental_append_indexes(&*bs, base_idx, &new_nodes_btree)?
597                }
598            }
599            _ => index::build_index_set(&*bs, &new_nodes_root, &new_edges_root)?,
600        };
601
602        // Embedding sidecar (). Skip the rebuild entirely when no
603        // pending writes AND no base sidecar - most commits in a
604        // legacy repo will hit this fast path. Otherwise: encode each
605        // pending bucket, stage its CID under the 16-byte truncated
606        // blake3 of the NodeCid wire form (matches the lookup keying
607        // in `ReadonlyRepo::embedding_for`), and feed the additions
608        // through the same `rebuild_tree` helper the node + edge
609        // trees use.
610        let base_embeddings_cid: Option<Cid> =
611            base.commit.as_deref().and_then(|c| c.embeddings.clone());
612        let new_embeddings_cid: Option<Cid> =
613            if pending_embeddings.is_empty() && base_embeddings_cid.is_none() {
614                None
615            } else {
616                let base_root = match &base_embeddings_cid {
617                    Some(c) => c.clone(),
618                    None => prolly::build_tree(&*bs, std::iter::empty())?,
619                };
620                let mut additions: BTreeMap<ProllyKey, Cid> = BTreeMap::new();
621                for (node_cid, bucket) in pending_embeddings {
622                    let (bucket_bytes, bucket_cid) = hash_to_cid(&bucket)?;
623                    bs.put_trusted(bucket_cid.clone(), bucket_bytes)?;
624                    let key = embedding_key_for_node_cid(&node_cid);
625                    additions.insert(key, bucket_cid);
626                }
627                Some(rebuild_tree(&*bs, &base_root, &additions, &HashSet::new())?)
628            };
629
630        // Build the new Commit.
631        //
632        // `time_micros` and `change_id` are deterministic-replay escape
633        // hatches: callers who want byte-identical CIDs across
634        // machines supply both. `None` falls back to wall clock +
635        // fresh v7 (the current human-workflow default).
636        let now = opts.time_micros.unwrap_or_else(now_micros);
637        let change_id = opts.change_id.unwrap_or_else(ChangeId::new_v7);
638        let mut commit = Commit::new(
639            change_id,
640            new_nodes_root,
641            new_edges_root,
642            new_schema_root,
643            opts.author,
644            now,
645            opts.message,
646        );
647        commit.indexes = Some(new_indexes_cid);
648        commit.embeddings = new_embeddings_cid;
649        if let Some(prev_head) = base.view.heads.first() {
650            commit = commit.with_parent(prev_head.clone());
651        }
652        let (commit_bytes, commit_cid) = hash_to_cid(&commit)?;
653        // safety: commit_cid computed above via hash_to_cid
654        bs.put_trusted(commit_cid.clone(), commit_bytes)?;
655
656        // Build the new View.
657        let mut new_view: View = (*base.view).clone();
658        let is_first_commit = base.view.heads.is_empty() && new_view.refs.is_empty();
659        new_view.heads = vec![commit_cid.clone()];
660        for (name, target) in ref_updates {
661            match target {
662                Some(t) => {
663                    new_view.refs.insert(name, t);
664                }
665                None => {
666                    new_view.refs.remove(&name);
667                }
668            }
669        }
670        // C4-1 (audit-2026-04-25): Mirror Git - on the first commit
671        // of a fresh repo, auto-create `refs/heads/main` pointing at
672        // the new commit unless the caller already supplied a ref
673        // update (explicit beats implicit). This means `mnem init` +
674        // first ingest leaves the repo with a usable default branch
675        // so docs examples like `mnem branch create test main` work
676        // out of the box without requiring `mnem ref set` plumbing.
677        if is_first_commit && !new_view.refs.contains_key("refs/heads/main") {
678            new_view
679                .refs
680                .insert("refs/heads/main".to_string(), RefTarget::normal(commit_cid));
681        }
682        // Stamp every staged tombstone with the commit's resolved `now`
683        // so all tombstones in one commit share a timestamp (agents
684        // expect "revoked together" to mean "same timestamp"), and
685        // merge into the View. Later entries overwrite earlier ones,
686        // matching the idempotent-deterministic rule documented on
687        // `tombstone_node`.
688        for (node_id, mut ts) in new_tombstones {
689            ts.tombstoned_at = now;
690            new_view.tombstones.insert(node_id, ts);
691        }
692        let (view_bytes, view_cid) = hash_to_cid(&new_view)?;
693        // safety: view_cid computed above via hash_to_cid
694        bs.put_trusted(view_cid.clone(), view_bytes)?;
695
696        // Build the new Operation.
697        let op = Operation::new(
698            view_cid,
699            opts.author,
700            now,
701            format!("commit: {}", opts.message),
702        )
703        .with_parent(base.op_id.clone());
704        let (op_bytes, op_cid) = hash_to_cid(&op)?;
705        // safety: op_cid computed above via hash_to_cid
706        bs.put_trusted(op_cid.clone(), op_bytes)?;
707
708        // Linearize check (SPEC §6.5): re-read op-heads just before the
709        // CAS-like advance. If drift has occurred, fail rather than
710        // append a concurrent head.
711        if opts.linearize {
712            let current = ohs.current()?;
713            if current.len() != 1 || current[0] != base.op_id {
714                return Err(RepoError::Stale.into());
715            }
716        }
717
718        // Advance op-heads atomically.
719        ohs.update(op_cid.clone(), std::slice::from_ref(&base.op_id))?;
720
721        // Return a fresh ReadonlyRepo pinned to the new op.
722        ReadonlyRepo::load_at(bs, ohs, op_cid)
723    }
724}
725
726// ---------------- Tree rebuild helper ----------------
727
728/// Rebuild a Prolly tree by applying additions and removals to the
729/// contents of an existing base tree.
730///
731/// Naive O(n) implementation: walks the whole base tree via [`Cursor`],
732/// filters out removals, applies additions, sorts, and re-builds. A
733/// future M5.5+ incremental mutation path will re-chunk only touched
734/// subtrees. For M8 MVP this is acceptable - typical graph commits
735/// touch a small fraction of a tree, so the rebuild is the slow path
736/// rather than the common path.
737///
738/// # Errors
739///
740/// Store and codec errors while iterating and writing.
741fn rebuild_tree<B: Blockstore + ?Sized>(
742    store: &B,
743    base_root: &Cid,
744    additions: &BTreeMap<ProllyKey, Cid>,
745    removals: &HashSet<ProllyKey>,
746) -> Result<Cid, Error> {
747    // Stream the base tree into a map (absorbs removals and prepares for
748    // addition-override). Using BTreeMap so final iteration is sorted.
749    let mut merged: BTreeMap<ProllyKey, Cid> = BTreeMap::new();
750    let cursor = Cursor::new(store, base_root)?;
751    for entry in cursor {
752        let (k, v) = entry?;
753        if removals.contains(&k) {
754            continue;
755        }
756        merged.insert(k, v);
757    }
758    for (k, v) in additions {
759        merged.insert(*k, v.clone());
760    }
761    // Feed to the Prolly builder (input is already sorted via BTreeMap).
762    prolly::build_tree(store, merged)
763}
764
765/// Derive the 16-byte Prolly key for the embedding-sidecar tree from
766/// a `NodeCid`. We blake3 the CID's wire form (codec + multihash) and
767/// take the first 16 bytes; that gives uniformly-distributed keys
768/// regardless of the codec/digest prefix structure of the CID, so the
769/// Prolly tree's leaf-split heuristic produces balanced nodes.
770///
771/// Both [`Transaction::commit_opts`] (write side) and
772/// [`crate::repo::ReadonlyRepo::embedding_for`] (read side) MUST go
773/// through this exact helper. Two callers that derive keys differently
774/// would silently miss each other's writes.
775pub(crate) fn embedding_key_for_node_cid(node_cid: &Cid) -> ProllyKey {
776    let h = blake3::hash(&node_cid.to_bytes());
777    let mut k = [0u8; 16];
778    k.copy_from_slice(&h.as_bytes()[..16]);
779    ProllyKey(k)
780}
781
782#[cfg(test)]
783mod tests {
784    use super::*;
785    use crate::id::{CODEC_RAW, Multihash};
786    use crate::store::{MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
787    use ipld_core::ipld::Ipld;
788    use std::sync::Arc;
789
790    fn new_repo() -> ReadonlyRepo {
791        let bs: Arc<dyn Blockstore> = Arc::new(MemoryBlockstore::new());
792        let ohs: Arc<dyn OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
793        ReadonlyRepo::init(bs, ohs).unwrap()
794    }
795
796    #[test]
797    fn first_commit_advances_head_and_stores_commit() {
798        let repo = new_repo();
799        assert!(repo.head_commit().is_none());
800
801        let mut tx = repo.start_transaction();
802        let alice =
803            Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
804        tx.add_node(&alice).unwrap();
805        let new_repo = tx.commit("alice@example.org", "add Alice").unwrap();
806
807        assert!(new_repo.head_commit().is_some());
808        let head = new_repo.head_commit().unwrap();
809        assert_eq!(head.message, "add Alice");
810
811        let looked_up = new_repo.lookup_node(&alice.id).unwrap();
812        assert_eq!(looked_up.as_ref(), Some(&alice));
813    }
814
815    #[test]
816    fn second_commit_chains_parent_and_preserves_history() {
817        let repo = new_repo();
818        let alice =
819            Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
820        let mut tx1 = repo.start_transaction();
821        tx1.add_node(&alice).unwrap();
822        let repo_v1 = tx1.commit("tester", "add Alice").unwrap();
823        let v1_head_cid = repo_v1.view().heads[0].clone();
824
825        let bob =
826            Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Bob".into()));
827        let mut tx2 = repo_v1.start_transaction();
828        tx2.add_node(&bob).unwrap();
829        let repo_v2 = tx2.commit("tester", "add Bob").unwrap();
830
831        // Alice still findable after second commit.
832        assert_eq!(
833            repo_v2.lookup_node(&alice.id).unwrap().as_ref(),
834            Some(&alice)
835        );
836        // Bob findable too.
837        assert_eq!(repo_v2.lookup_node(&bob.id).unwrap().as_ref(), Some(&bob));
838        // Commit 2's single parent is commit 1's CID.
839        let head_v2 = repo_v2.head_commit().unwrap();
840        assert_eq!(head_v2.parents.len(), 1);
841        assert_eq!(head_v2.parents[0], v1_head_cid);
842    }
843
844    // ---------- tombstone_node ----------
845
846    #[test]
847    fn tombstone_round_trip_through_view() {
848        // Contract: a tombstone written in one commit survives on the
849        // View read back from the next `ReadonlyRepo`, carrying the
850        // caller's reason + the commit's resolved timestamp.
851        let repo = new_repo();
852        let alice =
853            Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
854
855        let mut tx1 = repo.start_transaction();
856        tx1.add_node(&alice).unwrap();
857        let repo_v1 = tx1.commit("t", "seed").unwrap();
858        // Pre-tombstone: no entry on the view.
859        assert!(!repo_v1.is_tombstoned(&alice.id));
860
861        // Tombstone in a second commit so the timestamp field is
862        // stamped from that commit's `now`.
863        let mut tx2 = repo_v1.start_transaction();
864        tx2.tombstone_node(alice.id, "user asked to forget")
865            .unwrap();
866        let repo_v2 = tx2.commit("t", "revoke alice").unwrap();
867
868        // The original node block is still addressable - CID unchanged,
869        // lookup still returns it.
870        assert_eq!(
871            repo_v2.lookup_node(&alice.id).unwrap().as_ref(),
872            Some(&alice)
873        );
874        // But the View now carries a tombstone for this id.
875        assert!(repo_v2.is_tombstoned(&alice.id));
876        let ts = repo_v2.tombstone_for(&alice.id).expect("tombstone present");
877        assert_eq!(ts.reason, "user asked to forget");
878        assert!(
879            ts.tombstoned_at > 0,
880            "tombstone_at must be set from commit's resolved now, got 0"
881        );
882    }
883
884    #[test]
885    fn tombstone_is_idempotent_within_a_transaction() {
886        // Calling tombstone_node twice for the same id in one
887        // transaction collapses to a single View entry; the second
888        // reason overwrites the first (deterministic rule).
889        let repo = new_repo();
890        let alice = Node::new(NodeId::new_v7(), "Person");
891
892        let mut tx1 = repo.start_transaction();
893        tx1.add_node(&alice).unwrap();
894        let repo_v1 = tx1.commit("t", "seed").unwrap();
895
896        let mut tx2 = repo_v1.start_transaction();
897        tx2.tombstone_node(alice.id, "first").unwrap();
898        tx2.tombstone_node(alice.id, "second").unwrap();
899        let repo_v2 = tx2.commit("t", "revoke").unwrap();
900
901        assert_eq!(repo_v2.view().tombstones.len(), 1);
902        let ts = repo_v2.tombstone_for(&alice.id).unwrap();
903        assert_eq!(
904            ts.reason, "second",
905            "later tombstone_node call wins within one transaction"
906        );
907    }
908
909    #[test]
910    fn tombstone_leaves_node_cid_stable() {
911        // Contract: tombstoning a node does NOT alter the CID that
912        // `lookup_node` resolves to. Agents that persisted the CID of
913        // a node outside mnem can still fetch the same bytes after a
914        // tombstone commit. This is the core reason tombstones exist
915        // as a side-channel on the View rather than as a mutation of
916        // the node block.
917        use crate::codec::hash_to_cid;
918
919        let repo = new_repo();
920        let alice =
921            Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
922
923        let mut tx1 = repo.start_transaction();
924        tx1.add_node(&alice).unwrap();
925        let repo_v1 = tx1.commit("t", "seed").unwrap();
926        let alice_before = repo_v1.lookup_node(&alice.id).unwrap().unwrap();
927        let (_bytes_before, cid_before) = hash_to_cid(&alice_before).unwrap();
928
929        let mut tx2 = repo_v1.start_transaction();
930        tx2.tombstone_node(alice.id, "revoked").unwrap();
931        let repo_v2 = tx2.commit("t", "revoke").unwrap();
932
933        let alice_after = repo_v2.lookup_node(&alice.id).unwrap().unwrap();
934        let (_bytes_after, cid_after) = hash_to_cid(&alice_after).unwrap();
935        assert_eq!(
936            cid_before, cid_after,
937            "tombstone must not change the node's content-addressed CID"
938        );
939        assert_eq!(
940            alice_before, alice_after,
941            "tombstone must not mutate node content"
942        );
943    }
944
945    #[test]
946    fn remove_node_leaves_tree_without_it() {
947        let repo = new_repo();
948        let alice =
949            Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
950        let mut tx1 = repo.start_transaction();
951        tx1.add_node(&alice).unwrap();
952        let v1 = tx1.commit("a", "add").unwrap();
953        assert!(v1.lookup_node(&alice.id).unwrap().is_some());
954
955        let mut tx2 = v1.start_transaction();
956        tx2.remove_node(alice.id);
957        let v2 = tx2.commit("a", "remove").unwrap();
958        assert!(v2.lookup_node(&alice.id).unwrap().is_none());
959    }
960
961    #[test]
962    fn ref_update_is_visible_on_the_new_view() {
963        let repo = new_repo();
964        let raw_cid = Cid::new(CODEC_RAW, Multihash::sha2_256(b"target"));
965
966        let mut tx = repo.start_transaction();
967        tx.update_ref("refs/heads/main", Some(RefTarget::normal(raw_cid.clone())));
968        let v1 = tx.commit("a", "set main").unwrap();
969        match v1.view().refs.get("refs/heads/main") {
970            Some(RefTarget::Normal { target }) => assert_eq!(*target, raw_cid),
971            other => panic!("expected normal ref, got {other:?}"),
972        }
973    }
974
975    #[test]
976    fn op_heads_advances_on_commit() {
977        let repo = new_repo();
978        let ohs = repo.op_heads_store().clone();
979        assert_eq!(ohs.current().unwrap().len(), 1);
980        let before_head = ohs.current().unwrap()[0].clone();
981
982        let mut tx = repo.start_transaction();
983        let alice = Node::new(NodeId::new_v7(), "Person");
984        tx.add_node(&alice).unwrap();
985        let v1 = tx.commit("a", "m").unwrap();
986
987        let after_heads = ohs.current().unwrap();
988        assert_eq!(after_heads.len(), 1);
989        assert_ne!(after_heads[0], before_head);
990        assert_eq!(after_heads[0], *v1.op_id());
991    }
992
993    #[test]
994    fn linearize_commit_succeeds_against_current_head() {
995        let repo = new_repo();
996        let mut tx = repo.start_transaction();
997        tx.add_node(&Node::new(NodeId::new_v7(), "Person")).unwrap();
998        let r = tx.commit_opts(CommitOptions {
999            author: "a",
1000            message: "m",
1001            linearize: true,
1002            time_micros: None,
1003            change_id: None,
1004        });
1005        assert!(r.is_ok());
1006    }
1007
1008    #[test]
1009    fn linearize_commit_rejects_stale_base() {
1010        let repo = new_repo();
1011
1012        // Start a transaction against the initial state.
1013        let mut stale_tx = repo.start_transaction();
1014        stale_tx
1015            .add_node(&Node::new(NodeId::new_v7(), "Ghost"))
1016            .unwrap();
1017
1018        // A concurrent writer commits, advancing op-heads.
1019        let mut other_tx = repo.start_transaction();
1020        other_tx
1021            .add_node(&Node::new(NodeId::new_v7(), "Person"))
1022            .unwrap();
1023        let _ = other_tx.commit("a", "concurrent").unwrap();
1024
1025        // The stale transaction commits in linearize mode -> Stale.
1026        let err = stale_tx
1027            .commit_opts(CommitOptions {
1028                author: "a",
1029                message: "from stale",
1030                linearize: true,
1031                time_micros: None,
1032                change_id: None,
1033            })
1034            .unwrap_err();
1035        assert!(matches!(err, Error::Repo(RepoError::Stale)));
1036    }
1037
1038    #[test]
1039    fn default_commit_against_stale_base_still_succeeds() {
1040        // The non-linearize default lets both writers append to op-heads;
1041        // the second commit simply lands as a concurrent head (to be
1042        // merged by M8.5).
1043        let repo = new_repo();
1044
1045        let mut stale_tx = repo.start_transaction();
1046        stale_tx
1047            .add_node(&Node::new(NodeId::new_v7(), "Ghost"))
1048            .unwrap();
1049
1050        let mut other_tx = repo.start_transaction();
1051        other_tx
1052            .add_node(&Node::new(NodeId::new_v7(), "Person"))
1053            .unwrap();
1054        let _ = other_tx.commit("a", "concurrent").unwrap();
1055
1056        // Default mode succeeds even with a stale base.
1057        assert!(stale_tx.commit("a", "late but not linearized").is_ok());
1058    }
1059
1060    #[test]
1061    fn deterministic_commit_opts_yield_identical_commit_cid() {
1062        // Contract: two processes that build the same logical commit
1063        // on disjoint fresh repos, with CommitOptions pinning
1064        // `time_micros` + `change_id`, MUST produce byte-identical
1065        // commit CIDs. This is the headline "deterministic across
1066        // machines" property extended to commits (previously the
1067        // guarantee applied only to node-tree + IndexSet).
1068        //
1069        // This is ALSO our Q0-migration safety net: if
1070        // `put_trusted` (added in the A2 -> Q0 migration) ever
1071        // silently corrupts a commit's serialized bytes, the head
1072        // CID recorded here would change and this test would break.
1073        // Changes to the fixed inputs below should be treated as a
1074        // correctness regression until explained.
1075        let fixed_id = NodeId::from_bytes_raw([0x42; 16]);
1076        let fixed_change_id = ChangeId::from_bytes_raw([0x11; 16]);
1077        let fixed_time: u64 = 1_700_000_000_000_000;
1078
1079        let commit_once = || -> Cid {
1080            let repo = new_repo();
1081            let mut tx = repo.start_transaction();
1082            tx.add_node(&Node::new(fixed_id, "Person")).unwrap();
1083            let new_repo = tx
1084                .commit_opts(
1085                    CommitOptions::new("alice", "seed")
1086                        .with_time_micros(fixed_time)
1087                        .with_change_id(fixed_change_id),
1088                )
1089                .unwrap();
1090            new_repo
1091                .view()
1092                .heads
1093                .first()
1094                .expect("one head after commit")
1095                .clone()
1096        };
1097        let a = commit_once();
1098        let b = commit_once();
1099        assert_eq!(
1100            a, b,
1101            "identical CommitOptions across fresh repos must produce identical commit CIDs"
1102        );
1103    }
1104
1105    /// Fix X1 regression guard. Build the same graph two ways:
1106    /// (a) many append-only commits (trigger the incremental index
1107    /// fast path from the second commit onward),
1108    /// (b) one big commit that holds the full graph (hits the
1109    /// first-commit full-rebuild path).
1110    /// Both must produce byte-identical `IndexSet` CIDs. If not, the
1111    /// incremental path has drifted from the slow-path output and
1112    /// queries would silently diverge.
1113    #[test]
1114    fn incremental_and_full_index_build_produce_identical_index_set() {
1115        // Helper: ingest `batches` of `per_batch` nodes each, one
1116        // commit per batch. The first commit hits the full rebuild
1117        // (no base IndexSet); every subsequent commit hits the
1118        // incremental append path because all gating conditions
1119        // (no removals, no edges, no NodeId collision) are satisfied.
1120        fn incremental(batches: usize, per_batch: usize, ids: &[NodeId]) -> Cid {
1121            let bs: Arc<dyn Blockstore> = Arc::new(MemoryBlockstore::new());
1122            let ohs: Arc<dyn OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
1123            let mut repo = ReadonlyRepo::init(bs, ohs).unwrap();
1124            for b in 0..batches {
1125                let mut tx = repo.start_transaction();
1126                for i in 0..per_batch {
1127                    let id = ids[b * per_batch + i];
1128                    let node = Node::new(id, "Person")
1129                        .with_prop("name", Ipld::String(format!("p{i}")))
1130                        .with_prop("batch", Ipld::Integer(b as i128));
1131                    tx.add_node(&node).unwrap();
1132                }
1133                repo = tx.commit("t", "batch").unwrap();
1134            }
1135            repo.head_commit().unwrap().indexes.clone().unwrap()
1136        }
1137
1138        fn full(total: usize, ids: &[NodeId]) -> Cid {
1139            let bs: Arc<dyn Blockstore> = Arc::new(MemoryBlockstore::new());
1140            let ohs: Arc<dyn OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
1141            let repo = ReadonlyRepo::init(bs, ohs).unwrap();
1142            let mut tx = repo.start_transaction();
1143            let per_batch = 10;
1144            for i in 0..total {
1145                let batch_of = i / per_batch;
1146                let in_batch = i % per_batch;
1147                let node = Node::new(ids[i], "Person")
1148                    .with_prop("name", Ipld::String(format!("p{in_batch}")))
1149                    .with_prop("batch", Ipld::Integer(batch_of as i128));
1150                tx.add_node(&node).unwrap();
1151            }
1152            tx.commit("t", "one-shot")
1153                .unwrap()
1154                .head_commit()
1155                .unwrap()
1156                .indexes
1157                .clone()
1158                .unwrap()
1159        }
1160
1161        // Deterministic id set so both paths commit the same graph.
1162        // Using from_bytes_raw keeps the ids ordering predictable.
1163        let total = 30;
1164        let ids: Vec<NodeId> = (0..total)
1165            .map(|i| {
1166                let mut b = [0u8; 16];
1167                b[0] = i as u8;
1168                NodeId::from_bytes_raw(b)
1169            })
1170            .collect();
1171
1172        let inc = incremental(3, 10, &ids);
1173        let one = full(30, &ids);
1174        assert_eq!(
1175            inc, one,
1176            "incremental index build must produce the same IndexSet CID as the full rebuild"
1177        );
1178    }
1179
1180    /// Companion to the test above: when the graph has edges (so
1181    /// `outgoing` and `incoming` trees are actually populated), the
1182    /// incremental-append path must preserve BOTH direction CIDs
1183    /// byte-for-byte, not just the nodes side.
1184    #[test]
1185    fn incremental_and_full_preserve_both_direction_adjacency_cids() {
1186        let ids: Vec<NodeId> = (0u8..10u8)
1187            .map(|i| {
1188                let mut b = [0u8; 16];
1189                b[0] = i;
1190                NodeId::from_bytes_raw(b)
1191            })
1192            .collect();
1193        let edge_pairs: &[(usize, usize, u8)] =
1194            &[(0, 1, 0xA0), (1, 2, 0xA1), (2, 3, 0xA2), (0, 5, 0xA3)];
1195
1196        // Incremental: first commit has nodes+edges, then pure-node
1197        // appends hit the fast path.
1198        let (bs, ohs): (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) = (
1199            Arc::new(MemoryBlockstore::new()),
1200            Arc::new(MemoryOpHeadsStore::new()),
1201        );
1202        let repo_inc = ReadonlyRepo::init(bs, ohs).unwrap();
1203        let mut tx = repo_inc.start_transaction();
1204        for id in &ids {
1205            tx.add_node(&Node::new(*id, "Person")).unwrap();
1206        }
1207        for (s, d, tag) in edge_pairs {
1208            let mut eb = [0u8; 16];
1209            eb[0] = *tag;
1210            tx.add_edge(&crate::objects::Edge::new(
1211                crate::id::EdgeId::from_bytes_raw(eb),
1212                "knows",
1213                ids[*s],
1214                ids[*d],
1215            ))
1216            .unwrap();
1217        }
1218        let mut repo_inc = tx.commit("t", "seed").unwrap();
1219        for extra in 0u8..3 {
1220            let mut tx = repo_inc.start_transaction();
1221            let mut b = [0u8; 16];
1222            b[0] = 0xEE;
1223            b[1] = extra;
1224            tx.add_node(&Node::new(NodeId::from_bytes_raw(b), "Person"))
1225                .unwrap();
1226            repo_inc = tx.commit("t", "append").unwrap();
1227        }
1228        let idx_inc_cid = repo_inc.head_commit().unwrap().indexes.clone().unwrap();
1229        let idx_inc: crate::objects::IndexSet =
1230            crate::repo::readonly::decode_from_store(&**repo_inc.blockstore(), &idx_inc_cid)
1231                .unwrap();
1232
1233        // Full: single commit with all nodes (core + extras) + edges.
1234        let (bs, ohs): (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) = (
1235            Arc::new(MemoryBlockstore::new()),
1236            Arc::new(MemoryOpHeadsStore::new()),
1237        );
1238        let repo_full = ReadonlyRepo::init(bs, ohs).unwrap();
1239        let mut tx = repo_full.start_transaction();
1240        for id in &ids {
1241            tx.add_node(&Node::new(*id, "Person")).unwrap();
1242        }
1243        for extra in 0u8..3 {
1244            let mut b = [0u8; 16];
1245            b[0] = 0xEE;
1246            b[1] = extra;
1247            tx.add_node(&Node::new(NodeId::from_bytes_raw(b), "Person"))
1248                .unwrap();
1249        }
1250        for (s, d, tag) in edge_pairs {
1251            let mut eb = [0u8; 16];
1252            eb[0] = *tag;
1253            tx.add_edge(&crate::objects::Edge::new(
1254                crate::id::EdgeId::from_bytes_raw(eb),
1255                "knows",
1256                ids[*s],
1257                ids[*d],
1258            ))
1259            .unwrap();
1260        }
1261        let repo_full = tx.commit("t", "one-shot").unwrap();
1262        let idx_full_cid = repo_full.head_commit().unwrap().indexes.clone().unwrap();
1263        let idx_full: crate::objects::IndexSet =
1264            crate::repo::readonly::decode_from_store(&**repo_full.blockstore(), &idx_full_cid)
1265                .unwrap();
1266
1267        assert_eq!(
1268            idx_inc.outgoing, idx_full.outgoing,
1269            "incremental path must preserve the outgoing CID byte-for-byte"
1270        );
1271        assert_eq!(
1272            idx_inc.incoming, idx_full.incoming,
1273            "incremental path must preserve the incoming CID byte-for-byte"
1274        );
1275        assert_eq!(
1276            idx_inc_cid, idx_full_cid,
1277            "whole-IndexSet CID must also be byte-equal"
1278        );
1279    }
1280
1281    // -------- embedding sidecar () --------
1282
1283    fn dummy_embedding(model: &str, dim: u32) -> Embedding {
1284        let bytes_len = (dim as usize) * crate::objects::node::Dtype::F32.byte_width();
1285        Embedding {
1286            model: model.into(),
1287            dtype: crate::objects::node::Dtype::F32,
1288            dim,
1289            vector: bytes::Bytes::from(vec![0u8; bytes_len]),
1290        }
1291    }
1292
1293    /// happy path: stage an embedding via `set_embedding`,
1294    /// commit, then read it back via `embedding_for`. End-to-end
1295    /// proof the write side and the read side agree on the Prolly
1296    /// key derivation.
1297    #[test]
1298    fn set_embedding_round_trips_through_commit() {
1299        let repo = new_repo();
1300        let mut tx = repo.start_transaction();
1301        let node = Node::new(NodeId::new_v7(), "Doc").with_summary("hello");
1302        let node_cid = tx.add_node(&node).unwrap();
1303        let emb = dummy_embedding("onnx:test", 4);
1304        tx.set_embedding(node_cid.clone(), "onnx:test".into(), emb.clone())
1305            .unwrap();
1306        let r2 = tx.commit("alice", "stage embed").unwrap();
1307
1308        // Sidecar root populated on the new commit.
1309        assert!(r2.head_commit().unwrap().embeddings.is_some());
1310
1311        // Lookup returns the staged embedding.
1312        let got = r2.embedding_for(&node_cid, "onnx:test").unwrap();
1313        assert_eq!(got, Some(emb));
1314
1315        // Wrong model returns None, not error.
1316        assert_eq!(r2.embedding_for(&node_cid, "missing-model").unwrap(), None);
1317    }
1318
1319    /// One node may carry multiple embeddings simultaneously (e.g.
1320    /// MiniLM + bge-base for the same chunk). The bucket holds both,
1321    /// keyed by `model`.
1322    #[test]
1323    fn set_embedding_multiple_models_per_node() {
1324        let repo = new_repo();
1325        let mut tx = repo.start_transaction();
1326        let node = Node::new(NodeId::new_v7(), "Doc").with_summary("two-model node");
1327        let node_cid = tx.add_node(&node).unwrap();
1328        let emb_a = dummy_embedding("model-a", 4);
1329        let emb_b = dummy_embedding("model-b", 8);
1330        tx.set_embedding(node_cid.clone(), "model-a".into(), emb_a.clone())
1331            .unwrap();
1332        tx.set_embedding(node_cid.clone(), "model-b".into(), emb_b.clone())
1333            .unwrap();
1334        let r2 = tx.commit("alice", "two embeds").unwrap();
1335
1336        assert_eq!(r2.embedding_for(&node_cid, "model-a").unwrap(), Some(emb_a));
1337        assert_eq!(r2.embedding_for(&node_cid, "model-b").unwrap(), Some(emb_b));
1338    }
1339
1340    /// A commit with zero pending embeddings AND no base sidecar
1341    /// must leave `commit.embeddings = None` so legacy commits stay
1342    /// byte-identical.
1343    #[test]
1344    fn commit_without_set_embedding_has_none_embeddings_root() {
1345        let repo = new_repo();
1346        let mut tx = repo.start_transaction();
1347        let node = Node::new(NodeId::new_v7(), "Doc").with_summary("no embed");
1348        tx.add_node(&node).unwrap();
1349        let r2 = tx.commit("alice", "no embed").unwrap();
1350
1351        assert_eq!(r2.head_commit().unwrap().embeddings, None);
1352    }
1353
1354    /// Second commit on top of a sidecar-bearing base must inherit
1355    /// the existing entries and add the new one. Lookup of either
1356    /// (old or new) NodeCid succeeds against the new repo.
1357    #[test]
1358    fn second_commit_inherits_and_extends_embedding_sidecar() {
1359        let repo = new_repo();
1360
1361        // Tx 1: add node A + its embedding, commit.
1362        let mut tx1 = repo.start_transaction();
1363        let node_a = Node::new(NodeId::new_v7(), "Doc").with_summary("a");
1364        let cid_a = tx1.add_node(&node_a).unwrap();
1365        let emb_a = dummy_embedding("onnx:a", 4);
1366        tx1.set_embedding(cid_a.clone(), "onnx:a".into(), emb_a.clone())
1367            .unwrap();
1368        let r1 = tx1.commit("alice", "first").unwrap();
1369        assert!(r1.head_commit().unwrap().embeddings.is_some());
1370
1371        // Tx 2: add node B + its embedding on top of r1.
1372        let mut tx2 = r1.start_transaction();
1373        let node_b = Node::new(NodeId::new_v7(), "Doc").with_summary("b");
1374        let cid_b = tx2.add_node(&node_b).unwrap();
1375        let emb_b = dummy_embedding("onnx:b", 4);
1376        tx2.set_embedding(cid_b.clone(), "onnx:b".into(), emb_b.clone())
1377            .unwrap();
1378        let r2 = tx2.commit("alice", "second").unwrap();
1379
1380        // Both lookups must succeed against r2.
1381        assert_eq!(r2.embedding_for(&cid_a, "onnx:a").unwrap(), Some(emb_a));
1382        assert_eq!(r2.embedding_for(&cid_b, "onnx:b").unwrap(), Some(emb_b));
1383    }
1384
1385    /// Determinism: staging the same set of (NodeCid, model, embedding)
1386    /// triples in different orders must produce byte-identical
1387    /// `commit.embeddings` Cids. Pins the canonical-form contract for
1388    /// the sidecar tree.
1389    #[test]
1390    fn embedding_sidecar_root_is_insertion_order_invariant() {
1391        // Two repos, same Node + Embedding writes in different order.
1392        let make = |order: u8| -> Cid {
1393            let repo = new_repo();
1394            let mut tx = repo.start_transaction();
1395            let n1 = Node::new(NodeId::from_bytes_raw([1u8; 16]), "Doc").with_summary("n1");
1396            let n2 = Node::new(NodeId::from_bytes_raw([2u8; 16]), "Doc").with_summary("n2");
1397            let n3 = Node::new(NodeId::from_bytes_raw([3u8; 16]), "Doc").with_summary("n3");
1398            let c1 = tx.add_node(&n1).unwrap();
1399            let c2 = tx.add_node(&n2).unwrap();
1400            let c3 = tx.add_node(&n3).unwrap();
1401            let e1 = dummy_embedding("m", 4);
1402            let e2 = dummy_embedding("m", 4);
1403            let e3 = dummy_embedding("m", 4);
1404            // Same logical writes, three permutations.
1405            match order {
1406                0 => {
1407                    tx.set_embedding(c1, "m".into(), e1).unwrap();
1408                    tx.set_embedding(c2, "m".into(), e2).unwrap();
1409                    tx.set_embedding(c3, "m".into(), e3).unwrap();
1410                }
1411                1 => {
1412                    tx.set_embedding(c3, "m".into(), e3).unwrap();
1413                    tx.set_embedding(c1, "m".into(), e1).unwrap();
1414                    tx.set_embedding(c2, "m".into(), e2).unwrap();
1415                }
1416                _ => {
1417                    tx.set_embedding(c2, "m".into(), e2).unwrap();
1418                    tx.set_embedding(c3, "m".into(), e3).unwrap();
1419                    tx.set_embedding(c1, "m".into(), e1).unwrap();
1420                }
1421            }
1422            let r = tx.commit("alice", "det").unwrap();
1423            r.head_commit().unwrap().embeddings.clone().unwrap()
1424        };
1425        let cid_a = make(0);
1426        let cid_b = make(1);
1427        let cid_c = make(2);
1428        assert_eq!(
1429            cid_a, cid_b,
1430            "sidecar root must be insertion-order-invariant"
1431        );
1432        assert_eq!(cid_a, cid_c);
1433    }
1434}