Skip to main content

crdt_store/
db.rs

1//! High-level database API for CRDT persistence.
2//!
3//! `CrdtDb` wraps a storage backend with automatic versioning and migration.
4//! It serializes CRDTs with a version envelope so that schema changes are
5//! handled transparently on read.
6//!
7//! # Example
8//!
9//! ```
10//! use crdt_store::{CrdtDb, CrdtVersioned, MemoryStore};
11//! use serde::{Serialize, Deserialize};
12//!
13//! #[derive(Debug, PartialEq, Serialize, Deserialize)]
14//! struct Sensor { temperature: f32 }
15//!
16//! impl CrdtVersioned for Sensor {
17//!     const SCHEMA_VERSION: u8 = 1;
18//! }
19//!
20//! let mut db = CrdtDb::with_store(MemoryStore::new());
21//! db.save("s1", &Sensor { temperature: 22.5 }).unwrap();
22//!
23//! let loaded: Option<Sensor> = db.load("s1").unwrap();
24//! assert_eq!(loaded.unwrap().temperature, 22.5);
25//! ```
26
27use alloc::string::{String, ToString};
28use alloc::vec::Vec;
29use core::fmt;
30
31use crdt_migrate::{MigrationConfig, MigrationEngine, MigrationStep, VersionedEnvelope};
32use serde::{de::DeserializeOwned, Serialize};
33
34use crate::traits::{EventStore, Snapshot, StateStore, StoredEvent};
35
36/// The default namespace used when none is specified.
37const DEFAULT_NAMESPACE: &str = "default";
38
39/// Error type for `CrdtDb` operations.
40#[derive(Debug)]
41pub enum DbError<E: fmt::Debug + fmt::Display> {
42    /// Error from the underlying storage backend.
43    Store(E),
44    /// Serialization failed.
45    Serialize(String),
46    /// Deserialization failed.
47    Deserialize(String),
48    /// Version envelope error.
49    Envelope(String),
50    /// Migration error.
51    Migration(String),
52}
53
54impl<E: fmt::Debug + fmt::Display> fmt::Display for DbError<E> {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::Store(e) => write!(f, "store error: {e}"),
58            Self::Serialize(msg) => write!(f, "serialization error: {msg}"),
59            Self::Deserialize(msg) => write!(f, "deserialization error: {msg}"),
60            Self::Envelope(msg) => write!(f, "envelope error: {msg}"),
61            Self::Migration(msg) => write!(f, "migration error: {msg}"),
62        }
63    }
64}
65
66/// High-level CRDT database with automatic versioning and migration.
67///
68/// Wraps any [`StateStore`] backend. When data is saved, it is serialized
69/// with postcard and wrapped in a [`VersionedEnvelope`]. When loaded, the
70/// version is checked and migrations run automatically if needed.
71pub struct CrdtDb<S: StateStore> {
72    store: S,
73    migration_engine: MigrationEngine,
74    config: CrdtDbConfig,
75}
76
77/// Configuration for `CrdtDb`.
78#[derive(Debug, Clone)]
79pub struct CrdtDbConfig {
80    /// Migration configuration.
81    pub migration: MigrationConfig,
82    /// Default namespace for save/load without explicit namespace.
83    pub default_namespace: String,
84}
85
86impl Default for CrdtDbConfig {
87    fn default() -> Self {
88        Self {
89            migration: MigrationConfig::default(),
90            default_namespace: DEFAULT_NAMESPACE.to_string(),
91        }
92    }
93}
94
95/// Builder for constructing a `CrdtDb` with custom configuration.
96pub struct CrdtDbBuilder<S: StateStore> {
97    store: S,
98    migration_engine: MigrationEngine,
99    config: CrdtDbConfig,
100}
101
102impl<S: StateStore> CrdtDbBuilder<S> {
103    /// Set the migration configuration.
104    pub fn migration_config(mut self, config: MigrationConfig) -> Self {
105        self.config.migration = config;
106        self
107    }
108
109    /// Set the default namespace.
110    pub fn default_namespace(mut self, ns: &str) -> Self {
111        self.config.default_namespace = ns.to_string();
112        self
113    }
114
115    /// Register a migration step.
116    pub fn register_migration(mut self, step: alloc::boxed::Box<dyn MigrationStep>) -> Self {
117        self.migration_engine.register(step);
118        self
119    }
120
121    /// Build the `CrdtDb`.
122    pub fn build(self) -> CrdtDb<S> {
123        CrdtDb {
124            store: self.store,
125            migration_engine: self.migration_engine,
126            config: self.config,
127        }
128    }
129}
130
131impl<S: StateStore> CrdtDb<S> {
132    /// Create a `CrdtDb` wrapping the given store with default config.
133    ///
134    /// The current schema version defaults to 1 (no migrations registered).
135    pub fn with_store(store: S) -> Self {
136        Self {
137            store,
138            migration_engine: MigrationEngine::new(1),
139            config: CrdtDbConfig::default(),
140        }
141    }
142
143    /// Create a builder for advanced configuration.
144    ///
145    /// `current_version` is the schema version your app currently writes.
146    pub fn builder(store: S, current_version: u32) -> CrdtDbBuilder<S> {
147        CrdtDbBuilder {
148            store,
149            migration_engine: MigrationEngine::new(current_version),
150            config: CrdtDbConfig::default(),
151        }
152    }
153
154    /// Get a reference to the underlying store.
155    pub fn store(&self) -> &S {
156        &self.store
157    }
158
159    /// Get a mutable reference to the underlying store.
160    pub fn store_mut(&mut self) -> &mut S {
161        &mut self.store
162    }
163
164    /// Get the migration engine.
165    pub fn migration_engine(&self) -> &MigrationEngine {
166        &self.migration_engine
167    }
168
169    /// Save a serializable value in the default namespace.
170    ///
171    /// The value is serialized with postcard and wrapped in a version envelope.
172    pub fn save<T: Serialize + CrdtVersioned>(
173        &mut self,
174        key: &str,
175        value: &T,
176    ) -> Result<(), DbError<S::Error>> {
177        let ns = self.config.default_namespace.clone();
178        self.save_ns(&ns, key, value)
179    }
180
181    /// Save a serializable value in a specific namespace.
182    pub fn save_ns<T: Serialize + CrdtVersioned>(
183        &mut self,
184        namespace: &str,
185        key: &str,
186        value: &T,
187    ) -> Result<(), DbError<S::Error>> {
188        let payload =
189            postcard::to_allocvec(value).map_err(|e| DbError::Serialize(e.to_string()))?;
190
191        let envelope =
192            VersionedEnvelope::new(T::SCHEMA_VERSION, crdt_migrate::CrdtType::Custom, payload);
193
194        self.store
195            .put(namespace, key, &envelope.to_bytes())
196            .map_err(DbError::Store)
197    }
198
199    /// Load a deserializable value from the default namespace.
200    ///
201    /// If the stored data has an older version, migrations are applied
202    /// automatically before deserialization.
203    pub fn load<T: DeserializeOwned + CrdtVersioned>(
204        &mut self,
205        key: &str,
206    ) -> Result<Option<T>, DbError<S::Error>> {
207        let ns = self.config.default_namespace.clone();
208        self.load_ns(&ns, key)
209    }
210
211    /// Load a deserializable value from a specific namespace.
212    pub fn load_ns<T: DeserializeOwned + CrdtVersioned>(
213        &mut self,
214        namespace: &str,
215        key: &str,
216    ) -> Result<Option<T>, DbError<S::Error>> {
217        let raw = self.store.get(namespace, key).map_err(DbError::Store)?;
218
219        let raw = match raw {
220            Some(data) => data,
221            None => return Ok(None),
222        };
223
224        let payload = if VersionedEnvelope::is_versioned(&raw) {
225            let envelope = VersionedEnvelope::from_bytes(&raw)
226                .map_err(|e| DbError::Envelope(e.to_string()))?;
227
228            let stored_version = envelope.version as u32;
229            let current_version = T::SCHEMA_VERSION as u32;
230
231            if stored_version != current_version
232                && self.migration_engine.needs_migration(stored_version)
233            {
234                let migrated = self
235                    .migration_engine
236                    .migrate_to_current(&envelope.payload, stored_version)
237                    .map_err(|e| DbError::Migration(e.to_string()))?;
238
239                // Write back migrated data if configured
240                if self.config.migration.write_back_on_read {
241                    let new_envelope = VersionedEnvelope::new(
242                        T::SCHEMA_VERSION,
243                        envelope.crdt_type,
244                        migrated.clone(),
245                    );
246                    self.store
247                        .put(namespace, key, &new_envelope.to_bytes())
248                        .map_err(DbError::Store)?;
249                }
250
251                migrated
252            } else {
253                envelope.payload
254            }
255        } else {
256            // Raw data without envelope — treat as current version
257            raw
258        };
259
260        let value: T =
261            postcard::from_bytes(&payload).map_err(|e| DbError::Deserialize(e.to_string()))?;
262        Ok(Some(value))
263    }
264
265    /// Delete a value from the default namespace.
266    pub fn delete(&mut self, key: &str) -> Result<(), DbError<S::Error>> {
267        let ns = self.config.default_namespace.clone();
268        self.delete_ns(&ns, key)
269    }
270
271    /// Delete a value from a specific namespace.
272    pub fn delete_ns(&mut self, namespace: &str, key: &str) -> Result<(), DbError<S::Error>> {
273        self.store.delete(namespace, key).map_err(DbError::Store)
274    }
275
276    /// List all keys in the default namespace.
277    pub fn list_keys(&self) -> Result<Vec<String>, DbError<S::Error>> {
278        self.store
279            .list_keys(&self.config.default_namespace)
280            .map_err(DbError::Store)
281    }
282
283    /// List all keys in a specific namespace.
284    pub fn list_keys_ns(&self, namespace: &str) -> Result<Vec<String>, DbError<S::Error>> {
285        self.store.list_keys(namespace).map_err(DbError::Store)
286    }
287
288    /// Check if a key exists in the default namespace.
289    pub fn exists(&self, key: &str) -> Result<bool, DbError<S::Error>> {
290        self.store
291            .exists(&self.config.default_namespace, key)
292            .map_err(DbError::Store)
293    }
294
295    /// Check if a key exists in a specific namespace.
296    pub fn exists_ns(&self, namespace: &str, key: &str) -> Result<bool, DbError<S::Error>> {
297        self.store.exists(namespace, key).map_err(DbError::Store)
298    }
299}
300
301/// Event sourcing methods — available when the backend supports event logs.
302impl<S: EventStore> CrdtDb<S> {
303    /// Append a serializable event to the log.
304    ///
305    /// Returns the assigned sequence number.
306    pub fn append_event<T: Serialize>(
307        &mut self,
308        namespace: &str,
309        entity_id: &str,
310        event: &T,
311        timestamp: u64,
312        node_id: &str,
313    ) -> Result<u64, DbError<S::Error>> {
314        let data = postcard::to_allocvec(event).map_err(|e| DbError::Serialize(e.to_string()))?;
315
316        self.store
317            .append_event(namespace, entity_id, &data, timestamp, node_id)
318            .map_err(DbError::Store)
319    }
320
321    /// Read events since a given sequence number.
322    pub fn events_since(
323        &self,
324        namespace: &str,
325        entity_id: &str,
326        since_sequence: u64,
327    ) -> Result<Vec<StoredEvent>, DbError<S::Error>> {
328        self.store
329            .events_since(namespace, entity_id, since_sequence)
330            .map_err(DbError::Store)
331    }
332
333    /// Get the event count for an entity.
334    pub fn event_count(&self, namespace: &str, entity_id: &str) -> Result<u64, DbError<S::Error>> {
335        self.store
336            .event_count(namespace, entity_id)
337            .map_err(DbError::Store)
338    }
339
340    /// Save a snapshot for an entity.
341    pub fn save_snapshot(
342        &mut self,
343        namespace: &str,
344        entity_id: &str,
345        state: &[u8],
346        at_sequence: u64,
347        version: u8,
348    ) -> Result<(), DbError<S::Error>> {
349        self.store
350            .save_snapshot(namespace, entity_id, state, at_sequence, version)
351            .map_err(DbError::Store)
352    }
353
354    /// Load the latest snapshot for an entity.
355    pub fn load_snapshot(
356        &self,
357        namespace: &str,
358        entity_id: &str,
359    ) -> Result<Option<Snapshot>, DbError<S::Error>> {
360        self.store
361            .load_snapshot(namespace, entity_id)
362            .map_err(DbError::Store)
363    }
364
365    /// Compact an entity: save snapshot + truncate old events.
366    ///
367    /// `state` is the current serialized state. Events before the latest
368    /// sequence number will be removed.
369    pub fn compact(
370        &mut self,
371        namespace: &str,
372        entity_id: &str,
373        state: &[u8],
374        version: u8,
375    ) -> Result<u64, DbError<S::Error>> {
376        // Get current max sequence
377        let events = self
378            .store
379            .events_since(namespace, entity_id, 0)
380            .map_err(DbError::Store)?;
381
382        let max_seq = events.last().map(|e| e.sequence).unwrap_or(0);
383
384        if max_seq == 0 {
385            return Ok(0);
386        }
387
388        // Save snapshot at current sequence
389        self.store
390            .save_snapshot(namespace, entity_id, state, max_seq, version)
391            .map_err(DbError::Store)?;
392
393        // Truncate old events (keep the latest one as boundary)
394        self.store
395            .truncate_events_before(namespace, entity_id, max_seq)
396            .map_err(DbError::Store)
397    }
398}
399
400/// Marker trait for types that carry schema version information.
401///
402/// Implement this for your CRDT types to enable versioned save/load.
403/// The version is embedded in the stored data for automatic migration.
404///
405/// # Example
406///
407/// ```
408/// use crdt_store::CrdtVersioned;
409/// use serde::{Serialize, Deserialize};
410///
411/// #[derive(Serialize, Deserialize)]
412/// struct SensorReading {
413///     temperature: f32,
414/// }
415///
416/// impl CrdtVersioned for SensorReading {
417///     const SCHEMA_VERSION: u8 = 1;
418/// }
419/// ```
420pub trait CrdtVersioned {
421    /// The current schema version for this type.
422    const SCHEMA_VERSION: u8;
423}
424
425// Blanket impl for crdt-kit types that implement Versioned
426impl<T: crdt_kit::Versioned> CrdtVersioned for T {
427    const SCHEMA_VERSION: u8 = T::CURRENT_VERSION;
428}
429
430/// Helper to deserialize an event payload.
431pub fn deserialize_event<T: DeserializeOwned>(event: &StoredEvent) -> Result<T, String> {
432    postcard::from_bytes(&event.data).map_err(|e| e.to_string())
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use crate::MemoryStore;
439    use serde::{Deserialize, Serialize};
440
441    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
442    struct SensorV1 {
443        temperature: f32,
444    }
445
446    impl CrdtVersioned for SensorV1 {
447        const SCHEMA_VERSION: u8 = 1;
448    }
449
450    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
451    struct SensorV2 {
452        temperature: f32,
453        humidity: Option<f32>,
454    }
455
456    impl CrdtVersioned for SensorV2 {
457        const SCHEMA_VERSION: u8 = 2;
458    }
459
460    #[test]
461    fn save_and_load_basic() {
462        let mut db = CrdtDb::with_store(MemoryStore::new());
463
464        let sensor = SensorV1 { temperature: 22.5 };
465        db.save("s1", &sensor).unwrap();
466
467        let loaded: Option<SensorV1> = db.load("s1").unwrap();
468        assert_eq!(loaded, Some(sensor));
469    }
470
471    #[test]
472    fn load_nonexistent_returns_none() {
473        let mut db = CrdtDb::with_store(MemoryStore::new());
474
475        let loaded: Option<SensorV1> = db.load("nope").unwrap();
476        assert_eq!(loaded, None);
477    }
478
479    #[test]
480    fn save_overwrites() {
481        let mut db = CrdtDb::with_store(MemoryStore::new());
482
483        db.save("s1", &SensorV1 { temperature: 20.0 }).unwrap();
484        db.save("s1", &SensorV1 { temperature: 25.0 }).unwrap();
485
486        let loaded: Option<SensorV1> = db.load("s1").unwrap();
487        assert_eq!(loaded.unwrap().temperature, 25.0);
488    }
489
490    #[test]
491    fn namespace_isolation() {
492        let mut db = CrdtDb::with_store(MemoryStore::new());
493
494        let s1 = SensorV1 { temperature: 10.0 };
495        let s2 = SensorV1 { temperature: 20.0 };
496
497        db.save_ns("indoor", "s1", &s1).unwrap();
498        db.save_ns("outdoor", "s1", &s2).unwrap();
499
500        let indoor: SensorV1 = db.load_ns("indoor", "s1").unwrap().unwrap();
501        let outdoor: SensorV1 = db.load_ns("outdoor", "s1").unwrap().unwrap();
502
503        assert_eq!(indoor.temperature, 10.0);
504        assert_eq!(outdoor.temperature, 20.0);
505    }
506
507    #[test]
508    fn delete_removes_value() {
509        let mut db = CrdtDb::with_store(MemoryStore::new());
510
511        db.save("s1", &SensorV1 { temperature: 22.5 }).unwrap();
512        assert!(db.exists("s1").unwrap());
513
514        db.delete("s1").unwrap();
515        assert!(!db.exists("s1").unwrap());
516
517        let loaded: Option<SensorV1> = db.load("s1").unwrap();
518        assert_eq!(loaded, None);
519    }
520
521    #[test]
522    fn list_keys_works() {
523        let mut db = CrdtDb::with_store(MemoryStore::new());
524
525        db.save("b", &SensorV1 { temperature: 1.0 }).unwrap();
526        db.save("a", &SensorV1 { temperature: 2.0 }).unwrap();
527
528        let keys = db.list_keys().unwrap();
529        assert_eq!(keys.len(), 2);
530    }
531
532    #[test]
533    fn migration_on_load() {
534        use alloc::boxed::Box;
535        use crdt_migrate::MigrationError;
536
537        // Migration: SensorV1 -> SensorV2 (add humidity=None)
538        struct SensorMigration;
539
540        impl MigrationStep for SensorMigration {
541            fn source_version(&self) -> u32 {
542                1
543            }
544            fn target_version(&self) -> u32 {
545                2
546            }
547            fn migrate(&self, data: &[u8]) -> Result<Vec<u8>, MigrationError> {
548                let v1: SensorV1 = postcard::from_bytes(data)
549                    .map_err(|e| MigrationError::Deserialization(e.to_string()))?;
550                let v2 = SensorV2 {
551                    temperature: v1.temperature,
552                    humidity: None,
553                };
554                postcard::to_allocvec(&v2).map_err(|e| MigrationError::Serialization(e.to_string()))
555            }
556        }
557
558        let mut db = CrdtDb::builder(MemoryStore::new(), 2)
559            .register_migration(Box::new(SensorMigration))
560            .build();
561
562        // Save as v1 by writing raw envelope bytes
563        let v1 = SensorV1 { temperature: 22.5 };
564        let payload = postcard::to_allocvec(&v1).unwrap();
565        let envelope = VersionedEnvelope::new(1, crdt_migrate::CrdtType::Custom, payload);
566        db.store_mut()
567            .put("default", "s1", &envelope.to_bytes())
568            .unwrap();
569
570        // Load as v2 — should migrate automatically
571        let loaded: Option<SensorV2> = db.load("s1").unwrap();
572        let v2 = loaded.unwrap();
573        assert_eq!(v2.temperature, 22.5);
574        assert_eq!(v2.humidity, None);
575
576        // Verify write-back: reading raw bytes again should show v2 envelope
577        let raw = db.store().get("default", "s1").unwrap().unwrap();
578        let env = VersionedEnvelope::from_bytes(&raw).unwrap();
579        assert_eq!(env.version, 2);
580    }
581
582    #[test]
583    fn event_sourcing_roundtrip() {
584        let mut db = CrdtDb::with_store(MemoryStore::new());
585
586        #[derive(Debug, Serialize, Deserialize, PartialEq)]
587        enum Op {
588            SetTemp(f32),
589            SetHumidity(f32),
590        }
591
592        db.append_event("sensors", "s1", &Op::SetTemp(22.5), 1000, "node-1")
593            .unwrap();
594        db.append_event("sensors", "s1", &Op::SetHumidity(55.0), 1001, "node-1")
595            .unwrap();
596
597        let events = db.events_since("sensors", "s1", 0).unwrap();
598        assert_eq!(events.len(), 2);
599
600        let op1: Op = deserialize_event(&events[0]).unwrap();
601        let op2: Op = deserialize_event(&events[1]).unwrap();
602
603        assert_eq!(op1, Op::SetTemp(22.5));
604        assert_eq!(op2, Op::SetHumidity(55.0));
605    }
606
607    #[test]
608    fn compact_saves_snapshot_and_truncates() {
609        let mut db = CrdtDb::with_store(MemoryStore::new());
610
611        db.append_event("ns", "e1", &"op1", 100, "n").unwrap();
612        db.append_event("ns", "e1", &"op2", 101, "n").unwrap();
613        db.append_event("ns", "e1", &"op3", 102, "n").unwrap();
614
615        assert_eq!(db.event_count("ns", "e1").unwrap(), 3);
616
617        let removed = db.compact("ns", "e1", b"snapshot-state", 1).unwrap();
618        assert_eq!(removed, 2); // first 2 events removed
619
620        let snap = db.load_snapshot("ns", "e1").unwrap().unwrap();
621        assert_eq!(snap.state, b"snapshot-state");
622
623        assert_eq!(db.event_count("ns", "e1").unwrap(), 1);
624    }
625
626    #[test]
627    fn versioned_envelope_is_stored() {
628        let mut db = CrdtDb::with_store(MemoryStore::new());
629
630        let sensor = SensorV1 { temperature: 22.5 };
631        db.save("s1", &sensor).unwrap();
632
633        let raw = db.store().get("default", "s1").unwrap().unwrap();
634        assert!(VersionedEnvelope::is_versioned(&raw));
635
636        let envelope = VersionedEnvelope::from_bytes(&raw).unwrap();
637        assert_eq!(envelope.version, 1);
638    }
639
640    #[test]
641    fn builder_with_config() {
642        let mut db = CrdtDb::builder(MemoryStore::new(), 1)
643            .default_namespace("sensors")
644            .migration_config(MigrationConfig {
645                write_back_on_read: false,
646                eager_migration: false,
647            })
648            .build();
649
650        let sensor = SensorV1 { temperature: 22.5 };
651        db.save("s1", &sensor).unwrap();
652
653        let raw = db.store().get("sensors", "s1").unwrap().unwrap();
654        assert!(VersionedEnvelope::is_versioned(&raw));
655    }
656}