1use alloc::string::{String, ToString};
28use alloc::vec::Vec;
29use core::fmt;
30
31use crdt_migrate::{MigrationConfig, MigrationEngine, MigrationStep, VersionedEnvelope};
32use serde::{de::DeserializeOwned, Serialize};
33
34use crate::traits::{EventStore, Snapshot, StateStore, StoredEvent};
35
36const DEFAULT_NAMESPACE: &str = "default";
38
39#[derive(Debug)]
41pub enum DbError<E: fmt::Debug + fmt::Display> {
42 Store(E),
44 Serialize(String),
46 Deserialize(String),
48 Envelope(String),
50 Migration(String),
52}
53
54impl<E: fmt::Debug + fmt::Display> fmt::Display for DbError<E> {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::Store(e) => write!(f, "store error: {e}"),
58 Self::Serialize(msg) => write!(f, "serialization error: {msg}"),
59 Self::Deserialize(msg) => write!(f, "deserialization error: {msg}"),
60 Self::Envelope(msg) => write!(f, "envelope error: {msg}"),
61 Self::Migration(msg) => write!(f, "migration error: {msg}"),
62 }
63 }
64}
65
66pub struct CrdtDb<S: StateStore> {
72 store: S,
73 migration_engine: MigrationEngine,
74 config: CrdtDbConfig,
75}
76
77#[derive(Debug, Clone)]
79pub struct CrdtDbConfig {
80 pub migration: MigrationConfig,
82 pub default_namespace: String,
84}
85
86impl Default for CrdtDbConfig {
87 fn default() -> Self {
88 Self {
89 migration: MigrationConfig::default(),
90 default_namespace: DEFAULT_NAMESPACE.to_string(),
91 }
92 }
93}
94
95pub struct CrdtDbBuilder<S: StateStore> {
97 store: S,
98 migration_engine: MigrationEngine,
99 config: CrdtDbConfig,
100}
101
102impl<S: StateStore> CrdtDbBuilder<S> {
103 pub fn migration_config(mut self, config: MigrationConfig) -> Self {
105 self.config.migration = config;
106 self
107 }
108
109 pub fn default_namespace(mut self, ns: &str) -> Self {
111 self.config.default_namespace = ns.to_string();
112 self
113 }
114
115 pub fn register_migration(mut self, step: alloc::boxed::Box<dyn MigrationStep>) -> Self {
117 self.migration_engine.register(step);
118 self
119 }
120
121 pub fn build(self) -> CrdtDb<S> {
123 CrdtDb {
124 store: self.store,
125 migration_engine: self.migration_engine,
126 config: self.config,
127 }
128 }
129}
130
131impl<S: StateStore> CrdtDb<S> {
132 pub fn with_store(store: S) -> Self {
136 Self {
137 store,
138 migration_engine: MigrationEngine::new(1),
139 config: CrdtDbConfig::default(),
140 }
141 }
142
143 pub fn builder(store: S, current_version: u32) -> CrdtDbBuilder<S> {
147 CrdtDbBuilder {
148 store,
149 migration_engine: MigrationEngine::new(current_version),
150 config: CrdtDbConfig::default(),
151 }
152 }
153
154 pub fn store(&self) -> &S {
156 &self.store
157 }
158
159 pub fn store_mut(&mut self) -> &mut S {
161 &mut self.store
162 }
163
164 pub fn migration_engine(&self) -> &MigrationEngine {
166 &self.migration_engine
167 }
168
169 pub fn save<T: Serialize + CrdtVersioned>(
173 &mut self,
174 key: &str,
175 value: &T,
176 ) -> Result<(), DbError<S::Error>> {
177 let ns = self.config.default_namespace.clone();
178 self.save_ns(&ns, key, value)
179 }
180
181 pub fn save_ns<T: Serialize + CrdtVersioned>(
183 &mut self,
184 namespace: &str,
185 key: &str,
186 value: &T,
187 ) -> Result<(), DbError<S::Error>> {
188 let payload =
189 postcard::to_allocvec(value).map_err(|e| DbError::Serialize(e.to_string()))?;
190
191 let envelope =
192 VersionedEnvelope::new(T::SCHEMA_VERSION, crdt_migrate::CrdtType::Custom, payload);
193
194 self.store
195 .put(namespace, key, &envelope.to_bytes())
196 .map_err(DbError::Store)
197 }
198
199 pub fn load<T: DeserializeOwned + CrdtVersioned>(
204 &mut self,
205 key: &str,
206 ) -> Result<Option<T>, DbError<S::Error>> {
207 let ns = self.config.default_namespace.clone();
208 self.load_ns(&ns, key)
209 }
210
211 pub fn load_ns<T: DeserializeOwned + CrdtVersioned>(
213 &mut self,
214 namespace: &str,
215 key: &str,
216 ) -> Result<Option<T>, DbError<S::Error>> {
217 let raw = self.store.get(namespace, key).map_err(DbError::Store)?;
218
219 let raw = match raw {
220 Some(data) => data,
221 None => return Ok(None),
222 };
223
224 let payload = if VersionedEnvelope::is_versioned(&raw) {
225 let envelope = VersionedEnvelope::from_bytes(&raw)
226 .map_err(|e| DbError::Envelope(e.to_string()))?;
227
228 let stored_version = envelope.version as u32;
229 let current_version = T::SCHEMA_VERSION as u32;
230
231 if stored_version != current_version
232 && self.migration_engine.needs_migration(stored_version)
233 {
234 let migrated = self
235 .migration_engine
236 .migrate_to_current(&envelope.payload, stored_version)
237 .map_err(|e| DbError::Migration(e.to_string()))?;
238
239 if self.config.migration.write_back_on_read {
241 let new_envelope = VersionedEnvelope::new(
242 T::SCHEMA_VERSION,
243 envelope.crdt_type,
244 migrated.clone(),
245 );
246 self.store
247 .put(namespace, key, &new_envelope.to_bytes())
248 .map_err(DbError::Store)?;
249 }
250
251 migrated
252 } else {
253 envelope.payload
254 }
255 } else {
256 raw
258 };
259
260 let value: T =
261 postcard::from_bytes(&payload).map_err(|e| DbError::Deserialize(e.to_string()))?;
262 Ok(Some(value))
263 }
264
265 pub fn delete(&mut self, key: &str) -> Result<(), DbError<S::Error>> {
267 let ns = self.config.default_namespace.clone();
268 self.delete_ns(&ns, key)
269 }
270
271 pub fn delete_ns(&mut self, namespace: &str, key: &str) -> Result<(), DbError<S::Error>> {
273 self.store.delete(namespace, key).map_err(DbError::Store)
274 }
275
276 pub fn list_keys(&self) -> Result<Vec<String>, DbError<S::Error>> {
278 self.store
279 .list_keys(&self.config.default_namespace)
280 .map_err(DbError::Store)
281 }
282
283 pub fn list_keys_ns(&self, namespace: &str) -> Result<Vec<String>, DbError<S::Error>> {
285 self.store.list_keys(namespace).map_err(DbError::Store)
286 }
287
288 pub fn exists(&self, key: &str) -> Result<bool, DbError<S::Error>> {
290 self.store
291 .exists(&self.config.default_namespace, key)
292 .map_err(DbError::Store)
293 }
294
295 pub fn exists_ns(&self, namespace: &str, key: &str) -> Result<bool, DbError<S::Error>> {
297 self.store.exists(namespace, key).map_err(DbError::Store)
298 }
299}
300
301impl<S: EventStore> CrdtDb<S> {
303 pub fn append_event<T: Serialize>(
307 &mut self,
308 namespace: &str,
309 entity_id: &str,
310 event: &T,
311 timestamp: u64,
312 node_id: &str,
313 ) -> Result<u64, DbError<S::Error>> {
314 let data = postcard::to_allocvec(event).map_err(|e| DbError::Serialize(e.to_string()))?;
315
316 self.store
317 .append_event(namespace, entity_id, &data, timestamp, node_id)
318 .map_err(DbError::Store)
319 }
320
321 pub fn events_since(
323 &self,
324 namespace: &str,
325 entity_id: &str,
326 since_sequence: u64,
327 ) -> Result<Vec<StoredEvent>, DbError<S::Error>> {
328 self.store
329 .events_since(namespace, entity_id, since_sequence)
330 .map_err(DbError::Store)
331 }
332
333 pub fn event_count(&self, namespace: &str, entity_id: &str) -> Result<u64, DbError<S::Error>> {
335 self.store
336 .event_count(namespace, entity_id)
337 .map_err(DbError::Store)
338 }
339
340 pub fn save_snapshot(
342 &mut self,
343 namespace: &str,
344 entity_id: &str,
345 state: &[u8],
346 at_sequence: u64,
347 version: u8,
348 ) -> Result<(), DbError<S::Error>> {
349 self.store
350 .save_snapshot(namespace, entity_id, state, at_sequence, version)
351 .map_err(DbError::Store)
352 }
353
354 pub fn load_snapshot(
356 &self,
357 namespace: &str,
358 entity_id: &str,
359 ) -> Result<Option<Snapshot>, DbError<S::Error>> {
360 self.store
361 .load_snapshot(namespace, entity_id)
362 .map_err(DbError::Store)
363 }
364
365 pub fn compact(
370 &mut self,
371 namespace: &str,
372 entity_id: &str,
373 state: &[u8],
374 version: u8,
375 ) -> Result<u64, DbError<S::Error>> {
376 let events = self
378 .store
379 .events_since(namespace, entity_id, 0)
380 .map_err(DbError::Store)?;
381
382 let max_seq = events.last().map(|e| e.sequence).unwrap_or(0);
383
384 if max_seq == 0 {
385 return Ok(0);
386 }
387
388 self.store
390 .save_snapshot(namespace, entity_id, state, max_seq, version)
391 .map_err(DbError::Store)?;
392
393 self.store
395 .truncate_events_before(namespace, entity_id, max_seq)
396 .map_err(DbError::Store)
397 }
398}
399
400pub trait CrdtVersioned {
421 const SCHEMA_VERSION: u8;
423}
424
425impl<T: crdt_kit::Versioned> CrdtVersioned for T {
427 const SCHEMA_VERSION: u8 = T::CURRENT_VERSION;
428}
429
430pub fn deserialize_event<T: DeserializeOwned>(event: &StoredEvent) -> Result<T, String> {
432 postcard::from_bytes(&event.data).map_err(|e| e.to_string())
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use crate::MemoryStore;
439 use serde::{Deserialize, Serialize};
440
441 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
442 struct SensorV1 {
443 temperature: f32,
444 }
445
446 impl CrdtVersioned for SensorV1 {
447 const SCHEMA_VERSION: u8 = 1;
448 }
449
450 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
451 struct SensorV2 {
452 temperature: f32,
453 humidity: Option<f32>,
454 }
455
456 impl CrdtVersioned for SensorV2 {
457 const SCHEMA_VERSION: u8 = 2;
458 }
459
460 #[test]
461 fn save_and_load_basic() {
462 let mut db = CrdtDb::with_store(MemoryStore::new());
463
464 let sensor = SensorV1 { temperature: 22.5 };
465 db.save("s1", &sensor).unwrap();
466
467 let loaded: Option<SensorV1> = db.load("s1").unwrap();
468 assert_eq!(loaded, Some(sensor));
469 }
470
471 #[test]
472 fn load_nonexistent_returns_none() {
473 let mut db = CrdtDb::with_store(MemoryStore::new());
474
475 let loaded: Option<SensorV1> = db.load("nope").unwrap();
476 assert_eq!(loaded, None);
477 }
478
479 #[test]
480 fn save_overwrites() {
481 let mut db = CrdtDb::with_store(MemoryStore::new());
482
483 db.save("s1", &SensorV1 { temperature: 20.0 }).unwrap();
484 db.save("s1", &SensorV1 { temperature: 25.0 }).unwrap();
485
486 let loaded: Option<SensorV1> = db.load("s1").unwrap();
487 assert_eq!(loaded.unwrap().temperature, 25.0);
488 }
489
490 #[test]
491 fn namespace_isolation() {
492 let mut db = CrdtDb::with_store(MemoryStore::new());
493
494 let s1 = SensorV1 { temperature: 10.0 };
495 let s2 = SensorV1 { temperature: 20.0 };
496
497 db.save_ns("indoor", "s1", &s1).unwrap();
498 db.save_ns("outdoor", "s1", &s2).unwrap();
499
500 let indoor: SensorV1 = db.load_ns("indoor", "s1").unwrap().unwrap();
501 let outdoor: SensorV1 = db.load_ns("outdoor", "s1").unwrap().unwrap();
502
503 assert_eq!(indoor.temperature, 10.0);
504 assert_eq!(outdoor.temperature, 20.0);
505 }
506
507 #[test]
508 fn delete_removes_value() {
509 let mut db = CrdtDb::with_store(MemoryStore::new());
510
511 db.save("s1", &SensorV1 { temperature: 22.5 }).unwrap();
512 assert!(db.exists("s1").unwrap());
513
514 db.delete("s1").unwrap();
515 assert!(!db.exists("s1").unwrap());
516
517 let loaded: Option<SensorV1> = db.load("s1").unwrap();
518 assert_eq!(loaded, None);
519 }
520
521 #[test]
522 fn list_keys_works() {
523 let mut db = CrdtDb::with_store(MemoryStore::new());
524
525 db.save("b", &SensorV1 { temperature: 1.0 }).unwrap();
526 db.save("a", &SensorV1 { temperature: 2.0 }).unwrap();
527
528 let keys = db.list_keys().unwrap();
529 assert_eq!(keys.len(), 2);
530 }
531
532 #[test]
533 fn migration_on_load() {
534 use alloc::boxed::Box;
535 use crdt_migrate::MigrationError;
536
537 struct SensorMigration;
539
540 impl MigrationStep for SensorMigration {
541 fn source_version(&self) -> u32 {
542 1
543 }
544 fn target_version(&self) -> u32 {
545 2
546 }
547 fn migrate(&self, data: &[u8]) -> Result<Vec<u8>, MigrationError> {
548 let v1: SensorV1 = postcard::from_bytes(data)
549 .map_err(|e| MigrationError::Deserialization(e.to_string()))?;
550 let v2 = SensorV2 {
551 temperature: v1.temperature,
552 humidity: None,
553 };
554 postcard::to_allocvec(&v2).map_err(|e| MigrationError::Serialization(e.to_string()))
555 }
556 }
557
558 let mut db = CrdtDb::builder(MemoryStore::new(), 2)
559 .register_migration(Box::new(SensorMigration))
560 .build();
561
562 let v1 = SensorV1 { temperature: 22.5 };
564 let payload = postcard::to_allocvec(&v1).unwrap();
565 let envelope = VersionedEnvelope::new(1, crdt_migrate::CrdtType::Custom, payload);
566 db.store_mut()
567 .put("default", "s1", &envelope.to_bytes())
568 .unwrap();
569
570 let loaded: Option<SensorV2> = db.load("s1").unwrap();
572 let v2 = loaded.unwrap();
573 assert_eq!(v2.temperature, 22.5);
574 assert_eq!(v2.humidity, None);
575
576 let raw = db.store().get("default", "s1").unwrap().unwrap();
578 let env = VersionedEnvelope::from_bytes(&raw).unwrap();
579 assert_eq!(env.version, 2);
580 }
581
582 #[test]
583 fn event_sourcing_roundtrip() {
584 let mut db = CrdtDb::with_store(MemoryStore::new());
585
586 #[derive(Debug, Serialize, Deserialize, PartialEq)]
587 enum Op {
588 SetTemp(f32),
589 SetHumidity(f32),
590 }
591
592 db.append_event("sensors", "s1", &Op::SetTemp(22.5), 1000, "node-1")
593 .unwrap();
594 db.append_event("sensors", "s1", &Op::SetHumidity(55.0), 1001, "node-1")
595 .unwrap();
596
597 let events = db.events_since("sensors", "s1", 0).unwrap();
598 assert_eq!(events.len(), 2);
599
600 let op1: Op = deserialize_event(&events[0]).unwrap();
601 let op2: Op = deserialize_event(&events[1]).unwrap();
602
603 assert_eq!(op1, Op::SetTemp(22.5));
604 assert_eq!(op2, Op::SetHumidity(55.0));
605 }
606
607 #[test]
608 fn compact_saves_snapshot_and_truncates() {
609 let mut db = CrdtDb::with_store(MemoryStore::new());
610
611 db.append_event("ns", "e1", &"op1", 100, "n").unwrap();
612 db.append_event("ns", "e1", &"op2", 101, "n").unwrap();
613 db.append_event("ns", "e1", &"op3", 102, "n").unwrap();
614
615 assert_eq!(db.event_count("ns", "e1").unwrap(), 3);
616
617 let removed = db.compact("ns", "e1", b"snapshot-state", 1).unwrap();
618 assert_eq!(removed, 2); let snap = db.load_snapshot("ns", "e1").unwrap().unwrap();
621 assert_eq!(snap.state, b"snapshot-state");
622
623 assert_eq!(db.event_count("ns", "e1").unwrap(), 1);
624 }
625
626 #[test]
627 fn versioned_envelope_is_stored() {
628 let mut db = CrdtDb::with_store(MemoryStore::new());
629
630 let sensor = SensorV1 { temperature: 22.5 };
631 db.save("s1", &sensor).unwrap();
632
633 let raw = db.store().get("default", "s1").unwrap().unwrap();
634 assert!(VersionedEnvelope::is_versioned(&raw));
635
636 let envelope = VersionedEnvelope::from_bytes(&raw).unwrap();
637 assert_eq!(envelope.version, 1);
638 }
639
640 #[test]
641 fn builder_with_config() {
642 let mut db = CrdtDb::builder(MemoryStore::new(), 1)
643 .default_namespace("sensors")
644 .migration_config(MigrationConfig {
645 write_back_on_read: false,
646 eager_migration: false,
647 })
648 .build();
649
650 let sensor = SensorV1 { temperature: 22.5 };
651 db.save("s1", &sensor).unwrap();
652
653 let raw = db.store().get("sensors", "s1").unwrap().unwrap();
654 assert!(VersionedEnvelope::is_versioned(&raw));
655 }
656}