Skip to main content

lora_store/memory/
snapshot.rs

1//! Snapshot payload helpers for the in-memory graph.
2//!
3//! `lora-store` no longer ships its own on-disk codec. The byte-level
4//! columnar format lives in `lora-snapshot`; this module just bridges
5//! between [`InMemoryGraph`] and the portable [`SnapshotPayload`]
6//! vocabulary.
7
8use std::collections::BTreeSet;
9
10use crate::{SnapshotError, SnapshotMeta, SnapshotPayload};
11
12use super::InMemoryGraph;
13
14/// Format-version stamp surfaced through [`SnapshotMeta::format_version`]
15/// for payloads produced via the inherent helpers below. Kept stable
16/// across `lora-snapshot` codec versions because the payload shape
17/// itself has not changed; only the on-disk encoding has.
18pub(super) const PAYLOAD_FORMAT_VERSION: u32 = 1;
19
20impl InMemoryGraph {
21    /// Return the portable graph-state payload. Callers downstream of
22    /// `lora-store` (typically `lora-database`) feed this into
23    /// `lora-snapshot` for byte-level encoding.
24    pub fn snapshot_payload(&self) -> SnapshotPayload {
25        SnapshotPayload {
26            next_node_id: self.next_node_id,
27            next_rel_id: self.next_rel_id,
28            nodes: self.iter_node_records().cloned().collect(),
29            relationships: self.iter_rel_records().cloned().collect(),
30            indexes: self.index_catalog_read().list(),
31            constraints: self.constraint_catalog_read().list(),
32        }
33    }
34
35    /// Replace the graph from a portable graph-state payload, preserving the
36    /// currently installed mutation recorder across the swap.
37    pub fn load_snapshot_payload(
38        &mut self,
39        payload: SnapshotPayload,
40    ) -> Result<SnapshotMeta, SnapshotError> {
41        let meta = SnapshotMeta {
42            format_version: PAYLOAD_FORMAT_VERSION,
43            node_count: payload.nodes.len(),
44            relationship_count: payload.relationships.len(),
45            wal_lsn: None,
46        };
47
48        validate_payload_ids(&payload)?;
49
50        // Build the restored graph in a fresh local instance and only
51        // commit it into `self` at the very end. Capacity is based on live
52        // entity count, not `next_*_id`: snapshots may contain tombstone gaps,
53        // and hostile next-id values must not force huge allocations before
54        // the checked slab-growth path validates each concrete record id.
55        let node_capacity = payload.nodes.len();
56        let relationship_capacity = payload.relationships.len();
57        let mut rebuilt = Self::with_capacity_hint(node_capacity, relationship_capacity);
58        rebuilt.next_node_id = payload.next_node_id;
59        rebuilt.next_rel_id = payload.next_rel_id;
60
61        for node in payload.nodes {
62            let id = node.id;
63            let labels = node.labels.clone();
64            if rebuilt.node_at(id).is_some() {
65                return Err(SnapshotError::Decode(format!(
66                    "duplicate node id {id} in snapshot payload"
67                )));
68            }
69            rebuilt
70                .put_node_checked(id, node)
71                .map_err(SnapshotError::Decode)?;
72            for label in &labels {
73                rebuilt.insert_node_label_index(id, label);
74            }
75        }
76
77        for rel in payload.relationships {
78            if rebuilt.rel_at(rel.id).is_some() {
79                return Err(SnapshotError::Decode(format!(
80                    "duplicate relationship id {} in snapshot payload",
81                    rel.id
82                )));
83            }
84            if rebuilt.node_at(rel.src).is_none() {
85                return Err(SnapshotError::Decode(format!(
86                    "relationship {} references missing source node {}",
87                    rel.id, rel.src
88                )));
89            }
90            if rebuilt.node_at(rel.dst).is_none() {
91                return Err(SnapshotError::Decode(format!(
92                    "relationship {} references missing target node {}",
93                    rel.id, rel.dst
94                )));
95            }
96            let id = rel.id;
97            rebuilt
98                .put_rel_checked(id, rel.clone())
99                .map_err(SnapshotError::Decode)?;
100            rebuilt.attach_relationship(&rel);
101        }
102        rebuilt.rebuild_property_indexes();
103
104        let constraint_owned_indexes: BTreeSet<String> = payload
105            .constraints
106            .iter()
107            .filter_map(|def| {
108                def.owned_index
109                    .clone()
110                    .or_else(|| def.kind.requires_backing_index().then(|| def.name.clone()))
111            })
112            .collect();
113
114        // Re-register every user-visible index in the catalog. Going through
115        // `register_index` re-populates RANGE buckets and keeps the
116        // `populate_index_data` invariant aligned with the catalog —
117        // skipping it would leave RANGE indexes registered but never populated.
118        // Constraint-owned backing indexes are restored by re-registering the
119        // owning constraint below, which keeps catalog ownership explicit.
120        for def in payload.indexes {
121            if constraint_owned_indexes.contains(&def.name) {
122                continue;
123            }
124            // Errors here would mean the snapshot itself is corrupt or
125            // ambiguous; map them into Decode rather than panicking.
126            rebuilt
127                .register_index(
128                    crate::memory::IndexRequest {
129                        explicit_name: Some(def.name.clone()),
130                        kind: def.kind,
131                        entity: def.entity,
132                        label: def.label.clone(),
133                        additional_labels: def.additional_labels.clone(),
134                        properties: def.properties.clone(),
135                        options: def.options.clone(),
136                    },
137                    /*if_not_exists*/ true,
138                )
139                .map_err(|e| SnapshotError::Decode(format!("index `{}`: {e}", def.name)))?;
140        }
141
142        // Re-register constraints. Uniqueness / key constraints recreate
143        // their own backing indexes as part of registration.
144        for def in payload.constraints {
145            rebuilt
146                .register_constraint(
147                    crate::memory::ConstraintRequest {
148                        name: def.name.clone(),
149                        kind: def.kind.clone(),
150                        entity: def.entity,
151                        label: def.label.clone(),
152                        properties: def.properties.clone(),
153                    },
154                    /*if_not_exists*/ true,
155                )
156                .map_err(|e| SnapshotError::Decode(format!("constraint `{}`: {e}", def.name)))?;
157        }
158
159        // Preserve the existing recorder across the swap — observers of the
160        // store's identity should not be silently detached by a restore,
161        // same policy as `clear()`.
162        rebuilt.recorder = self.recorder.take();
163        *self = rebuilt;
164
165        Ok(meta)
166    }
167}
168
169fn validate_payload_ids(payload: &SnapshotPayload) -> Result<(), SnapshotError> {
170    validate_next_id("node", payload.next_node_id)?;
171    validate_next_id("relationship", payload.next_rel_id)?;
172
173    for node in &payload.nodes {
174        validate_entity_id("node", node.id, payload.next_node_id)?;
175    }
176    for rel in &payload.relationships {
177        validate_entity_id("relationship", rel.id, payload.next_rel_id)?;
178        validate_slot_id("relationship source node", rel.src)?;
179        validate_slot_id("relationship target node", rel.dst)?;
180    }
181
182    Ok(())
183}
184
185fn validate_next_id(kind: &str, next_id: u64) -> Result<(), SnapshotError> {
186    validate_slot_id(&format!("next {kind} id"), next_id)?;
187    if next_id == u64::MAX {
188        return Err(SnapshotError::Decode(format!(
189            "next {kind} id {next_id} leaves no allocatable id"
190        )));
191    }
192    Ok(())
193}
194
195fn validate_entity_id(kind: &str, id: u64, next_id: u64) -> Result<(), SnapshotError> {
196    validate_slot_id(kind, id)?;
197    if id >= next_id {
198        return Err(SnapshotError::Decode(format!(
199            "{kind} id {id} is not below next {kind} id {next_id}"
200        )));
201    }
202    Ok(())
203}
204
205fn validate_slot_id(label: &str, id: u64) -> Result<(), SnapshotError> {
206    let idx = usize::try_from(id).map_err(|_| {
207        SnapshotError::Decode(format!(
208            "{label} {id} does not fit in usize on this platform"
209        ))
210    })?;
211    idx.checked_add(1)
212        .ok_or_else(|| SnapshotError::Decode(format!("{label} {id} leaves no valid slab slot")))?;
213    Ok(())
214}