1use loro::{LoroDoc, LoroMap, LoroValue};
15
16use crate::error::{ArrayError, ArrayResult};
17use crate::schema::array_schema::ArraySchema;
18use crate::sync::hlc::{Hlc, HlcGenerator};
19use crate::sync::replica_id::ReplicaId;
20
21pub const LORO_FORMAT_VERSION: u8 = 1;
31
32pub struct SchemaDoc {
41 doc: LoroDoc,
42 schema_hlc: Hlc,
43 replica_id: ReplicaId,
44}
45
46impl SchemaDoc {
47 pub fn new(replica_id: ReplicaId) -> Self {
52 Self {
53 doc: LoroDoc::new(),
54 schema_hlc: Hlc::ZERO,
55 replica_id,
56 }
57 }
58
59 pub fn from_schema(
65 replica_id: ReplicaId,
66 schema: &ArraySchema,
67 generator: &HlcGenerator,
68 ) -> ArrayResult<Self> {
69 let mut doc_self = Self::new(replica_id);
70 doc_self.write_schema_to_doc(schema)?;
71 doc_self.schema_hlc = generator.next()?;
72 Ok(doc_self)
73 }
74
75 pub fn schema_hlc(&self) -> Hlc {
77 self.schema_hlc
78 }
79
80 pub fn replica_id(&self) -> ReplicaId {
82 self.replica_id
83 }
84
85 pub fn to_schema(&self) -> ArrayResult<ArraySchema> {
90 let root = self.doc.get_map("root");
91 let bytes = self.read_content_bytes(&root)?;
92 zerompk::from_msgpack(&bytes).map_err(|e| ArrayError::SegmentCorruption {
93 detail: format!("schema decode failed: {e}"),
94 })
95 }
96
97 pub fn export_snapshot(&self) -> ArrayResult<Vec<u8>> {
106 let loro_bytes = self.doc.export(loro::ExportMode::Snapshot).map_err(|e| {
107 ArrayError::SegmentCorruption {
108 detail: format!("loro snapshot export failed: {e}"),
109 }
110 })?;
111 let mut envelope = Vec::with_capacity(1 + loro_bytes.len());
112 envelope.push(LORO_FORMAT_VERSION);
113 envelope.extend_from_slice(&loro_bytes);
114 Ok(envelope)
115 }
116
117 pub fn import_snapshot(
125 &mut self,
126 bytes: &[u8],
127 remote_hlc: Hlc,
128 generator: &HlcGenerator,
129 ) -> ArrayResult<()> {
130 let loro_bytes = strip_envelope(bytes)?;
131 self.doc
132 .import(loro_bytes)
133 .map_err(|e| ArrayError::LoroError {
134 detail: format!("loro import failed: {e}"),
135 })?;
136 generator.observe(remote_hlc)?;
137 self.schema_hlc = generator.next()?;
138 Ok(())
139 }
140
141 pub fn import_snapshot_replicated(
152 &mut self,
153 bytes: &[u8],
154 committed_hlc: Hlc,
155 ) -> ArrayResult<()> {
156 let loro_bytes = strip_envelope(bytes)?;
157 self.doc
158 .import(loro_bytes)
159 .map_err(|e| ArrayError::LoroError {
160 detail: format!("loro import (replicated) failed: {e}"),
161 })?;
162 self.schema_hlc = committed_hlc;
163 Ok(())
164 }
165
166 pub fn replace_schema(
174 &mut self,
175 schema: &ArraySchema,
176 generator: &HlcGenerator,
177 ) -> ArrayResult<()> {
178 self.write_schema_to_doc(schema)?;
179 self.schema_hlc = generator.next()?;
180 Ok(())
181 }
182
183 fn write_schema_to_doc(&self, schema: &ArraySchema) -> ArrayResult<()> {
186 let schema_bytes =
187 zerompk::to_msgpack_vec(schema).map_err(|e| ArrayError::SegmentCorruption {
188 detail: format!("schema encode failed: {e}"),
189 })?;
190 let root: LoroMap = self.doc.get_map("root");
191 root.insert("content", LoroValue::Binary(schema_bytes.into()))
192 .map_err(|e| ArrayError::LoroError {
193 detail: format!("loro map insert failed: {e}"),
194 })?;
195 Ok(())
196 }
197
198 fn read_content_bytes(&self, root: &LoroMap) -> ArrayResult<Vec<u8>> {
199 match root.get("content") {
200 Some(loro::ValueOrContainer::Value(LoroValue::Binary(b))) => Ok(b.to_vec()),
201 Some(other) => Err(ArrayError::SegmentCorruption {
202 detail: format!("expected Binary at root[\"content\"], got {:?}", other),
203 }),
204 None => Err(ArrayError::SegmentCorruption {
205 detail: "root[\"content\"] not found".into(),
206 }),
207 }
208 }
209}
210
211fn strip_envelope(bytes: &[u8]) -> ArrayResult<&[u8]> {
218 match bytes.first() {
219 None => Err(ArrayError::SegmentCorruption {
220 detail: "loro snapshot envelope is empty".into(),
221 }),
222 Some(&v) if v != LORO_FORMAT_VERSION => Err(ArrayError::LoroSnapshotVersionMismatch {
223 expected: LORO_FORMAT_VERSION,
224 got: v,
225 }),
226 Some(_) => Ok(&bytes[1..]),
227 }
228}
229
230#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::schema::array_schema::ArraySchema;
236 use crate::schema::attr_spec::{AttrSpec, AttrType};
237 use crate::schema::cell_order::{CellOrder, TileOrder};
238 use crate::schema::dim_spec::{DimSpec, DimType};
239 use crate::sync::hlc::HlcGenerator;
240 use crate::sync::replica_id::ReplicaId;
241 use crate::types::domain::{Domain, DomainBound};
242
243 fn replica(id: u64) -> ReplicaId {
244 ReplicaId::new(id)
245 }
246
247 fn generator(id: u64) -> HlcGenerator {
248 HlcGenerator::new(replica(id))
249 }
250
251 fn simple_schema(name: &str) -> ArraySchema {
252 ArraySchema {
253 name: name.into(),
254 dims: vec![DimSpec::new(
255 "x",
256 DimType::Int64,
257 Domain::new(DomainBound::Int64(0), DomainBound::Int64(99)),
258 )],
259 attrs: vec![AttrSpec::new("v", AttrType::Float64, true)],
260 tile_extents: vec![10],
261 cell_order: CellOrder::RowMajor,
262 tile_order: TileOrder::RowMajor,
263 }
264 }
265
266 #[test]
267 fn from_schema_then_to_schema_roundtrips() {
268 let g = generator(1);
269 let schema = simple_schema("arr");
270 let doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
271 let back = doc.to_schema().unwrap();
272 assert_eq!(schema, back);
273 assert!(doc.schema_hlc() > Hlc::ZERO);
274 }
275
276 #[test]
277 fn replace_schema_bumps_hlc() {
278 let g = generator(1);
279 let schema = simple_schema("arr");
280 let mut doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
281 let hlc_before = doc.schema_hlc();
282
283 let schema2 = simple_schema("arr2");
284 doc.replace_schema(&schema2, &g).unwrap();
285 assert!(doc.schema_hlc() > hlc_before);
286 assert_eq!(doc.to_schema().unwrap(), schema2);
287 }
288
289 #[test]
290 fn export_then_import_converges() {
291 let g_a = generator(1);
292 let schema = simple_schema("shared");
293 let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
294 let snapshot = doc_a.export_snapshot().unwrap();
295
296 let g_b = generator(2);
297 let mut doc_b = SchemaDoc::new(replica(2));
298 doc_b
299 .import_snapshot(&snapshot, doc_a.schema_hlc(), &g_b)
300 .unwrap();
301
302 assert_eq!(doc_a.to_schema().unwrap(), doc_b.to_schema().unwrap());
303 }
304
305 #[test]
306 fn import_observes_remote_hlc() {
307 let g_a = generator(1);
308 let schema = simple_schema("x");
309 let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
310 let remote_hlc = doc_a.schema_hlc();
311 let snapshot = doc_a.export_snapshot().unwrap();
312
313 let g_b = generator(2);
314 let mut doc_b = SchemaDoc::new(replica(2));
315 doc_b.import_snapshot(&snapshot, remote_hlc, &g_b).unwrap();
316
317 doc_b.replace_schema(&simple_schema("x2"), &g_b).unwrap();
319 assert!(doc_b.schema_hlc() > remote_hlc);
320 }
321
322 #[test]
323 fn import_garbage_errors() {
324 let g = generator(1);
325 let mut doc = SchemaDoc::new(replica(1));
326 let mut bad = vec![LORO_FORMAT_VERSION];
329 bad.extend_from_slice(b"not valid loro data");
330 let result = doc.import_snapshot(&bad, Hlc::ZERO, &g);
331 assert!(result.is_err());
332 }
333
334 #[test]
335 fn export_snapshot_has_version_prefix() {
336 let g = generator(1);
337 let schema = simple_schema("arr");
338 let doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
339 let snapshot = doc.export_snapshot().unwrap();
340 assert!(!snapshot.is_empty());
341 assert_eq!(snapshot[0], LORO_FORMAT_VERSION);
342 }
343
344 #[test]
345 fn import_snapshot_rejects_wrong_version() {
346 let g_a = generator(1);
347 let schema = simple_schema("v");
348 let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
349 let mut snapshot = doc_a.export_snapshot().unwrap();
350
351 snapshot[0] = LORO_FORMAT_VERSION.wrapping_add(1);
353
354 let g_b = generator(2);
355 let mut doc_b = SchemaDoc::new(replica(2));
356 let err = doc_b
357 .import_snapshot(&snapshot, doc_a.schema_hlc(), &g_b)
358 .unwrap_err();
359 assert!(
360 matches!(
361 err,
362 ArrayError::LoroSnapshotVersionMismatch { expected, got }
363 if expected == LORO_FORMAT_VERSION && got == LORO_FORMAT_VERSION.wrapping_add(1)
364 ),
365 "unexpected error: {err}"
366 );
367 }
368
369 #[test]
370 fn import_snapshot_replicated_rejects_wrong_version() {
371 let g_a = generator(1);
372 let schema = simple_schema("v2");
373 let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
374 let mut snapshot = doc_a.export_snapshot().unwrap();
375
376 snapshot[0] = 0; let mut doc_b = SchemaDoc::new(replica(2));
379 let err = doc_b
380 .import_snapshot_replicated(&snapshot, doc_a.schema_hlc())
381 .unwrap_err();
382 assert!(
383 matches!(
384 err,
385 ArrayError::LoroSnapshotVersionMismatch { expected, got }
386 if expected == LORO_FORMAT_VERSION && got == 0
387 ),
388 "unexpected error: {err}"
389 );
390 }
391}