lora_store/memory/
snapshot.rs1use std::collections::BTreeSet;
9
10use crate::{SnapshotError, SnapshotMeta, SnapshotPayload};
11
12use super::index_catalog::StoredIndexEntity;
13use super::InMemoryGraph;
14
15pub(super) const PAYLOAD_FORMAT_VERSION: u32 = 1;
20
21impl InMemoryGraph {
22 pub fn snapshot_payload(&self) -> SnapshotPayload {
26 let mut vector_indexes = self
27 .vector_indexes_read(StoredIndexEntity::Node)
28 .to_snapshots(StoredIndexEntity::Node);
29 vector_indexes.extend(
30 self.vector_indexes_read(StoredIndexEntity::Relationship)
31 .to_snapshots(StoredIndexEntity::Relationship),
32 );
33 SnapshotPayload {
34 next_node_id: self.next_node_id,
35 next_rel_id: self.next_rel_id,
36 nodes: self.iter_node_records().cloned().collect(),
37 relationships: self.iter_rel_records().cloned().collect(),
38 indexes: self.index_catalog_read().list(),
39 constraints: self.constraint_catalog_read().list(),
40 vector_indexes,
41 }
42 }
43
44 pub fn load_snapshot_payload(
47 &mut self,
48 payload: SnapshotPayload,
49 ) -> Result<SnapshotMeta, SnapshotError> {
50 let meta = SnapshotMeta {
51 format_version: PAYLOAD_FORMAT_VERSION,
52 node_count: payload.nodes.len(),
53 relationship_count: payload.relationships.len(),
54 wal_lsn: None,
55 };
56
57 validate_payload_ids(&payload)?;
58
59 let node_capacity = payload.nodes.len();
65 let relationship_capacity = payload.relationships.len();
66 let mut rebuilt = Self::with_capacity_hint(node_capacity, relationship_capacity);
67 rebuilt.next_node_id = payload.next_node_id;
68 rebuilt.next_rel_id = payload.next_rel_id;
69
70 for node in payload.nodes {
71 let id = node.id;
72 let labels = node.labels.clone();
73 if rebuilt.node_at(id).is_some() {
74 return Err(SnapshotError::Decode(format!(
75 "duplicate node id {id} in snapshot payload"
76 )));
77 }
78 rebuilt
79 .put_node_checked(id, node)
80 .map_err(SnapshotError::Decode)?;
81 for label in &labels {
82 rebuilt.insert_node_label_index(id, label);
83 }
84 }
85
86 for rel in payload.relationships {
87 if rebuilt.rel_at(rel.id).is_some() {
88 return Err(SnapshotError::Decode(format!(
89 "duplicate relationship id {} in snapshot payload",
90 rel.id
91 )));
92 }
93 if rebuilt.node_at(rel.src).is_none() {
94 return Err(SnapshotError::Decode(format!(
95 "relationship {} references missing source node {}",
96 rel.id, rel.src
97 )));
98 }
99 if rebuilt.node_at(rel.dst).is_none() {
100 return Err(SnapshotError::Decode(format!(
101 "relationship {} references missing target node {}",
102 rel.id, rel.dst
103 )));
104 }
105 let id = rel.id;
106 rebuilt
107 .put_rel_checked(id, rel.clone())
108 .map_err(SnapshotError::Decode)?;
109 rebuilt.attach_relationship(&rel);
110 }
111 rebuilt.rebuild_property_indexes();
112
113 let constraint_owned_indexes: BTreeSet<String> = payload
114 .constraints
115 .iter()
116 .filter_map(|def| {
117 def.owned_index
118 .clone()
119 .or_else(|| def.kind.requires_backing_index().then(|| def.name.clone()))
120 })
121 .collect();
122
123 for def in payload.indexes {
130 if constraint_owned_indexes.contains(&def.name) {
131 continue;
132 }
133 rebuilt
136 .register_index(
137 crate::memory::IndexRequest {
138 explicit_name: Some(def.name.clone()),
139 kind: def.kind,
140 entity: def.entity,
141 label: def.label.clone(),
142 additional_labels: def.additional_labels.clone(),
143 properties: def.properties.clone(),
144 options: def.options.clone(),
145 },
146 true,
147 )
148 .map_err(|e| SnapshotError::Decode(format!("index `{}`: {e}", def.name)))?;
149 }
150
151 for snap in payload.vector_indexes {
161 let entity = snap.entity;
162 let mut registry = rebuilt.vector_indexes_write(entity);
163 if !registry.restore_snapshot(snap) {
164 }
168 }
169
170 for def in payload.constraints {
173 rebuilt
174 .register_constraint(
175 crate::memory::ConstraintRequest {
176 name: def.name.clone(),
177 kind: def.kind.clone(),
178 entity: def.entity,
179 label: def.label.clone(),
180 properties: def.properties.clone(),
181 },
182 true,
183 )
184 .map_err(|e| SnapshotError::Decode(format!("constraint `{}`: {e}", def.name)))?;
185 }
186
187 rebuilt.recorder = self.recorder.take();
191 *self = rebuilt;
192
193 Ok(meta)
194 }
195}
196
197fn validate_payload_ids(payload: &SnapshotPayload) -> Result<(), SnapshotError> {
198 validate_next_id("node", payload.next_node_id)?;
199 validate_next_id("relationship", payload.next_rel_id)?;
200
201 for node in &payload.nodes {
202 validate_entity_id("node", node.id, payload.next_node_id)?;
203 }
204 for rel in &payload.relationships {
205 validate_entity_id("relationship", rel.id, payload.next_rel_id)?;
206 validate_slot_id("relationship source node", rel.src)?;
207 validate_slot_id("relationship target node", rel.dst)?;
208 }
209
210 Ok(())
211}
212
213fn validate_next_id(kind: &str, next_id: u64) -> Result<(), SnapshotError> {
214 validate_slot_id(&format!("next {kind} id"), next_id)?;
215 if next_id == u64::MAX {
216 return Err(SnapshotError::Decode(format!(
217 "next {kind} id {next_id} leaves no allocatable id"
218 )));
219 }
220 Ok(())
221}
222
223fn validate_entity_id(kind: &str, id: u64, next_id: u64) -> Result<(), SnapshotError> {
224 validate_slot_id(kind, id)?;
225 if id >= next_id {
226 return Err(SnapshotError::Decode(format!(
227 "{kind} id {id} is not below next {kind} id {next_id}"
228 )));
229 }
230 Ok(())
231}
232
233fn validate_slot_id(label: &str, id: u64) -> Result<(), SnapshotError> {
234 let idx = usize::try_from(id).map_err(|_| {
235 SnapshotError::Decode(format!(
236 "{label} {id} does not fit in usize on this platform"
237 ))
238 })?;
239 idx.checked_add(1)
240 .ok_or_else(|| SnapshotError::Decode(format!("{label} {id} leaves no valid slab slot")))?;
241 Ok(())
242}