use loro::{LoroDoc, LoroMap, LoroValue};
use crate::error::{ArrayError, ArrayResult};
use crate::schema::array_schema::ArraySchema;
use crate::sync::hlc::{Hlc, HlcGenerator};
use crate::sync::replica_id::ReplicaId;
pub const LORO_FORMAT_VERSION: u8 = 1;
pub struct SchemaDoc {
doc: LoroDoc,
schema_hlc: Hlc,
replica_id: ReplicaId,
}
impl SchemaDoc {
pub fn new(replica_id: ReplicaId) -> Self {
Self {
doc: LoroDoc::new(),
schema_hlc: Hlc::ZERO,
replica_id,
}
}
pub fn from_schema(
replica_id: ReplicaId,
schema: &ArraySchema,
generator: &HlcGenerator,
) -> ArrayResult<Self> {
let mut doc_self = Self::new(replica_id);
doc_self.write_schema_to_doc(schema)?;
doc_self.schema_hlc = generator.next()?;
Ok(doc_self)
}
pub fn schema_hlc(&self) -> Hlc {
self.schema_hlc
}
pub fn replica_id(&self) -> ReplicaId {
self.replica_id
}
pub fn to_schema(&self) -> ArrayResult<ArraySchema> {
let root = self.doc.get_map("root");
let bytes = self.read_content_bytes(&root)?;
zerompk::from_msgpack(&bytes).map_err(|e| ArrayError::SegmentCorruption {
detail: format!("schema decode failed: {e}"),
})
}
pub fn export_snapshot(&self) -> ArrayResult<Vec<u8>> {
let loro_bytes = self.doc.export(loro::ExportMode::Snapshot).map_err(|e| {
ArrayError::SegmentCorruption {
detail: format!("loro snapshot export failed: {e}"),
}
})?;
let mut envelope = Vec::with_capacity(1 + loro_bytes.len());
envelope.push(LORO_FORMAT_VERSION);
envelope.extend_from_slice(&loro_bytes);
Ok(envelope)
}
pub fn import_snapshot(
&mut self,
bytes: &[u8],
remote_hlc: Hlc,
generator: &HlcGenerator,
) -> ArrayResult<()> {
let loro_bytes = strip_envelope(bytes)?;
self.doc
.import(loro_bytes)
.map_err(|e| ArrayError::LoroError {
detail: format!("loro import failed: {e}"),
})?;
generator.observe(remote_hlc)?;
self.schema_hlc = generator.next()?;
Ok(())
}
pub fn import_snapshot_replicated(
&mut self,
bytes: &[u8],
committed_hlc: Hlc,
) -> ArrayResult<()> {
let loro_bytes = strip_envelope(bytes)?;
self.doc
.import(loro_bytes)
.map_err(|e| ArrayError::LoroError {
detail: format!("loro import (replicated) failed: {e}"),
})?;
self.schema_hlc = committed_hlc;
Ok(())
}
pub fn replace_schema(
&mut self,
schema: &ArraySchema,
generator: &HlcGenerator,
) -> ArrayResult<()> {
self.write_schema_to_doc(schema)?;
self.schema_hlc = generator.next()?;
Ok(())
}
fn write_schema_to_doc(&self, schema: &ArraySchema) -> ArrayResult<()> {
let schema_bytes =
zerompk::to_msgpack_vec(schema).map_err(|e| ArrayError::SegmentCorruption {
detail: format!("schema encode failed: {e}"),
})?;
let root: LoroMap = self.doc.get_map("root");
root.insert("content", LoroValue::Binary(schema_bytes.into()))
.map_err(|e| ArrayError::LoroError {
detail: format!("loro map insert failed: {e}"),
})?;
Ok(())
}
fn read_content_bytes(&self, root: &LoroMap) -> ArrayResult<Vec<u8>> {
match root.get("content") {
Some(loro::ValueOrContainer::Value(LoroValue::Binary(b))) => Ok(b.to_vec()),
Some(other) => Err(ArrayError::SegmentCorruption {
detail: format!("expected Binary at root[\"content\"], got {:?}", other),
}),
None => Err(ArrayError::SegmentCorruption {
detail: "root[\"content\"] not found".into(),
}),
}
}
}
fn strip_envelope(bytes: &[u8]) -> ArrayResult<&[u8]> {
match bytes.first() {
None => Err(ArrayError::SegmentCorruption {
detail: "loro snapshot envelope is empty".into(),
}),
Some(&v) if v != LORO_FORMAT_VERSION => Err(ArrayError::LoroSnapshotVersionMismatch {
expected: LORO_FORMAT_VERSION,
got: v,
}),
Some(_) => Ok(&bytes[1..]),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::array_schema::ArraySchema;
use crate::schema::attr_spec::{AttrSpec, AttrType};
use crate::schema::cell_order::{CellOrder, TileOrder};
use crate::schema::dim_spec::{DimSpec, DimType};
use crate::sync::hlc::HlcGenerator;
use crate::sync::replica_id::ReplicaId;
use crate::types::domain::{Domain, DomainBound};
fn replica(id: u64) -> ReplicaId {
ReplicaId::new(id)
}
fn generator(id: u64) -> HlcGenerator {
HlcGenerator::new(replica(id))
}
fn simple_schema(name: &str) -> ArraySchema {
ArraySchema {
name: name.into(),
dims: vec![DimSpec::new(
"x",
DimType::Int64,
Domain::new(DomainBound::Int64(0), DomainBound::Int64(99)),
)],
attrs: vec![AttrSpec::new("v", AttrType::Float64, true)],
tile_extents: vec![10],
cell_order: CellOrder::RowMajor,
tile_order: TileOrder::RowMajor,
}
}
#[test]
fn from_schema_then_to_schema_roundtrips() {
let g = generator(1);
let schema = simple_schema("arr");
let doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
let back = doc.to_schema().unwrap();
assert_eq!(schema, back);
assert!(doc.schema_hlc() > Hlc::ZERO);
}
#[test]
fn replace_schema_bumps_hlc() {
let g = generator(1);
let schema = simple_schema("arr");
let mut doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
let hlc_before = doc.schema_hlc();
let schema2 = simple_schema("arr2");
doc.replace_schema(&schema2, &g).unwrap();
assert!(doc.schema_hlc() > hlc_before);
assert_eq!(doc.to_schema().unwrap(), schema2);
}
#[test]
fn export_then_import_converges() {
let g_a = generator(1);
let schema = simple_schema("shared");
let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
let snapshot = doc_a.export_snapshot().unwrap();
let g_b = generator(2);
let mut doc_b = SchemaDoc::new(replica(2));
doc_b
.import_snapshot(&snapshot, doc_a.schema_hlc(), &g_b)
.unwrap();
assert_eq!(doc_a.to_schema().unwrap(), doc_b.to_schema().unwrap());
}
#[test]
fn import_observes_remote_hlc() {
let g_a = generator(1);
let schema = simple_schema("x");
let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
let remote_hlc = doc_a.schema_hlc();
let snapshot = doc_a.export_snapshot().unwrap();
let g_b = generator(2);
let mut doc_b = SchemaDoc::new(replica(2));
doc_b.import_snapshot(&snapshot, remote_hlc, &g_b).unwrap();
doc_b.replace_schema(&simple_schema("x2"), &g_b).unwrap();
assert!(doc_b.schema_hlc() > remote_hlc);
}
#[test]
fn import_garbage_errors() {
let g = generator(1);
let mut doc = SchemaDoc::new(replica(1));
let mut bad = vec![LORO_FORMAT_VERSION];
bad.extend_from_slice(b"not valid loro data");
let result = doc.import_snapshot(&bad, Hlc::ZERO, &g);
assert!(result.is_err());
}
#[test]
fn export_snapshot_has_version_prefix() {
let g = generator(1);
let schema = simple_schema("arr");
let doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
let snapshot = doc.export_snapshot().unwrap();
assert!(!snapshot.is_empty());
assert_eq!(snapshot[0], LORO_FORMAT_VERSION);
}
#[test]
fn import_snapshot_rejects_wrong_version() {
let g_a = generator(1);
let schema = simple_schema("v");
let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
let mut snapshot = doc_a.export_snapshot().unwrap();
snapshot[0] = LORO_FORMAT_VERSION.wrapping_add(1);
let g_b = generator(2);
let mut doc_b = SchemaDoc::new(replica(2));
let err = doc_b
.import_snapshot(&snapshot, doc_a.schema_hlc(), &g_b)
.unwrap_err();
assert!(
matches!(
err,
ArrayError::LoroSnapshotVersionMismatch { expected, got }
if expected == LORO_FORMAT_VERSION && got == LORO_FORMAT_VERSION.wrapping_add(1)
),
"unexpected error: {err}"
);
}
#[test]
fn import_snapshot_replicated_rejects_wrong_version() {
let g_a = generator(1);
let schema = simple_schema("v2");
let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
let mut snapshot = doc_a.export_snapshot().unwrap();
snapshot[0] = 0;
let mut doc_b = SchemaDoc::new(replica(2));
let err = doc_b
.import_snapshot_replicated(&snapshot, doc_a.schema_hlc())
.unwrap_err();
assert!(
matches!(
err,
ArrayError::LoroSnapshotVersionMismatch { expected, got }
if expected == LORO_FORMAT_VERSION && got == 0
),
"unexpected error: {err}"
);
}
}