mnem_core/repo/transaction.rs
1//! [`Transaction`] - accumulator for pending mutations + commit.
2//!
3//! A transaction captures a snapshot of the current [`ReadonlyRepo`] at
4//! `start_transaction()` time. Mutations ([`Transaction::add_node`],
5//! `add_edge`, `remove_node`, etc.) are buffered. [`Transaction::commit`]
6//! atomically:
7//!
8//! 1. Rebuilds the node / edge / schema Prolly trees from the base
9//! commit's roots + the buffered additions and removals.
10//! 2. Writes a new Commit whose `parents` is the previous head.
11//! 3. Writes a new View whose `heads = [new commit]` and whose `refs`
12//! reflect any ref-update mutations.
13//! 4. Writes a new Operation whose `parents = [old op]`.
14//! 5. Advances the op-heads store: inserts new op, removes old op.
15//! 6. Returns a fresh [`ReadonlyRepo`] pinned to the new op.
16//!
17//! Multi-writer safety: step 5 is atomic per
18//! [`OpHeadsStore::update`][crate::store::OpHeadsStore::update].
19//! If another writer has advanced the heads concurrently, both new ops
20//! remain in the heads set; the next [`ReadonlyRepo::open`] will see
21//! multiple heads and (in M8.5) trigger a 3-way merge.
22
23use std::collections::{BTreeMap, HashSet};
24
25use ipld_core::ipld::Ipld;
26
27use crate::codec::hash_to_cid;
28use crate::error::{Error, RepoError};
29use crate::id::{ChangeId, Cid, EdgeId, NodeId};
30use crate::index;
31use crate::objects::node::Embedding;
32use crate::objects::{
33 Commit, Edge, EmbeddingBucket, IndexSet, Node, Operation, RefTarget, Tombstone, View,
34};
35use crate::prolly::{self, Cursor, ProllyKey};
36use crate::store::Blockstore;
37
38use super::readonly::{ReadonlyRepo, decode_from_store, now_micros};
39
40/// Options controlling the commit path.
41///
42/// The default (via [`Transaction::commit`]) is lock-free: concurrent
43/// writers both succeed; the next reader merges. Setting
44/// [`linearize`](Self::linearize) to `true` enables SPEC §6.5
45/// opportunistic concurrency - if any other writer has advanced
46/// op-heads since this transaction started, the commit fails with
47/// [`RepoError::Stale`] instead of appending a concurrent head.
48#[derive(Clone, Copy, Debug)]
49pub struct CommitOptions<'a> {
50 /// Commit author (UTF-8, stored on the new Commit + Operation).
51 pub author: &'a str,
52 /// Commit message.
53 pub message: &'a str,
54 /// Opt-in SPEC §6.5 linearize mode. Defaults to `false`.
55 pub linearize: bool,
56 /// Override the commit + operation timestamp. Measured in
57 /// microseconds since Unix epoch. `None` (the default) calls
58 /// `SystemTime::now()` at commit time, which is what a human
59 /// workflow wants.
60 ///
61 /// Set this to `Some(...)` when byte-identical CIDs across
62 /// machines matter: two processes that build the same logical
63 /// commit (same author, same message, same graph mutations,
64 /// same time, same `change_id`) will produce the same commit CID
65 /// and the same op-id. This is the escape hatch for
66 /// audit-replay, distributed-agent consensus, and regression
67 /// tests that assert on commit CIDs.
68 pub time_micros: Option<u64>,
69 /// Override the commit's `change_id`. `None` (the default)
70 /// generates a fresh `ChangeId::new_v7()`, which embeds wall-
71 /// clock time and therefore varies per call. Deterministic-
72 /// replay workflows MUST supply this explicitly alongside
73 /// `time_micros`; otherwise the v7 randomness alone defeats the
74 /// byte-identical-CID contract.
75 pub change_id: Option<ChangeId>,
76}
77
78impl<'a> CommitOptions<'a> {
79 /// Construct with all deterministic-override fields set to `None`
80 /// (the caller-convenient default: auto-clock + auto-change-id).
81 #[must_use]
82 pub const fn new(author: &'a str, message: &'a str) -> Self {
83 Self {
84 author,
85 message,
86 linearize: false,
87 time_micros: None,
88 change_id: None,
89 }
90 }
91
92 /// Pin the timestamp for deterministic replay. See
93 /// [`Self::time_micros`] for the wider contract.
94 #[must_use]
95 pub const fn with_time_micros(mut self, t: u64) -> Self {
96 self.time_micros = Some(t);
97 self
98 }
99
100 /// Pin the change-id for deterministic replay. See
101 /// [`Self::change_id`] for the wider contract.
102 #[must_use]
103 pub const fn with_change_id(mut self, id: ChangeId) -> Self {
104 self.change_id = Some(id);
105 self
106 }
107}
108
109/// Buffered mutations against a [`ReadonlyRepo`].
110///
111/// Construct via [`ReadonlyRepo::start_transaction`].
112pub struct Transaction {
113 base: ReadonlyRepo,
114 new_nodes: BTreeMap<NodeId, Cid>,
115 removed_nodes: HashSet<NodeId>,
116 new_edges: BTreeMap<EdgeId, Cid>,
117 removed_edges: HashSet<EdgeId>,
118 ref_updates: BTreeMap<String, Option<RefTarget>>,
119 /// Tombstones staged for insertion into the new View at commit
120 /// time. Keyed by `NodeId`; later writes to the same `NodeId` in
121 /// the same transaction overwrite the earlier ones (consistent
122 /// with [`Self::tombstone_node`]'s idempotent-deterministic rule).
123 new_tombstones: BTreeMap<NodeId, Tombstone>,
124 /// Side-table for `resolve_or_create_node`: maps
125 /// `(label, prop_name, blake3(canonical(value))[..16])` to the
126 /// `NodeId` of a node added in this transaction. Bounded by the
127 /// number of `resolve_or_create_node` or `add_node` calls in this
128 /// tx; prevents the O(pending²) decode loop the naive
129 /// implementation would trigger.
130 pending_by_prop: BTreeMap<(String, String, [u8; 16]), NodeId>,
131 /// Lazy, one-time decode of the base commit's `IndexSet`. Populated
132 /// on the first `resolve_or_create_node` call and re-used by
133 /// every subsequent call in this transaction. `None` means
134 /// "not yet fetched"; `Some(None)` means "no `IndexSet` on the
135 /// base commit" (either uninitialised or pre-0.2 commit).
136 cached_base_indexes: Option<Option<IndexSet>>,
137 /// Pending embedding-sidecar writes, keyed by the content-addressed
138 /// `NodeCid` they reference. Multiple `set_embedding` calls for the
139 /// same node accumulate into one bucket (one entry per `model`
140 /// string). Empty by default; the commit path skips the sidecar
141 /// rebuild entirely when this map is empty AND the base commit
142 /// carried no `embeddings` root.
143 pending_embeddings: BTreeMap<Cid, EmbeddingBucket>,
144}
145
146impl Transaction {
147 pub(crate) fn new(base: ReadonlyRepo) -> Self {
148 Self {
149 base,
150 new_nodes: BTreeMap::new(),
151 removed_nodes: HashSet::new(),
152 new_edges: BTreeMap::new(),
153 removed_edges: HashSet::new(),
154 ref_updates: BTreeMap::new(),
155 new_tombstones: BTreeMap::new(),
156 pending_by_prop: BTreeMap::new(),
157 cached_base_indexes: None,
158 pending_embeddings: BTreeMap::new(),
159 }
160 }
161
162 /// The base repo this transaction is derived from.
163 #[must_use]
164 pub const fn base(&self) -> &ReadonlyRepo {
165 &self.base
166 }
167
168 // ---------------- Mutations ----------------
169
170 /// Add (or overwrite) a node. Cancels any pending `remove_node` for
171 /// the same id. Returns the node's content-addressed CID.
172 ///
173 /// # Errors
174 ///
175 /// Codec or blockstore errors while writing the node.
176 pub fn add_node(&mut self, node: &Node) -> Result<Cid, Error> {
177 let (bytes, cid) = hash_to_cid(node)?;
178 // safety: cid computed above via hash_to_cid
179 self.base.blockstore.put_trusted(cid.clone(), bytes)?;
180 self.removed_nodes.remove(&node.id);
181 self.new_nodes.insert(node.id, cid.clone());
182 // Populate the pending-by-prop cache so future
183 // `resolve_or_create_node` calls in this tx find the node in
184 // O(1) instead of decoding every pending node.
185 for (prop_name, prop_value) in &node.props {
186 if let Ok(hash) = index::prop_value_hash(prop_value) {
187 self.pending_by_prop
188 .insert((node.ntype.clone(), prop_name.clone(), hash), node.id);
189 }
190 }
191 Ok(cid)
192 }
193
194 /// Stage an embedding for a previously-added node into the
195 /// embedding-sidecar Prolly tree referenced by
196 /// `Commit.embeddings`.
197 ///
198 /// Symmetric with [`Self::add_node`]: pass the `node_cid` returned
199 /// from `add_node` (or any pre-existing NodeCid you want to attach
200 /// a new vector to). Multiple `set_embedding` calls for the same
201 /// `node_cid` accumulate into one [`EmbeddingBucket`]; calling
202 /// twice with the same `model` upserts (the second value wins).
203 ///
204 /// The actual sidecar tree is built and committed by
205 /// [`Self::commit`] / [`Self::commit_opts`]; staging does not
206 /// touch the blockstore.
207 ///
208 /// # Why this lives outside the Node bytes
209 ///
210 /// Dense embedding vectors drift in their last bit across ORT
211 /// thread counts (`f32` reduction reordering is non-associative).
212 /// Storing them inline on `Node` would couple `NodeCid` to
213 /// thread count and break federated dedup. The sidecar separates
214 /// identity (Node) from derived bytes (Embedding); two machines
215 /// re-deriving the same source text on different cores share the
216 /// Node CID even when their vectors differ.
217 ///
218 /// # Errors
219 ///
220 /// Currently infallible (the staged map cannot fail to insert);
221 /// the `Result` shape is reserved for future validation hooks
222 /// (e.g. dim/dtype checks against a per-repo config).
223 pub fn set_embedding(
224 &mut self,
225 node_cid: Cid,
226 model: String,
227 embedding: Embedding,
228 ) -> Result<(), Error> {
229 let bucket = self.pending_embeddings.entry(node_cid).or_default();
230 bucket.upsert(model, embedding);
231 Ok(())
232 }
233
234 /// Remove a node. Cancels any pending `add_node` for the same id.
235 ///
236 /// If the node was added AND had an embedding staged via
237 /// `set_embedding` in this same transaction, the staged embedding
238 /// is dropped to prevent an orphan sidecar entry. Embeddings that
239 /// already live in the base commit's sidecar tree are NOT scrubbed
240 /// here; they remain reachable through the inherited tree (a
241 /// follow-up audit will add explicit sidecar tombstones).
242 pub fn remove_node(&mut self, id: NodeId) {
243 if let Some(cid) = self.new_nodes.remove(&id) {
244 self.pending_embeddings.remove(&cid);
245 }
246 self.removed_nodes.insert(id);
247 // Drop any pending-by-prop entries pointing at this id.
248 self.pending_by_prop.retain(|_, v| *v != id);
249 }
250
251 /// Add (or overwrite) an edge. Returns the edge's content-addressed CID.
252 ///
253 /// # Errors
254 ///
255 /// Codec or blockstore errors while writing the edge.
256 pub fn add_edge(&mut self, edge: &Edge) -> Result<Cid, Error> {
257 let (bytes, cid) = hash_to_cid(edge)?;
258 // safety: cid computed above via hash_to_cid
259 self.base.blockstore.put_trusted(cid.clone(), bytes)?;
260 self.removed_edges.remove(&edge.id);
261 self.new_edges.insert(edge.id, cid.clone());
262 Ok(cid)
263 }
264
265 /// Remove an edge.
266 pub fn remove_edge(&mut self, id: EdgeId) {
267 self.new_edges.remove(&id);
268 self.removed_edges.insert(id);
269 }
270
271 /// Logically "forget" a node without breaking the append-only,
272 /// content-addressed invariant of the graph.
273 ///
274 /// The node block remains in the node Prolly tree; its CID does
275 /// not change. What changes is the [`View`]: at commit time, a
276 /// [`Tombstone`] record keyed by `node_id` is inserted into
277 /// [`View::tombstones`]. Retrieval paths filter out tombstoned
278 /// nodes by default - see
279 /// [`crate::retrieve::Retriever::include_tombstoned`] for the
280 /// opt-out used by audit / debug callers.
281 ///
282 /// The tombstone's `tombstoned_at` timestamp is set at commit
283 /// time (via the commit's resolved `now`), not at the call site,
284 /// so two transactions built in parallel don't disagree on when a
285 /// node was revoked just because of clock skew between author
286 /// processes.
287 ///
288 /// Idempotence: calling `tombstone_node` twice for the same
289 /// `node_id` in the same transaction is a no-op at the semantic
290 /// level. The second call overwrites the first's reason; no
291 /// additional state change is observable to a retrieve or to a
292 /// subsequent `is_tombstoned` query. Across transactions, the
293 /// rule is the same: each new tombstone commit fully replaces the
294 /// prior record for that node.
295 ///
296 /// The original Node is NOT removed and edges referencing it are
297 /// NOT touched. For physical removal, use
298 /// [`Self::remove_node`] instead.
299 ///
300 /// # Errors
301 ///
302 /// Currently infallible; the `Result` return type reserves space
303 /// for future validation (e.g. rejecting tombstones on a
304 /// non-existent `node_id`).
305 #[tracing::instrument(
306 level = "debug",
307 target = "mnem::repo::transaction",
308 skip(self, reason),
309 fields(node_id = %node_id)
310 )]
311 pub fn tombstone_node(
312 &mut self,
313 node_id: NodeId,
314 reason: impl Into<String>,
315 ) -> Result<(), Error> {
316 // Stamp with a placeholder `0` timestamp; the real
317 // `tombstoned_at` is filled in at commit time from the
318 // commit's resolved `now`. This keeps multiple
319 // `tombstone_node` calls in one transaction all sharing the
320 // same timestamp, which is the semantic agents expect
321 // ("everything in this commit got revoked together").
322 let ts = Tombstone::new(reason, 0);
323 self.new_tombstones.insert(node_id, ts);
324 Ok(())
325 }
326
327 /// Set a named ref in the new View. `None` removes the ref.
328 pub fn update_ref(&mut self, name: impl Into<String>, target: Option<RefTarget>) {
329 self.ref_updates.insert(name.into(), target);
330 }
331
332 /// Ergonomic one-call node write for agent workflows.
333 ///
334 /// Generates a fresh [`NodeId::new_v7`], builds the node with the
335 /// caller's `ntype`, `summary`, and properties, auto-stamps two
336 /// reserved temporal props (`mnem:created_at`, `mnem:updated_at`)
337 /// with the current microseconds-since-Unix-epoch, and writes the
338 /// node via [`Self::add_node`]. Returns the freshly generated
339 /// `NodeId`.
340 ///
341 /// The reserved prop keys are the substrate's temporal-range filter
342 /// contract (see
343 /// [`crate::retrieve::Retriever::where_created_after`] et al.) and
344 /// avoid a breaking Node-CID change that a dedicated header field
345 /// would have triggered. Callers who need deterministic-replay CIDs
346 /// can override either key by passing it explicitly in `props`; the
347 /// auto-stamp only fills absent keys.
348 ///
349 /// # Errors
350 ///
351 /// Propagates codec/blockstore errors from [`Self::add_node`].
352 pub fn commit_memory<I>(
353 &mut self,
354 ntype: impl Into<String>,
355 summary: impl Into<String>,
356 props: I,
357 ) -> Result<NodeId, Error>
358 where
359 I: IntoIterator<Item = (String, Ipld)>,
360 {
361 let id = NodeId::new_v7();
362 let mut node = Node::new(id, ntype).with_summary(summary);
363 for (k, v) in props {
364 node.props.insert(k, v);
365 }
366 let now = now_micros();
367 node.props
368 .entry("mnem:created_at".to_string())
369 .or_insert_with(|| Ipld::Integer(i128::from(now)));
370 node.props
371 .entry("mnem:updated_at".to_string())
372 .or_insert_with(|| Ipld::Integer(i128::from(now)));
373 self.add_node(&node)?;
374 Ok(id)
375 }
376
377 /// Find-or-create a node by a primary-key property.
378 ///
379 /// Looks for an existing node with `(ntype == label, props[prop_name] == value)`
380 /// in the following order:
381 ///
382 /// 1. Nodes added in this transaction (O(1) via a cache that
383 /// `add_node` maintains).
384 /// 2. The base commit's property index, if one exists (O(log n)).
385 ///
386 /// If a match is found, its `NodeId` is returned. Otherwise a new
387 /// node is added (with `prop_name -> value` set) and its fresh
388 /// `NodeId` is returned. This is the go-to helper for agents
389 /// writing facts from LLM output where the same entity may be
390 /// mentioned multiple times across tool calls.
391 ///
392 /// Within a single `resolve_or_create_node` call the cost is
393 /// bounded by one cache lookup + one index point lookup; total
394 /// cost of N calls in a transaction is O(N log n), not O(N²).
395 ///
396 /// # Errors
397 ///
398 /// Propagates codec/store errors from the property-index lookup or
399 /// node write.
400 pub fn resolve_or_create_node(
401 &mut self,
402 label: &str,
403 prop_name: &str,
404 value: impl Into<Ipld>,
405 ) -> Result<NodeId, Error> {
406 let value = value.into();
407 let hash = index::prop_value_hash(&value)?;
408
409 // 1. Pending-adds cache: O(1) BTreeMap lookup.
410 if let Some(id) =
411 self.pending_by_prop
412 .get(&(label.to_string(), prop_name.to_string(), hash))
413 {
414 return Ok(*id);
415 }
416
417 // 2. Base commit's property index: O(log n) point lookup.
418 // The IndexSet is fetched once per transaction and cached;
419 // a hot resolve loop pays exactly one decode_from_store of
420 // the IndexSet, not N.
421 if self.cached_base_indexes.is_none() {
422 let fetched = if let Some(commit) = self.base.commit.as_deref() {
423 if let Some(idx_cid) = &commit.indexes {
424 Some(decode_from_store::<IndexSet, _>(
425 &*self.base.blockstore,
426 idx_cid,
427 )?)
428 } else {
429 None
430 }
431 } else {
432 None
433 };
434 self.cached_base_indexes = Some(fetched);
435 }
436 if let Some(Some(indexes)) = self.cached_base_indexes.as_ref()
437 && let Some((_cid, node)) =
438 index::lookup_by_prop(&*self.base.blockstore, indexes, label, prop_name, &value)?
439 && !self.removed_nodes.contains(&node.id)
440 {
441 return Ok(node.id);
442 }
443
444 // 3. Create.
445 let new_node = Node::new(NodeId::new_v7(), label).with_prop(prop_name, value);
446 self.add_node(&new_node)?;
447 Ok(new_node.id)
448 }
449
450 // ---------------- Commit ----------------
451
452 /// Convenience: commit in the default lock-free mode.
453 ///
454 /// Delegates to [`commit_opts`](Self::commit_opts) with
455 /// `linearize: false`. See there for semantics.
456 ///
457 /// # Errors
458 ///
459 /// Codec, store, and tree-rebuild errors.
460 pub fn commit(self, author: &str, message: &str) -> Result<ReadonlyRepo, Error> {
461 self.commit_opts(CommitOptions::new(author, message))
462 }
463
464 /// Finalize the transaction with explicit options.
465 ///
466 /// Lock-free default ([`CommitOptions::linearize`] = `false`):
467 /// rebuild trees, write Commit / View / Operation, advance the
468 /// op-head regardless of concurrent writers, return a fresh
469 /// [`ReadonlyRepo`] pinned to the new op.
470 ///
471 /// Linearize mode (`linearize = true`, SPEC §6.5): re-read
472 /// op-heads just before advancing. If the current set is not
473 /// exactly `[base.op_id]`, return [`RepoError::Stale`] without
474 /// advancing. Tree / commit / view / op bytes already written to
475 /// the blockstore remain (they are content-addressed and
476 /// collision-free; a retry will re-reference them).
477 ///
478 /// # Errors
479 ///
480 /// - [`RepoError::Stale`] in linearize mode when op-heads drift.
481 /// - Codec, store, and tree-rebuild errors.
482 ///
483 /// # Instrumentation
484 ///
485 /// Emits one `info`-level span `mnem::repo::transaction::commit` per
486 /// call with bounded-cardinality fields: `added_nodes`,
487 /// `removed_nodes`, `added_edges`, `removed_edges`, `tombstones`,
488 /// `ref_updates`, `linearize`. No node payloads or CIDs are
489 /// recorded - a commit of 100k nodes still produces one span of
490 /// constant size. Agents wanting per-node detail should enable the
491 /// `debug`-level `tombstone_node` span or add their own.
492 #[tracing::instrument(
493 name = "commit",
494 level = "info",
495 target = "mnem::repo::transaction",
496 skip(self, opts),
497 fields(
498 added_nodes = self.new_nodes.len(),
499 removed_nodes = self.removed_nodes.len(),
500 added_edges = self.new_edges.len(),
501 removed_edges = self.removed_edges.len(),
502 tombstones = self.new_tombstones.len(),
503 ref_updates = self.ref_updates.len(),
504 linearize = opts.linearize,
505 )
506 )]
507 pub fn commit_opts(self, opts: CommitOptions<'_>) -> Result<ReadonlyRepo, Error> {
508 let Self {
509 base,
510 new_nodes,
511 removed_nodes,
512 new_edges,
513 removed_edges,
514 ref_updates,
515 new_tombstones,
516 pending_by_prop: _,
517 cached_base_indexes: _,
518 pending_embeddings,
519 } = self;
520
521 let bs = base.blockstore.clone();
522 let ohs = base.op_heads.clone();
523
524 // Base roots (trees from the previous commit, or empty trees if
525 // this is the first commit on a fresh repo).
526 let (base_nodes, base_edges, base_schema) = if let Some(commit) = base.commit.as_deref() {
527 (
528 commit.nodes.clone(),
529 commit.edges.clone(),
530 commit.schema.clone(),
531 )
532 } else {
533 let empty_root = prolly::build_tree(&*bs, std::iter::empty())?;
534 (empty_root.clone(), empty_root.clone(), empty_root)
535 };
536
537 // Decide gating for the incremental-index fast path BEFORE we
538 // consume `new_nodes` / `removed_*` / `new_edges` into their
539 // ProllyKey forms.
540 let is_append_only_at_graph_level = removed_nodes.is_empty()
541 && removed_edges.is_empty()
542 && new_edges.is_empty()
543 && !new_nodes.is_empty();
544 let base_indexes_cid: Option<Cid> = base.commit.as_deref().and_then(|c| c.indexes.clone());
545
546 // Keep a NodeId-keyed sorted copy of the added nodes so the
547 // incremental index path can re-decode them. The ProllyKey
548 // map used for the node-tree rebuild is derived from this
549 // without consuming it.
550 let new_nodes_btree: BTreeMap<NodeId, Cid> = new_nodes.into_iter().collect();
551 let node_additions: BTreeMap<ProllyKey, Cid> = new_nodes_btree
552 .iter()
553 .map(|(id, cid)| (ProllyKey::from(*id), cid.clone()))
554 .collect();
555 let node_removals: HashSet<ProllyKey> =
556 removed_nodes.into_iter().map(ProllyKey::from).collect();
557 let new_nodes_root = rebuild_tree(&*bs, &base_nodes, &node_additions, &node_removals)?;
558
559 // Rebuild edge tree with mutations.
560 let edge_additions: BTreeMap<ProllyKey, Cid> = new_edges
561 .into_iter()
562 .map(|(id, cid)| (ProllyKey::from(id), cid))
563 .collect();
564 let edge_removals: HashSet<ProllyKey> =
565 removed_edges.into_iter().map(ProllyKey::from).collect();
566 let new_edges_root = rebuild_tree(&*bs, &base_edges, &edge_additions, &edge_removals)?;
567
568 // Schema tree unchanged in M8 MVP (no schema mutations yet).
569 let new_schema_root = base_schema;
570
571 // Build secondary indexes. Fast path: incremental append when
572 // the transaction is a pure node-level append AND we have a
573 // previous IndexSet to extend AND no added NodeId collides with
574 // an existing one in the base node tree. Slow path: full
575 // rebuild (same as before; correctness baseline).
576 //
577 // The fast path is byte-equivalent to the slow path in the
578 // conditions above; the `incremental_append_indexes` contract
579 // in `mnem-core::index` pins this. Tests in this module pin
580 // the equivalence on round-trip.
581 let new_indexes_cid = match (is_append_only_at_graph_level, base_indexes_cid.as_ref()) {
582 (true, Some(base_idx)) => {
583 // O(|new_nodes| * log N) point-lookup check for collisions
584 // against the base node tree. On any lookup error, fall
585 // back to full rebuild (safety over speed).
586 let has_collision = new_nodes_btree.keys().any(|node_id| {
587 let key = ProllyKey::from(*node_id);
588 matches!(
589 crate::prolly::lookup(&*bs, &base_nodes, &key),
590 Ok(Some(_)) | Err(_)
591 )
592 });
593 if has_collision {
594 index::build_index_set(&*bs, &new_nodes_root, &new_edges_root)?
595 } else {
596 index::incremental_append_indexes(&*bs, base_idx, &new_nodes_btree)?
597 }
598 }
599 _ => index::build_index_set(&*bs, &new_nodes_root, &new_edges_root)?,
600 };
601
602 // Embedding sidecar (). Skip the rebuild entirely when no
603 // pending writes AND no base sidecar - most commits in a
604 // legacy repo will hit this fast path. Otherwise: encode each
605 // pending bucket, stage its CID under the 16-byte truncated
606 // blake3 of the NodeCid wire form (matches the lookup keying
607 // in `ReadonlyRepo::embedding_for`), and feed the additions
608 // through the same `rebuild_tree` helper the node + edge
609 // trees use.
610 let base_embeddings_cid: Option<Cid> =
611 base.commit.as_deref().and_then(|c| c.embeddings.clone());
612 let new_embeddings_cid: Option<Cid> =
613 if pending_embeddings.is_empty() && base_embeddings_cid.is_none() {
614 None
615 } else {
616 let base_root = match &base_embeddings_cid {
617 Some(c) => c.clone(),
618 None => prolly::build_tree(&*bs, std::iter::empty())?,
619 };
620 let mut additions: BTreeMap<ProllyKey, Cid> = BTreeMap::new();
621 for (node_cid, bucket) in pending_embeddings {
622 let (bucket_bytes, bucket_cid) = hash_to_cid(&bucket)?;
623 bs.put_trusted(bucket_cid.clone(), bucket_bytes)?;
624 let key = embedding_key_for_node_cid(&node_cid);
625 additions.insert(key, bucket_cid);
626 }
627 Some(rebuild_tree(&*bs, &base_root, &additions, &HashSet::new())?)
628 };
629
630 // Build the new Commit.
631 //
632 // `time_micros` and `change_id` are deterministic-replay escape
633 // hatches: callers who want byte-identical CIDs across
634 // machines supply both. `None` falls back to wall clock +
635 // fresh v7 (the current human-workflow default).
636 let now = opts.time_micros.unwrap_or_else(now_micros);
637 let change_id = opts.change_id.unwrap_or_else(ChangeId::new_v7);
638 let mut commit = Commit::new(
639 change_id,
640 new_nodes_root,
641 new_edges_root,
642 new_schema_root,
643 opts.author,
644 now,
645 opts.message,
646 );
647 commit.indexes = Some(new_indexes_cid);
648 commit.embeddings = new_embeddings_cid;
649 if let Some(prev_head) = base.view.heads.first() {
650 commit = commit.with_parent(prev_head.clone());
651 }
652 let (commit_bytes, commit_cid) = hash_to_cid(&commit)?;
653 // safety: commit_cid computed above via hash_to_cid
654 bs.put_trusted(commit_cid.clone(), commit_bytes)?;
655
656 // Build the new View.
657 let mut new_view: View = (*base.view).clone();
658 let is_first_commit = base.view.heads.is_empty() && new_view.refs.is_empty();
659 new_view.heads = vec![commit_cid.clone()];
660 for (name, target) in ref_updates {
661 match target {
662 Some(t) => {
663 new_view.refs.insert(name, t);
664 }
665 None => {
666 new_view.refs.remove(&name);
667 }
668 }
669 }
670 // C4-1 (audit-2026-04-25): Mirror Git - on the first commit
671 // of a fresh repo, auto-create `refs/heads/main` pointing at
672 // the new commit unless the caller already supplied a ref
673 // update (explicit beats implicit). This means `mnem init` +
674 // first ingest leaves the repo with a usable default branch
675 // so docs examples like `mnem branch create test main` work
676 // out of the box without requiring `mnem ref set` plumbing.
677 if is_first_commit && !new_view.refs.contains_key("refs/heads/main") {
678 new_view
679 .refs
680 .insert("refs/heads/main".to_string(), RefTarget::normal(commit_cid));
681 }
682 // Stamp every staged tombstone with the commit's resolved `now`
683 // so all tombstones in one commit share a timestamp (agents
684 // expect "revoked together" to mean "same timestamp"), and
685 // merge into the View. Later entries overwrite earlier ones,
686 // matching the idempotent-deterministic rule documented on
687 // `tombstone_node`.
688 for (node_id, mut ts) in new_tombstones {
689 ts.tombstoned_at = now;
690 new_view.tombstones.insert(node_id, ts);
691 }
692 let (view_bytes, view_cid) = hash_to_cid(&new_view)?;
693 // safety: view_cid computed above via hash_to_cid
694 bs.put_trusted(view_cid.clone(), view_bytes)?;
695
696 // Build the new Operation.
697 let op = Operation::new(
698 view_cid,
699 opts.author,
700 now,
701 format!("commit: {}", opts.message),
702 )
703 .with_parent(base.op_id.clone());
704 let (op_bytes, op_cid) = hash_to_cid(&op)?;
705 // safety: op_cid computed above via hash_to_cid
706 bs.put_trusted(op_cid.clone(), op_bytes)?;
707
708 // Linearize check (SPEC §6.5): re-read op-heads just before the
709 // CAS-like advance. If drift has occurred, fail rather than
710 // append a concurrent head.
711 if opts.linearize {
712 let current = ohs.current()?;
713 if current.len() != 1 || current[0] != base.op_id {
714 return Err(RepoError::Stale.into());
715 }
716 }
717
718 // Advance op-heads atomically.
719 ohs.update(op_cid.clone(), std::slice::from_ref(&base.op_id))?;
720
721 // Return a fresh ReadonlyRepo pinned to the new op.
722 ReadonlyRepo::load_at(bs, ohs, op_cid)
723 }
724}
725
726// ---------------- Tree rebuild helper ----------------
727
728/// Rebuild a Prolly tree by applying additions and removals to the
729/// contents of an existing base tree.
730///
731/// Naive O(n) implementation: walks the whole base tree via [`Cursor`],
732/// filters out removals, applies additions, sorts, and re-builds. A
733/// future M5.5+ incremental mutation path will re-chunk only touched
734/// subtrees. For M8 MVP this is acceptable - typical graph commits
735/// touch a small fraction of a tree, so the rebuild is the slow path
736/// rather than the common path.
737///
738/// # Errors
739///
740/// Store and codec errors while iterating and writing.
741fn rebuild_tree<B: Blockstore + ?Sized>(
742 store: &B,
743 base_root: &Cid,
744 additions: &BTreeMap<ProllyKey, Cid>,
745 removals: &HashSet<ProllyKey>,
746) -> Result<Cid, Error> {
747 // Stream the base tree into a map (absorbs removals and prepares for
748 // addition-override). Using BTreeMap so final iteration is sorted.
749 let mut merged: BTreeMap<ProllyKey, Cid> = BTreeMap::new();
750 let cursor = Cursor::new(store, base_root)?;
751 for entry in cursor {
752 let (k, v) = entry?;
753 if removals.contains(&k) {
754 continue;
755 }
756 merged.insert(k, v);
757 }
758 for (k, v) in additions {
759 merged.insert(*k, v.clone());
760 }
761 // Feed to the Prolly builder (input is already sorted via BTreeMap).
762 prolly::build_tree(store, merged)
763}
764
765/// Derive the 16-byte Prolly key for the embedding-sidecar tree from
766/// a `NodeCid`. We blake3 the CID's wire form (codec + multihash) and
767/// take the first 16 bytes; that gives uniformly-distributed keys
768/// regardless of the codec/digest prefix structure of the CID, so the
769/// Prolly tree's leaf-split heuristic produces balanced nodes.
770///
771/// Both [`Transaction::commit_opts`] (write side) and
772/// [`crate::repo::ReadonlyRepo::embedding_for`] (read side) MUST go
773/// through this exact helper. Two callers that derive keys differently
774/// would silently miss each other's writes.
775pub(crate) fn embedding_key_for_node_cid(node_cid: &Cid) -> ProllyKey {
776 let h = blake3::hash(&node_cid.to_bytes());
777 let mut k = [0u8; 16];
778 k.copy_from_slice(&h.as_bytes()[..16]);
779 ProllyKey(k)
780}
781
782#[cfg(test)]
783mod tests {
784 use super::*;
785 use crate::id::{CODEC_RAW, Multihash};
786 use crate::store::{MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
787 use ipld_core::ipld::Ipld;
788 use std::sync::Arc;
789
790 fn new_repo() -> ReadonlyRepo {
791 let bs: Arc<dyn Blockstore> = Arc::new(MemoryBlockstore::new());
792 let ohs: Arc<dyn OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
793 ReadonlyRepo::init(bs, ohs).unwrap()
794 }
795
796 #[test]
797 fn first_commit_advances_head_and_stores_commit() {
798 let repo = new_repo();
799 assert!(repo.head_commit().is_none());
800
801 let mut tx = repo.start_transaction();
802 let alice =
803 Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
804 tx.add_node(&alice).unwrap();
805 let new_repo = tx.commit("alice@example.org", "add Alice").unwrap();
806
807 assert!(new_repo.head_commit().is_some());
808 let head = new_repo.head_commit().unwrap();
809 assert_eq!(head.message, "add Alice");
810
811 let looked_up = new_repo.lookup_node(&alice.id).unwrap();
812 assert_eq!(looked_up.as_ref(), Some(&alice));
813 }
814
815 #[test]
816 fn second_commit_chains_parent_and_preserves_history() {
817 let repo = new_repo();
818 let alice =
819 Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
820 let mut tx1 = repo.start_transaction();
821 tx1.add_node(&alice).unwrap();
822 let repo_v1 = tx1.commit("tester", "add Alice").unwrap();
823 let v1_head_cid = repo_v1.view().heads[0].clone();
824
825 let bob =
826 Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Bob".into()));
827 let mut tx2 = repo_v1.start_transaction();
828 tx2.add_node(&bob).unwrap();
829 let repo_v2 = tx2.commit("tester", "add Bob").unwrap();
830
831 // Alice still findable after second commit.
832 assert_eq!(
833 repo_v2.lookup_node(&alice.id).unwrap().as_ref(),
834 Some(&alice)
835 );
836 // Bob findable too.
837 assert_eq!(repo_v2.lookup_node(&bob.id).unwrap().as_ref(), Some(&bob));
838 // Commit 2's single parent is commit 1's CID.
839 let head_v2 = repo_v2.head_commit().unwrap();
840 assert_eq!(head_v2.parents.len(), 1);
841 assert_eq!(head_v2.parents[0], v1_head_cid);
842 }
843
844 // ---------- tombstone_node ----------
845
846 #[test]
847 fn tombstone_round_trip_through_view() {
848 // Contract: a tombstone written in one commit survives on the
849 // View read back from the next `ReadonlyRepo`, carrying the
850 // caller's reason + the commit's resolved timestamp.
851 let repo = new_repo();
852 let alice =
853 Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
854
855 let mut tx1 = repo.start_transaction();
856 tx1.add_node(&alice).unwrap();
857 let repo_v1 = tx1.commit("t", "seed").unwrap();
858 // Pre-tombstone: no entry on the view.
859 assert!(!repo_v1.is_tombstoned(&alice.id));
860
861 // Tombstone in a second commit so the timestamp field is
862 // stamped from that commit's `now`.
863 let mut tx2 = repo_v1.start_transaction();
864 tx2.tombstone_node(alice.id, "user asked to forget")
865 .unwrap();
866 let repo_v2 = tx2.commit("t", "revoke alice").unwrap();
867
868 // The original node block is still addressable - CID unchanged,
869 // lookup still returns it.
870 assert_eq!(
871 repo_v2.lookup_node(&alice.id).unwrap().as_ref(),
872 Some(&alice)
873 );
874 // But the View now carries a tombstone for this id.
875 assert!(repo_v2.is_tombstoned(&alice.id));
876 let ts = repo_v2.tombstone_for(&alice.id).expect("tombstone present");
877 assert_eq!(ts.reason, "user asked to forget");
878 assert!(
879 ts.tombstoned_at > 0,
880 "tombstone_at must be set from commit's resolved now, got 0"
881 );
882 }
883
884 #[test]
885 fn tombstone_is_idempotent_within_a_transaction() {
886 // Calling tombstone_node twice for the same id in one
887 // transaction collapses to a single View entry; the second
888 // reason overwrites the first (deterministic rule).
889 let repo = new_repo();
890 let alice = Node::new(NodeId::new_v7(), "Person");
891
892 let mut tx1 = repo.start_transaction();
893 tx1.add_node(&alice).unwrap();
894 let repo_v1 = tx1.commit("t", "seed").unwrap();
895
896 let mut tx2 = repo_v1.start_transaction();
897 tx2.tombstone_node(alice.id, "first").unwrap();
898 tx2.tombstone_node(alice.id, "second").unwrap();
899 let repo_v2 = tx2.commit("t", "revoke").unwrap();
900
901 assert_eq!(repo_v2.view().tombstones.len(), 1);
902 let ts = repo_v2.tombstone_for(&alice.id).unwrap();
903 assert_eq!(
904 ts.reason, "second",
905 "later tombstone_node call wins within one transaction"
906 );
907 }
908
909 #[test]
910 fn tombstone_leaves_node_cid_stable() {
911 // Contract: tombstoning a node does NOT alter the CID that
912 // `lookup_node` resolves to. Agents that persisted the CID of
913 // a node outside mnem can still fetch the same bytes after a
914 // tombstone commit. This is the core reason tombstones exist
915 // as a side-channel on the View rather than as a mutation of
916 // the node block.
917 use crate::codec::hash_to_cid;
918
919 let repo = new_repo();
920 let alice =
921 Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
922
923 let mut tx1 = repo.start_transaction();
924 tx1.add_node(&alice).unwrap();
925 let repo_v1 = tx1.commit("t", "seed").unwrap();
926 let alice_before = repo_v1.lookup_node(&alice.id).unwrap().unwrap();
927 let (_bytes_before, cid_before) = hash_to_cid(&alice_before).unwrap();
928
929 let mut tx2 = repo_v1.start_transaction();
930 tx2.tombstone_node(alice.id, "revoked").unwrap();
931 let repo_v2 = tx2.commit("t", "revoke").unwrap();
932
933 let alice_after = repo_v2.lookup_node(&alice.id).unwrap().unwrap();
934 let (_bytes_after, cid_after) = hash_to_cid(&alice_after).unwrap();
935 assert_eq!(
936 cid_before, cid_after,
937 "tombstone must not change the node's content-addressed CID"
938 );
939 assert_eq!(
940 alice_before, alice_after,
941 "tombstone must not mutate node content"
942 );
943 }
944
945 #[test]
946 fn remove_node_leaves_tree_without_it() {
947 let repo = new_repo();
948 let alice =
949 Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
950 let mut tx1 = repo.start_transaction();
951 tx1.add_node(&alice).unwrap();
952 let v1 = tx1.commit("a", "add").unwrap();
953 assert!(v1.lookup_node(&alice.id).unwrap().is_some());
954
955 let mut tx2 = v1.start_transaction();
956 tx2.remove_node(alice.id);
957 let v2 = tx2.commit("a", "remove").unwrap();
958 assert!(v2.lookup_node(&alice.id).unwrap().is_none());
959 }
960
961 #[test]
962 fn ref_update_is_visible_on_the_new_view() {
963 let repo = new_repo();
964 let raw_cid = Cid::new(CODEC_RAW, Multihash::sha2_256(b"target"));
965
966 let mut tx = repo.start_transaction();
967 tx.update_ref("refs/heads/main", Some(RefTarget::normal(raw_cid.clone())));
968 let v1 = tx.commit("a", "set main").unwrap();
969 match v1.view().refs.get("refs/heads/main") {
970 Some(RefTarget::Normal { target }) => assert_eq!(*target, raw_cid),
971 other => panic!("expected normal ref, got {other:?}"),
972 }
973 }
974
975 #[test]
976 fn op_heads_advances_on_commit() {
977 let repo = new_repo();
978 let ohs = repo.op_heads_store().clone();
979 assert_eq!(ohs.current().unwrap().len(), 1);
980 let before_head = ohs.current().unwrap()[0].clone();
981
982 let mut tx = repo.start_transaction();
983 let alice = Node::new(NodeId::new_v7(), "Person");
984 tx.add_node(&alice).unwrap();
985 let v1 = tx.commit("a", "m").unwrap();
986
987 let after_heads = ohs.current().unwrap();
988 assert_eq!(after_heads.len(), 1);
989 assert_ne!(after_heads[0], before_head);
990 assert_eq!(after_heads[0], *v1.op_id());
991 }
992
993 #[test]
994 fn linearize_commit_succeeds_against_current_head() {
995 let repo = new_repo();
996 let mut tx = repo.start_transaction();
997 tx.add_node(&Node::new(NodeId::new_v7(), "Person")).unwrap();
998 let r = tx.commit_opts(CommitOptions {
999 author: "a",
1000 message: "m",
1001 linearize: true,
1002 time_micros: None,
1003 change_id: None,
1004 });
1005 assert!(r.is_ok());
1006 }
1007
1008 #[test]
1009 fn linearize_commit_rejects_stale_base() {
1010 let repo = new_repo();
1011
1012 // Start a transaction against the initial state.
1013 let mut stale_tx = repo.start_transaction();
1014 stale_tx
1015 .add_node(&Node::new(NodeId::new_v7(), "Ghost"))
1016 .unwrap();
1017
1018 // A concurrent writer commits, advancing op-heads.
1019 let mut other_tx = repo.start_transaction();
1020 other_tx
1021 .add_node(&Node::new(NodeId::new_v7(), "Person"))
1022 .unwrap();
1023 let _ = other_tx.commit("a", "concurrent").unwrap();
1024
1025 // The stale transaction commits in linearize mode -> Stale.
1026 let err = stale_tx
1027 .commit_opts(CommitOptions {
1028 author: "a",
1029 message: "from stale",
1030 linearize: true,
1031 time_micros: None,
1032 change_id: None,
1033 })
1034 .unwrap_err();
1035 assert!(matches!(err, Error::Repo(RepoError::Stale)));
1036 }
1037
1038 #[test]
1039 fn default_commit_against_stale_base_still_succeeds() {
1040 // The non-linearize default lets both writers append to op-heads;
1041 // the second commit simply lands as a concurrent head (to be
1042 // merged by M8.5).
1043 let repo = new_repo();
1044
1045 let mut stale_tx = repo.start_transaction();
1046 stale_tx
1047 .add_node(&Node::new(NodeId::new_v7(), "Ghost"))
1048 .unwrap();
1049
1050 let mut other_tx = repo.start_transaction();
1051 other_tx
1052 .add_node(&Node::new(NodeId::new_v7(), "Person"))
1053 .unwrap();
1054 let _ = other_tx.commit("a", "concurrent").unwrap();
1055
1056 // Default mode succeeds even with a stale base.
1057 assert!(stale_tx.commit("a", "late but not linearized").is_ok());
1058 }
1059
1060 #[test]
1061 fn deterministic_commit_opts_yield_identical_commit_cid() {
1062 // Contract: two processes that build the same logical commit
1063 // on disjoint fresh repos, with CommitOptions pinning
1064 // `time_micros` + `change_id`, MUST produce byte-identical
1065 // commit CIDs. This is the headline "deterministic across
1066 // machines" property extended to commits (previously the
1067 // guarantee applied only to node-tree + IndexSet).
1068 //
1069 // This is ALSO our Q0-migration safety net: if
1070 // `put_trusted` (added in the A2 -> Q0 migration) ever
1071 // silently corrupts a commit's serialized bytes, the head
1072 // CID recorded here would change and this test would break.
1073 // Changes to the fixed inputs below should be treated as a
1074 // correctness regression until explained.
1075 let fixed_id = NodeId::from_bytes_raw([0x42; 16]);
1076 let fixed_change_id = ChangeId::from_bytes_raw([0x11; 16]);
1077 let fixed_time: u64 = 1_700_000_000_000_000;
1078
1079 let commit_once = || -> Cid {
1080 let repo = new_repo();
1081 let mut tx = repo.start_transaction();
1082 tx.add_node(&Node::new(fixed_id, "Person")).unwrap();
1083 let new_repo = tx
1084 .commit_opts(
1085 CommitOptions::new("alice", "seed")
1086 .with_time_micros(fixed_time)
1087 .with_change_id(fixed_change_id),
1088 )
1089 .unwrap();
1090 new_repo
1091 .view()
1092 .heads
1093 .first()
1094 .expect("one head after commit")
1095 .clone()
1096 };
1097 let a = commit_once();
1098 let b = commit_once();
1099 assert_eq!(
1100 a, b,
1101 "identical CommitOptions across fresh repos must produce identical commit CIDs"
1102 );
1103 }
1104
1105 /// Fix X1 regression guard. Build the same graph two ways:
1106 /// (a) many append-only commits (trigger the incremental index
1107 /// fast path from the second commit onward),
1108 /// (b) one big commit that holds the full graph (hits the
1109 /// first-commit full-rebuild path).
1110 /// Both must produce byte-identical `IndexSet` CIDs. If not, the
1111 /// incremental path has drifted from the slow-path output and
1112 /// queries would silently diverge.
1113 #[test]
1114 fn incremental_and_full_index_build_produce_identical_index_set() {
1115 // Helper: ingest `batches` of `per_batch` nodes each, one
1116 // commit per batch. The first commit hits the full rebuild
1117 // (no base IndexSet); every subsequent commit hits the
1118 // incremental append path because all gating conditions
1119 // (no removals, no edges, no NodeId collision) are satisfied.
1120 fn incremental(batches: usize, per_batch: usize, ids: &[NodeId]) -> Cid {
1121 let bs: Arc<dyn Blockstore> = Arc::new(MemoryBlockstore::new());
1122 let ohs: Arc<dyn OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
1123 let mut repo = ReadonlyRepo::init(bs, ohs).unwrap();
1124 for b in 0..batches {
1125 let mut tx = repo.start_transaction();
1126 for i in 0..per_batch {
1127 let id = ids[b * per_batch + i];
1128 let node = Node::new(id, "Person")
1129 .with_prop("name", Ipld::String(format!("p{i}")))
1130 .with_prop("batch", Ipld::Integer(b as i128));
1131 tx.add_node(&node).unwrap();
1132 }
1133 repo = tx.commit("t", "batch").unwrap();
1134 }
1135 repo.head_commit().unwrap().indexes.clone().unwrap()
1136 }
1137
1138 fn full(total: usize, ids: &[NodeId]) -> Cid {
1139 let bs: Arc<dyn Blockstore> = Arc::new(MemoryBlockstore::new());
1140 let ohs: Arc<dyn OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
1141 let repo = ReadonlyRepo::init(bs, ohs).unwrap();
1142 let mut tx = repo.start_transaction();
1143 let per_batch = 10;
1144 for i in 0..total {
1145 let batch_of = i / per_batch;
1146 let in_batch = i % per_batch;
1147 let node = Node::new(ids[i], "Person")
1148 .with_prop("name", Ipld::String(format!("p{in_batch}")))
1149 .with_prop("batch", Ipld::Integer(batch_of as i128));
1150 tx.add_node(&node).unwrap();
1151 }
1152 tx.commit("t", "one-shot")
1153 .unwrap()
1154 .head_commit()
1155 .unwrap()
1156 .indexes
1157 .clone()
1158 .unwrap()
1159 }
1160
1161 // Deterministic id set so both paths commit the same graph.
1162 // Using from_bytes_raw keeps the ids ordering predictable.
1163 let total = 30;
1164 let ids: Vec<NodeId> = (0..total)
1165 .map(|i| {
1166 let mut b = [0u8; 16];
1167 b[0] = i as u8;
1168 NodeId::from_bytes_raw(b)
1169 })
1170 .collect();
1171
1172 let inc = incremental(3, 10, &ids);
1173 let one = full(30, &ids);
1174 assert_eq!(
1175 inc, one,
1176 "incremental index build must produce the same IndexSet CID as the full rebuild"
1177 );
1178 }
1179
1180 /// Companion to the test above: when the graph has edges (so
1181 /// `outgoing` and `incoming` trees are actually populated), the
1182 /// incremental-append path must preserve BOTH direction CIDs
1183 /// byte-for-byte, not just the nodes side.
1184 #[test]
1185 fn incremental_and_full_preserve_both_direction_adjacency_cids() {
1186 let ids: Vec<NodeId> = (0u8..10u8)
1187 .map(|i| {
1188 let mut b = [0u8; 16];
1189 b[0] = i;
1190 NodeId::from_bytes_raw(b)
1191 })
1192 .collect();
1193 let edge_pairs: &[(usize, usize, u8)] =
1194 &[(0, 1, 0xA0), (1, 2, 0xA1), (2, 3, 0xA2), (0, 5, 0xA3)];
1195
1196 // Incremental: first commit has nodes+edges, then pure-node
1197 // appends hit the fast path.
1198 let (bs, ohs): (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) = (
1199 Arc::new(MemoryBlockstore::new()),
1200 Arc::new(MemoryOpHeadsStore::new()),
1201 );
1202 let repo_inc = ReadonlyRepo::init(bs, ohs).unwrap();
1203 let mut tx = repo_inc.start_transaction();
1204 for id in &ids {
1205 tx.add_node(&Node::new(*id, "Person")).unwrap();
1206 }
1207 for (s, d, tag) in edge_pairs {
1208 let mut eb = [0u8; 16];
1209 eb[0] = *tag;
1210 tx.add_edge(&crate::objects::Edge::new(
1211 crate::id::EdgeId::from_bytes_raw(eb),
1212 "knows",
1213 ids[*s],
1214 ids[*d],
1215 ))
1216 .unwrap();
1217 }
1218 let mut repo_inc = tx.commit("t", "seed").unwrap();
1219 for extra in 0u8..3 {
1220 let mut tx = repo_inc.start_transaction();
1221 let mut b = [0u8; 16];
1222 b[0] = 0xEE;
1223 b[1] = extra;
1224 tx.add_node(&Node::new(NodeId::from_bytes_raw(b), "Person"))
1225 .unwrap();
1226 repo_inc = tx.commit("t", "append").unwrap();
1227 }
1228 let idx_inc_cid = repo_inc.head_commit().unwrap().indexes.clone().unwrap();
1229 let idx_inc: crate::objects::IndexSet =
1230 crate::repo::readonly::decode_from_store(&**repo_inc.blockstore(), &idx_inc_cid)
1231 .unwrap();
1232
1233 // Full: single commit with all nodes (core + extras) + edges.
1234 let (bs, ohs): (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) = (
1235 Arc::new(MemoryBlockstore::new()),
1236 Arc::new(MemoryOpHeadsStore::new()),
1237 );
1238 let repo_full = ReadonlyRepo::init(bs, ohs).unwrap();
1239 let mut tx = repo_full.start_transaction();
1240 for id in &ids {
1241 tx.add_node(&Node::new(*id, "Person")).unwrap();
1242 }
1243 for extra in 0u8..3 {
1244 let mut b = [0u8; 16];
1245 b[0] = 0xEE;
1246 b[1] = extra;
1247 tx.add_node(&Node::new(NodeId::from_bytes_raw(b), "Person"))
1248 .unwrap();
1249 }
1250 for (s, d, tag) in edge_pairs {
1251 let mut eb = [0u8; 16];
1252 eb[0] = *tag;
1253 tx.add_edge(&crate::objects::Edge::new(
1254 crate::id::EdgeId::from_bytes_raw(eb),
1255 "knows",
1256 ids[*s],
1257 ids[*d],
1258 ))
1259 .unwrap();
1260 }
1261 let repo_full = tx.commit("t", "one-shot").unwrap();
1262 let idx_full_cid = repo_full.head_commit().unwrap().indexes.clone().unwrap();
1263 let idx_full: crate::objects::IndexSet =
1264 crate::repo::readonly::decode_from_store(&**repo_full.blockstore(), &idx_full_cid)
1265 .unwrap();
1266
1267 assert_eq!(
1268 idx_inc.outgoing, idx_full.outgoing,
1269 "incremental path must preserve the outgoing CID byte-for-byte"
1270 );
1271 assert_eq!(
1272 idx_inc.incoming, idx_full.incoming,
1273 "incremental path must preserve the incoming CID byte-for-byte"
1274 );
1275 assert_eq!(
1276 idx_inc_cid, idx_full_cid,
1277 "whole-IndexSet CID must also be byte-equal"
1278 );
1279 }
1280
1281 // -------- embedding sidecar () --------
1282
1283 fn dummy_embedding(model: &str, dim: u32) -> Embedding {
1284 let bytes_len = (dim as usize) * crate::objects::node::Dtype::F32.byte_width();
1285 Embedding {
1286 model: model.into(),
1287 dtype: crate::objects::node::Dtype::F32,
1288 dim,
1289 vector: bytes::Bytes::from(vec![0u8; bytes_len]),
1290 }
1291 }
1292
1293 /// happy path: stage an embedding via `set_embedding`,
1294 /// commit, then read it back via `embedding_for`. End-to-end
1295 /// proof the write side and the read side agree on the Prolly
1296 /// key derivation.
1297 #[test]
1298 fn set_embedding_round_trips_through_commit() {
1299 let repo = new_repo();
1300 let mut tx = repo.start_transaction();
1301 let node = Node::new(NodeId::new_v7(), "Doc").with_summary("hello");
1302 let node_cid = tx.add_node(&node).unwrap();
1303 let emb = dummy_embedding("onnx:test", 4);
1304 tx.set_embedding(node_cid.clone(), "onnx:test".into(), emb.clone())
1305 .unwrap();
1306 let r2 = tx.commit("alice", "stage embed").unwrap();
1307
1308 // Sidecar root populated on the new commit.
1309 assert!(r2.head_commit().unwrap().embeddings.is_some());
1310
1311 // Lookup returns the staged embedding.
1312 let got = r2.embedding_for(&node_cid, "onnx:test").unwrap();
1313 assert_eq!(got, Some(emb));
1314
1315 // Wrong model returns None, not error.
1316 assert_eq!(r2.embedding_for(&node_cid, "missing-model").unwrap(), None);
1317 }
1318
1319 /// One node may carry multiple embeddings simultaneously (e.g.
1320 /// MiniLM + bge-base for the same chunk). The bucket holds both,
1321 /// keyed by `model`.
1322 #[test]
1323 fn set_embedding_multiple_models_per_node() {
1324 let repo = new_repo();
1325 let mut tx = repo.start_transaction();
1326 let node = Node::new(NodeId::new_v7(), "Doc").with_summary("two-model node");
1327 let node_cid = tx.add_node(&node).unwrap();
1328 let emb_a = dummy_embedding("model-a", 4);
1329 let emb_b = dummy_embedding("model-b", 8);
1330 tx.set_embedding(node_cid.clone(), "model-a".into(), emb_a.clone())
1331 .unwrap();
1332 tx.set_embedding(node_cid.clone(), "model-b".into(), emb_b.clone())
1333 .unwrap();
1334 let r2 = tx.commit("alice", "two embeds").unwrap();
1335
1336 assert_eq!(r2.embedding_for(&node_cid, "model-a").unwrap(), Some(emb_a));
1337 assert_eq!(r2.embedding_for(&node_cid, "model-b").unwrap(), Some(emb_b));
1338 }
1339
1340 /// A commit with zero pending embeddings AND no base sidecar
1341 /// must leave `commit.embeddings = None` so legacy commits stay
1342 /// byte-identical.
1343 #[test]
1344 fn commit_without_set_embedding_has_none_embeddings_root() {
1345 let repo = new_repo();
1346 let mut tx = repo.start_transaction();
1347 let node = Node::new(NodeId::new_v7(), "Doc").with_summary("no embed");
1348 tx.add_node(&node).unwrap();
1349 let r2 = tx.commit("alice", "no embed").unwrap();
1350
1351 assert_eq!(r2.head_commit().unwrap().embeddings, None);
1352 }
1353
1354 /// Second commit on top of a sidecar-bearing base must inherit
1355 /// the existing entries and add the new one. Lookup of either
1356 /// (old or new) NodeCid succeeds against the new repo.
1357 #[test]
1358 fn second_commit_inherits_and_extends_embedding_sidecar() {
1359 let repo = new_repo();
1360
1361 // Tx 1: add node A + its embedding, commit.
1362 let mut tx1 = repo.start_transaction();
1363 let node_a = Node::new(NodeId::new_v7(), "Doc").with_summary("a");
1364 let cid_a = tx1.add_node(&node_a).unwrap();
1365 let emb_a = dummy_embedding("onnx:a", 4);
1366 tx1.set_embedding(cid_a.clone(), "onnx:a".into(), emb_a.clone())
1367 .unwrap();
1368 let r1 = tx1.commit("alice", "first").unwrap();
1369 assert!(r1.head_commit().unwrap().embeddings.is_some());
1370
1371 // Tx 2: add node B + its embedding on top of r1.
1372 let mut tx2 = r1.start_transaction();
1373 let node_b = Node::new(NodeId::new_v7(), "Doc").with_summary("b");
1374 let cid_b = tx2.add_node(&node_b).unwrap();
1375 let emb_b = dummy_embedding("onnx:b", 4);
1376 tx2.set_embedding(cid_b.clone(), "onnx:b".into(), emb_b.clone())
1377 .unwrap();
1378 let r2 = tx2.commit("alice", "second").unwrap();
1379
1380 // Both lookups must succeed against r2.
1381 assert_eq!(r2.embedding_for(&cid_a, "onnx:a").unwrap(), Some(emb_a));
1382 assert_eq!(r2.embedding_for(&cid_b, "onnx:b").unwrap(), Some(emb_b));
1383 }
1384
1385 /// Determinism: staging the same set of (NodeCid, model, embedding)
1386 /// triples in different orders must produce byte-identical
1387 /// `commit.embeddings` Cids. Pins the canonical-form contract for
1388 /// the sidecar tree.
1389 #[test]
1390 fn embedding_sidecar_root_is_insertion_order_invariant() {
1391 // Two repos, same Node + Embedding writes in different order.
1392 let make = |order: u8| -> Cid {
1393 let repo = new_repo();
1394 let mut tx = repo.start_transaction();
1395 let n1 = Node::new(NodeId::from_bytes_raw([1u8; 16]), "Doc").with_summary("n1");
1396 let n2 = Node::new(NodeId::from_bytes_raw([2u8; 16]), "Doc").with_summary("n2");
1397 let n3 = Node::new(NodeId::from_bytes_raw([3u8; 16]), "Doc").with_summary("n3");
1398 let c1 = tx.add_node(&n1).unwrap();
1399 let c2 = tx.add_node(&n2).unwrap();
1400 let c3 = tx.add_node(&n3).unwrap();
1401 let e1 = dummy_embedding("m", 4);
1402 let e2 = dummy_embedding("m", 4);
1403 let e3 = dummy_embedding("m", 4);
1404 // Same logical writes, three permutations.
1405 match order {
1406 0 => {
1407 tx.set_embedding(c1, "m".into(), e1).unwrap();
1408 tx.set_embedding(c2, "m".into(), e2).unwrap();
1409 tx.set_embedding(c3, "m".into(), e3).unwrap();
1410 }
1411 1 => {
1412 tx.set_embedding(c3, "m".into(), e3).unwrap();
1413 tx.set_embedding(c1, "m".into(), e1).unwrap();
1414 tx.set_embedding(c2, "m".into(), e2).unwrap();
1415 }
1416 _ => {
1417 tx.set_embedding(c2, "m".into(), e2).unwrap();
1418 tx.set_embedding(c3, "m".into(), e3).unwrap();
1419 tx.set_embedding(c1, "m".into(), e1).unwrap();
1420 }
1421 }
1422 let r = tx.commit("alice", "det").unwrap();
1423 r.head_commit().unwrap().embeddings.clone().unwrap()
1424 };
1425 let cid_a = make(0);
1426 let cid_b = make(1);
1427 let cid_c = make(2);
1428 assert_eq!(
1429 cid_a, cid_b,
1430 "sidecar root must be insertion-order-invariant"
1431 );
1432 assert_eq!(cid_a, cid_c);
1433 }
1434}