1use crate::storage::traits::{Collection, DocumentPredicate};
8use crate::storage::ttl_manager::TtlManager;
9use automerge::{transaction::Transactable, Automerge, ReadDoc};
10use lru::LruCache;
11use redb::{Builder, Database, ReadableTable, ReadableTableMetadata, TableDefinition};
12use std::hash::{Hash, Hasher};
13use std::num::NonZeroUsize;
14use std::path::Path;
15use std::sync::{Arc, RwLock};
16use tokio::sync::broadcast;
17
18use anyhow::{Context, Result};
19
20const DOCUMENTS_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("documents");
24
25const DEFAULT_REDB_CACHE_SIZE: usize = 16 * 1024 * 1024; const TOMBSTONES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("tombstones");
38
39const DOC_LOCK_STRIPES: usize = 64;
55
56pub struct AutomergeStore {
57 db: Option<Arc<Database>>,
59 cache: Arc<RwLock<LruCache<String, Automerge>>>,
60 change_tx: broadcast::Sender<String>,
63 observer_tx: broadcast::Sender<String>,
66 doc_locks: Box<[std::sync::Mutex<()>]>,
70}
71
72impl AutomergeStore {
73 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
75 let path = path.as_ref();
77 if let Some(parent) = path.parent() {
78 std::fs::create_dir_all(parent).ok();
79 }
80
81 let db_path = if path.is_dir() || !path.exists() {
83 std::fs::create_dir_all(path).ok();
84 path.join("automerge.redb")
85 } else {
86 path.to_path_buf()
87 };
88
89 if db_path.exists() {
92 if let Ok(metadata) = std::fs::metadata(&db_path) {
93 if metadata.len() == 0 {
94 tracing::warn!("Removing corrupted 0-byte redb database at {:?}", db_path);
95 std::fs::remove_file(&db_path).ok();
96 }
97 }
98 }
99
100 let cache_size = std::env::var("CAP_REDB_CACHE_SIZE")
104 .ok()
105 .and_then(|s| s.parse::<usize>().ok())
106 .unwrap_or(DEFAULT_REDB_CACHE_SIZE);
107
108 tracing::debug!("Opening redb database with cache_size={} bytes", cache_size);
109
110 let db = Builder::new()
111 .set_cache_size(cache_size)
112 .create(&db_path)
113 .context("Failed to open redb database")?;
114
115 {
117 let write_txn = db
118 .begin_write()
119 .context("Failed to begin write transaction")?;
120 let _ = write_txn.open_table(DOCUMENTS_TABLE);
122 let _ = write_txn.open_table(TOMBSTONES_TABLE); write_txn
124 .commit()
125 .context("Failed to commit table creation")?;
126 }
127
128 let cache = LruCache::new(NonZeroUsize::new(1000).unwrap());
129
130 let (change_tx, _) = broadcast::channel(8192);
135
136 let (observer_tx, _) = broadcast::channel(8192);
140
141 Ok(Self {
142 db: Some(Arc::new(db)),
143 cache: Arc::new(RwLock::new(cache)),
144 change_tx,
145 observer_tx,
146 doc_locks: (0..DOC_LOCK_STRIPES)
147 .map(|_| std::sync::Mutex::new(()))
148 .collect::<Vec<_>>()
149 .into_boxed_slice(),
150 })
151 }
152
153 pub fn in_memory() -> Self {
161 let cache = LruCache::new(NonZeroUsize::new(10000).unwrap());
163 let (change_tx, _) = broadcast::channel(8192);
164 let (observer_tx, _) = broadcast::channel(8192);
165
166 tracing::info!("AutomergeStore: Running in MEMORY-ONLY mode (no disk persistence)");
167
168 Self {
169 db: None,
170 cache: Arc::new(RwLock::new(cache)),
171 change_tx,
172 observer_tx,
173 doc_locks: (0..DOC_LOCK_STRIPES)
174 .map(|_| std::sync::Mutex::new(()))
175 .collect::<Vec<_>>()
176 .into_boxed_slice(),
177 }
178 }
179
180 pub fn lock_doc(&self, key: &str) -> std::sync::MutexGuard<'_, ()> {
189 let mut hasher = std::collections::hash_map::DefaultHasher::new();
190 key.hash(&mut hasher);
191 let idx = (hasher.finish() as usize) % DOC_LOCK_STRIPES;
192 self.doc_locks[idx]
193 .lock()
194 .unwrap_or_else(|e| e.into_inner())
195 }
196
197 pub fn is_in_memory(&self) -> bool {
199 self.db.is_none()
200 }
201
202 pub fn put(&self, key: &str, doc: &Automerge) -> Result<()> {
209 self.put_inner(key, doc, true)
210 }
211
212 pub fn put_without_notify(&self, key: &str, doc: &Automerge) -> Result<()> {
218 self.put_inner(key, doc, false)
219 }
220
221 pub fn put_with_ttl(&self, key: &str, doc: &Automerge, ttl_manager: &TtlManager) -> Result<()> {
227 self.put(key, doc)?;
228
229 let collection = key.find(['/', ':']).map(|pos| &key[..pos]).unwrap_or(key);
231
232 if let Some(ttl) = ttl_manager.config().get_collection_ttl(collection) {
233 ttl_manager.set_ttl(key, ttl)?;
234 }
235
236 Ok(())
237 }
238
239 fn put_inner(&self, key: &str, doc: &Automerge, notify: bool) -> Result<()> {
241 if let Some(ref db) = self.db {
243 let bytes = doc.save();
244
245 let write_txn = db
246 .begin_write()
247 .context("Failed to begin write transaction")?;
248 {
249 let mut table = write_txn
250 .open_table(DOCUMENTS_TABLE)
251 .context("Failed to open documents table")?;
252 table
253 .insert(key.as_bytes(), bytes.as_slice())
254 .context("Failed to insert document")?;
255 }
256 write_txn.commit().context("Failed to commit write")?;
257 }
258
259 self.cache
260 .write()
261 .unwrap()
262 .put(key.to_string(), doc.clone());
263
264 let _ = self.observer_tx.send(key.to_string());
267
268 if notify {
271 let _ = self.change_tx.send(key.to_string());
273 }
274
275 Ok(())
276 }
277
278 pub fn get(&self, key: &str) -> Result<Option<Automerge>> {
280 {
282 let mut cache = self.cache.write().unwrap_or_else(|e| e.into_inner());
283 if let Some(doc) = cache.get(key) {
284 return Ok(Some(doc.clone()));
285 }
286 }
287
288 let Some(ref db) = self.db else {
290 return Ok(None);
291 };
292
293 let read_txn = db
294 .begin_read()
295 .context("Failed to begin read transaction")?;
296 let table = read_txn
297 .open_table(DOCUMENTS_TABLE)
298 .context("Failed to open documents table")?;
299
300 match table.get(key.as_bytes())? {
301 Some(value) => {
302 let bytes = value.value();
303 let doc = Automerge::load(bytes).context("Failed to load Automerge document")?;
304
305 self.cache
306 .write()
307 .unwrap()
308 .put(key.to_string(), doc.clone());
309
310 Ok(Some(doc))
311 }
312 None => Ok(None),
313 }
314 }
315
316 pub fn delete(&self, key: &str) -> Result<()> {
318 if let Some(ref db) = self.db {
320 let write_txn = db
321 .begin_write()
322 .context("Failed to begin write transaction")?;
323 {
324 let mut table = write_txn
325 .open_table(DOCUMENTS_TABLE)
326 .context("Failed to open documents table")?;
327 table.remove(key.as_bytes())?;
328 }
329 write_txn.commit().context("Failed to commit delete")?;
330 }
331
332 self.cache
333 .write()
334 .unwrap_or_else(|e| e.into_inner())
335 .pop(key);
336 Ok(())
337 }
338
339 pub fn scan_prefix(&self, prefix: &str) -> Result<Vec<(String, Automerge)>> {
341 if self.db.is_none() {
343 let cache = self.cache.read().unwrap_or_else(|e| e.into_inner());
344 let results: Vec<(String, Automerge)> = cache
345 .iter()
346 .filter(|(k, _)| k.starts_with(prefix))
347 .map(|(k, v)| (k.clone(), v.clone()))
348 .collect();
349 return Ok(results);
350 }
351
352 let mut results = Vec::new();
353
354 let read_txn = self
355 .db
356 .as_ref()
357 .unwrap()
358 .begin_read()
359 .context("Failed to begin read transaction")?;
360 let table = read_txn
361 .open_table(DOCUMENTS_TABLE)
362 .context("Failed to open documents table")?;
363
364 let prefix_bytes = prefix.as_bytes();
366 for entry in table.range(prefix_bytes..)? {
367 let (key, value) = entry?;
368 let key_bytes = key.value();
369
370 if !key_bytes.starts_with(prefix_bytes) {
372 break;
373 }
374
375 let key_str = String::from_utf8_lossy(key_bytes).to_string();
376 let doc = Automerge::load(value.value())?;
377 results.push((key_str, doc));
378 }
379
380 Ok(results)
381 }
382
383 pub fn count(&self) -> usize {
385 let Some(ref db) = self.db else {
387 return self.cache.read().unwrap_or_else(|e| e.into_inner()).len();
388 };
389
390 let read_txn = match db.begin_read() {
391 Ok(txn) => txn,
392 Err(_) => return 0,
393 };
394 let table = match read_txn.open_table(DOCUMENTS_TABLE) {
395 Ok(t) => t,
396 Err(_) => return 0,
397 };
398
399 table.len().unwrap_or(0) as usize
400 }
401
402 pub fn subscribe_to_changes(&self) -> broadcast::Receiver<String> {
417 self.change_tx.subscribe()
418 }
419
420 pub fn subscribe_to_observer_changes(&self) -> broadcast::Receiver<String> {
430 self.observer_tx.subscribe()
431 }
432
433 pub fn collection(self: &Arc<Self>, name: &str) -> Arc<dyn Collection> {
435 Arc::new(AutomergeCollection {
436 store: Arc::clone(self),
437 prefix: format!("{}:", name),
438 })
439 }
440
441 pub fn put_tombstone(&self, tombstone: &crate::qos::Tombstone) -> Result<()> {
448 let Some(ref db) = self.db else {
449 return Ok(()); };
451
452 let key = format!("{}:{}", tombstone.collection, tombstone.document_id);
453 let bytes = serde_json::to_vec(tombstone).context("Failed to serialize tombstone")?;
454
455 let write_txn = db
456 .begin_write()
457 .context("Failed to begin write transaction")?;
458 {
459 let mut table = write_txn
460 .open_table(TOMBSTONES_TABLE)
461 .context("Failed to open tombstones table")?;
462 table
463 .insert(key.as_bytes(), bytes.as_slice())
464 .context("Failed to insert tombstone")?;
465 }
466 write_txn
467 .commit()
468 .context("Failed to commit tombstone write")?;
469
470 tracing::debug!(
471 "Stored tombstone for document {} in collection {}",
472 tombstone.document_id,
473 tombstone.collection
474 );
475
476 Ok(())
477 }
478
479 pub fn get_tombstone(
481 &self,
482 collection: &str,
483 document_id: &str,
484 ) -> Result<Option<crate::qos::Tombstone>> {
485 let Some(ref db) = self.db else {
486 return Ok(None); };
488
489 let key = format!("{}:{}", collection, document_id);
490
491 let read_txn = db
492 .begin_read()
493 .context("Failed to begin read transaction")?;
494 let table = read_txn
495 .open_table(TOMBSTONES_TABLE)
496 .context("Failed to open tombstones table")?;
497
498 match table.get(key.as_bytes())? {
499 Some(value) => {
500 let bytes = value.value();
501 let tombstone: crate::qos::Tombstone =
502 serde_json::from_slice(bytes).context("Failed to deserialize tombstone")?;
503 Ok(Some(tombstone))
504 }
505 None => Ok(None),
506 }
507 }
508
509 pub fn get_tombstones_for_collection(
511 &self,
512 collection: &str,
513 ) -> Result<Vec<crate::qos::Tombstone>> {
514 let Some(ref db) = self.db else {
515 return Ok(Vec::new()); };
517
518 let prefix = format!("{}:", collection);
519 let mut tombstones = Vec::new();
520
521 let read_txn = db
522 .begin_read()
523 .context("Failed to begin read transaction")?;
524 let table = read_txn
525 .open_table(TOMBSTONES_TABLE)
526 .context("Failed to open tombstones table")?;
527
528 for entry in table.iter()? {
530 let (key, value) = entry?;
531 let key_str = String::from_utf8_lossy(key.value());
532 if key_str.starts_with(&prefix) {
533 let tombstone: crate::qos::Tombstone = serde_json::from_slice(value.value())
534 .context("Failed to deserialize tombstone")?;
535 tombstones.push(tombstone);
536 }
537 }
538
539 Ok(tombstones)
540 }
541
542 pub fn get_all_tombstones(&self) -> Result<Vec<crate::qos::Tombstone>> {
544 let Some(ref db) = self.db else {
545 return Ok(Vec::new()); };
547
548 let mut tombstones = Vec::new();
549
550 let read_txn = db
551 .begin_read()
552 .context("Failed to begin read transaction")?;
553 let table = read_txn
554 .open_table(TOMBSTONES_TABLE)
555 .context("Failed to open tombstones table")?;
556
557 for entry in table.iter()? {
558 let (_key, value) = entry?;
559 let tombstone: crate::qos::Tombstone =
560 serde_json::from_slice(value.value()).context("Failed to deserialize tombstone")?;
561 tombstones.push(tombstone);
562 }
563
564 Ok(tombstones)
565 }
566
567 pub fn remove_tombstone(&self, collection: &str, document_id: &str) -> Result<bool> {
569 let Some(ref db) = self.db else {
570 return Ok(false); };
572
573 let key = format!("{}:{}", collection, document_id);
574
575 let write_txn = db
576 .begin_write()
577 .context("Failed to begin write transaction")?;
578 let existed = {
579 let mut table = write_txn
580 .open_table(TOMBSTONES_TABLE)
581 .context("Failed to open tombstones table")?;
582 let result = table.remove(key.as_bytes())?;
583 result.is_some()
584 };
585 write_txn
586 .commit()
587 .context("Failed to commit tombstone removal")?;
588
589 if existed {
590 tracing::debug!(
591 "Removed tombstone for document {} in collection {}",
592 document_id,
593 collection
594 );
595 }
596
597 Ok(existed)
598 }
599
600 pub fn has_tombstone(&self, collection: &str, document_id: &str) -> Result<bool> {
602 Ok(self.get_tombstone(collection, document_id)?.is_some())
603 }
604
605 pub fn list_collections(&self) -> Result<Vec<String>> {
609 let mut collections = std::collections::HashSet::new();
610
611 if self.db.is_none() {
613 let cache = self.cache.read().unwrap_or_else(|e| e.into_inner());
614 for key in cache.iter().map(|(k, _)| k) {
615 if let Some(colon_pos) = key.find(':') {
616 let collection = &key[..colon_pos];
617 collections.insert(collection.to_string());
618 }
619 }
620 return Ok(collections.into_iter().collect());
621 }
622
623 let read_txn = self
624 .db
625 .as_ref()
626 .unwrap()
627 .begin_read()
628 .context("Failed to begin read transaction")?;
629 let table = read_txn
630 .open_table(DOCUMENTS_TABLE)
631 .context("Failed to open documents table")?;
632
633 for result in table.iter().context("Failed to iterate documents")? {
634 let (key, _) = result.context("Failed to read document entry")?;
635 let key_str =
636 std::str::from_utf8(key.value()).context("Invalid UTF-8 in document key")?;
637
638 if let Some(colon_pos) = key_str.find(':') {
640 let collection = &key_str[..colon_pos];
641 collections.insert(collection.to_string());
642 }
643 }
644
645 Ok(collections.into_iter().collect())
646 }
647
648 pub fn get_expired_documents(
653 &self,
654 collection: &str,
655 cutoff: std::time::SystemTime,
656 ) -> Result<Vec<String>> {
657 let prefix = format!("{}:", collection);
658 let docs = self.scan_prefix(&prefix)?;
659 let mut expired = Vec::new();
660
661 let cutoff_ms = cutoff
662 .duration_since(std::time::UNIX_EPOCH)
663 .unwrap_or_default()
664 .as_millis() as u64;
665
666 for (key, doc) in docs {
667 if let Ok(Some((automerge::Value::Scalar(scalar), _))) =
669 doc.get(automerge::ROOT, "_created_at")
670 {
671 if let automerge::ScalarValue::Uint(created_at) = scalar.as_ref() {
672 if *created_at < cutoff_ms {
673 if let Some(doc_id) = key.strip_prefix(&prefix) {
675 expired.push(doc_id.to_string());
676 }
677 }
678 }
679 }
680 }
681
682 Ok(expired)
683 }
684
685 pub fn hard_delete(&self, collection: &str, document_id: &str) -> Result<()> {
690 let key = format!("{}:{}", collection, document_id);
691 self.delete(&key)?;
692 tracing::debug!(
693 "Hard deleted document {} from collection {}",
694 document_id,
695 collection
696 );
697 Ok(())
698 }
699
700 pub fn compact(&self, key: &str) -> Result<Option<(usize, usize)>> {
729 let _guard = self.lock_doc(key);
732 let doc = match self.get(key)? {
733 Some(d) => d,
734 None => return Ok(None),
735 };
736
737 let old_size = doc.save().len();
738
739 let compacted = doc.fork();
741 let new_size = compacted.save().len();
742
743 self.put_without_notify(key, &compacted)?;
745
746 tracing::debug!(
747 "Compacted document {}: {} -> {} bytes ({:.1}% reduction)",
748 key,
749 old_size,
750 new_size,
751 if old_size > 0 {
752 100.0 - (new_size as f64 * 100.0 / old_size as f64)
753 } else {
754 0.0
755 }
756 );
757
758 Ok(Some((old_size, new_size)))
759 }
760
761 pub fn compact_prefix(&self, prefix: &str) -> Result<(usize, usize, usize)> {
777 let docs = self.scan_prefix(prefix)?;
778 let mut count = 0;
779 let mut total_before = 0;
780 let mut total_after = 0;
781
782 for (key, _) in docs {
783 if let Some((before, after)) = self.compact(&key)? {
784 count += 1;
785 total_before += before;
786 total_after += after;
787 }
788 }
789
790 if count > 0 {
791 tracing::info!(
792 "Compacted {} documents with prefix '{}': {} -> {} bytes ({:.1}% reduction)",
793 count,
794 prefix,
795 total_before,
796 total_after,
797 if total_before > 0 {
798 100.0 - (total_after as f64 * 100.0 / total_before as f64)
799 } else {
800 0.0
801 }
802 );
803 }
804
805 Ok((count, total_before, total_after))
806 }
807
808 pub fn compact_all(&self) -> Result<(usize, usize, usize)> {
814 if self.db.is_none() {
816 let keys: Vec<String> = {
817 let cache = self.cache.read().unwrap_or_else(|e| e.into_inner());
818 cache.iter().map(|(k, _)| k.clone()).collect()
819 };
820
821 let mut count = 0;
822 let mut total_before = 0;
823 let mut total_after = 0;
824
825 for key in keys {
826 if let Some((before, after)) = self.compact(&key)? {
827 count += 1;
828 total_before += before;
829 total_after += after;
830 }
831 }
832
833 return Ok((count, total_before, total_after));
834 }
835
836 let read_txn = self
838 .db
839 .as_ref()
840 .unwrap()
841 .begin_read()
842 .context("Failed to begin read transaction")?;
843 let table = read_txn
844 .open_table(DOCUMENTS_TABLE)
845 .context("Failed to open documents table")?;
846
847 let keys: Vec<String> = table
848 .iter()?
849 .filter_map(|entry| {
850 entry
851 .ok()
852 .map(|(k, _)| String::from_utf8_lossy(k.value()).to_string())
853 })
854 .collect();
855
856 drop(table);
857 drop(read_txn);
858
859 let mut count = 0;
860 let mut total_before = 0;
861 let mut total_after = 0;
862
863 for key in keys {
864 if let Some((before, after)) = self.compact(&key)? {
865 count += 1;
866 total_before += before;
867 total_after += after;
868 }
869 }
870
871 if count > 0 {
872 tracing::info!(
873 "Compacted {} documents: {} -> {} bytes ({:.1}% reduction)",
874 count,
875 total_before,
876 total_after,
877 if total_before > 0 {
878 100.0 - (total_after as f64 * 100.0 / total_before as f64)
879 } else {
880 0.0
881 }
882 );
883 }
884
885 Ok((count, total_before, total_after))
886 }
887
888 pub fn start_background_compaction(
898 self: &Arc<Self>,
899 interval: std::time::Duration,
900 size_threshold_bytes: usize,
901 collections: Vec<String>,
902 token: tokio_util::sync::CancellationToken,
903 ) {
904 let store = Arc::clone(self);
905 tokio::spawn(async move {
906 let mut timer = tokio::time::interval(interval);
907 timer.tick().await;
909
910 loop {
911 tokio::select! {
912 _ = token.cancelled() => {
913 tracing::info!("background compaction cancelled");
914 break;
915 }
916 _ = timer.tick() => {
917 match store.compact_collections_above_threshold(&collections, size_threshold_bytes) {
918 Ok((count, before, after)) => {
919 if count > 0 {
920 tracing::info!(count, before, after, "background compaction complete");
921 }
922 }
923 Err(e) => {
924 tracing::warn!("background compaction failed: {e}");
925 }
926 }
927 }
928 }
929 }
930 });
931 }
932
933 pub fn compact_collections_above_threshold(
940 &self,
941 collections: &[String],
942 threshold_bytes: usize,
943 ) -> Result<(usize, usize, usize)> {
944 let mut count = 0;
945 let mut total_before = 0;
946 let mut total_after = 0;
947
948 for collection in collections {
949 let prefix = format!("{}:", collection);
950 let docs = self.scan_prefix(&prefix)?;
951 for (key, _) in docs {
952 let size = self.document_size(&key)?.unwrap_or(0);
953 if size >= threshold_bytes {
954 if let Some((before, after)) = self.compact(&key)? {
955 count += 1;
956 total_before += before;
957 total_after += after;
958 }
959 }
960 }
961 }
962
963 Ok((count, total_before, total_after))
964 }
965
966 pub fn compact_above_threshold(&self, threshold_bytes: usize) -> Result<(usize, usize, usize)> {
970 let keys = self.all_keys()?;
971 let mut count = 0;
972 let mut total_before = 0;
973 let mut total_after = 0;
974
975 for key in keys {
976 let size = self.document_size(&key)?.unwrap_or(0);
977 if size >= threshold_bytes {
978 if let Some((before, after)) = self.compact(&key)? {
979 count += 1;
980 total_before += before;
981 total_after += after;
982 }
983 }
984 }
985
986 Ok((count, total_before, total_after))
987 }
988
989 fn all_keys(&self) -> Result<Vec<String>> {
991 if self.db.is_none() {
992 let cache = self.cache.read().unwrap_or_else(|e| e.into_inner());
993 return Ok(cache.iter().map(|(k, _)| k.clone()).collect());
994 }
995
996 let read_txn = self
997 .db
998 .as_ref()
999 .unwrap()
1000 .begin_read()
1001 .context("Failed to begin read transaction")?;
1002 let table = read_txn
1003 .open_table(DOCUMENTS_TABLE)
1004 .context("Failed to open documents table")?;
1005
1006 let keys: Vec<String> = table
1007 .iter()?
1008 .filter_map(|entry| {
1009 entry
1010 .ok()
1011 .map(|(k, _)| String::from_utf8_lossy(k.value()).to_string())
1012 })
1013 .collect();
1014
1015 Ok(keys)
1016 }
1017
1018 pub fn typed_collection<T: serde::Serialize + serde::de::DeserializeOwned>(
1032 self: &Arc<Self>,
1033 name: &str,
1034 ) -> super::typed_collection::TypedCollection<T> {
1035 super::typed_collection::TypedCollection::new(Arc::clone(self), name)
1036 }
1037
1038 pub fn document_size(&self, key: &str) -> Result<Option<usize>> {
1040 match self.get(key)? {
1041 Some(doc) => Ok(Some(doc.save().len())),
1042 None => Ok(None),
1043 }
1044 }
1045}
1046
1047pub struct AutomergeCollection {
1052 store: Arc<AutomergeStore>,
1053 prefix: String,
1054}
1055
1056impl AutomergeCollection {
1057 fn prefixed_key(&self, doc_id: &str) -> String {
1058 format!("{}{}", self.prefix, doc_id)
1059 }
1060
1061 fn strip_prefix<'b>(&self, key: &'b str) -> Option<&'b str> {
1062 key.strip_prefix(&self.prefix)
1063 }
1064}
1065
1066impl Collection for AutomergeCollection {
1067 fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()> {
1068 let key = self.prefixed_key(doc_id);
1074 let mut doc = match self.store.get(&key)? {
1075 Some(existing) => {
1076 existing.fork()
1078 }
1079 None => {
1080 Automerge::new()
1082 }
1083 };
1084
1085 match doc.transact(|tx| {
1086 tx.put(
1087 automerge::ROOT,
1088 "data",
1089 automerge::ScalarValue::Bytes(data.clone()),
1090 )?;
1091 Ok::<(), automerge::AutomergeError>(())
1092 }) {
1093 Ok(_) => self.store.put(&key, &doc),
1094 Err(e) => Err(anyhow::anyhow!(
1095 "Failed to update Automerge document: {:?}",
1096 e
1097 )),
1098 }
1099 }
1100
1101 fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>> {
1102 match self.store.get(&self.prefixed_key(doc_id))? {
1103 Some(doc) => {
1104 if let Ok(Some((automerge::Value::Scalar(scalar), _))) =
1106 doc.get(automerge::ROOT, "data")
1107 {
1108 if let automerge::ScalarValue::Bytes(bytes) = scalar.as_ref() {
1109 return Ok(Some(bytes.to_vec()));
1110 }
1111 }
1112 Ok(None)
1113 }
1114 None => Ok(None),
1115 }
1116 }
1117
1118 fn delete(&self, doc_id: &str) -> Result<()> {
1119 self.store.delete(&self.prefixed_key(doc_id))
1120 }
1121
1122 fn scan(&self) -> Result<Vec<(String, Vec<u8>)>> {
1123 let docs = self.store.scan_prefix(&self.prefix)?;
1124 tracing::debug!(
1125 "AutomergeCollection.scan: prefix={}, found {} docs",
1126 self.prefix,
1127 docs.len()
1128 );
1129 let mut results = Vec::new();
1130
1131 for (key, doc) in docs {
1132 tracing::debug!(
1133 "AutomergeCollection.scan: processing key={}, doc_len={}",
1134 key,
1135 doc.save().len()
1136 );
1137 if let Some(doc_id) = self.strip_prefix(&key) {
1138 match doc.get(automerge::ROOT, "data") {
1139 Ok(Some((automerge::Value::Scalar(scalar), _))) => {
1140 if let automerge::ScalarValue::Bytes(bytes) = scalar.as_ref() {
1141 tracing::debug!(
1142 "AutomergeCollection.scan: found data bytes, doc_id={}, len={}",
1143 doc_id,
1144 bytes.len()
1145 );
1146 results.push((doc_id.to_string(), bytes.to_vec()));
1147 } else {
1148 tracing::debug!(
1149 "AutomergeCollection.scan: data is not Bytes, doc_id={}",
1150 doc_id
1151 );
1152 }
1153 }
1154 Ok(Some((value, _))) => {
1155 tracing::debug!(
1156 "AutomergeCollection.scan: data is not Scalar, doc_id={}, value_type={:?}",
1157 doc_id,
1158 value
1159 );
1160 }
1161 Ok(None) => {
1162 tracing::debug!(
1163 "AutomergeCollection.scan: no 'data' field, doc_id={}",
1164 doc_id
1165 );
1166 }
1167 Err(e) => {
1168 tracing::debug!(
1169 "AutomergeCollection.scan: error getting 'data', doc_id={}, err={}",
1170 doc_id,
1171 e
1172 );
1173 }
1174 }
1175 }
1176 }
1177
1178 Ok(results)
1179 }
1180
1181 fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>> {
1182 let all_docs = self.scan()?;
1183 Ok(all_docs
1184 .into_iter()
1185 .filter(|(_, bytes)| predicate(bytes))
1186 .collect())
1187 }
1188
1189 fn query_geohash_prefix(&self, geohash_prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1190 let all_docs = self.scan()?;
1193 Ok(all_docs
1194 .into_iter()
1195 .filter(|(id, _)| id.starts_with(geohash_prefix))
1196 .collect())
1197 }
1198
1199 fn count(&self) -> Result<usize> {
1200 Ok(self.scan()?.len())
1201 }
1202}
1203
1204impl crate::qos::GcStore for AutomergeStore {
1210 fn get_all_tombstones(&self) -> anyhow::Result<Vec<crate::qos::Tombstone>> {
1211 self.get_all_tombstones()
1212 }
1213
1214 fn remove_tombstone(&self, collection: &str, document_id: &str) -> anyhow::Result<bool> {
1215 self.remove_tombstone(collection, document_id)
1216 }
1217
1218 fn has_tombstone(&self, collection: &str, document_id: &str) -> anyhow::Result<bool> {
1219 self.has_tombstone(collection, document_id)
1220 }
1221
1222 fn get_expired_documents(
1223 &self,
1224 collection: &str,
1225 cutoff: std::time::SystemTime,
1226 ) -> anyhow::Result<Vec<String>> {
1227 self.get_expired_documents(collection, cutoff)
1228 }
1229
1230 fn hard_delete(&self, collection: &str, document_id: &str) -> anyhow::Result<()> {
1231 self.hard_delete(collection, document_id)
1232 }
1233
1234 fn list_collections(&self) -> anyhow::Result<Vec<String>> {
1235 self.list_collections()
1236 }
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241 use super::*;
1242 use tempfile::TempDir;
1243
1244 fn create_test_store() -> (Arc<AutomergeStore>, TempDir) {
1245 let temp_dir = TempDir::new().unwrap();
1246 let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
1247 (store, temp_dir)
1248 }
1249
1250 #[test]
1251 fn test_collection_upsert_and_get() {
1252 let (store, _temp) = create_test_store();
1253 let collection = store.collection("test");
1254
1255 let data = b"test data".to_vec();
1256 collection.upsert("doc1", data.clone()).unwrap();
1257
1258 let retrieved = collection.get("doc1").unwrap().unwrap();
1259 assert_eq!(retrieved, data);
1260 }
1261
1262 #[test]
1263 fn test_collection_scan() {
1264 let (store, _temp) = create_test_store();
1265 let collection = store.collection("test");
1266
1267 collection.upsert("doc1", b"data1".to_vec()).unwrap();
1268 collection.upsert("doc2", b"data2".to_vec()).unwrap();
1269
1270 let results = collection.scan().unwrap();
1271 assert_eq!(results.len(), 2);
1272
1273 let ids: Vec<String> = results.iter().map(|(id, _)| id.clone()).collect();
1274 assert!(ids.contains(&"doc1".to_string()));
1275 assert!(ids.contains(&"doc2".to_string()));
1276 }
1277
1278 #[test]
1279 fn test_collection_delete() {
1280 let (store, _temp) = create_test_store();
1281 let collection = store.collection("test");
1282
1283 collection.upsert("doc1", b"data1".to_vec()).unwrap();
1284 assert!(collection.get("doc1").unwrap().is_some());
1285
1286 collection.delete("doc1").unwrap();
1287 assert!(collection.get("doc1").unwrap().is_none());
1288 }
1289
1290 #[test]
1291 fn test_collection_count() {
1292 let (store, _temp) = create_test_store();
1293 let collection = store.collection("test");
1294
1295 assert_eq!(collection.count().unwrap(), 0);
1296
1297 collection.upsert("doc1", b"data1".to_vec()).unwrap();
1298 collection.upsert("doc2", b"data2".to_vec()).unwrap();
1299
1300 assert_eq!(collection.count().unwrap(), 2);
1301 }
1302
1303 #[test]
1304 fn test_collection_find_with_predicate() {
1305 let (store, _temp) = create_test_store();
1306 let collection = store.collection("test");
1307
1308 collection.upsert("doc1", b"hello".to_vec()).unwrap();
1309 collection.upsert("doc2", b"world".to_vec()).unwrap();
1310 collection.upsert("doc3", b"hello world".to_vec()).unwrap();
1311
1312 let results = collection
1313 .find(Box::new(|bytes| {
1314 String::from_utf8_lossy(bytes).contains("hello")
1315 }))
1316 .unwrap();
1317
1318 assert_eq!(results.len(), 2);
1319 }
1320
1321 #[test]
1322 fn test_collection_namespace_isolation() {
1323 let (store, _temp) = create_test_store();
1324 let collection1 = store.collection("coll1");
1325 let collection2 = store.collection("coll2");
1326
1327 collection1.upsert("doc1", b"data1".to_vec()).unwrap();
1328 collection2.upsert("doc1", b"data2".to_vec()).unwrap();
1329
1330 let data1 = collection1.get("doc1").unwrap().unwrap();
1331 let data2 = collection2.get("doc1").unwrap().unwrap();
1332
1333 assert_eq!(data1, b"data1");
1334 assert_eq!(data2, b"data2");
1335 assert_ne!(data1, data2);
1336 }
1337
1338 #[test]
1339 fn test_direct_put_and_get() {
1340 let (store, _temp) = create_test_store();
1341
1342 let mut doc = Automerge::new();
1343 doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1344 tx.put(automerge::ROOT, "key", "value")?;
1345 Ok(())
1346 })
1347 .unwrap();
1348
1349 store.put("test-doc", &doc).unwrap();
1350
1351 let loaded = store.get("test-doc").unwrap().unwrap();
1352 let value: String = loaded
1353 .get(automerge::ROOT, "key")
1354 .unwrap()
1355 .unwrap()
1356 .0
1357 .to_string();
1358 assert!(value.contains("value"));
1359 }
1360
1361 #[test]
1362 fn test_scan_prefix() {
1363 let (store, _temp) = create_test_store();
1364
1365 let mut doc1 = Automerge::new();
1366 doc1.transact::<_, _, automerge::AutomergeError>(|tx| {
1367 tx.put(automerge::ROOT, "n", "1")?;
1368 Ok(())
1369 })
1370 .unwrap();
1371
1372 let mut doc2 = Automerge::new();
1373 doc2.transact::<_, _, automerge::AutomergeError>(|tx| {
1374 tx.put(automerge::ROOT, "n", "2")?;
1375 Ok(())
1376 })
1377 .unwrap();
1378
1379 store.put("prefix:a", &doc1).unwrap();
1380 store.put("prefix:b", &doc2).unwrap();
1381 store.put("other:c", &doc1).unwrap();
1382
1383 let results = store.scan_prefix("prefix:").unwrap();
1384 assert_eq!(results.len(), 2);
1385 }
1386
1387 #[test]
1390 fn test_compact_document() {
1391 let (store, _temp) = create_test_store();
1392
1393 let mut doc = Automerge::new();
1395 for i in 0..100 {
1396 doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1397 tx.put(automerge::ROOT, "counter", i as i64)?;
1398 Ok(())
1399 })
1400 .unwrap();
1401 }
1402
1403 store.put("test-doc", &doc).unwrap();
1404 let size_before = store.document_size("test-doc").unwrap().unwrap();
1405
1406 let result = store.compact("test-doc").unwrap();
1408 assert!(result.is_some());
1409 let (old_size, new_size) = result.unwrap();
1410
1411 assert_eq!(old_size, size_before);
1412 assert!(
1413 new_size <= old_size,
1414 "Compaction should reduce or maintain size"
1415 );
1416
1417 let loaded = store.get("test-doc").unwrap().unwrap();
1419 let value = loaded.get(automerge::ROOT, "counter").unwrap().unwrap();
1420 assert_eq!(value.0.to_i64(), Some(99));
1421 }
1422
1423 #[test]
1424 fn test_compact_nonexistent_document() {
1425 let (store, _temp) = create_test_store();
1426 let result = store.compact("nonexistent").unwrap();
1427 assert!(result.is_none());
1428 }
1429
1430 #[test]
1431 fn test_compact_prefix() {
1432 let (store, _temp) = create_test_store();
1433
1434 for doc_num in 0..5 {
1436 let mut doc = Automerge::new();
1437 for i in 0..50 {
1438 doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1439 tx.put(automerge::ROOT, "counter", i as i64)?;
1440 Ok(())
1441 })
1442 .unwrap();
1443 }
1444 store.put(&format!("test:{}", doc_num), &doc).unwrap();
1445 }
1446
1447 let mut other_doc = Automerge::new();
1449 other_doc
1450 .transact::<_, _, automerge::AutomergeError>(|tx| {
1451 tx.put(automerge::ROOT, "other", "value")?;
1452 Ok(())
1453 })
1454 .unwrap();
1455 store.put("other:1", &other_doc).unwrap();
1456
1457 let (count, before, after) = store.compact_prefix("test:").unwrap();
1459 assert_eq!(count, 5);
1460 assert!(before > 0);
1461 assert!(after <= before);
1462
1463 let other = store.get("other:1").unwrap().unwrap();
1465 let value = other.get(automerge::ROOT, "other").unwrap().unwrap();
1466 assert!(value.0.to_str().unwrap().contains("value"));
1467 }
1468
1469 #[test]
1470 fn test_compact_all() {
1471 let (store, _temp) = create_test_store();
1472
1473 for prefix in &["a", "b", "c"] {
1475 let mut doc = Automerge::new();
1476 for i in 0..30 {
1477 doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1478 tx.put(automerge::ROOT, "counter", i as i64)?;
1479 Ok(())
1480 })
1481 .unwrap();
1482 }
1483 store.put(&format!("{}:doc", prefix), &doc).unwrap();
1484 }
1485
1486 let (count, before, after) = store.compact_all().unwrap();
1487 assert_eq!(count, 3);
1488 assert!(before > 0);
1489 assert!(after <= before);
1490 }
1491
1492 #[test]
1493 fn test_compact_in_memory_store() {
1494 let store = Arc::new(AutomergeStore::in_memory());
1495
1496 let mut doc = Automerge::new();
1502 for i in 0..100 {
1503 doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1504 tx.put(automerge::ROOT, "counter", i as i64)?;
1505 Ok(())
1506 })
1507 .unwrap();
1508 }
1509
1510 let mut doc2 = doc.fork();
1513 for i in 0..100 {
1514 doc2.transact::<_, _, automerge::AutomergeError>(|tx| {
1515 tx.put(automerge::ROOT, "peer2_counter", i as i64)?;
1516 Ok(())
1517 })
1518 .unwrap();
1519 }
1520 doc.merge(&mut doc2).unwrap();
1521
1522 store.put("test-doc", &doc).unwrap();
1523
1524 let result = store.compact("test-doc").unwrap();
1525 assert!(result.is_some());
1526 let (old_size, new_size) = result.unwrap();
1527 assert!(
1528 new_size <= old_size,
1529 "compaction should not increase size, got {} -> {}",
1530 old_size,
1531 new_size
1532 );
1533
1534 let loaded = store.get("test-doc").unwrap().unwrap();
1536 let counter = loaded.get(automerge::ROOT, "counter").unwrap().unwrap();
1537 assert_eq!(counter.0.to_i64(), Some(99));
1538 let peer2 = loaded
1539 .get(automerge::ROOT, "peer2_counter")
1540 .unwrap()
1541 .unwrap();
1542 assert_eq!(peer2.0.to_i64(), Some(99));
1543 }
1544
1545 #[test]
1546 fn test_compact_above_threshold() {
1547 let store = Arc::new(AutomergeStore::in_memory());
1548
1549 let mut small_doc = Automerge::new();
1551 small_doc
1552 .transact::<_, _, automerge::AutomergeError>(|tx| {
1553 tx.put(automerge::ROOT, "key", "value")?;
1554 Ok(())
1555 })
1556 .unwrap();
1557 store.put("small-doc", &small_doc).unwrap();
1558
1559 let mut big_doc = Automerge::new();
1561 for i in 0..200 {
1562 big_doc
1563 .transact::<_, _, automerge::AutomergeError>(|tx| {
1564 tx.put(automerge::ROOT, "counter", i as i64)?;
1565 Ok(())
1566 })
1567 .unwrap();
1568 }
1569 let mut big_doc2 = big_doc.fork();
1571 for i in 0..200 {
1572 big_doc2
1573 .transact::<_, _, automerge::AutomergeError>(|tx| {
1574 tx.put(automerge::ROOT, "peer2", i as i64)?;
1575 Ok(())
1576 })
1577 .unwrap();
1578 }
1579 big_doc.merge(&mut big_doc2).unwrap();
1580 store.put("big-doc", &big_doc).unwrap();
1581
1582 let small_size = small_doc.save().len();
1584 let threshold = small_size + 1;
1585 let (count, before, after) = store.compact_above_threshold(threshold).unwrap();
1586
1587 assert_eq!(count, 1);
1589 assert!(
1590 after <= before,
1591 "compaction should not increase size, got {} -> {}",
1592 before,
1593 after
1594 );
1595
1596 let loaded = store.get("big-doc").unwrap().unwrap();
1598 let value = loaded.get(automerge::ROOT, "counter").unwrap().unwrap();
1599 assert_eq!(value.0.to_i64(), Some(199));
1600 }
1601
1602 #[tokio::test]
1603 async fn test_background_compaction_cancellation() {
1604 let store = Arc::new(AutomergeStore::in_memory());
1605 let token = tokio_util::sync::CancellationToken::new();
1606
1607 store.start_background_compaction(
1608 std::time::Duration::from_millis(50),
1609 1024,
1610 vec!["test".to_string()],
1611 token.clone(),
1612 );
1613
1614 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1616
1617 token.cancel();
1619 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1620 }
1621
1622 #[test]
1623 fn test_compact_collections_above_threshold() {
1624 let store = Arc::new(AutomergeStore::in_memory());
1625
1626 let mut beacons_doc = Automerge::new();
1628 for i in 0..100 {
1629 beacons_doc
1630 .transact::<_, _, automerge::AutomergeError>(|tx| {
1631 tx.put(automerge::ROOT, "lat", i as i64)?;
1632 Ok(())
1633 })
1634 .unwrap();
1635 }
1636 store.put("beacons:beacon-1", &beacons_doc).unwrap();
1637
1638 let mut commands_doc = Automerge::new();
1639 for i in 0..100 {
1640 commands_doc
1641 .transact::<_, _, automerge::AutomergeError>(|tx| {
1642 tx.put(automerge::ROOT, "cmd", i as i64)?;
1643 Ok(())
1644 })
1645 .unwrap();
1646 }
1647 store.put("commands:cmd-1", &commands_doc).unwrap();
1648
1649 let before_commands = store.document_size("commands:cmd-1").unwrap().unwrap();
1650
1651 let (count, _, _) = store
1653 .compact_collections_above_threshold(&["beacons".to_string()], 1)
1654 .unwrap();
1655
1656 assert_eq!(count, 1, "only the beacons doc should be compacted");
1657
1658 let after_commands = store.document_size("commands:cmd-1").unwrap().unwrap();
1660 assert_eq!(
1661 before_commands, after_commands,
1662 "commands doc should not be touched"
1663 );
1664 }
1665
1666 #[test]
1667 fn test_compact_collections_respects_threshold() {
1668 let store = Arc::new(AutomergeStore::in_memory());
1669
1670 let mut small_doc = Automerge::new();
1672 small_doc
1673 .transact::<_, _, automerge::AutomergeError>(|tx| {
1674 tx.put(automerge::ROOT, "key", "value")?;
1675 Ok(())
1676 })
1677 .unwrap();
1678 store.put("beacons:small", &small_doc).unwrap();
1679
1680 let size = store.document_size("beacons:small").unwrap().unwrap();
1681
1682 let (count, _, _) = store
1684 .compact_collections_above_threshold(&["beacons".to_string()], size + 1)
1685 .unwrap();
1686
1687 assert_eq!(count, 0, "small doc below threshold should be skipped");
1688 }
1689}