Skip to main content

peat_mesh/storage/
sync_persistence.rs

1//! Sync state persistence with redb
2//!
3//! This module provides durable storage for Automerge sync state, enabling:
4//! - Persist sync heads per peer
5//! - Recovery on restart (resume from last position)
6//! - Periodic checkpointing
7//!
8//! # Storage Schema
9//!
10//! Uses redb with key prefixes:
11//! - `sync_state:{peer_id}:{doc_key}` → serialized automerge::sync::State
12//! - `checkpoint:{timestamp}` → snapshot metadata
13//! - `meta:last_checkpoint` → timestamp of last checkpoint
14
15use anyhow::{Context, Result};
16use automerge::sync::State as SyncState;
17use iroh::EndpointId;
18use redb::{Database, TableDefinition};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::path::Path;
22use std::sync::Arc;
23use std::time::{Duration, SystemTime, UNIX_EPOCH};
24
25/// Table for sync state storage
26const SYNC_STATE_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("sync_states");
27
28/// Table for checkpoint storage
29const CHECKPOINT_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("checkpoints");
30
31/// Table for metadata
32const META_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("meta");
33
34/// Key prefixes for storage
35const SYNC_STATE_PREFIX: &str = "sync_state:";
36const CHECKPOINT_PREFIX: &str = "checkpoint:";
37const META_LAST_CHECKPOINT: &str = "last_checkpoint";
38
39/// Serializable wrapper for automerge::sync::State
40///
41/// The Automerge SyncState isn't directly serializable, so we store
42/// the encoded bytes along with metadata.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PersistedSyncState {
45    /// Encoded sync state bytes
46    pub state_bytes: Vec<u8>,
47    /// Peer ID (hex encoded for serialization)
48    pub peer_id_hex: String,
49    /// Document key
50    pub doc_key: String,
51    /// Timestamp when state was saved
52    pub saved_at: u64,
53    /// Number of syncs completed
54    pub sync_count: u64,
55}
56
57impl PersistedSyncState {
58    /// Create from SyncState and metadata
59    pub fn from_sync_state(
60        state: &SyncState,
61        peer_id: &EndpointId,
62        doc_key: &str,
63        sync_count: u64,
64    ) -> Self {
65        Self {
66            state_bytes: state.encode(),
67            peer_id_hex: hex::encode(peer_id.as_bytes()),
68            doc_key: doc_key.to_string(),
69            saved_at: SystemTime::now()
70                .duration_since(UNIX_EPOCH)
71                .expect("system clock before UNIX epoch")
72                .as_secs(),
73            sync_count,
74        }
75    }
76
77    /// Restore SyncState from persisted data
78    pub fn to_sync_state(&self) -> Result<SyncState> {
79        SyncState::decode(&self.state_bytes).context("Failed to decode sync state")
80    }
81}
82
83/// Checkpoint metadata
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct Checkpoint {
86    /// Timestamp of checkpoint
87    pub timestamp: u64,
88    /// Number of sync states saved
89    pub state_count: usize,
90    /// Total bytes stored
91    pub total_bytes: usize,
92    /// Peer IDs included
93    pub peer_ids: Vec<String>,
94}
95
96/// Statistics about persisted sync state
97#[derive(Debug, Clone, Default)]
98pub struct PersistenceStats {
99    /// Number of sync states stored
100    pub state_count: usize,
101    /// Total bytes used
102    pub total_bytes: usize,
103    /// Number of peers with stored state
104    pub peer_count: usize,
105    /// Last checkpoint timestamp
106    pub last_checkpoint: Option<u64>,
107    /// Number of checkpoints
108    pub checkpoint_count: usize,
109}
110
111/// Sync state persistence manager
112///
113/// Provides durable storage for Automerge sync state to enable
114/// fast recovery after restart without full resync.
115pub struct SyncStatePersistence {
116    /// redb instance
117    db: Arc<Database>,
118    /// Checkpoint interval (how often to create checkpoints)
119    checkpoint_interval: Duration,
120}
121
122impl SyncStatePersistence {
123    /// Open or create sync state storage at the given path
124    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
125        let path = path.as_ref();
126
127        // Ensure parent directory exists
128        if let Some(parent) = path.parent() {
129            std::fs::create_dir_all(parent).ok();
130        }
131
132        // redb stores in a single file, append .redb extension if it's a directory path
133        let db_path = if path.is_dir() || !path.exists() {
134            std::fs::create_dir_all(path).ok();
135            path.join("sync_state.redb")
136        } else {
137            path.to_path_buf()
138        };
139
140        let db = Database::create(&db_path).context("Failed to open sync state redb")?;
141
142        // Initialize tables
143        {
144            let write_txn = db
145                .begin_write()
146                .context("Failed to begin write transaction")?;
147            let _ = write_txn.open_table(SYNC_STATE_TABLE);
148            let _ = write_txn.open_table(CHECKPOINT_TABLE);
149            let _ = write_txn.open_table(META_TABLE);
150            write_txn
151                .commit()
152                .context("Failed to commit table creation")?;
153        }
154
155        Ok(Self {
156            db: Arc::new(db),
157            checkpoint_interval: Duration::from_secs(60), // Default: checkpoint every 60 seconds
158        })
159    }
160
161    /// Open with custom checkpoint interval
162    pub fn open_with_interval(
163        path: impl AsRef<Path>,
164        checkpoint_interval: Duration,
165    ) -> Result<Self> {
166        let mut persistence = Self::open(path)?;
167        persistence.checkpoint_interval = checkpoint_interval;
168        Ok(persistence)
169    }
170
171    /// Build storage key for sync state
172    fn sync_state_key(peer_id: &EndpointId, doc_key: &str) -> String {
173        format!(
174            "{}{}:{}",
175            SYNC_STATE_PREFIX,
176            hex::encode(peer_id.as_bytes()),
177            doc_key
178        )
179    }
180
181    /// Save sync state for a peer and document
182    pub fn save_sync_state(
183        &self,
184        peer_id: &EndpointId,
185        doc_key: &str,
186        state: &SyncState,
187        sync_count: u64,
188    ) -> Result<()> {
189        let key = Self::sync_state_key(peer_id, doc_key);
190        let persisted = PersistedSyncState::from_sync_state(state, peer_id, doc_key, sync_count);
191
192        let value = serde_json::to_vec(&persisted).context("Failed to serialize sync state")?;
193
194        let write_txn = self
195            .db
196            .begin_write()
197            .context("Failed to begin write transaction")?;
198        {
199            let mut table = write_txn
200                .open_table(SYNC_STATE_TABLE)
201                .context("Failed to open sync state table")?;
202            table
203                .insert(key.as_bytes(), value.as_slice())
204                .context("Failed to write sync state")?;
205        }
206        write_txn.commit().context("Failed to commit write")?;
207
208        tracing::trace!(
209            "Saved sync state for peer {} doc {}: {} bytes",
210            persisted.peer_id_hex,
211            doc_key,
212            value.len()
213        );
214
215        Ok(())
216    }
217
218    /// Load sync state for a peer and document
219    pub fn load_sync_state(
220        &self,
221        peer_id: &EndpointId,
222        doc_key: &str,
223    ) -> Result<Option<(SyncState, u64)>> {
224        let key = Self::sync_state_key(peer_id, doc_key);
225
226        let read_txn = self
227            .db
228            .begin_read()
229            .context("Failed to begin read transaction")?;
230        let table = read_txn
231            .open_table(SYNC_STATE_TABLE)
232            .context("Failed to open sync state table")?;
233
234        match table.get(key.as_bytes())? {
235            Some(value) => {
236                let bytes = value.value();
237                let persisted: PersistedSyncState =
238                    serde_json::from_slice(bytes).context("Failed to deserialize sync state")?;
239
240                let state = persisted.to_sync_state()?;
241
242                tracing::trace!(
243                    "Loaded sync state for peer {} doc {}: sync_count={}",
244                    persisted.peer_id_hex,
245                    doc_key,
246                    persisted.sync_count
247                );
248
249                Ok(Some((state, persisted.sync_count)))
250            }
251            None => Ok(None),
252        }
253    }
254
255    /// Delete sync state for a peer and document
256    pub fn delete_sync_state(&self, peer_id: &EndpointId, doc_key: &str) -> Result<()> {
257        let key = Self::sync_state_key(peer_id, doc_key);
258
259        let write_txn = self
260            .db
261            .begin_write()
262            .context("Failed to begin write transaction")?;
263        {
264            let mut table = write_txn
265                .open_table(SYNC_STATE_TABLE)
266                .context("Failed to open sync state table")?;
267            table.remove(key.as_bytes())?;
268        }
269        write_txn.commit().context("Failed to commit delete")?;
270
271        Ok(())
272    }
273
274    /// Load all sync states for a peer
275    pub fn load_all_for_peer(&self, peer_id: &EndpointId) -> Result<HashMap<String, SyncState>> {
276        let prefix = format!("{}{}:", SYNC_STATE_PREFIX, hex::encode(peer_id.as_bytes()));
277        let mut results = HashMap::new();
278
279        let read_txn = self
280            .db
281            .begin_read()
282            .context("Failed to begin read transaction")?;
283        let table = read_txn
284            .open_table(SYNC_STATE_TABLE)
285            .context("Failed to open sync state table")?;
286
287        for entry in table.range(prefix.as_bytes()..)? {
288            let (key, value) = entry?;
289            let key_bytes = key.value();
290            let key_str = String::from_utf8_lossy(key_bytes);
291
292            if !key_str.starts_with(&prefix) {
293                break;
294            }
295
296            let persisted: PersistedSyncState = serde_json::from_slice(value.value())?;
297            let state = persisted.to_sync_state()?;
298            results.insert(persisted.doc_key.clone(), state);
299        }
300
301        Ok(results)
302    }
303
304    /// Load all sync states (for full recovery)
305    pub fn load_all(&self) -> Result<HashMap<(EndpointId, String), SyncState>> {
306        let mut results = HashMap::new();
307
308        let read_txn = self
309            .db
310            .begin_read()
311            .context("Failed to begin read transaction")?;
312        let table = read_txn
313            .open_table(SYNC_STATE_TABLE)
314            .context("Failed to open sync state table")?;
315
316        for entry in table.range(SYNC_STATE_PREFIX.as_bytes()..)? {
317            let (key, value) = entry?;
318            let key_bytes = key.value();
319            let key_str = String::from_utf8_lossy(key_bytes);
320
321            if !key_str.starts_with(SYNC_STATE_PREFIX) {
322                break;
323            }
324
325            let persisted: PersistedSyncState = serde_json::from_slice(value.value())?;
326
327            // Parse peer ID from hex
328            let peer_id_bytes =
329                hex::decode(&persisted.peer_id_hex).context("Invalid peer ID hex")?;
330            if peer_id_bytes.len() != 32 {
331                continue; // Skip invalid entries
332            }
333            let mut arr = [0u8; 32];
334            arr.copy_from_slice(&peer_id_bytes);
335            let public_key = iroh::PublicKey::from_bytes(&arr)
336                .map_err(|e| anyhow::anyhow!("Invalid public key: {}", e))?;
337            let peer_id: EndpointId = public_key;
338
339            let state = persisted.to_sync_state()?;
340            results.insert((peer_id, persisted.doc_key.clone()), state);
341        }
342
343        tracing::info!("Loaded {} sync states from persistence", results.len());
344
345        Ok(results)
346    }
347
348    /// Create a checkpoint
349    pub fn create_checkpoint(&self) -> Result<Checkpoint> {
350        let timestamp = SystemTime::now()
351            .duration_since(UNIX_EPOCH)
352            .expect("system clock before UNIX epoch")
353            .as_millis() as u64;
354
355        // Count current states
356        let mut state_count = 0;
357        let mut total_bytes = 0;
358        let mut peer_ids = std::collections::HashSet::new();
359
360        {
361            let read_txn = self
362                .db
363                .begin_read()
364                .context("Failed to begin read transaction")?;
365            let table = read_txn
366                .open_table(SYNC_STATE_TABLE)
367                .context("Failed to open sync state table")?;
368
369            for entry in table.range(SYNC_STATE_PREFIX.as_bytes()..)? {
370                let (key, value) = entry?;
371                let key_bytes = key.value();
372                let key_str = String::from_utf8_lossy(key_bytes);
373
374                if !key_str.starts_with(SYNC_STATE_PREFIX) {
375                    break;
376                }
377
378                state_count += 1;
379                total_bytes += value.value().len();
380
381                // Extract peer ID from key
382                if let Some(rest) = key_str.strip_prefix(SYNC_STATE_PREFIX) {
383                    if let Some(peer_id) = rest.split(':').next() {
384                        peer_ids.insert(peer_id.to_string());
385                    }
386                }
387            }
388        }
389
390        let checkpoint = Checkpoint {
391            timestamp,
392            state_count,
393            total_bytes,
394            peer_ids: peer_ids.into_iter().collect(),
395        };
396
397        // Save checkpoint
398        let checkpoint_key = format!("{}{}", CHECKPOINT_PREFIX, timestamp);
399        let checkpoint_bytes = serde_json::to_vec(&checkpoint)?;
400
401        let write_txn = self
402            .db
403            .begin_write()
404            .context("Failed to begin write transaction")?;
405        {
406            let mut table = write_txn
407                .open_table(CHECKPOINT_TABLE)
408                .context("Failed to open checkpoint table")?;
409            table.insert(checkpoint_key.as_bytes(), checkpoint_bytes.as_slice())?;
410        }
411        {
412            // Update last checkpoint timestamp
413            let mut meta_table = write_txn
414                .open_table(META_TABLE)
415                .context("Failed to open meta table")?;
416            meta_table.insert(
417                META_LAST_CHECKPOINT.as_bytes(),
418                &timestamp.to_be_bytes()[..],
419            )?;
420        }
421        write_txn.commit().context("Failed to commit checkpoint")?;
422
423        tracing::info!(
424            "Created checkpoint: {} states, {} bytes, {} peers",
425            state_count,
426            total_bytes,
427            checkpoint.peer_ids.len()
428        );
429
430        Ok(checkpoint)
431    }
432
433    /// Get the last checkpoint
434    pub fn get_last_checkpoint(&self) -> Result<Option<Checkpoint>> {
435        let read_txn = self
436            .db
437            .begin_read()
438            .context("Failed to begin read transaction")?;
439
440        // Get last checkpoint timestamp
441        let meta_table = read_txn
442            .open_table(META_TABLE)
443            .context("Failed to open meta table")?;
444
445        let timestamp_bytes = match meta_table.get(META_LAST_CHECKPOINT.as_bytes())? {
446            Some(value) => value.value().to_vec(),
447            None => return Ok(None),
448        };
449
450        if timestamp_bytes.len() != 8 {
451            return Ok(None);
452        }
453
454        let mut arr = [0u8; 8];
455        arr.copy_from_slice(&timestamp_bytes);
456        let timestamp = u64::from_be_bytes(arr);
457
458        // Load checkpoint
459        let checkpoint_key = format!("{}{}", CHECKPOINT_PREFIX, timestamp);
460        let checkpoint_table = read_txn
461            .open_table(CHECKPOINT_TABLE)
462            .context("Failed to open checkpoint table")?;
463
464        match checkpoint_table.get(checkpoint_key.as_bytes())? {
465            Some(value) => {
466                let checkpoint: Checkpoint = serde_json::from_slice(value.value())?;
467                Ok(Some(checkpoint))
468            }
469            None => Ok(None),
470        }
471    }
472
473    /// Get persistence statistics
474    pub fn stats(&self) -> Result<PersistenceStats> {
475        let mut stats = PersistenceStats::default();
476        let mut peer_ids = std::collections::HashSet::new();
477
478        let read_txn = self
479            .db
480            .begin_read()
481            .context("Failed to begin read transaction")?;
482
483        // Count sync states
484        {
485            let table = read_txn
486                .open_table(SYNC_STATE_TABLE)
487                .context("Failed to open sync state table")?;
488
489            for entry in table.range(SYNC_STATE_PREFIX.as_bytes()..)? {
490                let (key, value) = entry?;
491                let key_bytes = key.value();
492                let key_str = String::from_utf8_lossy(key_bytes);
493
494                if !key_str.starts_with(SYNC_STATE_PREFIX) {
495                    break;
496                }
497
498                stats.state_count += 1;
499                stats.total_bytes += value.value().len();
500
501                if let Some(rest) = key_str.strip_prefix(SYNC_STATE_PREFIX) {
502                    if let Some(peer_id) = rest.split(':').next() {
503                        peer_ids.insert(peer_id.to_string());
504                    }
505                }
506            }
507        }
508
509        stats.peer_count = peer_ids.len();
510
511        // Count checkpoints
512        {
513            let checkpoint_table = read_txn
514                .open_table(CHECKPOINT_TABLE)
515                .context("Failed to open checkpoint table")?;
516
517            for entry in checkpoint_table.range(CHECKPOINT_PREFIX.as_bytes()..)? {
518                let (key, _) = entry?;
519                if !key.value().starts_with(CHECKPOINT_PREFIX.as_bytes()) {
520                    break;
521                }
522                stats.checkpoint_count += 1;
523            }
524        }
525
526        // Get last checkpoint timestamp
527        if let Ok(Some(checkpoint)) = self.get_last_checkpoint() {
528            stats.last_checkpoint = Some(checkpoint.timestamp);
529        }
530
531        Ok(stats)
532    }
533
534    /// Clean up old checkpoints, keeping only the last N
535    pub fn cleanup_old_checkpoints(&self, keep_count: usize) -> Result<usize> {
536        let mut checkpoints: Vec<u64> = Vec::new();
537
538        {
539            let read_txn = self
540                .db
541                .begin_read()
542                .context("Failed to begin read transaction")?;
543            let table = read_txn
544                .open_table(CHECKPOINT_TABLE)
545                .context("Failed to open checkpoint table")?;
546
547            for entry in table.range(CHECKPOINT_PREFIX.as_bytes()..)? {
548                let (key, _) = entry?;
549                let key_bytes = key.value();
550                let key_str = String::from_utf8_lossy(key_bytes);
551
552                if !key_str.starts_with(CHECKPOINT_PREFIX) {
553                    break;
554                }
555
556                if let Some(ts_str) = key_str.strip_prefix(CHECKPOINT_PREFIX) {
557                    if let Ok(ts) = ts_str.parse::<u64>() {
558                        checkpoints.push(ts);
559                    }
560                }
561            }
562        }
563
564        // Sort descending (newest first)
565        checkpoints.sort_by(|a, b| b.cmp(a));
566
567        // Delete old ones
568        let mut deleted = 0;
569        let to_delete: Vec<_> = checkpoints.iter().skip(keep_count).cloned().collect();
570
571        if !to_delete.is_empty() {
572            let write_txn = self
573                .db
574                .begin_write()
575                .context("Failed to begin write transaction")?;
576            {
577                let mut table = write_txn
578                    .open_table(CHECKPOINT_TABLE)
579                    .context("Failed to open checkpoint table")?;
580
581                for ts in to_delete {
582                    let key = format!("{}{}", CHECKPOINT_PREFIX, ts);
583                    table.remove(key.as_bytes())?;
584                    deleted += 1;
585                }
586            }
587            write_txn.commit().context("Failed to commit cleanup")?;
588        }
589
590        if deleted > 0 {
591            tracing::info!("Cleaned up {} old checkpoints", deleted);
592        }
593
594        Ok(deleted)
595    }
596
597    /// Delete all sync state for a peer (when peer is removed from mesh)
598    pub fn delete_peer(&self, peer_id: &EndpointId) -> Result<usize> {
599        let prefix = format!("{}{}:", SYNC_STATE_PREFIX, hex::encode(peer_id.as_bytes()));
600        let mut keys_to_delete = Vec::new();
601
602        // First, collect keys to delete
603        {
604            let read_txn = self
605                .db
606                .begin_read()
607                .context("Failed to begin read transaction")?;
608            let table = read_txn
609                .open_table(SYNC_STATE_TABLE)
610                .context("Failed to open sync state table")?;
611
612            for entry in table.range(prefix.as_bytes()..)? {
613                let (key, _) = entry?;
614                let key_bytes = key.value();
615                if !key_bytes.starts_with(prefix.as_bytes()) {
616                    break;
617                }
618                keys_to_delete.push(key_bytes.to_vec());
619            }
620        }
621
622        // Then delete them
623        let deleted = keys_to_delete.len();
624        if !keys_to_delete.is_empty() {
625            let write_txn = self
626                .db
627                .begin_write()
628                .context("Failed to begin write transaction")?;
629            {
630                let mut table = write_txn
631                    .open_table(SYNC_STATE_TABLE)
632                    .context("Failed to open sync state table")?;
633
634                for key in keys_to_delete {
635                    table.remove(key.as_slice())?;
636                }
637            }
638            write_txn.commit().context("Failed to commit delete")?;
639        }
640
641        if deleted > 0 {
642            tracing::info!(
643                "Deleted {} sync states for peer {}",
644                deleted,
645                hex::encode(peer_id.as_bytes())
646            );
647        }
648
649        Ok(deleted)
650    }
651
652    /// Get checkpoint interval
653    pub fn checkpoint_interval(&self) -> Duration {
654        self.checkpoint_interval
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661    use tempfile::TempDir;
662
663    fn create_test_persistence() -> (SyncStatePersistence, TempDir) {
664        let temp_dir = TempDir::new().unwrap();
665        let persistence = SyncStatePersistence::open(temp_dir.path()).unwrap();
666        (persistence, temp_dir)
667    }
668
669    fn create_test_peer_id() -> EndpointId {
670        use iroh::SecretKey;
671        let mut rng = rand::rng();
672        SecretKey::generate(&mut rng).public()
673    }
674
675    #[test]
676    fn test_save_and_load_sync_state() {
677        let (persistence, _temp) = create_test_persistence();
678        let peer_id = create_test_peer_id();
679        let state = SyncState::new();
680
681        // Save
682        persistence
683            .save_sync_state(&peer_id, "doc1", &state, 5)
684            .unwrap();
685
686        // Load
687        let (loaded_state, sync_count) = persistence
688            .load_sync_state(&peer_id, "doc1")
689            .unwrap()
690            .expect("State should exist");
691
692        assert_eq!(sync_count, 5);
693        // Sync states should be functionally equivalent (both empty/initial)
694        assert_eq!(loaded_state.encode(), state.encode());
695    }
696
697    #[test]
698    fn test_load_nonexistent_state() {
699        let (persistence, _temp) = create_test_persistence();
700        let peer_id = create_test_peer_id();
701
702        let result = persistence
703            .load_sync_state(&peer_id, "nonexistent")
704            .unwrap();
705        assert!(result.is_none());
706    }
707
708    #[test]
709    fn test_delete_sync_state() {
710        let (persistence, _temp) = create_test_persistence();
711        let peer_id = create_test_peer_id();
712        let state = SyncState::new();
713
714        persistence
715            .save_sync_state(&peer_id, "doc1", &state, 1)
716            .unwrap();
717        assert!(persistence
718            .load_sync_state(&peer_id, "doc1")
719            .unwrap()
720            .is_some());
721
722        persistence.delete_sync_state(&peer_id, "doc1").unwrap();
723        assert!(persistence
724            .load_sync_state(&peer_id, "doc1")
725            .unwrap()
726            .is_none());
727    }
728
729    #[test]
730    fn test_load_all_for_peer() {
731        let (persistence, _temp) = create_test_persistence();
732        let peer_id = create_test_peer_id();
733        let peer_id2 = create_test_peer_id();
734        let state = SyncState::new();
735
736        // Save states for peer 1
737        persistence
738            .save_sync_state(&peer_id, "doc1", &state, 1)
739            .unwrap();
740        persistence
741            .save_sync_state(&peer_id, "doc2", &state, 2)
742            .unwrap();
743
744        // Save state for peer 2
745        persistence
746            .save_sync_state(&peer_id2, "doc1", &state, 3)
747            .unwrap();
748
749        // Load all for peer 1
750        let peer1_states = persistence.load_all_for_peer(&peer_id).unwrap();
751        assert_eq!(peer1_states.len(), 2);
752        assert!(peer1_states.contains_key("doc1"));
753        assert!(peer1_states.contains_key("doc2"));
754
755        // Load all for peer 2
756        let peer2_states = persistence.load_all_for_peer(&peer_id2).unwrap();
757        assert_eq!(peer2_states.len(), 1);
758    }
759
760    #[test]
761    fn test_load_all() {
762        let (persistence, _temp) = create_test_persistence();
763        let peer_id1 = create_test_peer_id();
764        let peer_id2 = create_test_peer_id();
765        let state = SyncState::new();
766
767        persistence
768            .save_sync_state(&peer_id1, "doc1", &state, 1)
769            .unwrap();
770        persistence
771            .save_sync_state(&peer_id2, "doc2", &state, 2)
772            .unwrap();
773
774        let all_states = persistence.load_all().unwrap();
775        assert_eq!(all_states.len(), 2);
776    }
777
778    #[test]
779    fn test_checkpoint() {
780        let (persistence, _temp) = create_test_persistence();
781        let peer_id = create_test_peer_id();
782        let state = SyncState::new();
783
784        // Save some states
785        persistence
786            .save_sync_state(&peer_id, "doc1", &state, 1)
787            .unwrap();
788        persistence
789            .save_sync_state(&peer_id, "doc2", &state, 2)
790            .unwrap();
791
792        // Create checkpoint
793        let checkpoint = persistence.create_checkpoint().unwrap();
794        assert_eq!(checkpoint.state_count, 2);
795        assert_eq!(checkpoint.peer_ids.len(), 1);
796
797        // Get last checkpoint
798        let loaded = persistence.get_last_checkpoint().unwrap().unwrap();
799        assert_eq!(loaded.timestamp, checkpoint.timestamp);
800        assert_eq!(loaded.state_count, 2);
801    }
802
803    #[test]
804    fn test_stats() {
805        let (persistence, _temp) = create_test_persistence();
806        let peer_id1 = create_test_peer_id();
807        let peer_id2 = create_test_peer_id();
808        let state = SyncState::new();
809
810        persistence
811            .save_sync_state(&peer_id1, "doc1", &state, 1)
812            .unwrap();
813        persistence
814            .save_sync_state(&peer_id2, "doc2", &state, 2)
815            .unwrap();
816
817        let stats = persistence.stats().unwrap();
818        assert_eq!(stats.state_count, 2);
819        assert_eq!(stats.peer_count, 2);
820        assert!(stats.total_bytes > 0);
821    }
822
823    #[test]
824    fn test_cleanup_old_checkpoints() {
825        let (persistence, _temp) = create_test_persistence();
826        let peer_id = create_test_peer_id();
827        let state = SyncState::new();
828
829        persistence
830            .save_sync_state(&peer_id, "doc1", &state, 1)
831            .unwrap();
832
833        // Create multiple checkpoints
834        for _ in 0..5 {
835            persistence.create_checkpoint().unwrap();
836            std::thread::sleep(std::time::Duration::from_millis(10));
837        }
838
839        let stats_before = persistence.stats().unwrap();
840        assert_eq!(stats_before.checkpoint_count, 5);
841
842        // Keep only 2
843        let deleted = persistence.cleanup_old_checkpoints(2).unwrap();
844        assert_eq!(deleted, 3);
845
846        let stats_after = persistence.stats().unwrap();
847        assert_eq!(stats_after.checkpoint_count, 2);
848    }
849
850    #[test]
851    fn test_delete_peer() {
852        let (persistence, _temp) = create_test_persistence();
853        let peer_id1 = create_test_peer_id();
854        let peer_id2 = create_test_peer_id();
855        let state = SyncState::new();
856
857        // Save states for both peers
858        persistence
859            .save_sync_state(&peer_id1, "doc1", &state, 1)
860            .unwrap();
861        persistence
862            .save_sync_state(&peer_id1, "doc2", &state, 2)
863            .unwrap();
864        persistence
865            .save_sync_state(&peer_id2, "doc1", &state, 3)
866            .unwrap();
867
868        // Delete peer 1
869        let deleted = persistence.delete_peer(&peer_id1).unwrap();
870        assert_eq!(deleted, 2);
871
872        // Verify peer 1 states are gone
873        assert!(persistence
874            .load_sync_state(&peer_id1, "doc1")
875            .unwrap()
876            .is_none());
877        assert!(persistence
878            .load_sync_state(&peer_id1, "doc2")
879            .unwrap()
880            .is_none());
881
882        // Verify peer 2 state remains
883        assert!(persistence
884            .load_sync_state(&peer_id2, "doc1")
885            .unwrap()
886            .is_some());
887    }
888
889    #[test]
890    fn test_persisted_sync_state_roundtrip() {
891        let peer_id = create_test_peer_id();
892        let state = SyncState::new();
893
894        let persisted = PersistedSyncState::from_sync_state(&state, &peer_id, "test_doc", 42);
895
896        assert_eq!(persisted.doc_key, "test_doc");
897        assert_eq!(persisted.sync_count, 42);
898        assert!(!persisted.state_bytes.is_empty());
899
900        let restored = persisted.to_sync_state().unwrap();
901        assert_eq!(restored.encode(), state.encode());
902    }
903}