Skip to main content

lora_store/memory/
snapshot.rs

1//! Snapshot payload helpers for the in-memory graph.
2//!
3//! `lora-store` no longer ships its own on-disk codec. The byte-level
4//! columnar format lives in `lora-snapshot`; this module just bridges
5//! between [`InMemoryGraph`] and the portable [`SnapshotPayload`]
6//! vocabulary.
7
8use std::collections::BTreeSet;
9
10use crate::{SnapshotError, SnapshotMeta, SnapshotPayload};
11
12use super::index_catalog::StoredIndexEntity;
13use super::InMemoryGraph;
14
15/// Format-version stamp surfaced through [`SnapshotMeta::format_version`]
16/// for payloads produced via the inherent helpers below. Kept stable
17/// across `lora-snapshot` codec versions because the payload shape
18/// itself has not changed; only the on-disk encoding has.
19pub(super) const PAYLOAD_FORMAT_VERSION: u32 = 1;
20
21impl InMemoryGraph {
22    /// Return the portable graph-state payload. Callers downstream of
23    /// `lora-store` (typically `lora-database`) feed this into
24    /// `lora-snapshot` for byte-level encoding.
25    pub fn snapshot_payload(&self) -> SnapshotPayload {
26        let mut vector_indexes = self
27            .vector_indexes_read(StoredIndexEntity::Node)
28            .to_snapshots(StoredIndexEntity::Node);
29        vector_indexes.extend(
30            self.vector_indexes_read(StoredIndexEntity::Relationship)
31                .to_snapshots(StoredIndexEntity::Relationship),
32        );
33        SnapshotPayload {
34            next_node_id: self.next_node_id,
35            next_rel_id: self.next_rel_id,
36            nodes: self.iter_node_records().cloned().collect(),
37            relationships: self.iter_rel_records().cloned().collect(),
38            indexes: self.index_catalog_read().list(),
39            constraints: self.constraint_catalog_read().list(),
40            vector_indexes,
41        }
42    }
43
44    /// Replace the graph from a portable graph-state payload, preserving the
45    /// currently installed mutation recorder across the swap.
46    pub fn load_snapshot_payload(
47        &mut self,
48        payload: SnapshotPayload,
49    ) -> Result<SnapshotMeta, SnapshotError> {
50        let meta = SnapshotMeta {
51            format_version: PAYLOAD_FORMAT_VERSION,
52            node_count: payload.nodes.len(),
53            relationship_count: payload.relationships.len(),
54            wal_lsn: None,
55        };
56
57        validate_payload_ids(&payload)?;
58
59        // Build the restored graph in a fresh local instance and only
60        // commit it into `self` at the very end. Capacity is based on live
61        // entity count, not `next_*_id`: snapshots may contain tombstone gaps,
62        // and hostile next-id values must not force huge allocations before
63        // the checked slab-growth path validates each concrete record id.
64        let node_capacity = payload.nodes.len();
65        let relationship_capacity = payload.relationships.len();
66        let mut rebuilt = Self::with_capacity_hint(node_capacity, relationship_capacity);
67        rebuilt.next_node_id = payload.next_node_id;
68        rebuilt.next_rel_id = payload.next_rel_id;
69
70        for node in payload.nodes {
71            let id = node.id;
72            let labels = node.labels.clone();
73            if rebuilt.node_at(id).is_some() {
74                return Err(SnapshotError::Decode(format!(
75                    "duplicate node id {id} in snapshot payload"
76                )));
77            }
78            rebuilt
79                .put_node_checked(id, node)
80                .map_err(SnapshotError::Decode)?;
81            for label in &labels {
82                rebuilt.insert_node_label_index(id, label);
83            }
84        }
85
86        for rel in payload.relationships {
87            if rebuilt.rel_at(rel.id).is_some() {
88                return Err(SnapshotError::Decode(format!(
89                    "duplicate relationship id {} in snapshot payload",
90                    rel.id
91                )));
92            }
93            if rebuilt.node_at(rel.src).is_none() {
94                return Err(SnapshotError::Decode(format!(
95                    "relationship {} references missing source node {}",
96                    rel.id, rel.src
97                )));
98            }
99            if rebuilt.node_at(rel.dst).is_none() {
100                return Err(SnapshotError::Decode(format!(
101                    "relationship {} references missing target node {}",
102                    rel.id, rel.dst
103                )));
104            }
105            let id = rel.id;
106            rebuilt
107                .put_rel_checked(id, rel.clone())
108                .map_err(SnapshotError::Decode)?;
109            rebuilt.attach_relationship(&rel);
110        }
111        rebuilt.rebuild_property_indexes();
112
113        let constraint_owned_indexes: BTreeSet<String> = payload
114            .constraints
115            .iter()
116            .filter_map(|def| {
117                def.owned_index
118                    .clone()
119                    .or_else(|| def.kind.requires_backing_index().then(|| def.name.clone()))
120            })
121            .collect();
122
123        // Re-register every user-visible index in the catalog. Going through
124        // `register_index` re-populates RANGE buckets and keeps the
125        // `populate_index_data` invariant aligned with the catalog —
126        // skipping it would leave RANGE indexes registered but never populated.
127        // Constraint-owned backing indexes are restored by re-registering the
128        // owning constraint below, which keeps catalog ownership explicit.
129        for def in payload.indexes {
130            if constraint_owned_indexes.contains(&def.name) {
131                continue;
132            }
133            // Errors here would mean the snapshot itself is corrupt or
134            // ambiguous; map them into Decode rather than panicking.
135            rebuilt
136                .register_index(
137                    crate::memory::IndexRequest {
138                        explicit_name: Some(def.name.clone()),
139                        kind: def.kind,
140                        entity: def.entity,
141                        label: def.label.clone(),
142                        additional_labels: def.additional_labels.clone(),
143                        properties: def.properties.clone(),
144                        options: def.options.clone(),
145                    },
146                    /*if_not_exists*/ true,
147                )
148                .map_err(|e| SnapshotError::Decode(format!("index `{}`: {e}", def.name)))?;
149        }
150
151        // Overlay persisted HNSW snapshots over the freshly-registered
152        // (and freshly-backfilled) vector indexes. This is the
153        // post-step that gives Phase 5 its raison d'être: instead of
154        // paying O(n log n) to re-insert every vector through the
155        // HNSW algorithm, we install the graph topology byte-for-byte.
156        // Snapshots from versions before this trailer round-trip with
157        // `vector_indexes = []` so the fallback path (the backfill
158        // that already ran inside `register_index`) handles them
159        // correctly.
160        for snap in payload.vector_indexes {
161            let entity = snap.entity;
162            let mut registry = rebuilt.vector_indexes_write(entity);
163            if !registry.restore_snapshot(snap) {
164                // Catalog/snapshot mismatch — registry already
165                // contains the populate-built backend, which is the
166                // safe fallback. No further action.
167            }
168        }
169
170        // Re-register constraints. Uniqueness / key constraints recreate
171        // their own backing indexes as part of registration.
172        for def in payload.constraints {
173            rebuilt
174                .register_constraint(
175                    crate::memory::ConstraintRequest {
176                        name: def.name.clone(),
177                        kind: def.kind.clone(),
178                        entity: def.entity,
179                        label: def.label.clone(),
180                        properties: def.properties.clone(),
181                    },
182                    /*if_not_exists*/ true,
183                )
184                .map_err(|e| SnapshotError::Decode(format!("constraint `{}`: {e}", def.name)))?;
185        }
186
187        // Preserve the existing recorder across the swap — observers of the
188        // store's identity should not be silently detached by a restore,
189        // same policy as `clear()`.
190        rebuilt.recorder = self.recorder.take();
191        *self = rebuilt;
192
193        Ok(meta)
194    }
195}
196
197fn validate_payload_ids(payload: &SnapshotPayload) -> Result<(), SnapshotError> {
198    validate_next_id("node", payload.next_node_id)?;
199    validate_next_id("relationship", payload.next_rel_id)?;
200
201    for node in &payload.nodes {
202        validate_entity_id("node", node.id, payload.next_node_id)?;
203    }
204    for rel in &payload.relationships {
205        validate_entity_id("relationship", rel.id, payload.next_rel_id)?;
206        validate_slot_id("relationship source node", rel.src)?;
207        validate_slot_id("relationship target node", rel.dst)?;
208    }
209
210    Ok(())
211}
212
213fn validate_next_id(kind: &str, next_id: u64) -> Result<(), SnapshotError> {
214    validate_slot_id(&format!("next {kind} id"), next_id)?;
215    if next_id == u64::MAX {
216        return Err(SnapshotError::Decode(format!(
217            "next {kind} id {next_id} leaves no allocatable id"
218        )));
219    }
220    Ok(())
221}
222
223fn validate_entity_id(kind: &str, id: u64, next_id: u64) -> Result<(), SnapshotError> {
224    validate_slot_id(kind, id)?;
225    if id >= next_id {
226        return Err(SnapshotError::Decode(format!(
227            "{kind} id {id} is not below next {kind} id {next_id}"
228        )));
229    }
230    Ok(())
231}
232
233fn validate_slot_id(label: &str, id: u64) -> Result<(), SnapshotError> {
234    let idx = usize::try_from(id).map_err(|_| {
235        SnapshotError::Decode(format!(
236            "{label} {id} does not fit in usize on this platform"
237        ))
238    })?;
239    idx.checked_add(1)
240        .ok_or_else(|| SnapshotError::Decode(format!("{label} {id} leaves no valid slab slot")))?;
241    Ok(())
242}