1use std::collections::{BTreeMap, HashMap};
4
5use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
6use crate::db::build_non_pk_values_in_schema_order;
7use crate::error::{DbError, FormatError, SchemaError};
8use crate::index::{decode_index_payload, encode_index_payload, IndexEntry, IndexState};
9use crate::record::{
10 encode_record_payload_v2, encode_record_payload_v3, non_pk_defs_in_order, RowValue, ScalarValue,
11};
12use crate::schema::CollectionId;
13
14use crate::db::LatestMap;
15
16pub const CHECKPOINT_VERSION_V0: u16 = 0;
17
18#[derive(Debug, Clone)]
19pub struct CheckpointV0 {
20 pub replay_from_offset: u64,
21 pub catalog_records: Vec<CatalogRecordWire>,
22 pub record_payloads: Vec<Vec<u8>>,
23 pub index_entries: Vec<IndexEntry>,
24}
25
26pub fn encode_checkpoint_payload_v0(cp: &CheckpointV0) -> Vec<u8> {
27 #[cfg(feature = "tracing")]
28 tracing::debug!(
29 catalog_records = cp.catalog_records.len(),
30 record_payloads = cp.record_payloads.len(),
31 index_entries = cp.index_entries.len(),
32 "encode_checkpoint_payload_v0"
33 );
34 let mut out = Vec::new();
36 out.extend_from_slice(&CHECKPOINT_VERSION_V0.to_le_bytes());
37 out.extend_from_slice(&cp.replay_from_offset.to_le_bytes());
38
39 out.extend_from_slice(&(cp.catalog_records.len() as u32).to_le_bytes());
40 for r in &cp.catalog_records {
41 let b = encode_catalog_payload(r);
42 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
43 out.extend_from_slice(b.as_slice());
44 }
45
46 out.extend_from_slice(&(cp.record_payloads.len() as u32).to_le_bytes());
47 for b in &cp.record_payloads {
48 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
49 out.extend_from_slice(b.as_slice());
50 }
51
52 let idx_blob = encode_index_payload(&cp.index_entries);
53 out.extend_from_slice(&(idx_blob.len() as u32).to_le_bytes());
54 out.extend_from_slice(&idx_blob);
55
56 out
57}
58
59pub fn decode_checkpoint_payload(bytes: &[u8]) -> Result<CheckpointV0, DbError> {
60 #[cfg(feature = "tracing")]
61 let _span = tracing::info_span!("decode_checkpoint_payload", bytes = bytes.len()).entered();
62 let mut cur = Cursor::new(bytes);
63 let ver = cur.take_u16()?;
64 if ver != CHECKPOINT_VERSION_V0 {
65 return Err(DbError::Format(FormatError::UnsupportedVersion {
66 major: 0,
67 minor: ver,
68 }));
69 }
70 let replay_from_offset = cur.take_u64()?;
71
72 let n_catalog = cur.take_u32()? as usize;
73 let mut catalog_records = Vec::with_capacity(n_catalog.min(1024));
74 for _ in 0..n_catalog {
75 let n = cur.take_u32()? as usize;
76 let b = cur.take_bytes(n)?;
77 let rec = crate::catalog::decode_catalog_payload(&b)?;
78 catalog_records.push(rec);
79 }
80
81 let n_records = cur.take_u32()? as usize;
82 let mut record_payloads = Vec::with_capacity(n_records.min(1024));
83 for _ in 0..n_records {
84 let n = cur.take_u32()? as usize;
85 record_payloads.push(cur.take_bytes(n)?);
86 }
87
88 let idx_blob_len = cur.take_u32()? as usize;
89 let idx_blob = cur.take_bytes(idx_blob_len)?;
90 let index_entries = decode_index_payload(&idx_blob)?;
91
92 if cur.remaining() != 0 {
93 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
94 message: "trailing bytes in checkpoint payload".to_string(),
95 }));
96 }
97
98 let cp = CheckpointV0 {
99 replay_from_offset,
100 catalog_records,
101 record_payloads,
102 index_entries,
103 };
104 #[cfg(feature = "tracing")]
105 tracing::info!(
106 replay_from_offset = cp.replay_from_offset,
107 catalog_records = cp.catalog_records.len(),
108 record_payloads = cp.record_payloads.len(),
109 index_entries = cp.index_entries.len(),
110 "decode_checkpoint_payload_ok"
111 );
112 Ok(cp)
113}
114
115pub fn checkpoint_from_state(
120 catalog: &Catalog,
121 latest: &LatestMap,
122 indexes: &IndexState,
123) -> Result<CheckpointV0, DbError> {
124 #[cfg(feature = "tracing")]
125 let _span = tracing::info_span!("checkpoint_from_state").entered();
126 let mut catalog_records: Vec<CatalogRecordWire> = Vec::new();
127 let mut cols = catalog.collections();
128 cols.sort_by_key(|c| c.id.0);
129 for c in &cols {
130 let pk = c
131 .primary_field
132 .as_deref()
133 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
134 collection_id: c.id.0,
135 }))?;
136 catalog_records.push(CatalogRecordWire::CreateCollection {
137 collection_id: c.id.0,
138 name: c.name.clone(),
139 schema_version: 1,
140 fields: c.fields.clone(),
141 indexes: c.indexes.clone(),
142 primary_field: Some(pk.to_string()),
143 });
144 for v in 2..=c.current_version.0 {
145 catalog_records.push(CatalogRecordWire::NewSchemaVersion {
146 collection_id: c.id.0,
147 schema_version: v,
148 fields: c.fields.clone(),
149 indexes: c.indexes.clone(),
150 });
151 }
152 }
153
154 let mut record_payloads: Vec<Vec<u8>> = Vec::with_capacity(latest.len().min(1_000_000));
156 for ((cid, _pk_key), row) in latest.iter() {
157 let col = catalog
158 .get(CollectionId(*cid))
159 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: *cid }))?;
160 let pk_name =
161 col.primary_field
162 .as_deref()
163 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
164 collection_id: col.id.0,
165 }))?;
166 let pk_def = col
167 .fields
168 .iter()
169 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
170 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
171 name: pk_name.to_string(),
172 }))?;
173 let pk_cell = row
174 .get(pk_name)
175 .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
176 name: pk_name.to_string(),
177 }))?;
178 let pk_scalar: ScalarValue = pk_cell.clone().into_scalar()?;
179
180 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
181 let non_pk_defs = if has_multi_segment_schema {
182 col.fields
183 .iter()
184 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
185 .collect::<Vec<_>>()
186 } else {
187 non_pk_defs_in_order(&col.fields, pk_name)
188 };
189 let ordered = build_non_pk_values_in_schema_order(row, &non_pk_defs)?;
190
191 record_payloads.push(if has_multi_segment_schema {
192 encode_record_payload_v3(
193 *cid,
194 col.current_version.0,
195 &pk_scalar,
196 &pk_def.ty,
197 &ordered,
198 )?
199 } else {
200 encode_record_payload_v2(
201 *cid,
202 col.current_version.0,
203 &pk_scalar,
204 &pk_def.ty,
205 &ordered,
206 )?
207 });
208 }
209
210 let index_entries = indexes.entries_for_checkpoint();
211
212 let cp = CheckpointV0 {
213 replay_from_offset: 0,
214 catalog_records,
215 record_payloads,
216 index_entries,
217 };
218 #[cfg(feature = "tracing")]
219 tracing::info!(
220 catalog_records = cp.catalog_records.len(),
221 record_payloads = cp.record_payloads.len(),
222 index_entries = cp.index_entries.len(),
223 "checkpoint_from_state_ok"
224 );
225 Ok(cp)
226}
227
228pub fn state_from_checkpoint_payload(
230 payload: &[u8],
231) -> Result<(u64, Catalog, LatestMap, IndexState), DbError> {
232 let cp = decode_checkpoint_payload(payload)?;
233
234 let mut catalog = Catalog::default();
235 for r in &cp.catalog_records {
236 catalog.apply_record(r.clone())?;
237 }
238
239 let mut latest: LatestMap = HashMap::new();
240 for rec in &cp.record_payloads {
241 apply_checkpoint_record_payload(rec, &catalog, &mut latest)?;
242 }
243
244 let mut indexes = IndexState::default();
245 for e in cp.index_entries {
246 indexes.apply(e)?;
247 }
248
249 Ok((cp.replay_from_offset, catalog, latest, indexes))
250}
251
252fn apply_checkpoint_record_payload(
253 payload: &[u8],
254 catalog: &Catalog,
255 latest: &mut LatestMap,
256) -> Result<(), DbError> {
257 if payload.len() < 6 {
258 return Err(DbError::Format(FormatError::TruncatedRecordPayload));
259 }
260 let collection_id = u32::from_le_bytes([payload[2], payload[3], payload[4], payload[5]]);
261 let col = catalog
262 .get(CollectionId(collection_id))
263 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
264 id: collection_id,
265 }))?;
266 let pk_name =
267 col.primary_field
268 .as_deref()
269 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
270 collection_id: col.id.0,
271 }))?;
272 let pk_ty = col
273 .fields
274 .iter()
275 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
276 .map(|f| &f.ty)
277 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
278 name: pk_name.to_string(),
279 }))?;
280
281 let decoded = crate::record::decode_record_payload(payload, pk_name, pk_ty, &col.fields)?;
282 let pk_key = decoded.pk.canonical_key_bytes();
283 let mut full: BTreeMap<String, RowValue> = BTreeMap::new();
284 full.insert(pk_name.to_string(), RowValue::from_scalar(decoded.pk));
285 for (k, v) in decoded.fields {
286 full.insert(k, v);
287 }
288 latest.insert((collection_id, pk_key), full);
289 Ok(())
290}
291
292struct Cursor<'a> {
293 bytes: &'a [u8],
294 pos: usize,
295}
296
297impl<'a> Cursor<'a> {
298 fn new(bytes: &'a [u8]) -> Self {
299 Self { bytes, pos: 0 }
300 }
301
302 fn remaining(&self) -> usize {
303 self.bytes.len().saturating_sub(self.pos)
304 }
305
306 fn take_u16(&mut self) -> Result<u16, DbError> {
307 if self.remaining() < 2 {
308 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
309 message: "unexpected eof".to_string(),
310 }));
311 }
312 let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
313 self.pos += 2;
314 Ok(v)
315 }
316
317 fn take_u32(&mut self) -> Result<u32, DbError> {
318 if self.remaining() < 4 {
319 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
320 message: "unexpected eof".to_string(),
321 }));
322 }
323 let v = u32::from_le_bytes([
324 self.bytes[self.pos],
325 self.bytes[self.pos + 1],
326 self.bytes[self.pos + 2],
327 self.bytes[self.pos + 3],
328 ]);
329 self.pos += 4;
330 Ok(v)
331 }
332
333 fn take_u64(&mut self) -> Result<u64, DbError> {
334 if self.remaining() < 8 {
335 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
336 message: "unexpected eof".to_string(),
337 }));
338 }
339 let v = u64::from_le_bytes(self.bytes[self.pos..self.pos + 8].try_into().unwrap());
340 self.pos += 8;
341 Ok(v)
342 }
343
344 fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
345 if self.remaining() < n {
346 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
347 message: "unexpected eof".to_string(),
348 }));
349 }
350 let slice = &self.bytes[self.pos..self.pos + n];
351 self.pos += n;
352 Ok(slice.to_vec())
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 include!(concat!(
359 env!("CARGO_MANIFEST_DIR"),
360 "/tests/unit/src_checkpoint_tests.rs"
361 ));
362}