Skip to main content

modelvault_core/
checkpoint.rs

1//! Checkpoint payloads: persisted logical state snapshots to accelerate open/replay.
2
3use std::collections::{BTreeMap, HashMap};
4
5use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
6use crate::db::build_non_pk_values_in_schema_order;
7use crate::error::{DbError, FormatError, SchemaError};
8use crate::index::{decode_index_payload, encode_index_payload, IndexEntry, IndexState};
9use crate::record::{
10    encode_record_payload_v2, encode_record_payload_v3, non_pk_defs_in_order, RowValue, ScalarValue,
11};
12use crate::schema::CollectionId;
13
14use crate::db::LatestMap;
15
16pub const CHECKPOINT_VERSION_V0: u16 = 0;
17
18#[derive(Debug, Clone)]
19pub struct CheckpointV0 {
20    pub replay_from_offset: u64,
21    pub catalog_records: Vec<CatalogRecordWire>,
22    pub record_payloads: Vec<Vec<u8>>,
23    pub index_entries: Vec<IndexEntry>,
24}
25
26pub fn encode_checkpoint_payload_v0(cp: &CheckpointV0) -> Vec<u8> {
27    #[cfg(feature = "tracing")]
28    tracing::debug!(
29        catalog_records = cp.catalog_records.len(),
30        record_payloads = cp.record_payloads.len(),
31        index_entries = cp.index_entries.len(),
32        "encode_checkpoint_payload_v0"
33    );
34    // NOTE: caller is responsible for setting replay_from_offset (we may patch it later).
35    let mut out = Vec::new();
36    out.extend_from_slice(&CHECKPOINT_VERSION_V0.to_le_bytes());
37    out.extend_from_slice(&cp.replay_from_offset.to_le_bytes());
38
39    out.extend_from_slice(&(cp.catalog_records.len() as u32).to_le_bytes());
40    for r in &cp.catalog_records {
41        let b = encode_catalog_payload(r);
42        out.extend_from_slice(&(b.len() as u32).to_le_bytes());
43        out.extend_from_slice(b.as_slice());
44    }
45
46    out.extend_from_slice(&(cp.record_payloads.len() as u32).to_le_bytes());
47    for b in &cp.record_payloads {
48        out.extend_from_slice(&(b.len() as u32).to_le_bytes());
49        out.extend_from_slice(b.as_slice());
50    }
51
52    let idx_blob = encode_index_payload(&cp.index_entries);
53    out.extend_from_slice(&(idx_blob.len() as u32).to_le_bytes());
54    out.extend_from_slice(&idx_blob);
55
56    out
57}
58
59pub fn decode_checkpoint_payload(bytes: &[u8]) -> Result<CheckpointV0, DbError> {
60    #[cfg(feature = "tracing")]
61    let _span = tracing::info_span!("decode_checkpoint_payload", bytes = bytes.len()).entered();
62    let mut cur = Cursor::new(bytes);
63    let ver = cur.take_u16()?;
64    if ver != CHECKPOINT_VERSION_V0 {
65        return Err(DbError::Format(FormatError::UnsupportedVersion {
66            major: 0,
67            minor: ver,
68        }));
69    }
70    let replay_from_offset = cur.take_u64()?;
71
72    let n_catalog = cur.take_u32()? as usize;
73    let mut catalog_records = Vec::with_capacity(n_catalog.min(1024));
74    for _ in 0..n_catalog {
75        let n = cur.take_u32()? as usize;
76        let b = cur.take_bytes(n)?;
77        let rec = crate::catalog::decode_catalog_payload(&b)?;
78        catalog_records.push(rec);
79    }
80
81    let n_records = cur.take_u32()? as usize;
82    let mut record_payloads = Vec::with_capacity(n_records.min(1024));
83    for _ in 0..n_records {
84        let n = cur.take_u32()? as usize;
85        record_payloads.push(cur.take_bytes(n)?);
86    }
87
88    let idx_blob_len = cur.take_u32()? as usize;
89    let idx_blob = cur.take_bytes(idx_blob_len)?;
90    let index_entries = decode_index_payload(&idx_blob)?;
91
92    if cur.remaining() != 0 {
93        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
94            message: "trailing bytes in checkpoint payload".to_string(),
95        }));
96    }
97
98    let cp = CheckpointV0 {
99        replay_from_offset,
100        catalog_records,
101        record_payloads,
102        index_entries,
103    };
104    #[cfg(feature = "tracing")]
105    tracing::info!(
106        replay_from_offset = cp.replay_from_offset,
107        catalog_records = cp.catalog_records.len(),
108        record_payloads = cp.record_payloads.len(),
109        index_entries = cp.index_entries.len(),
110        "decode_checkpoint_payload_ok"
111    );
112    Ok(cp)
113}
114
115/// Build a checkpoint representation from current in-memory engine state.
116///
117/// This encodes the *current* catalog/schema only (ModelVault validates record segments against the
118/// current schema version during replay).
119pub fn checkpoint_from_state(
120    catalog: &Catalog,
121    latest: &LatestMap,
122    indexes: &IndexState,
123) -> Result<CheckpointV0, DbError> {
124    #[cfg(feature = "tracing")]
125    let _span = tracing::info_span!("checkpoint_from_state").entered();
126    let mut catalog_records: Vec<CatalogRecordWire> = Vec::new();
127    let mut cols = catalog.collections();
128    cols.sort_by_key(|c| c.id.0);
129    for c in &cols {
130        let pk = c
131            .primary_field
132            .as_deref()
133            .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
134                collection_id: c.id.0,
135            }))?;
136        catalog_records.push(CatalogRecordWire::CreateCollection {
137            collection_id: c.id.0,
138            name: c.name.clone(),
139            schema_version: 1,
140            fields: c.fields.clone(),
141            indexes: c.indexes.clone(),
142            primary_field: Some(pk.to_string()),
143        });
144        for v in 2..=c.current_version.0 {
145            catalog_records.push(CatalogRecordWire::NewSchemaVersion {
146                collection_id: c.id.0,
147                schema_version: v,
148                fields: c.fields.clone(),
149                indexes: c.indexes.clone(),
150            });
151        }
152    }
153
154    // Encode latest rows as v2/v3 record payloads (insert op semantics).
155    let mut record_payloads: Vec<Vec<u8>> = Vec::with_capacity(latest.len().min(1_000_000));
156    for ((cid, _pk_key), row) in latest.iter() {
157        let col = catalog
158            .get(CollectionId(*cid))
159            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: *cid }))?;
160        let pk_name =
161            col.primary_field
162                .as_deref()
163                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
164                    collection_id: col.id.0,
165                }))?;
166        let pk_def = col
167            .fields
168            .iter()
169            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
170            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
171                name: pk_name.to_string(),
172            }))?;
173        let pk_cell = row
174            .get(pk_name)
175            .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
176                name: pk_name.to_string(),
177            }))?;
178        let pk_scalar: ScalarValue = pk_cell.clone().into_scalar()?;
179
180        let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
181        let non_pk_defs = if has_multi_segment_schema {
182            col.fields
183                .iter()
184                .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
185                .collect::<Vec<_>>()
186        } else {
187            non_pk_defs_in_order(&col.fields, pk_name)
188        };
189        let ordered = build_non_pk_values_in_schema_order(row, &non_pk_defs)?;
190
191        record_payloads.push(if has_multi_segment_schema {
192            encode_record_payload_v3(
193                *cid,
194                col.current_version.0,
195                &pk_scalar,
196                &pk_def.ty,
197                &ordered,
198            )?
199        } else {
200            encode_record_payload_v2(
201                *cid,
202                col.current_version.0,
203                &pk_scalar,
204                &pk_def.ty,
205                &ordered,
206            )?
207        });
208    }
209
210    let index_entries = indexes.entries_for_checkpoint();
211
212    let cp = CheckpointV0 {
213        replay_from_offset: 0,
214        catalog_records,
215        record_payloads,
216        index_entries,
217    };
218    #[cfg(feature = "tracing")]
219    tracing::info!(
220        catalog_records = cp.catalog_records.len(),
221        record_payloads = cp.record_payloads.len(),
222        index_entries = cp.index_entries.len(),
223        "checkpoint_from_state_ok"
224    );
225    Ok(cp)
226}
227
228/// Decode a checkpoint payload into engine state (catalog/latest/indexes).
229pub fn state_from_checkpoint_payload(
230    payload: &[u8],
231) -> Result<(u64, Catalog, LatestMap, IndexState), DbError> {
232    let cp = decode_checkpoint_payload(payload)?;
233
234    let mut catalog = Catalog::default();
235    for r in &cp.catalog_records {
236        catalog.apply_record(r.clone())?;
237    }
238
239    let mut latest: LatestMap = HashMap::new();
240    for rec in &cp.record_payloads {
241        apply_checkpoint_record_payload(rec, &catalog, &mut latest)?;
242    }
243
244    let mut indexes = IndexState::default();
245    for e in cp.index_entries {
246        indexes.apply(e)?;
247    }
248
249    Ok((cp.replay_from_offset, catalog, latest, indexes))
250}
251
252fn apply_checkpoint_record_payload(
253    payload: &[u8],
254    catalog: &Catalog,
255    latest: &mut LatestMap,
256) -> Result<(), DbError> {
257    if payload.len() < 6 {
258        return Err(DbError::Format(FormatError::TruncatedRecordPayload));
259    }
260    let collection_id = u32::from_le_bytes([payload[2], payload[3], payload[4], payload[5]]);
261    let col = catalog
262        .get(CollectionId(collection_id))
263        .ok_or(DbError::Schema(SchemaError::UnknownCollection {
264            id: collection_id,
265        }))?;
266    let pk_name =
267        col.primary_field
268            .as_deref()
269            .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
270                collection_id: col.id.0,
271            }))?;
272    let pk_ty = col
273        .fields
274        .iter()
275        .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
276        .map(|f| &f.ty)
277        .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
278            name: pk_name.to_string(),
279        }))?;
280
281    let decoded = crate::record::decode_record_payload(payload, pk_name, pk_ty, &col.fields)?;
282    let pk_key = decoded.pk.canonical_key_bytes();
283    let mut full: BTreeMap<String, RowValue> = BTreeMap::new();
284    full.insert(pk_name.to_string(), RowValue::from_scalar(decoded.pk));
285    for (k, v) in decoded.fields {
286        full.insert(k, v);
287    }
288    latest.insert((collection_id, pk_key), full);
289    Ok(())
290}
291
292struct Cursor<'a> {
293    bytes: &'a [u8],
294    pos: usize,
295}
296
297impl<'a> Cursor<'a> {
298    fn new(bytes: &'a [u8]) -> Self {
299        Self { bytes, pos: 0 }
300    }
301
302    fn remaining(&self) -> usize {
303        self.bytes.len().saturating_sub(self.pos)
304    }
305
306    fn take_u16(&mut self) -> Result<u16, DbError> {
307        if self.remaining() < 2 {
308            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
309                message: "unexpected eof".to_string(),
310            }));
311        }
312        let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
313        self.pos += 2;
314        Ok(v)
315    }
316
317    fn take_u32(&mut self) -> Result<u32, DbError> {
318        if self.remaining() < 4 {
319            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
320                message: "unexpected eof".to_string(),
321            }));
322        }
323        let v = u32::from_le_bytes([
324            self.bytes[self.pos],
325            self.bytes[self.pos + 1],
326            self.bytes[self.pos + 2],
327            self.bytes[self.pos + 3],
328        ]);
329        self.pos += 4;
330        Ok(v)
331    }
332
333    fn take_u64(&mut self) -> Result<u64, DbError> {
334        if self.remaining() < 8 {
335            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
336                message: "unexpected eof".to_string(),
337            }));
338        }
339        let v = u64::from_le_bytes(self.bytes[self.pos..self.pos + 8].try_into().unwrap());
340        self.pos += 8;
341        Ok(v)
342    }
343
344    fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
345        if self.remaining() < n {
346            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
347                message: "unexpected eof".to_string(),
348            }));
349        }
350        let slice = &self.bytes[self.pos..self.pos + n];
351        self.pos += n;
352        Ok(slice.to_vec())
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    include!(concat!(
359        env!("CARGO_MANIFEST_DIR"),
360        "/tests/unit/src_checkpoint_tests.rs"
361    ));
362}