Skip to main content

peat_mesh/storage/
automerge_store.rs

1//! Automerge document storage with redb persistence
2//!
3//! This module provides persistent storage for Automerge CRDT documents using redb,
4//! a pure Rust embedded database. This replaces the previous RocksDB implementation
5//! to eliminate C/C++ build dependencies and align with Iroh's storage layer.
6
7use crate::storage::traits::{Collection, DocumentPredicate};
8use crate::storage::ttl_manager::TtlManager;
9use automerge::{transaction::Transactable, Automerge, ReadDoc};
10use lru::LruCache;
11use redb::{Builder, Database, ReadableTable, ReadableTableMetadata, TableDefinition};
12use std::hash::{Hash, Hasher};
13use std::num::NonZeroUsize;
14use std::path::Path;
15use std::sync::{Arc, RwLock};
16use tokio::sync::broadcast;
17
18use anyhow::{Context, Result};
19
20/// Table definition for document storage
21/// Key: document key as string bytes
22/// Value: serialized Automerge document bytes
23const DOCUMENTS_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("documents");
24
25/// Default redb cache size in bytes (Issue #446)
26///
27/// The redb default is 1 GiB which is excessive for our use case.
28/// We set a much smaller default (16 MiB) that's sufficient for typical
29/// document storage while preventing unbounded memory growth.
30///
31/// Can be overridden via `CAP_REDB_CACHE_SIZE` environment variable (in bytes).
32const DEFAULT_REDB_CACHE_SIZE: usize = 16 * 1024 * 1024; // 16 MiB
33
34/// Table definition for tombstone storage (ADR-034 Phase 2)
35/// Key: "collection:document_id" as string bytes
36/// Value: serialized Tombstone bytes (JSON)
37const TOMBSTONES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("tombstones");
38
39/// Storage layer for Automerge documents with redb persistence
40///
41/// # Change Notifications (Phase 6.3)
42///
43/// The store emits change notifications when documents are modified via `put()`.
44/// Subscribers can listen for these notifications to trigger automatic sync.
45///
46/// # In-Memory Mode
47///
48/// For high-throughput testing, the store can operate in pure in-memory mode
49/// where all documents are stored only in the LRU cache (no disk persistence).
50/// Enable via `AutomergeStore::in_memory()` constructor.
51/// Number of striped lock buckets for per-document concurrency control.
52/// Documents are hashed into buckets, so operations on different documents
53/// rarely contend while operations on the same document serialize correctly.
54const DOC_LOCK_STRIPES: usize = 64;
55
56pub struct AutomergeStore {
57    /// Database handle - None when running in memory-only mode
58    db: Option<Arc<Database>>,
59    cache: Arc<RwLock<LruCache<String, Automerge>>>,
60    /// Broadcast channel for sync coordinator - used to trigger P2P sync
61    /// Only notified for local puts (not synced documents)
62    change_tx: broadcast::Sender<String>,
63    /// Broadcast channel for observers - used for hierarchical aggregation (Issue #377)
64    /// Notified for ALL document changes (local and synced) so observers can react
65    observer_tx: broadcast::Sender<String>,
66    /// Striped locks for per-document concurrency control (Issue #74).
67    /// Serializes read-modify-write operations (compact, merge) on the
68    /// same document key to prevent silent data loss.
69    doc_locks: Box<[std::sync::Mutex<()>]>,
70}
71
72impl AutomergeStore {
73    /// Open or create storage at the given path
74    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
75        // Ensure the directory exists
76        let path = path.as_ref();
77        if let Some(parent) = path.parent() {
78            std::fs::create_dir_all(parent).ok();
79        }
80
81        // redb stores in a single file, append .redb extension if it's a directory path
82        let db_path = if path.is_dir() || !path.exists() {
83            std::fs::create_dir_all(path).ok();
84            path.join("automerge.redb")
85        } else {
86            path.to_path_buf()
87        };
88
89        // Check for corrupted (0-byte) database file and remove it
90        // This can happen on Android if the previous initialization was interrupted
91        if db_path.exists() {
92            if let Ok(metadata) = std::fs::metadata(&db_path) {
93                if metadata.len() == 0 {
94                    tracing::warn!("Removing corrupted 0-byte redb database at {:?}", db_path);
95                    std::fs::remove_file(&db_path).ok();
96                }
97            }
98        }
99
100        // Configure redb cache size (Issue #446)
101        // Default redb cache is 1 GiB which causes excessive memory growth.
102        // Use a smaller cache (default 16 MiB) or allow override via environment variable.
103        let cache_size = std::env::var("CAP_REDB_CACHE_SIZE")
104            .ok()
105            .and_then(|s| s.parse::<usize>().ok())
106            .unwrap_or(DEFAULT_REDB_CACHE_SIZE);
107
108        tracing::debug!("Opening redb database with cache_size={} bytes", cache_size);
109
110        let db = Builder::new()
111            .set_cache_size(cache_size)
112            .create(&db_path)
113            .context("Failed to open redb database")?;
114
115        // Initialize the tables (redb requires this on first use)
116        {
117            let write_txn = db
118                .begin_write()
119                .context("Failed to begin write transaction")?;
120            // Creating the tables if they don't exist
121            let _ = write_txn.open_table(DOCUMENTS_TABLE);
122            let _ = write_txn.open_table(TOMBSTONES_TABLE); // ADR-034 Phase 2
123            write_txn
124                .commit()
125                .context("Failed to commit table creation")?;
126        }
127
128        let cache = LruCache::new(NonZeroUsize::new(1000).unwrap());
129
130        // Create broadcast channel for sync coordinator notifications
131        // Issue #346: Increased from 1024 to 8192 to reduce lagging under high load.
132        // When this channel lags, the auto_sync_task must do a full resync which is expensive.
133        // A larger buffer trades memory (8KB per doc_key) for reduced resync frequency.
134        let (change_tx, _) = broadcast::channel(8192);
135
136        // Create broadcast channel for observer notifications (Issue #377)
137        // This channel notifies ALL document changes including synced documents
138        // so hierarchical aggregation can react to remotely synced platoon summaries
139        let (observer_tx, _) = broadcast::channel(8192);
140
141        Ok(Self {
142            db: Some(Arc::new(db)),
143            cache: Arc::new(RwLock::new(cache)),
144            change_tx,
145            observer_tx,
146            doc_locks: (0..DOC_LOCK_STRIPES)
147                .map(|_| std::sync::Mutex::new(()))
148                .collect::<Vec<_>>()
149                .into_boxed_slice(),
150        })
151    }
152
153    /// Create an in-memory store (no disk persistence)
154    ///
155    /// Documents are stored only in the LRU cache. This mode is useful for
156    /// high-throughput testing where persistence is not required.
157    ///
158    /// Note: Cache size is 10,000 documents in memory mode (vs 1,000 for disk mode)
159    /// to accommodate larger working sets.
160    pub fn in_memory() -> Self {
161        // Larger cache for in-memory mode since we have no disk backing
162        let cache = LruCache::new(NonZeroUsize::new(10000).unwrap());
163        let (change_tx, _) = broadcast::channel(8192);
164        let (observer_tx, _) = broadcast::channel(8192);
165
166        tracing::info!("AutomergeStore: Running in MEMORY-ONLY mode (no disk persistence)");
167
168        Self {
169            db: None,
170            cache: Arc::new(RwLock::new(cache)),
171            change_tx,
172            observer_tx,
173            doc_locks: (0..DOC_LOCK_STRIPES)
174                .map(|_| std::sync::Mutex::new(()))
175                .collect::<Vec<_>>()
176                .into_boxed_slice(),
177        }
178    }
179
180    /// Acquire the striped lock for a document key.
181    ///
182    /// Returns a `MutexGuard` that serializes read-modify-write operations
183    /// on documents hashing to the same stripe. Hold this guard across
184    /// the entire get→modify→put sequence.
185    ///
186    /// Used by `compact()` and `AutomergeSyncCoordinator::receive_sync_message()`
187    /// to prevent concurrent modifications from silently dropping changes.
188    pub fn lock_doc(&self, key: &str) -> std::sync::MutexGuard<'_, ()> {
189        let mut hasher = std::collections::hash_map::DefaultHasher::new();
190        key.hash(&mut hasher);
191        let idx = (hasher.finish() as usize) % DOC_LOCK_STRIPES;
192        self.doc_locks[idx]
193            .lock()
194            .unwrap_or_else(|e| e.into_inner())
195    }
196
197    /// Check if the store is running in memory-only mode
198    pub fn is_in_memory(&self) -> bool {
199        self.db.is_none()
200    }
201
202    /// Save an Automerge document
203    ///
204    /// # Change Notifications (Phase 6.3)
205    ///
206    /// This method emits a change notification after successfully persisting the document.
207    /// Subscribers will receive the document key to trigger automatic sync.
208    pub fn put(&self, key: &str, doc: &Automerge) -> Result<()> {
209        self.put_inner(key, doc, true)
210    }
211
212    /// Save an Automerge document without emitting change notification (Issue #346)
213    ///
214    /// Use this method when storing documents received via sync to avoid
215    /// triggering a sync-back that would be blocked by cooldown and waste resources.
216    /// The sending peer already has this document, so syncing back is unnecessary.
217    pub fn put_without_notify(&self, key: &str, doc: &Automerge) -> Result<()> {
218        self.put_inner(key, doc, false)
219    }
220
221    /// Save an Automerge document with TTL-based expiration.
222    ///
223    /// Delegates to `put()` and then schedules TTL cleanup via the TtlManager
224    /// if the collection has a configured TTL. The collection name is extracted
225    /// from the key prefix (before the first '/' or ':').
226    pub fn put_with_ttl(&self, key: &str, doc: &Automerge, ttl_manager: &TtlManager) -> Result<()> {
227        self.put(key, doc)?;
228
229        // Extract collection name from key prefix (handles both "col/id" and "col:id")
230        let collection = key.find(['/', ':']).map(|pos| &key[..pos]).unwrap_or(key);
231
232        if let Some(ttl) = ttl_manager.config().get_collection_ttl(collection) {
233            ttl_manager.set_ttl(key, ttl)?;
234        }
235
236        Ok(())
237    }
238
239    /// Internal put implementation
240    fn put_inner(&self, key: &str, doc: &Automerge, notify: bool) -> Result<()> {
241        // Only persist to disk if we have a database (not in-memory mode)
242        if let Some(ref db) = self.db {
243            let bytes = doc.save();
244
245            let write_txn = db
246                .begin_write()
247                .context("Failed to begin write transaction")?;
248            {
249                let mut table = write_txn
250                    .open_table(DOCUMENTS_TABLE)
251                    .context("Failed to open documents table")?;
252                table
253                    .insert(key.as_bytes(), bytes.as_slice())
254                    .context("Failed to insert document")?;
255            }
256            write_txn.commit().context("Failed to commit write")?;
257        }
258
259        self.cache
260            .write()
261            .unwrap()
262            .put(key.to_string(), doc.clone());
263
264        // Always notify observers for ALL document changes (Issue #377)
265        // This enables hierarchical aggregation to react to remotely synced docs
266        let _ = self.observer_tx.send(key.to_string());
267
268        // Notify sync coordinator only for local changes (Phase 6.3, Issue #346)
269        // Skip notification for documents received via sync to avoid sync-back loops
270        if notify {
271            // Ignore send errors - if no one is listening, that's fine
272            let _ = self.change_tx.send(key.to_string());
273        }
274
275        Ok(())
276    }
277
278    /// Load an Automerge document
279    pub fn get(&self, key: &str) -> Result<Option<Automerge>> {
280        // Always check cache first
281        {
282            let mut cache = self.cache.write().unwrap_or_else(|e| e.into_inner());
283            if let Some(doc) = cache.get(key) {
284                return Ok(Some(doc.clone()));
285            }
286        }
287
288        // In memory-only mode, cache miss means document doesn't exist
289        let Some(ref db) = self.db else {
290            return Ok(None);
291        };
292
293        let read_txn = db
294            .begin_read()
295            .context("Failed to begin read transaction")?;
296        let table = read_txn
297            .open_table(DOCUMENTS_TABLE)
298            .context("Failed to open documents table")?;
299
300        match table.get(key.as_bytes())? {
301            Some(value) => {
302                let bytes = value.value();
303                let doc = Automerge::load(bytes).context("Failed to load Automerge document")?;
304
305                self.cache
306                    .write()
307                    .unwrap()
308                    .put(key.to_string(), doc.clone());
309
310                Ok(Some(doc))
311            }
312            None => Ok(None),
313        }
314    }
315
316    /// Delete a document
317    pub fn delete(&self, key: &str) -> Result<()> {
318        // Only delete from disk if we have a database
319        if let Some(ref db) = self.db {
320            let write_txn = db
321                .begin_write()
322                .context("Failed to begin write transaction")?;
323            {
324                let mut table = write_txn
325                    .open_table(DOCUMENTS_TABLE)
326                    .context("Failed to open documents table")?;
327                table.remove(key.as_bytes())?;
328            }
329            write_txn.commit().context("Failed to commit delete")?;
330        }
331
332        self.cache
333            .write()
334            .unwrap_or_else(|e| e.into_inner())
335            .pop(key);
336        Ok(())
337    }
338
339    /// Scan documents with prefix
340    pub fn scan_prefix(&self, prefix: &str) -> Result<Vec<(String, Automerge)>> {
341        // In memory-only mode, scan the cache
342        if self.db.is_none() {
343            let cache = self.cache.read().unwrap_or_else(|e| e.into_inner());
344            let results: Vec<(String, Automerge)> = cache
345                .iter()
346                .filter(|(k, _)| k.starts_with(prefix))
347                .map(|(k, v)| (k.clone(), v.clone()))
348                .collect();
349            return Ok(results);
350        }
351
352        let mut results = Vec::new();
353
354        let read_txn = self
355            .db
356            .as_ref()
357            .unwrap()
358            .begin_read()
359            .context("Failed to begin read transaction")?;
360        let table = read_txn
361            .open_table(DOCUMENTS_TABLE)
362            .context("Failed to open documents table")?;
363
364        // Use range to scan from prefix onwards
365        let prefix_bytes = prefix.as_bytes();
366        for entry in table.range(prefix_bytes..)? {
367            let (key, value) = entry?;
368            let key_bytes = key.value();
369
370            // Stop if we've passed the prefix
371            if !key_bytes.starts_with(prefix_bytes) {
372                break;
373            }
374
375            let key_str = String::from_utf8_lossy(key_bytes).to_string();
376            let doc = Automerge::load(value.value())?;
377            results.push((key_str, doc));
378        }
379
380        Ok(results)
381    }
382
383    /// Count total documents
384    pub fn count(&self) -> usize {
385        // In memory-only mode, count cache entries
386        let Some(ref db) = self.db else {
387            return self.cache.read().unwrap_or_else(|e| e.into_inner()).len();
388        };
389
390        let read_txn = match db.begin_read() {
391            Ok(txn) => txn,
392            Err(_) => return 0,
393        };
394        let table = match read_txn.open_table(DOCUMENTS_TABLE) {
395            Ok(t) => t,
396            Err(_) => return 0,
397        };
398
399        table.len().unwrap_or(0) as usize
400    }
401
402    /// Subscribe to document change notifications (Phase 6.3)
403    ///
404    /// Returns a receiver that will receive document keys whenever documents are modified.
405    /// Multiple subscribers are supported - each gets their own receiver.
406    ///
407    /// # Example
408    ///
409    /// ```ignore
410    /// let store = AutomergeStore::open("./data")?;
411    /// let mut rx = store.subscribe_to_changes();
412    /// while let Ok(doc_key) = rx.recv().await {
413    ///     println!("Document changed: {}", doc_key);
414    /// }
415    /// ```
416    pub fn subscribe_to_changes(&self) -> broadcast::Receiver<String> {
417        self.change_tx.subscribe()
418    }
419
420    /// Subscribe to observer notifications (Issue #377)
421    ///
422    /// Returns a receiver that receives document keys for ALL changes, including
423    /// documents received via sync. Use this for hierarchical aggregation where
424    /// you need to react to remotely synced documents (e.g., company commander
425    /// reacting to platoon summaries synced from platoon leaders).
426    ///
427    /// Unlike `subscribe_to_changes()` which only fires for local puts,
428    /// this fires for ALL document changes.
429    pub fn subscribe_to_observer_changes(&self) -> broadcast::Receiver<String> {
430        self.observer_tx.subscribe()
431    }
432
433    /// Get a collection handle for a specific namespace
434    pub fn collection(self: &Arc<Self>, name: &str) -> Arc<dyn Collection> {
435        Arc::new(AutomergeCollection {
436            store: Arc::clone(self),
437            prefix: format!("{}:", name),
438        })
439    }
440
441    // === Tombstone storage methods (ADR-034 Phase 2) ===
442
443    /// Store a tombstone
444    ///
445    /// Tombstones are stored with key format "collection:document_id"
446    /// In memory-only mode, this is a no-op (tombstones aren't needed without persistence)
447    pub fn put_tombstone(&self, tombstone: &crate::qos::Tombstone) -> Result<()> {
448        let Some(ref db) = self.db else {
449            return Ok(()); // No-op in memory mode
450        };
451
452        let key = format!("{}:{}", tombstone.collection, tombstone.document_id);
453        let bytes = serde_json::to_vec(tombstone).context("Failed to serialize tombstone")?;
454
455        let write_txn = db
456            .begin_write()
457            .context("Failed to begin write transaction")?;
458        {
459            let mut table = write_txn
460                .open_table(TOMBSTONES_TABLE)
461                .context("Failed to open tombstones table")?;
462            table
463                .insert(key.as_bytes(), bytes.as_slice())
464                .context("Failed to insert tombstone")?;
465        }
466        write_txn
467            .commit()
468            .context("Failed to commit tombstone write")?;
469
470        tracing::debug!(
471            "Stored tombstone for document {} in collection {}",
472            tombstone.document_id,
473            tombstone.collection
474        );
475
476        Ok(())
477    }
478
479    /// Get a tombstone by collection and document ID
480    pub fn get_tombstone(
481        &self,
482        collection: &str,
483        document_id: &str,
484    ) -> Result<Option<crate::qos::Tombstone>> {
485        let Some(ref db) = self.db else {
486            return Ok(None); // No tombstones in memory mode
487        };
488
489        let key = format!("{}:{}", collection, document_id);
490
491        let read_txn = db
492            .begin_read()
493            .context("Failed to begin read transaction")?;
494        let table = read_txn
495            .open_table(TOMBSTONES_TABLE)
496            .context("Failed to open tombstones table")?;
497
498        match table.get(key.as_bytes())? {
499            Some(value) => {
500                let bytes = value.value();
501                let tombstone: crate::qos::Tombstone =
502                    serde_json::from_slice(bytes).context("Failed to deserialize tombstone")?;
503                Ok(Some(tombstone))
504            }
505            None => Ok(None),
506        }
507    }
508
509    /// Get all tombstones for a collection
510    pub fn get_tombstones_for_collection(
511        &self,
512        collection: &str,
513    ) -> Result<Vec<crate::qos::Tombstone>> {
514        let Some(ref db) = self.db else {
515            return Ok(Vec::new()); // No tombstones in memory mode
516        };
517
518        let prefix = format!("{}:", collection);
519        let mut tombstones = Vec::new();
520
521        let read_txn = db
522            .begin_read()
523            .context("Failed to begin read transaction")?;
524        let table = read_txn
525            .open_table(TOMBSTONES_TABLE)
526            .context("Failed to open tombstones table")?;
527
528        // Iterate all entries and filter by prefix
529        for entry in table.iter()? {
530            let (key, value) = entry?;
531            let key_str = String::from_utf8_lossy(key.value());
532            if key_str.starts_with(&prefix) {
533                let tombstone: crate::qos::Tombstone = serde_json::from_slice(value.value())
534                    .context("Failed to deserialize tombstone")?;
535                tombstones.push(tombstone);
536            }
537        }
538
539        Ok(tombstones)
540    }
541
542    /// Get all tombstones across all collections
543    pub fn get_all_tombstones(&self) -> Result<Vec<crate::qos::Tombstone>> {
544        let Some(ref db) = self.db else {
545            return Ok(Vec::new()); // No tombstones in memory mode
546        };
547
548        let mut tombstones = Vec::new();
549
550        let read_txn = db
551            .begin_read()
552            .context("Failed to begin read transaction")?;
553        let table = read_txn
554            .open_table(TOMBSTONES_TABLE)
555            .context("Failed to open tombstones table")?;
556
557        for entry in table.iter()? {
558            let (_key, value) = entry?;
559            let tombstone: crate::qos::Tombstone =
560                serde_json::from_slice(value.value()).context("Failed to deserialize tombstone")?;
561            tombstones.push(tombstone);
562        }
563
564        Ok(tombstones)
565    }
566
567    /// Remove a tombstone
568    pub fn remove_tombstone(&self, collection: &str, document_id: &str) -> Result<bool> {
569        let Some(ref db) = self.db else {
570            return Ok(false); // No tombstones in memory mode
571        };
572
573        let key = format!("{}:{}", collection, document_id);
574
575        let write_txn = db
576            .begin_write()
577            .context("Failed to begin write transaction")?;
578        let existed = {
579            let mut table = write_txn
580                .open_table(TOMBSTONES_TABLE)
581                .context("Failed to open tombstones table")?;
582            let result = table.remove(key.as_bytes())?;
583            result.is_some()
584        };
585        write_txn
586            .commit()
587            .context("Failed to commit tombstone removal")?;
588
589        if existed {
590            tracing::debug!(
591                "Removed tombstone for document {} in collection {}",
592                document_id,
593                collection
594            );
595        }
596
597        Ok(existed)
598    }
599
600    /// Check if a tombstone exists
601    pub fn has_tombstone(&self, collection: &str, document_id: &str) -> Result<bool> {
602        Ok(self.get_tombstone(collection, document_id)?.is_some())
603    }
604
605    // === Garbage Collection support methods (ADR-034 Phase 3) ===
606
607    /// Get list of all collections (by scanning document key prefixes)
608    pub fn list_collections(&self) -> Result<Vec<String>> {
609        let mut collections = std::collections::HashSet::new();
610
611        // In memory-only mode, scan the cache
612        if self.db.is_none() {
613            let cache = self.cache.read().unwrap_or_else(|e| e.into_inner());
614            for key in cache.iter().map(|(k, _)| k) {
615                if let Some(colon_pos) = key.find(':') {
616                    let collection = &key[..colon_pos];
617                    collections.insert(collection.to_string());
618                }
619            }
620            return Ok(collections.into_iter().collect());
621        }
622
623        let read_txn = self
624            .db
625            .as_ref()
626            .unwrap()
627            .begin_read()
628            .context("Failed to begin read transaction")?;
629        let table = read_txn
630            .open_table(DOCUMENTS_TABLE)
631            .context("Failed to open documents table")?;
632
633        for result in table.iter().context("Failed to iterate documents")? {
634            let (key, _) = result.context("Failed to read document entry")?;
635            let key_str =
636                std::str::from_utf8(key.value()).context("Invalid UTF-8 in document key")?;
637
638            // Keys are formatted as "collection:document_id"
639            if let Some(colon_pos) = key_str.find(':') {
640                let collection = &key_str[..colon_pos];
641                collections.insert(collection.to_string());
642            }
643        }
644
645        Ok(collections.into_iter().collect())
646    }
647
648    /// Get documents in a collection that were created before the cutoff time
649    ///
650    /// This checks the _created_at field stored in the Automerge document.
651    /// Used for ImplicitTTL garbage collection.
652    pub fn get_expired_documents(
653        &self,
654        collection: &str,
655        cutoff: std::time::SystemTime,
656    ) -> Result<Vec<String>> {
657        let prefix = format!("{}:", collection);
658        let docs = self.scan_prefix(&prefix)?;
659        let mut expired = Vec::new();
660
661        let cutoff_ms = cutoff
662            .duration_since(std::time::UNIX_EPOCH)
663            .unwrap_or_default()
664            .as_millis() as u64;
665
666        for (key, doc) in docs {
667            // Extract _created_at timestamp if present
668            if let Ok(Some((automerge::Value::Scalar(scalar), _))) =
669                doc.get(automerge::ROOT, "_created_at")
670            {
671                if let automerge::ScalarValue::Uint(created_at) = scalar.as_ref() {
672                    if *created_at < cutoff_ms {
673                        // Document is older than cutoff
674                        if let Some(doc_id) = key.strip_prefix(&prefix) {
675                            expired.push(doc_id.to_string());
676                        }
677                    }
678                }
679            }
680        }
681
682        Ok(expired)
683    }
684
685    /// Hard delete a document (permanent removal, no tombstone created)
686    ///
687    /// Used by garbage collection for ImplicitTTL collections where
688    /// tombstones are not needed.
689    pub fn hard_delete(&self, collection: &str, document_id: &str) -> Result<()> {
690        let key = format!("{}:{}", collection, document_id);
691        self.delete(&key)?;
692        tracing::debug!(
693            "Hard deleted document {} from collection {}",
694            document_id,
695            collection
696        );
697        Ok(())
698    }
699
700    // === Document Compaction (Issue #401 - Memory Blowout Fix) ===
701
702    /// Compact a document by discarding CRDT history
703    ///
704    /// Automerge documents accumulate operation history with every change.
705    /// This method replaces the document with a forked copy that contains
706    /// only the current state, freeing memory used by historical operations.
707    ///
708    /// # When to use
709    ///
710    /// - After many updates to high-frequency documents (beacons, node_states)
711    /// - When memory pressure is detected
712    /// - Periodically for long-running simulations
713    ///
714    /// # Returns
715    ///
716    /// - `Ok(Some(old_size, new_size))` - Document was compacted, returns sizes before/after
717    /// - `Ok(None)` - Document not found
718    /// - `Err(_)` - Compaction failed
719    ///
720    /// # Example
721    ///
722    /// ```ignore
723    /// let store = AutomergeStore::open("./data")?;
724    /// if let Some((old, new)) = store.compact("node_states:soldier-1")? {
725    ///     tracing::info!("Compacted {} -> {} bytes ({}% reduction)", old, new, 100 - (new * 100 / old));
726    /// }
727    /// ```
728    pub fn compact(&self, key: &str) -> Result<Option<(usize, usize)>> {
729        // Hold per-document striped lock across the entire get→fork→put
730        // sequence to prevent concurrent sync writes from being lost (Issue #74).
731        let _guard = self.lock_doc(key);
732        let doc = match self.get(key)? {
733            Some(d) => d,
734            None => return Ok(None),
735        };
736
737        let old_size = doc.save().len();
738
739        // Fork creates a new document with current state but no history
740        let compacted = doc.fork();
741        let new_size = compacted.save().len();
742
743        // Save the compacted document (without triggering sync notification)
744        self.put_without_notify(key, &compacted)?;
745
746        tracing::debug!(
747            "Compacted document {}: {} -> {} bytes ({:.1}% reduction)",
748            key,
749            old_size,
750            new_size,
751            if old_size > 0 {
752                100.0 - (new_size as f64 * 100.0 / old_size as f64)
753            } else {
754                0.0
755            }
756        );
757
758        Ok(Some((old_size, new_size)))
759    }
760
761    /// Compact all documents with a given prefix
762    ///
763    /// Useful for batch-compacting all documents in a collection.
764    ///
765    /// # Returns
766    ///
767    /// `(documents_compacted, total_bytes_before, total_bytes_after)`
768    ///
769    /// # Example
770    ///
771    /// ```ignore
772    /// let store = AutomergeStore::open("./data")?;
773    /// let (count, before, after) = store.compact_prefix("node_states:")?;
774    /// tracing::info!("Compacted {} documents: {} -> {} bytes", count, before, after);
775    /// ```
776    pub fn compact_prefix(&self, prefix: &str) -> Result<(usize, usize, usize)> {
777        let docs = self.scan_prefix(prefix)?;
778        let mut count = 0;
779        let mut total_before = 0;
780        let mut total_after = 0;
781
782        for (key, _) in docs {
783            if let Some((before, after)) = self.compact(&key)? {
784                count += 1;
785                total_before += before;
786                total_after += after;
787            }
788        }
789
790        if count > 0 {
791            tracing::info!(
792                "Compacted {} documents with prefix '{}': {} -> {} bytes ({:.1}% reduction)",
793                count,
794                prefix,
795                total_before,
796                total_after,
797                if total_before > 0 {
798                    100.0 - (total_after as f64 * 100.0 / total_before as f64)
799                } else {
800                    0.0
801                }
802            );
803        }
804
805        Ok((count, total_before, total_after))
806    }
807
808    /// Compact all documents in the store
809    ///
810    /// # Returns
811    ///
812    /// `(documents_compacted, total_bytes_before, total_bytes_after)`
813    pub fn compact_all(&self) -> Result<(usize, usize, usize)> {
814        // In memory-only mode, iterate cache
815        if self.db.is_none() {
816            let keys: Vec<String> = {
817                let cache = self.cache.read().unwrap_or_else(|e| e.into_inner());
818                cache.iter().map(|(k, _)| k.clone()).collect()
819            };
820
821            let mut count = 0;
822            let mut total_before = 0;
823            let mut total_after = 0;
824
825            for key in keys {
826                if let Some((before, after)) = self.compact(&key)? {
827                    count += 1;
828                    total_before += before;
829                    total_after += after;
830                }
831            }
832
833            return Ok((count, total_before, total_after));
834        }
835
836        // With disk persistence, scan all documents
837        let read_txn = self
838            .db
839            .as_ref()
840            .unwrap()
841            .begin_read()
842            .context("Failed to begin read transaction")?;
843        let table = read_txn
844            .open_table(DOCUMENTS_TABLE)
845            .context("Failed to open documents table")?;
846
847        let keys: Vec<String> = table
848            .iter()?
849            .filter_map(|entry| {
850                entry
851                    .ok()
852                    .map(|(k, _)| String::from_utf8_lossy(k.value()).to_string())
853            })
854            .collect();
855
856        drop(table);
857        drop(read_txn);
858
859        let mut count = 0;
860        let mut total_before = 0;
861        let mut total_after = 0;
862
863        for key in keys {
864            if let Some((before, after)) = self.compact(&key)? {
865                count += 1;
866                total_before += before;
867                total_after += after;
868            }
869        }
870
871        if count > 0 {
872            tracing::info!(
873                "Compacted {} documents: {} -> {} bytes ({:.1}% reduction)",
874                count,
875                total_before,
876                total_after,
877                if total_before > 0 {
878                    100.0 - (total_after as f64 * 100.0 / total_before as f64)
879                } else {
880                    0.0
881                }
882            );
883        }
884
885        Ok((count, total_before, total_after))
886    }
887
888    /// Start a background task that periodically compacts documents in
889    /// specific collections that exceed a size threshold.
890    ///
891    /// Only `LatestOnly` sync-mode collections should be compacted — compacting
892    /// `FullHistory` collections destroys change history needed for delta sync.
893    ///
894    /// - `interval`: How often to run compaction (default: 5 minutes)
895    /// - `size_threshold_bytes`: Only compact documents larger than this (default: 64 KiB)
896    /// - `collections`: Collection prefixes to compact (e.g., `["beacons", "platforms"]`)
897    pub fn start_background_compaction(
898        self: &Arc<Self>,
899        interval: std::time::Duration,
900        size_threshold_bytes: usize,
901        collections: Vec<String>,
902        token: tokio_util::sync::CancellationToken,
903    ) {
904        let store = Arc::clone(self);
905        tokio::spawn(async move {
906            let mut timer = tokio::time::interval(interval);
907            // Don't run immediately on startup
908            timer.tick().await;
909
910            loop {
911                tokio::select! {
912                    _ = token.cancelled() => {
913                        tracing::info!("background compaction cancelled");
914                        break;
915                    }
916                    _ = timer.tick() => {
917                        match store.compact_collections_above_threshold(&collections, size_threshold_bytes) {
918                            Ok((count, before, after)) => {
919                                if count > 0 {
920                                    tracing::info!(count, before, after, "background compaction complete");
921                                }
922                            }
923                            Err(e) => {
924                                tracing::warn!("background compaction failed: {e}");
925                            }
926                        }
927                    }
928                }
929            }
930        });
931    }
932
933    /// Compact documents in specific collections that exceed a size threshold.
934    ///
935    /// Only scans documents matching the given collection prefixes (e.g.,
936    /// `"beacons"` matches `"beacons:beacon-1"`, `"beacons:beacon-2"`, etc.).
937    ///
938    /// Returns `(documents_compacted, total_bytes_before, total_bytes_after)`.
939    pub fn compact_collections_above_threshold(
940        &self,
941        collections: &[String],
942        threshold_bytes: usize,
943    ) -> Result<(usize, usize, usize)> {
944        let mut count = 0;
945        let mut total_before = 0;
946        let mut total_after = 0;
947
948        for collection in collections {
949            let prefix = format!("{}:", collection);
950            let docs = self.scan_prefix(&prefix)?;
951            for (key, _) in docs {
952                let size = self.document_size(&key)?.unwrap_or(0);
953                if size >= threshold_bytes {
954                    if let Some((before, after)) = self.compact(&key)? {
955                        count += 1;
956                        total_before += before;
957                        total_after += after;
958                    }
959                }
960            }
961        }
962
963        Ok((count, total_before, total_after))
964    }
965
966    /// Compact only documents exceeding a size threshold.
967    ///
968    /// Returns `(documents_compacted, total_bytes_before, total_bytes_after)`.
969    pub fn compact_above_threshold(&self, threshold_bytes: usize) -> Result<(usize, usize, usize)> {
970        let keys = self.all_keys()?;
971        let mut count = 0;
972        let mut total_before = 0;
973        let mut total_after = 0;
974
975        for key in keys {
976            let size = self.document_size(&key)?.unwrap_or(0);
977            if size >= threshold_bytes {
978                if let Some((before, after)) = self.compact(&key)? {
979                    count += 1;
980                    total_before += before;
981                    total_after += after;
982                }
983            }
984        }
985
986        Ok((count, total_before, total_after))
987    }
988
989    /// List all document keys in the store.
990    fn all_keys(&self) -> Result<Vec<String>> {
991        if self.db.is_none() {
992            let cache = self.cache.read().unwrap_or_else(|e| e.into_inner());
993            return Ok(cache.iter().map(|(k, _)| k.clone()).collect());
994        }
995
996        let read_txn = self
997            .db
998            .as_ref()
999            .unwrap()
1000            .begin_read()
1001            .context("Failed to begin read transaction")?;
1002        let table = read_txn
1003            .open_table(DOCUMENTS_TABLE)
1004            .context("Failed to open documents table")?;
1005
1006        let keys: Vec<String> = table
1007            .iter()?
1008            .filter_map(|entry| {
1009                entry
1010                    .ok()
1011                    .map(|(k, _)| String::from_utf8_lossy(k.value()).to_string())
1012            })
1013            .collect();
1014
1015        Ok(keys)
1016    }
1017
1018    /// Get a typed collection handle for serde-based access.
1019    ///
1020    /// Returns a `TypedCollection<T>` that provides automatic serde
1021    /// serialization/deserialization, integrated queries, and
1022    /// prefix-filtered change subscriptions.
1023    ///
1024    /// # Example
1025    ///
1026    /// ```ignore
1027    /// let sensors = store.typed_collection::<SensorReading>("sensors");
1028    /// sensors.upsert("r001", &reading)?;
1029    /// let result = sensors.get("r001")?;
1030    /// ```
1031    pub fn typed_collection<T: serde::Serialize + serde::de::DeserializeOwned>(
1032        self: &Arc<Self>,
1033        name: &str,
1034    ) -> super::typed_collection::TypedCollection<T> {
1035        super::typed_collection::TypedCollection::new(Arc::clone(self), name)
1036    }
1037
1038    /// Get the serialized size of a document (for monitoring)
1039    pub fn document_size(&self, key: &str) -> Result<Option<usize>> {
1040        match self.get(key)? {
1041            Some(doc) => Ok(Some(doc.save().len())),
1042            None => Ok(None),
1043        }
1044    }
1045}
1046
1047/// Collection implementation for AutomergeStore
1048///
1049/// Wraps AutomergeStore and provides Collection trait implementation.
1050/// Uses key prefixing to namespace collections (e.g., "cells:cell-1", "nodes:node-1").
1051pub struct AutomergeCollection {
1052    store: Arc<AutomergeStore>,
1053    prefix: String,
1054}
1055
1056impl AutomergeCollection {
1057    fn prefixed_key(&self, doc_id: &str) -> String {
1058        format!("{}{}", self.prefix, doc_id)
1059    }
1060
1061    fn strip_prefix<'b>(&self, key: &'b str) -> Option<&'b str> {
1062        key.strip_prefix(&self.prefix)
1063    }
1064}
1065
1066impl Collection for AutomergeCollection {
1067    fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()> {
1068        // Get existing document or create a new one
1069        // This is critical for CRDT sync: we must UPDATE the existing document
1070        // rather than replacing it with a new one. If we create a new document,
1071        // Automerge will see it as a conflicting branch and may pick the wrong
1072        // value during merge.
1073        let key = self.prefixed_key(doc_id);
1074        let mut doc = match self.store.get(&key)? {
1075            Some(existing) => {
1076                // Fork the existing document to update it
1077                existing.fork()
1078            }
1079            None => {
1080                // No existing doc, create a new one
1081                Automerge::new()
1082            }
1083        };
1084
1085        match doc.transact(|tx| {
1086            tx.put(
1087                automerge::ROOT,
1088                "data",
1089                automerge::ScalarValue::Bytes(data.clone()),
1090            )?;
1091            Ok::<(), automerge::AutomergeError>(())
1092        }) {
1093            Ok(_) => self.store.put(&key, &doc),
1094            Err(e) => Err(anyhow::anyhow!(
1095                "Failed to update Automerge document: {:?}",
1096                e
1097            )),
1098        }
1099    }
1100
1101    fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>> {
1102        match self.store.get(&self.prefixed_key(doc_id))? {
1103            Some(doc) => {
1104                // Extract bytes from Automerge document
1105                if let Ok(Some((automerge::Value::Scalar(scalar), _))) =
1106                    doc.get(automerge::ROOT, "data")
1107                {
1108                    if let automerge::ScalarValue::Bytes(bytes) = scalar.as_ref() {
1109                        return Ok(Some(bytes.to_vec()));
1110                    }
1111                }
1112                Ok(None)
1113            }
1114            None => Ok(None),
1115        }
1116    }
1117
1118    fn delete(&self, doc_id: &str) -> Result<()> {
1119        self.store.delete(&self.prefixed_key(doc_id))
1120    }
1121
1122    fn scan(&self) -> Result<Vec<(String, Vec<u8>)>> {
1123        let docs = self.store.scan_prefix(&self.prefix)?;
1124        tracing::debug!(
1125            "AutomergeCollection.scan: prefix={}, found {} docs",
1126            self.prefix,
1127            docs.len()
1128        );
1129        let mut results = Vec::new();
1130
1131        for (key, doc) in docs {
1132            tracing::debug!(
1133                "AutomergeCollection.scan: processing key={}, doc_len={}",
1134                key,
1135                doc.save().len()
1136            );
1137            if let Some(doc_id) = self.strip_prefix(&key) {
1138                match doc.get(automerge::ROOT, "data") {
1139                    Ok(Some((automerge::Value::Scalar(scalar), _))) => {
1140                        if let automerge::ScalarValue::Bytes(bytes) = scalar.as_ref() {
1141                            tracing::debug!(
1142                                "AutomergeCollection.scan: found data bytes, doc_id={}, len={}",
1143                                doc_id,
1144                                bytes.len()
1145                            );
1146                            results.push((doc_id.to_string(), bytes.to_vec()));
1147                        } else {
1148                            tracing::debug!(
1149                                "AutomergeCollection.scan: data is not Bytes, doc_id={}",
1150                                doc_id
1151                            );
1152                        }
1153                    }
1154                    Ok(Some((value, _))) => {
1155                        tracing::debug!(
1156                            "AutomergeCollection.scan: data is not Scalar, doc_id={}, value_type={:?}",
1157                            doc_id,
1158                            value
1159                        );
1160                    }
1161                    Ok(None) => {
1162                        tracing::debug!(
1163                            "AutomergeCollection.scan: no 'data' field, doc_id={}",
1164                            doc_id
1165                        );
1166                    }
1167                    Err(e) => {
1168                        tracing::debug!(
1169                            "AutomergeCollection.scan: error getting 'data', doc_id={}, err={}",
1170                            doc_id,
1171                            e
1172                        );
1173                    }
1174                }
1175            }
1176        }
1177
1178        Ok(results)
1179    }
1180
1181    fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>> {
1182        let all_docs = self.scan()?;
1183        Ok(all_docs
1184            .into_iter()
1185            .filter(|(_, bytes)| predicate(bytes))
1186            .collect())
1187    }
1188
1189    fn query_geohash_prefix(&self, geohash_prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1190        // For AutomergeStore, geohash queries require the geohash to be in the key
1191        // This is a simplified implementation - in Phase 2 we'll add proper indexing
1192        let all_docs = self.scan()?;
1193        Ok(all_docs
1194            .into_iter()
1195            .filter(|(id, _)| id.starts_with(geohash_prefix))
1196            .collect())
1197    }
1198
1199    fn count(&self) -> Result<usize> {
1200        Ok(self.scan()?.len())
1201    }
1202}
1203
1204// === GcStore trait implementation for AutomergeStore (ADR-034 Phase 3) ===
1205//
1206// Both GcStore trait and AutomergeStore type are in peat-mesh,
1207// so the impl lives here (orphan rule satisfied).
1208
1209impl crate::qos::GcStore for AutomergeStore {
1210    fn get_all_tombstones(&self) -> anyhow::Result<Vec<crate::qos::Tombstone>> {
1211        self.get_all_tombstones()
1212    }
1213
1214    fn remove_tombstone(&self, collection: &str, document_id: &str) -> anyhow::Result<bool> {
1215        self.remove_tombstone(collection, document_id)
1216    }
1217
1218    fn has_tombstone(&self, collection: &str, document_id: &str) -> anyhow::Result<bool> {
1219        self.has_tombstone(collection, document_id)
1220    }
1221
1222    fn get_expired_documents(
1223        &self,
1224        collection: &str,
1225        cutoff: std::time::SystemTime,
1226    ) -> anyhow::Result<Vec<String>> {
1227        self.get_expired_documents(collection, cutoff)
1228    }
1229
1230    fn hard_delete(&self, collection: &str, document_id: &str) -> anyhow::Result<()> {
1231        self.hard_delete(collection, document_id)
1232    }
1233
1234    fn list_collections(&self) -> anyhow::Result<Vec<String>> {
1235        self.list_collections()
1236    }
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241    use super::*;
1242    use tempfile::TempDir;
1243
1244    fn create_test_store() -> (Arc<AutomergeStore>, TempDir) {
1245        let temp_dir = TempDir::new().unwrap();
1246        let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
1247        (store, temp_dir)
1248    }
1249
1250    #[test]
1251    fn test_collection_upsert_and_get() {
1252        let (store, _temp) = create_test_store();
1253        let collection = store.collection("test");
1254
1255        let data = b"test data".to_vec();
1256        collection.upsert("doc1", data.clone()).unwrap();
1257
1258        let retrieved = collection.get("doc1").unwrap().unwrap();
1259        assert_eq!(retrieved, data);
1260    }
1261
1262    #[test]
1263    fn test_collection_scan() {
1264        let (store, _temp) = create_test_store();
1265        let collection = store.collection("test");
1266
1267        collection.upsert("doc1", b"data1".to_vec()).unwrap();
1268        collection.upsert("doc2", b"data2".to_vec()).unwrap();
1269
1270        let results = collection.scan().unwrap();
1271        assert_eq!(results.len(), 2);
1272
1273        let ids: Vec<String> = results.iter().map(|(id, _)| id.clone()).collect();
1274        assert!(ids.contains(&"doc1".to_string()));
1275        assert!(ids.contains(&"doc2".to_string()));
1276    }
1277
1278    #[test]
1279    fn test_collection_delete() {
1280        let (store, _temp) = create_test_store();
1281        let collection = store.collection("test");
1282
1283        collection.upsert("doc1", b"data1".to_vec()).unwrap();
1284        assert!(collection.get("doc1").unwrap().is_some());
1285
1286        collection.delete("doc1").unwrap();
1287        assert!(collection.get("doc1").unwrap().is_none());
1288    }
1289
1290    #[test]
1291    fn test_collection_count() {
1292        let (store, _temp) = create_test_store();
1293        let collection = store.collection("test");
1294
1295        assert_eq!(collection.count().unwrap(), 0);
1296
1297        collection.upsert("doc1", b"data1".to_vec()).unwrap();
1298        collection.upsert("doc2", b"data2".to_vec()).unwrap();
1299
1300        assert_eq!(collection.count().unwrap(), 2);
1301    }
1302
1303    #[test]
1304    fn test_collection_find_with_predicate() {
1305        let (store, _temp) = create_test_store();
1306        let collection = store.collection("test");
1307
1308        collection.upsert("doc1", b"hello".to_vec()).unwrap();
1309        collection.upsert("doc2", b"world".to_vec()).unwrap();
1310        collection.upsert("doc3", b"hello world".to_vec()).unwrap();
1311
1312        let results = collection
1313            .find(Box::new(|bytes| {
1314                String::from_utf8_lossy(bytes).contains("hello")
1315            }))
1316            .unwrap();
1317
1318        assert_eq!(results.len(), 2);
1319    }
1320
1321    #[test]
1322    fn test_collection_namespace_isolation() {
1323        let (store, _temp) = create_test_store();
1324        let collection1 = store.collection("coll1");
1325        let collection2 = store.collection("coll2");
1326
1327        collection1.upsert("doc1", b"data1".to_vec()).unwrap();
1328        collection2.upsert("doc1", b"data2".to_vec()).unwrap();
1329
1330        let data1 = collection1.get("doc1").unwrap().unwrap();
1331        let data2 = collection2.get("doc1").unwrap().unwrap();
1332
1333        assert_eq!(data1, b"data1");
1334        assert_eq!(data2, b"data2");
1335        assert_ne!(data1, data2);
1336    }
1337
1338    #[test]
1339    fn test_direct_put_and_get() {
1340        let (store, _temp) = create_test_store();
1341
1342        let mut doc = Automerge::new();
1343        doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1344            tx.put(automerge::ROOT, "key", "value")?;
1345            Ok(())
1346        })
1347        .unwrap();
1348
1349        store.put("test-doc", &doc).unwrap();
1350
1351        let loaded = store.get("test-doc").unwrap().unwrap();
1352        let value: String = loaded
1353            .get(automerge::ROOT, "key")
1354            .unwrap()
1355            .unwrap()
1356            .0
1357            .to_string();
1358        assert!(value.contains("value"));
1359    }
1360
1361    #[test]
1362    fn test_scan_prefix() {
1363        let (store, _temp) = create_test_store();
1364
1365        let mut doc1 = Automerge::new();
1366        doc1.transact::<_, _, automerge::AutomergeError>(|tx| {
1367            tx.put(automerge::ROOT, "n", "1")?;
1368            Ok(())
1369        })
1370        .unwrap();
1371
1372        let mut doc2 = Automerge::new();
1373        doc2.transact::<_, _, automerge::AutomergeError>(|tx| {
1374            tx.put(automerge::ROOT, "n", "2")?;
1375            Ok(())
1376        })
1377        .unwrap();
1378
1379        store.put("prefix:a", &doc1).unwrap();
1380        store.put("prefix:b", &doc2).unwrap();
1381        store.put("other:c", &doc1).unwrap();
1382
1383        let results = store.scan_prefix("prefix:").unwrap();
1384        assert_eq!(results.len(), 2);
1385    }
1386
1387    // === Compaction Tests (Issue #401 - Memory Blowout Fix) ===
1388
1389    #[test]
1390    fn test_compact_document() {
1391        let (store, _temp) = create_test_store();
1392
1393        // Create a document and update it many times to build up history
1394        let mut doc = Automerge::new();
1395        for i in 0..100 {
1396            doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1397                tx.put(automerge::ROOT, "counter", i as i64)?;
1398                Ok(())
1399            })
1400            .unwrap();
1401        }
1402
1403        store.put("test-doc", &doc).unwrap();
1404        let size_before = store.document_size("test-doc").unwrap().unwrap();
1405
1406        // Compact the document
1407        let result = store.compact("test-doc").unwrap();
1408        assert!(result.is_some());
1409        let (old_size, new_size) = result.unwrap();
1410
1411        assert_eq!(old_size, size_before);
1412        assert!(
1413            new_size <= old_size,
1414            "Compaction should reduce or maintain size"
1415        );
1416
1417        // Verify the document still has the correct value
1418        let loaded = store.get("test-doc").unwrap().unwrap();
1419        let value = loaded.get(automerge::ROOT, "counter").unwrap().unwrap();
1420        assert_eq!(value.0.to_i64(), Some(99));
1421    }
1422
1423    #[test]
1424    fn test_compact_nonexistent_document() {
1425        let (store, _temp) = create_test_store();
1426        let result = store.compact("nonexistent").unwrap();
1427        assert!(result.is_none());
1428    }
1429
1430    #[test]
1431    fn test_compact_prefix() {
1432        let (store, _temp) = create_test_store();
1433
1434        // Create multiple documents with history
1435        for doc_num in 0..5 {
1436            let mut doc = Automerge::new();
1437            for i in 0..50 {
1438                doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1439                    tx.put(automerge::ROOT, "counter", i as i64)?;
1440                    Ok(())
1441                })
1442                .unwrap();
1443            }
1444            store.put(&format!("test:{}", doc_num), &doc).unwrap();
1445        }
1446
1447        // Add a document with different prefix
1448        let mut other_doc = Automerge::new();
1449        other_doc
1450            .transact::<_, _, automerge::AutomergeError>(|tx| {
1451                tx.put(automerge::ROOT, "other", "value")?;
1452                Ok(())
1453            })
1454            .unwrap();
1455        store.put("other:1", &other_doc).unwrap();
1456
1457        // Compact only "test:" prefix
1458        let (count, before, after) = store.compact_prefix("test:").unwrap();
1459        assert_eq!(count, 5);
1460        assert!(before > 0);
1461        assert!(after <= before);
1462
1463        // Verify other document was not affected
1464        let other = store.get("other:1").unwrap().unwrap();
1465        let value = other.get(automerge::ROOT, "other").unwrap().unwrap();
1466        assert!(value.0.to_str().unwrap().contains("value"));
1467    }
1468
1469    #[test]
1470    fn test_compact_all() {
1471        let (store, _temp) = create_test_store();
1472
1473        // Create documents with history
1474        for prefix in &["a", "b", "c"] {
1475            let mut doc = Automerge::new();
1476            for i in 0..30 {
1477                doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1478                    tx.put(automerge::ROOT, "counter", i as i64)?;
1479                    Ok(())
1480                })
1481                .unwrap();
1482            }
1483            store.put(&format!("{}:doc", prefix), &doc).unwrap();
1484        }
1485
1486        let (count, before, after) = store.compact_all().unwrap();
1487        assert_eq!(count, 3);
1488        assert!(before > 0);
1489        assert!(after <= before);
1490    }
1491
1492    #[test]
1493    fn test_compact_in_memory_store() {
1494        let store = Arc::new(AutomergeStore::in_memory());
1495
1496        // Create a document with multi-actor history to exercise compaction.
1497        // fork() discards the change log (individual ops from each actor),
1498        // which is where real savings come from in multi-peer CRDT sync.
1499        // Single-actor docs may not shrink much since the compressed binary
1500        // format is already efficient.
1501        let mut doc = Automerge::new();
1502        for i in 0..100 {
1503            doc.transact::<_, _, automerge::AutomergeError>(|tx| {
1504                tx.put(automerge::ROOT, "counter", i as i64)?;
1505                Ok(())
1506            })
1507            .unwrap();
1508        }
1509
1510        // Simulate a second actor merging in (this is the pattern that
1511        // causes history bloat in production — many peers contributing changes)
1512        let mut doc2 = doc.fork();
1513        for i in 0..100 {
1514            doc2.transact::<_, _, automerge::AutomergeError>(|tx| {
1515                tx.put(automerge::ROOT, "peer2_counter", i as i64)?;
1516                Ok(())
1517            })
1518            .unwrap();
1519        }
1520        doc.merge(&mut doc2).unwrap();
1521
1522        store.put("test-doc", &doc).unwrap();
1523
1524        let result = store.compact("test-doc").unwrap();
1525        assert!(result.is_some());
1526        let (old_size, new_size) = result.unwrap();
1527        assert!(
1528            new_size <= old_size,
1529            "compaction should not increase size, got {} -> {}",
1530            old_size,
1531            new_size
1532        );
1533
1534        // Verify both actor values are preserved
1535        let loaded = store.get("test-doc").unwrap().unwrap();
1536        let counter = loaded.get(automerge::ROOT, "counter").unwrap().unwrap();
1537        assert_eq!(counter.0.to_i64(), Some(99));
1538        let peer2 = loaded
1539            .get(automerge::ROOT, "peer2_counter")
1540            .unwrap()
1541            .unwrap();
1542        assert_eq!(peer2.0.to_i64(), Some(99));
1543    }
1544
1545    #[test]
1546    fn test_compact_above_threshold() {
1547        let store = Arc::new(AutomergeStore::in_memory());
1548
1549        // Create a small document (below threshold)
1550        let mut small_doc = Automerge::new();
1551        small_doc
1552            .transact::<_, _, automerge::AutomergeError>(|tx| {
1553                tx.put(automerge::ROOT, "key", "value")?;
1554                Ok(())
1555            })
1556            .unwrap();
1557        store.put("small-doc", &small_doc).unwrap();
1558
1559        // Create a large document with multi-actor history
1560        let mut big_doc = Automerge::new();
1561        for i in 0..200 {
1562            big_doc
1563                .transact::<_, _, automerge::AutomergeError>(|tx| {
1564                    tx.put(automerge::ROOT, "counter", i as i64)?;
1565                    Ok(())
1566                })
1567                .unwrap();
1568        }
1569        // Add a second actor to simulate peer sync history
1570        let mut big_doc2 = big_doc.fork();
1571        for i in 0..200 {
1572            big_doc2
1573                .transact::<_, _, automerge::AutomergeError>(|tx| {
1574                    tx.put(automerge::ROOT, "peer2", i as i64)?;
1575                    Ok(())
1576                })
1577                .unwrap();
1578        }
1579        big_doc.merge(&mut big_doc2).unwrap();
1580        store.put("big-doc", &big_doc).unwrap();
1581
1582        // Compact with threshold above small doc size but below big doc
1583        let small_size = small_doc.save().len();
1584        let threshold = small_size + 1;
1585        let (count, before, after) = store.compact_above_threshold(threshold).unwrap();
1586
1587        // Only the big doc should have been compacted (small is below threshold)
1588        assert_eq!(count, 1);
1589        assert!(
1590            after <= before,
1591            "compaction should not increase size, got {} -> {}",
1592            before,
1593            after
1594        );
1595
1596        // Verify the big doc value is preserved
1597        let loaded = store.get("big-doc").unwrap().unwrap();
1598        let value = loaded.get(automerge::ROOT, "counter").unwrap().unwrap();
1599        assert_eq!(value.0.to_i64(), Some(199));
1600    }
1601
1602    #[tokio::test]
1603    async fn test_background_compaction_cancellation() {
1604        let store = Arc::new(AutomergeStore::in_memory());
1605        let token = tokio_util::sync::CancellationToken::new();
1606
1607        store.start_background_compaction(
1608            std::time::Duration::from_millis(50),
1609            1024,
1610            vec!["test".to_string()],
1611            token.clone(),
1612        );
1613
1614        // Let it run briefly
1615        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1616
1617        // Cancel and verify it stops (no panic, no hang)
1618        token.cancel();
1619        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1620    }
1621
1622    #[test]
1623    fn test_compact_collections_above_threshold() {
1624        let store = Arc::new(AutomergeStore::in_memory());
1625
1626        // Create docs in two collections
1627        let mut beacons_doc = Automerge::new();
1628        for i in 0..100 {
1629            beacons_doc
1630                .transact::<_, _, automerge::AutomergeError>(|tx| {
1631                    tx.put(automerge::ROOT, "lat", i as i64)?;
1632                    Ok(())
1633                })
1634                .unwrap();
1635        }
1636        store.put("beacons:beacon-1", &beacons_doc).unwrap();
1637
1638        let mut commands_doc = Automerge::new();
1639        for i in 0..100 {
1640            commands_doc
1641                .transact::<_, _, automerge::AutomergeError>(|tx| {
1642                    tx.put(automerge::ROOT, "cmd", i as i64)?;
1643                    Ok(())
1644                })
1645                .unwrap();
1646        }
1647        store.put("commands:cmd-1", &commands_doc).unwrap();
1648
1649        let before_commands = store.document_size("commands:cmd-1").unwrap().unwrap();
1650
1651        // Only compact beacons collection
1652        let (count, _, _) = store
1653            .compact_collections_above_threshold(&["beacons".to_string()], 1)
1654            .unwrap();
1655
1656        assert_eq!(count, 1, "only the beacons doc should be compacted");
1657
1658        // Commands doc should be unchanged
1659        let after_commands = store.document_size("commands:cmd-1").unwrap().unwrap();
1660        assert_eq!(
1661            before_commands, after_commands,
1662            "commands doc should not be touched"
1663        );
1664    }
1665
1666    #[test]
1667    fn test_compact_collections_respects_threshold() {
1668        let store = Arc::new(AutomergeStore::in_memory());
1669
1670        // Create a small doc in an opted-in collection
1671        let mut small_doc = Automerge::new();
1672        small_doc
1673            .transact::<_, _, automerge::AutomergeError>(|tx| {
1674                tx.put(automerge::ROOT, "key", "value")?;
1675                Ok(())
1676            })
1677            .unwrap();
1678        store.put("beacons:small", &small_doc).unwrap();
1679
1680        let size = store.document_size("beacons:small").unwrap().unwrap();
1681
1682        // Compact with threshold above the doc size
1683        let (count, _, _) = store
1684            .compact_collections_above_threshold(&["beacons".to_string()], size + 1)
1685            .unwrap();
1686
1687        assert_eq!(count, 0, "small doc below threshold should be skipped");
1688    }
1689}