1use anyhow::{Context, Result};
16use automerge::sync::State as SyncState;
17use iroh::EndpointId;
18use redb::{Database, TableDefinition};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::path::Path;
22use std::sync::Arc;
23use std::time::{Duration, SystemTime, UNIX_EPOCH};
24
25const SYNC_STATE_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("sync_states");
27
28const CHECKPOINT_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("checkpoints");
30
31const META_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("meta");
33
34const SYNC_STATE_PREFIX: &str = "sync_state:";
36const CHECKPOINT_PREFIX: &str = "checkpoint:";
37const META_LAST_CHECKPOINT: &str = "last_checkpoint";
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PersistedSyncState {
45 pub state_bytes: Vec<u8>,
47 pub peer_id_hex: String,
49 pub doc_key: String,
51 pub saved_at: u64,
53 pub sync_count: u64,
55}
56
57impl PersistedSyncState {
58 pub fn from_sync_state(
60 state: &SyncState,
61 peer_id: &EndpointId,
62 doc_key: &str,
63 sync_count: u64,
64 ) -> Self {
65 Self {
66 state_bytes: state.encode(),
67 peer_id_hex: hex::encode(peer_id.as_bytes()),
68 doc_key: doc_key.to_string(),
69 saved_at: SystemTime::now()
70 .duration_since(UNIX_EPOCH)
71 .expect("system clock before UNIX epoch")
72 .as_secs(),
73 sync_count,
74 }
75 }
76
77 pub fn to_sync_state(&self) -> Result<SyncState> {
79 SyncState::decode(&self.state_bytes).context("Failed to decode sync state")
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct Checkpoint {
86 pub timestamp: u64,
88 pub state_count: usize,
90 pub total_bytes: usize,
92 pub peer_ids: Vec<String>,
94}
95
96#[derive(Debug, Clone, Default)]
98pub struct PersistenceStats {
99 pub state_count: usize,
101 pub total_bytes: usize,
103 pub peer_count: usize,
105 pub last_checkpoint: Option<u64>,
107 pub checkpoint_count: usize,
109}
110
111pub struct SyncStatePersistence {
116 db: Arc<Database>,
118 checkpoint_interval: Duration,
120}
121
122impl SyncStatePersistence {
123 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
125 let path = path.as_ref();
126
127 if let Some(parent) = path.parent() {
129 std::fs::create_dir_all(parent).ok();
130 }
131
132 let db_path = if path.is_dir() || !path.exists() {
134 std::fs::create_dir_all(path).ok();
135 path.join("sync_state.redb")
136 } else {
137 path.to_path_buf()
138 };
139
140 let db = Database::create(&db_path).context("Failed to open sync state redb")?;
141
142 {
144 let write_txn = db
145 .begin_write()
146 .context("Failed to begin write transaction")?;
147 let _ = write_txn.open_table(SYNC_STATE_TABLE);
148 let _ = write_txn.open_table(CHECKPOINT_TABLE);
149 let _ = write_txn.open_table(META_TABLE);
150 write_txn
151 .commit()
152 .context("Failed to commit table creation")?;
153 }
154
155 Ok(Self {
156 db: Arc::new(db),
157 checkpoint_interval: Duration::from_secs(60), })
159 }
160
161 pub fn open_with_interval(
163 path: impl AsRef<Path>,
164 checkpoint_interval: Duration,
165 ) -> Result<Self> {
166 let mut persistence = Self::open(path)?;
167 persistence.checkpoint_interval = checkpoint_interval;
168 Ok(persistence)
169 }
170
171 fn sync_state_key(peer_id: &EndpointId, doc_key: &str) -> String {
173 format!(
174 "{}{}:{}",
175 SYNC_STATE_PREFIX,
176 hex::encode(peer_id.as_bytes()),
177 doc_key
178 )
179 }
180
181 pub fn save_sync_state(
183 &self,
184 peer_id: &EndpointId,
185 doc_key: &str,
186 state: &SyncState,
187 sync_count: u64,
188 ) -> Result<()> {
189 let key = Self::sync_state_key(peer_id, doc_key);
190 let persisted = PersistedSyncState::from_sync_state(state, peer_id, doc_key, sync_count);
191
192 let value = serde_json::to_vec(&persisted).context("Failed to serialize sync state")?;
193
194 let write_txn = self
195 .db
196 .begin_write()
197 .context("Failed to begin write transaction")?;
198 {
199 let mut table = write_txn
200 .open_table(SYNC_STATE_TABLE)
201 .context("Failed to open sync state table")?;
202 table
203 .insert(key.as_bytes(), value.as_slice())
204 .context("Failed to write sync state")?;
205 }
206 write_txn.commit().context("Failed to commit write")?;
207
208 tracing::trace!(
209 "Saved sync state for peer {} doc {}: {} bytes",
210 persisted.peer_id_hex,
211 doc_key,
212 value.len()
213 );
214
215 Ok(())
216 }
217
218 pub fn load_sync_state(
220 &self,
221 peer_id: &EndpointId,
222 doc_key: &str,
223 ) -> Result<Option<(SyncState, u64)>> {
224 let key = Self::sync_state_key(peer_id, doc_key);
225
226 let read_txn = self
227 .db
228 .begin_read()
229 .context("Failed to begin read transaction")?;
230 let table = read_txn
231 .open_table(SYNC_STATE_TABLE)
232 .context("Failed to open sync state table")?;
233
234 match table.get(key.as_bytes())? {
235 Some(value) => {
236 let bytes = value.value();
237 let persisted: PersistedSyncState =
238 serde_json::from_slice(bytes).context("Failed to deserialize sync state")?;
239
240 let state = persisted.to_sync_state()?;
241
242 tracing::trace!(
243 "Loaded sync state for peer {} doc {}: sync_count={}",
244 persisted.peer_id_hex,
245 doc_key,
246 persisted.sync_count
247 );
248
249 Ok(Some((state, persisted.sync_count)))
250 }
251 None => Ok(None),
252 }
253 }
254
255 pub fn delete_sync_state(&self, peer_id: &EndpointId, doc_key: &str) -> Result<()> {
257 let key = Self::sync_state_key(peer_id, doc_key);
258
259 let write_txn = self
260 .db
261 .begin_write()
262 .context("Failed to begin write transaction")?;
263 {
264 let mut table = write_txn
265 .open_table(SYNC_STATE_TABLE)
266 .context("Failed to open sync state table")?;
267 table.remove(key.as_bytes())?;
268 }
269 write_txn.commit().context("Failed to commit delete")?;
270
271 Ok(())
272 }
273
274 pub fn load_all_for_peer(&self, peer_id: &EndpointId) -> Result<HashMap<String, SyncState>> {
276 let prefix = format!("{}{}:", SYNC_STATE_PREFIX, hex::encode(peer_id.as_bytes()));
277 let mut results = HashMap::new();
278
279 let read_txn = self
280 .db
281 .begin_read()
282 .context("Failed to begin read transaction")?;
283 let table = read_txn
284 .open_table(SYNC_STATE_TABLE)
285 .context("Failed to open sync state table")?;
286
287 for entry in table.range(prefix.as_bytes()..)? {
288 let (key, value) = entry?;
289 let key_bytes = key.value();
290 let key_str = String::from_utf8_lossy(key_bytes);
291
292 if !key_str.starts_with(&prefix) {
293 break;
294 }
295
296 let persisted: PersistedSyncState = serde_json::from_slice(value.value())?;
297 let state = persisted.to_sync_state()?;
298 results.insert(persisted.doc_key.clone(), state);
299 }
300
301 Ok(results)
302 }
303
304 pub fn load_all(&self) -> Result<HashMap<(EndpointId, String), SyncState>> {
306 let mut results = HashMap::new();
307
308 let read_txn = self
309 .db
310 .begin_read()
311 .context("Failed to begin read transaction")?;
312 let table = read_txn
313 .open_table(SYNC_STATE_TABLE)
314 .context("Failed to open sync state table")?;
315
316 for entry in table.range(SYNC_STATE_PREFIX.as_bytes()..)? {
317 let (key, value) = entry?;
318 let key_bytes = key.value();
319 let key_str = String::from_utf8_lossy(key_bytes);
320
321 if !key_str.starts_with(SYNC_STATE_PREFIX) {
322 break;
323 }
324
325 let persisted: PersistedSyncState = serde_json::from_slice(value.value())?;
326
327 let peer_id_bytes =
329 hex::decode(&persisted.peer_id_hex).context("Invalid peer ID hex")?;
330 if peer_id_bytes.len() != 32 {
331 continue; }
333 let mut arr = [0u8; 32];
334 arr.copy_from_slice(&peer_id_bytes);
335 let public_key = iroh::PublicKey::from_bytes(&arr)
336 .map_err(|e| anyhow::anyhow!("Invalid public key: {}", e))?;
337 let peer_id: EndpointId = public_key;
338
339 let state = persisted.to_sync_state()?;
340 results.insert((peer_id, persisted.doc_key.clone()), state);
341 }
342
343 tracing::info!("Loaded {} sync states from persistence", results.len());
344
345 Ok(results)
346 }
347
348 pub fn create_checkpoint(&self) -> Result<Checkpoint> {
350 let timestamp = SystemTime::now()
351 .duration_since(UNIX_EPOCH)
352 .expect("system clock before UNIX epoch")
353 .as_millis() as u64;
354
355 let mut state_count = 0;
357 let mut total_bytes = 0;
358 let mut peer_ids = std::collections::HashSet::new();
359
360 {
361 let read_txn = self
362 .db
363 .begin_read()
364 .context("Failed to begin read transaction")?;
365 let table = read_txn
366 .open_table(SYNC_STATE_TABLE)
367 .context("Failed to open sync state table")?;
368
369 for entry in table.range(SYNC_STATE_PREFIX.as_bytes()..)? {
370 let (key, value) = entry?;
371 let key_bytes = key.value();
372 let key_str = String::from_utf8_lossy(key_bytes);
373
374 if !key_str.starts_with(SYNC_STATE_PREFIX) {
375 break;
376 }
377
378 state_count += 1;
379 total_bytes += value.value().len();
380
381 if let Some(rest) = key_str.strip_prefix(SYNC_STATE_PREFIX) {
383 if let Some(peer_id) = rest.split(':').next() {
384 peer_ids.insert(peer_id.to_string());
385 }
386 }
387 }
388 }
389
390 let checkpoint = Checkpoint {
391 timestamp,
392 state_count,
393 total_bytes,
394 peer_ids: peer_ids.into_iter().collect(),
395 };
396
397 let checkpoint_key = format!("{}{}", CHECKPOINT_PREFIX, timestamp);
399 let checkpoint_bytes = serde_json::to_vec(&checkpoint)?;
400
401 let write_txn = self
402 .db
403 .begin_write()
404 .context("Failed to begin write transaction")?;
405 {
406 let mut table = write_txn
407 .open_table(CHECKPOINT_TABLE)
408 .context("Failed to open checkpoint table")?;
409 table.insert(checkpoint_key.as_bytes(), checkpoint_bytes.as_slice())?;
410 }
411 {
412 let mut meta_table = write_txn
414 .open_table(META_TABLE)
415 .context("Failed to open meta table")?;
416 meta_table.insert(
417 META_LAST_CHECKPOINT.as_bytes(),
418 ×tamp.to_be_bytes()[..],
419 )?;
420 }
421 write_txn.commit().context("Failed to commit checkpoint")?;
422
423 tracing::info!(
424 "Created checkpoint: {} states, {} bytes, {} peers",
425 state_count,
426 total_bytes,
427 checkpoint.peer_ids.len()
428 );
429
430 Ok(checkpoint)
431 }
432
433 pub fn get_last_checkpoint(&self) -> Result<Option<Checkpoint>> {
435 let read_txn = self
436 .db
437 .begin_read()
438 .context("Failed to begin read transaction")?;
439
440 let meta_table = read_txn
442 .open_table(META_TABLE)
443 .context("Failed to open meta table")?;
444
445 let timestamp_bytes = match meta_table.get(META_LAST_CHECKPOINT.as_bytes())? {
446 Some(value) => value.value().to_vec(),
447 None => return Ok(None),
448 };
449
450 if timestamp_bytes.len() != 8 {
451 return Ok(None);
452 }
453
454 let mut arr = [0u8; 8];
455 arr.copy_from_slice(×tamp_bytes);
456 let timestamp = u64::from_be_bytes(arr);
457
458 let checkpoint_key = format!("{}{}", CHECKPOINT_PREFIX, timestamp);
460 let checkpoint_table = read_txn
461 .open_table(CHECKPOINT_TABLE)
462 .context("Failed to open checkpoint table")?;
463
464 match checkpoint_table.get(checkpoint_key.as_bytes())? {
465 Some(value) => {
466 let checkpoint: Checkpoint = serde_json::from_slice(value.value())?;
467 Ok(Some(checkpoint))
468 }
469 None => Ok(None),
470 }
471 }
472
473 pub fn stats(&self) -> Result<PersistenceStats> {
475 let mut stats = PersistenceStats::default();
476 let mut peer_ids = std::collections::HashSet::new();
477
478 let read_txn = self
479 .db
480 .begin_read()
481 .context("Failed to begin read transaction")?;
482
483 {
485 let table = read_txn
486 .open_table(SYNC_STATE_TABLE)
487 .context("Failed to open sync state table")?;
488
489 for entry in table.range(SYNC_STATE_PREFIX.as_bytes()..)? {
490 let (key, value) = entry?;
491 let key_bytes = key.value();
492 let key_str = String::from_utf8_lossy(key_bytes);
493
494 if !key_str.starts_with(SYNC_STATE_PREFIX) {
495 break;
496 }
497
498 stats.state_count += 1;
499 stats.total_bytes += value.value().len();
500
501 if let Some(rest) = key_str.strip_prefix(SYNC_STATE_PREFIX) {
502 if let Some(peer_id) = rest.split(':').next() {
503 peer_ids.insert(peer_id.to_string());
504 }
505 }
506 }
507 }
508
509 stats.peer_count = peer_ids.len();
510
511 {
513 let checkpoint_table = read_txn
514 .open_table(CHECKPOINT_TABLE)
515 .context("Failed to open checkpoint table")?;
516
517 for entry in checkpoint_table.range(CHECKPOINT_PREFIX.as_bytes()..)? {
518 let (key, _) = entry?;
519 if !key.value().starts_with(CHECKPOINT_PREFIX.as_bytes()) {
520 break;
521 }
522 stats.checkpoint_count += 1;
523 }
524 }
525
526 if let Ok(Some(checkpoint)) = self.get_last_checkpoint() {
528 stats.last_checkpoint = Some(checkpoint.timestamp);
529 }
530
531 Ok(stats)
532 }
533
534 pub fn cleanup_old_checkpoints(&self, keep_count: usize) -> Result<usize> {
536 let mut checkpoints: Vec<u64> = Vec::new();
537
538 {
539 let read_txn = self
540 .db
541 .begin_read()
542 .context("Failed to begin read transaction")?;
543 let table = read_txn
544 .open_table(CHECKPOINT_TABLE)
545 .context("Failed to open checkpoint table")?;
546
547 for entry in table.range(CHECKPOINT_PREFIX.as_bytes()..)? {
548 let (key, _) = entry?;
549 let key_bytes = key.value();
550 let key_str = String::from_utf8_lossy(key_bytes);
551
552 if !key_str.starts_with(CHECKPOINT_PREFIX) {
553 break;
554 }
555
556 if let Some(ts_str) = key_str.strip_prefix(CHECKPOINT_PREFIX) {
557 if let Ok(ts) = ts_str.parse::<u64>() {
558 checkpoints.push(ts);
559 }
560 }
561 }
562 }
563
564 checkpoints.sort_by(|a, b| b.cmp(a));
566
567 let mut deleted = 0;
569 let to_delete: Vec<_> = checkpoints.iter().skip(keep_count).cloned().collect();
570
571 if !to_delete.is_empty() {
572 let write_txn = self
573 .db
574 .begin_write()
575 .context("Failed to begin write transaction")?;
576 {
577 let mut table = write_txn
578 .open_table(CHECKPOINT_TABLE)
579 .context("Failed to open checkpoint table")?;
580
581 for ts in to_delete {
582 let key = format!("{}{}", CHECKPOINT_PREFIX, ts);
583 table.remove(key.as_bytes())?;
584 deleted += 1;
585 }
586 }
587 write_txn.commit().context("Failed to commit cleanup")?;
588 }
589
590 if deleted > 0 {
591 tracing::info!("Cleaned up {} old checkpoints", deleted);
592 }
593
594 Ok(deleted)
595 }
596
597 pub fn delete_peer(&self, peer_id: &EndpointId) -> Result<usize> {
599 let prefix = format!("{}{}:", SYNC_STATE_PREFIX, hex::encode(peer_id.as_bytes()));
600 let mut keys_to_delete = Vec::new();
601
602 {
604 let read_txn = self
605 .db
606 .begin_read()
607 .context("Failed to begin read transaction")?;
608 let table = read_txn
609 .open_table(SYNC_STATE_TABLE)
610 .context("Failed to open sync state table")?;
611
612 for entry in table.range(prefix.as_bytes()..)? {
613 let (key, _) = entry?;
614 let key_bytes = key.value();
615 if !key_bytes.starts_with(prefix.as_bytes()) {
616 break;
617 }
618 keys_to_delete.push(key_bytes.to_vec());
619 }
620 }
621
622 let deleted = keys_to_delete.len();
624 if !keys_to_delete.is_empty() {
625 let write_txn = self
626 .db
627 .begin_write()
628 .context("Failed to begin write transaction")?;
629 {
630 let mut table = write_txn
631 .open_table(SYNC_STATE_TABLE)
632 .context("Failed to open sync state table")?;
633
634 for key in keys_to_delete {
635 table.remove(key.as_slice())?;
636 }
637 }
638 write_txn.commit().context("Failed to commit delete")?;
639 }
640
641 if deleted > 0 {
642 tracing::info!(
643 "Deleted {} sync states for peer {}",
644 deleted,
645 hex::encode(peer_id.as_bytes())
646 );
647 }
648
649 Ok(deleted)
650 }
651
652 pub fn checkpoint_interval(&self) -> Duration {
654 self.checkpoint_interval
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661 use tempfile::TempDir;
662
663 fn create_test_persistence() -> (SyncStatePersistence, TempDir) {
664 let temp_dir = TempDir::new().unwrap();
665 let persistence = SyncStatePersistence::open(temp_dir.path()).unwrap();
666 (persistence, temp_dir)
667 }
668
669 fn create_test_peer_id() -> EndpointId {
670 use iroh::SecretKey;
671 let mut rng = rand::rng();
672 SecretKey::generate(&mut rng).public()
673 }
674
675 #[test]
676 fn test_save_and_load_sync_state() {
677 let (persistence, _temp) = create_test_persistence();
678 let peer_id = create_test_peer_id();
679 let state = SyncState::new();
680
681 persistence
683 .save_sync_state(&peer_id, "doc1", &state, 5)
684 .unwrap();
685
686 let (loaded_state, sync_count) = persistence
688 .load_sync_state(&peer_id, "doc1")
689 .unwrap()
690 .expect("State should exist");
691
692 assert_eq!(sync_count, 5);
693 assert_eq!(loaded_state.encode(), state.encode());
695 }
696
697 #[test]
698 fn test_load_nonexistent_state() {
699 let (persistence, _temp) = create_test_persistence();
700 let peer_id = create_test_peer_id();
701
702 let result = persistence
703 .load_sync_state(&peer_id, "nonexistent")
704 .unwrap();
705 assert!(result.is_none());
706 }
707
708 #[test]
709 fn test_delete_sync_state() {
710 let (persistence, _temp) = create_test_persistence();
711 let peer_id = create_test_peer_id();
712 let state = SyncState::new();
713
714 persistence
715 .save_sync_state(&peer_id, "doc1", &state, 1)
716 .unwrap();
717 assert!(persistence
718 .load_sync_state(&peer_id, "doc1")
719 .unwrap()
720 .is_some());
721
722 persistence.delete_sync_state(&peer_id, "doc1").unwrap();
723 assert!(persistence
724 .load_sync_state(&peer_id, "doc1")
725 .unwrap()
726 .is_none());
727 }
728
729 #[test]
730 fn test_load_all_for_peer() {
731 let (persistence, _temp) = create_test_persistence();
732 let peer_id = create_test_peer_id();
733 let peer_id2 = create_test_peer_id();
734 let state = SyncState::new();
735
736 persistence
738 .save_sync_state(&peer_id, "doc1", &state, 1)
739 .unwrap();
740 persistence
741 .save_sync_state(&peer_id, "doc2", &state, 2)
742 .unwrap();
743
744 persistence
746 .save_sync_state(&peer_id2, "doc1", &state, 3)
747 .unwrap();
748
749 let peer1_states = persistence.load_all_for_peer(&peer_id).unwrap();
751 assert_eq!(peer1_states.len(), 2);
752 assert!(peer1_states.contains_key("doc1"));
753 assert!(peer1_states.contains_key("doc2"));
754
755 let peer2_states = persistence.load_all_for_peer(&peer_id2).unwrap();
757 assert_eq!(peer2_states.len(), 1);
758 }
759
760 #[test]
761 fn test_load_all() {
762 let (persistence, _temp) = create_test_persistence();
763 let peer_id1 = create_test_peer_id();
764 let peer_id2 = create_test_peer_id();
765 let state = SyncState::new();
766
767 persistence
768 .save_sync_state(&peer_id1, "doc1", &state, 1)
769 .unwrap();
770 persistence
771 .save_sync_state(&peer_id2, "doc2", &state, 2)
772 .unwrap();
773
774 let all_states = persistence.load_all().unwrap();
775 assert_eq!(all_states.len(), 2);
776 }
777
778 #[test]
779 fn test_checkpoint() {
780 let (persistence, _temp) = create_test_persistence();
781 let peer_id = create_test_peer_id();
782 let state = SyncState::new();
783
784 persistence
786 .save_sync_state(&peer_id, "doc1", &state, 1)
787 .unwrap();
788 persistence
789 .save_sync_state(&peer_id, "doc2", &state, 2)
790 .unwrap();
791
792 let checkpoint = persistence.create_checkpoint().unwrap();
794 assert_eq!(checkpoint.state_count, 2);
795 assert_eq!(checkpoint.peer_ids.len(), 1);
796
797 let loaded = persistence.get_last_checkpoint().unwrap().unwrap();
799 assert_eq!(loaded.timestamp, checkpoint.timestamp);
800 assert_eq!(loaded.state_count, 2);
801 }
802
803 #[test]
804 fn test_stats() {
805 let (persistence, _temp) = create_test_persistence();
806 let peer_id1 = create_test_peer_id();
807 let peer_id2 = create_test_peer_id();
808 let state = SyncState::new();
809
810 persistence
811 .save_sync_state(&peer_id1, "doc1", &state, 1)
812 .unwrap();
813 persistence
814 .save_sync_state(&peer_id2, "doc2", &state, 2)
815 .unwrap();
816
817 let stats = persistence.stats().unwrap();
818 assert_eq!(stats.state_count, 2);
819 assert_eq!(stats.peer_count, 2);
820 assert!(stats.total_bytes > 0);
821 }
822
823 #[test]
824 fn test_cleanup_old_checkpoints() {
825 let (persistence, _temp) = create_test_persistence();
826 let peer_id = create_test_peer_id();
827 let state = SyncState::new();
828
829 persistence
830 .save_sync_state(&peer_id, "doc1", &state, 1)
831 .unwrap();
832
833 for _ in 0..5 {
835 persistence.create_checkpoint().unwrap();
836 std::thread::sleep(std::time::Duration::from_millis(10));
837 }
838
839 let stats_before = persistence.stats().unwrap();
840 assert_eq!(stats_before.checkpoint_count, 5);
841
842 let deleted = persistence.cleanup_old_checkpoints(2).unwrap();
844 assert_eq!(deleted, 3);
845
846 let stats_after = persistence.stats().unwrap();
847 assert_eq!(stats_after.checkpoint_count, 2);
848 }
849
850 #[test]
851 fn test_delete_peer() {
852 let (persistence, _temp) = create_test_persistence();
853 let peer_id1 = create_test_peer_id();
854 let peer_id2 = create_test_peer_id();
855 let state = SyncState::new();
856
857 persistence
859 .save_sync_state(&peer_id1, "doc1", &state, 1)
860 .unwrap();
861 persistence
862 .save_sync_state(&peer_id1, "doc2", &state, 2)
863 .unwrap();
864 persistence
865 .save_sync_state(&peer_id2, "doc1", &state, 3)
866 .unwrap();
867
868 let deleted = persistence.delete_peer(&peer_id1).unwrap();
870 assert_eq!(deleted, 2);
871
872 assert!(persistence
874 .load_sync_state(&peer_id1, "doc1")
875 .unwrap()
876 .is_none());
877 assert!(persistence
878 .load_sync_state(&peer_id1, "doc2")
879 .unwrap()
880 .is_none());
881
882 assert!(persistence
884 .load_sync_state(&peer_id2, "doc1")
885 .unwrap()
886 .is_some());
887 }
888
889 #[test]
890 fn test_persisted_sync_state_roundtrip() {
891 let peer_id = create_test_peer_id();
892 let state = SyncState::new();
893
894 let persisted = PersistedSyncState::from_sync_state(&state, &peer_id, "test_doc", 42);
895
896 assert_eq!(persisted.doc_key, "test_doc");
897 assert_eq!(persisted.sync_count, 42);
898 assert!(!persisted.state_bytes.is_empty());
899
900 let restored = persisted.to_sync_state().unwrap();
901 assert_eq!(restored.encode(), state.encode());
902 }
903}