Skip to main content

peat_mesh/storage/
negentropy_sync.rs

1//! Negentropy-based set reconciliation for efficient sync (ADR-040, Issue #435)
2//!
3//! This module provides O(log n) event discovery using the Negentropy protocol,
4//! replacing unbounded multi-round Automerge sync that accumulates state.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────┐                    ┌─────────────┐
10//! │   Node A    │                    │   Node B    │
11//! │             │                    │             │
12//! │ ┌─────────┐ │  1. Initiate       │ ┌─────────┐ │
13//! │ │Negentropy│─┼──────────────────►│ │Negentropy│ │
14//! │ │ Storage │ │                    │ │ Storage │ │
15//! │ └─────────┘ │  2. Reconcile      │ └─────────┘ │
16//! │             │◄──────────────────┼│             │
17//! │             │                    │             │
18//! │ have_ids ───┼──► Send to B       │             │
19//! │ need_ids ◄──┼─── Request from B  │             │
20//! └─────────────┘                    └─────────────┘
21//! ```
22//!
23//! # Benefits over Automerge multi-round sync
24//!
25//! - **O(log n) rounds** instead of unbounded rounds
26//! - **No state accumulation**: No `sent_hashes` BTreeSet growing
27//! - **Bandwidth efficient**: Only exchange fingerprints, not full state
28//! - **Stateless**: Each sync session is independent
29
30use anyhow::{Context, Result};
31use negentropy::{Id as NegentropyId, Negentropy, NegentropyStorageVector};
32use std::collections::HashMap;
33use std::sync::{Arc, RwLock};
34
35use iroh::EndpointId;
36
37/// Size of document ID for Negentropy (32 bytes = SHA256)
38pub const DOC_ID_SIZE: usize = 32;
39
40/// Represents a document for Negentropy reconciliation
41#[derive(Debug, Clone)]
42pub struct SyncItem {
43    /// Document key (collection::id format)
44    pub doc_key: String,
45    /// Timestamp (created_at or last_modified)
46    pub timestamp: u64,
47    /// 32-byte document ID (SHA256 of content or unique identifier)
48    pub id: [u8; DOC_ID_SIZE],
49}
50
51impl SyncItem {
52    /// Create a new sync item from document key and content hash
53    pub fn new(doc_key: String, timestamp: u64, content_hash: [u8; DOC_ID_SIZE]) -> Self {
54        Self {
55            doc_key,
56            timestamp,
57            id: content_hash,
58        }
59    }
60
61    /// Create sync item from doc_key, deriving ID from the key itself
62    pub fn from_doc_key(doc_key: &str, timestamp: u64) -> Self {
63        use sha2::{Digest, Sha256};
64        let mut hasher = Sha256::new();
65        hasher.update(doc_key.as_bytes());
66        let hash: [u8; 32] = hasher.finalize().into();
67        Self {
68            doc_key: doc_key.to_string(),
69            timestamp,
70            id: hash,
71        }
72    }
73}
74
75/// Result of a Negentropy reconciliation
76#[derive(Debug, Default)]
77pub struct ReconcileResult {
78    /// Document keys I have that peer needs
79    pub have_keys: Vec<String>,
80    /// Document keys peer has that I need
81    pub need_keys: Vec<String>,
82    /// Whether reconciliation is complete
83    pub is_complete: bool,
84    /// Next message to send (if not complete)
85    pub next_message: Option<Vec<u8>>,
86}
87
88/// Active sync session with a peer
89pub struct SyncSession {
90    /// Peer we're syncing with
91    pub peer_id: EndpointId,
92    /// Local storage snapshot (kept for responder initialization)
93    storage: Option<NegentropyStorageVector>,
94    /// Negentropy instance (owned storage variant)
95    negentropy: Option<Negentropy<'static, NegentropyStorageVector>>,
96    /// Mapping from Negentropy ID to doc_key
97    id_to_key: HashMap<[u8; DOC_ID_SIZE], String>,
98    /// Whether we initiated or responded
99    pub is_initiator: bool,
100}
101
102impl SyncSession {
103    /// Create a new sync session as initiator
104    fn new_initiator(peer_id: EndpointId, items: Vec<SyncItem>) -> Result<Self> {
105        let mut storage = NegentropyStorageVector::new();
106        let mut id_to_key = HashMap::new();
107
108        for item in items {
109            let neg_id =
110                NegentropyId::from_slice(&item.id).context("Invalid ID size for Negentropy")?;
111            storage.insert(item.timestamp, neg_id)?;
112            id_to_key.insert(item.id, item.doc_key);
113        }
114        storage.seal()?;
115
116        Ok(Self {
117            peer_id,
118            storage: Some(storage),
119            negentropy: None,
120            id_to_key,
121            is_initiator: true,
122        })
123    }
124
125    /// Create a new sync session as responder
126    fn new_responder(peer_id: EndpointId, items: Vec<SyncItem>) -> Result<Self> {
127        let mut storage = NegentropyStorageVector::new();
128        let mut id_to_key = HashMap::new();
129
130        for item in items {
131            let neg_id =
132                NegentropyId::from_slice(&item.id).context("Invalid ID size for Negentropy")?;
133            storage.insert(item.timestamp, neg_id)?;
134            id_to_key.insert(item.id, item.doc_key);
135        }
136        storage.seal()?;
137
138        Ok(Self {
139            peer_id,
140            storage: Some(storage),
141            negentropy: None,
142            id_to_key,
143            is_initiator: false,
144        })
145    }
146
147    /// Generate initial message (initiator only)
148    pub fn initiate(&mut self) -> Result<Vec<u8>> {
149        let storage = self.storage.take().context("Storage already consumed")?;
150
151        let mut neg =
152            Negentropy::owned(storage, 0).context("Failed to create Negentropy instance")?;
153
154        let init_msg = neg
155            .initiate()
156            .context("Failed to initiate Negentropy sync")?;
157
158        self.negentropy = Some(neg);
159        Ok(init_msg)
160    }
161
162    /// Process received message and generate response
163    pub fn reconcile(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
164        if self.is_initiator {
165            self.reconcile_initiator(peer_msg)
166        } else {
167            self.reconcile_responder(peer_msg)
168        }
169    }
170
171    fn reconcile_initiator(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
172        let neg = self.negentropy.as_mut().context("Session not initiated")?;
173
174        let mut have_ids: Vec<NegentropyId> = Vec::new();
175        let mut need_ids: Vec<NegentropyId> = Vec::new();
176
177        // reconcile_with_ids returns Option<Vec<u8>>: None when complete, Some when more rounds needed
178        let response = neg
179            .reconcile_with_ids(peer_msg, &mut have_ids, &mut need_ids)
180            .context("Failed to reconcile")?;
181
182        // Convert IDs back to doc keys
183        let have_keys: Vec<String> = have_ids
184            .iter()
185            .filter_map(|id| {
186                let bytes: &[u8; 32] = id.as_bytes();
187                self.id_to_key.get(bytes).cloned()
188            })
189            .collect();
190
191        let need_keys: Vec<String> = need_ids
192            .iter()
193            .filter_map(|id| {
194                let bytes: &[u8; 32] = id.as_bytes();
195                self.id_to_key.get(bytes).cloned()
196            })
197            .collect();
198
199        let is_complete = response.is_none();
200
201        Ok(ReconcileResult {
202            have_keys,
203            need_keys,
204            is_complete,
205            next_message: response,
206        })
207    }
208
209    fn reconcile_responder(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
210        // Create negentropy if first message
211        if self.negentropy.is_none() {
212            let storage = self.storage.take().context("Storage already consumed")?;
213
214            let neg =
215                Negentropy::owned(storage, 0).context("Failed to create Negentropy instance")?;
216
217            self.negentropy = Some(neg);
218        }
219
220        let neg = self.negentropy.as_mut().unwrap();
221
222        let response = neg.reconcile(peer_msg).context("Failed to reconcile")?;
223
224        // Responder doesn't get have/need until reconciliation completes
225        // The response message encodes the differences
226        Ok(ReconcileResult {
227            have_keys: Vec::new(),
228            need_keys: Vec::new(),
229            is_complete: false,
230            next_message: Some(response),
231        })
232    }
233}
234
235/// Manager for Negentropy-based document sync
236///
237/// Provides efficient O(log n) set reconciliation to discover which
238/// documents need to be synced, avoiding unbounded Automerge sync state.
239pub struct NegentropySync {
240    /// Active sync sessions per peer
241    sessions: Arc<RwLock<HashMap<EndpointId, SyncSession>>>,
242    /// Statistics
243    stats: Arc<RwLock<NegentropyStats>>,
244}
245
246/// Statistics for Negentropy sync operations
247#[derive(Debug, Default, Clone)]
248pub struct NegentropyStats {
249    /// Total sync sessions initiated
250    pub sessions_initiated: u64,
251    /// Total sync sessions completed
252    pub sessions_completed: u64,
253    /// Total documents discovered as "have" (we have, peer needs)
254    pub docs_have: u64,
255    /// Total documents discovered as "need" (peer has, we need)
256    pub docs_need: u64,
257    /// Total bytes exchanged in Negentropy messages
258    pub bytes_exchanged: u64,
259    /// Total round trips
260    pub round_trips: u64,
261}
262
263impl NegentropySync {
264    /// Create a new NegentropySync manager
265    pub fn new() -> Self {
266        Self {
267            sessions: Arc::new(RwLock::new(HashMap::new())),
268            stats: Arc::new(RwLock::new(NegentropyStats::default())),
269        }
270    }
271
272    /// Start a sync session with a peer as initiator
273    ///
274    /// Returns the initial message to send to the peer.
275    pub fn initiate_sync(
276        &self,
277        peer_id: EndpointId,
278        local_items: Vec<SyncItem>,
279    ) -> Result<Vec<u8>> {
280        let mut session = SyncSession::new_initiator(peer_id, local_items)?;
281        let init_msg = session.initiate()?;
282
283        // Store session
284        {
285            let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
286            sessions.insert(peer_id, session);
287        }
288
289        // Update stats
290        {
291            let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
292            stats.sessions_initiated += 1;
293            stats.bytes_exchanged += init_msg.len() as u64;
294        }
295
296        tracing::debug!(
297            "Initiated Negentropy sync with peer {:?}, msg_len={}",
298            peer_id,
299            init_msg.len()
300        );
301
302        Ok(init_msg)
303    }
304
305    /// Handle incoming sync message from peer
306    ///
307    /// If we have an active session, this is a response to our initiation.
308    /// Otherwise, we're the responder and should create a new session.
309    pub fn handle_message(
310        &self,
311        peer_id: EndpointId,
312        message: &[u8],
313        local_items: Vec<SyncItem>,
314    ) -> Result<ReconcileResult> {
315        let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
316
317        let session = if let Some(existing) = sessions.get_mut(&peer_id) {
318            existing
319        } else {
320            // Create responder session
321            let session = SyncSession::new_responder(peer_id, local_items)?;
322            sessions.insert(peer_id, session);
323            sessions.get_mut(&peer_id).unwrap()
324        };
325
326        let result = session.reconcile(message)?;
327
328        // Update stats
329        {
330            let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
331            stats.bytes_exchanged += message.len() as u64;
332            stats.round_trips += 1;
333            if let Some(next) = &result.next_message {
334                stats.bytes_exchanged += next.len() as u64;
335            }
336            stats.docs_have += result.have_keys.len() as u64;
337            stats.docs_need += result.need_keys.len() as u64;
338            if result.is_complete {
339                stats.sessions_completed += 1;
340            }
341        }
342
343        // Clean up completed session
344        if result.is_complete {
345            sessions.remove(&peer_id);
346            tracing::debug!(
347                "Negentropy sync complete with {:?}: have={}, need={}",
348                peer_id,
349                result.have_keys.len(),
350                result.need_keys.len()
351            );
352        }
353
354        Ok(result)
355    }
356
357    /// Get current statistics
358    pub fn stats(&self) -> NegentropyStats {
359        self.stats.read().unwrap_or_else(|e| e.into_inner()).clone()
360    }
361
362    /// Check if there's an active session with a peer
363    pub fn has_session(&self, peer_id: &EndpointId) -> bool {
364        self.sessions
365            .read()
366            .unwrap_or_else(|e| e.into_inner())
367            .contains_key(peer_id)
368    }
369
370    /// Cancel a sync session
371    pub fn cancel_session(&self, peer_id: &EndpointId) {
372        self.sessions
373            .write()
374            .unwrap_or_else(|e| e.into_inner())
375            .remove(peer_id);
376    }
377}
378
379impl Default for NegentropySync {
380    fn default() -> Self {
381        Self::new()
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    fn make_test_items(keys: &[&str], base_timestamp: u64) -> Vec<SyncItem> {
390        keys.iter()
391            .enumerate()
392            .map(|(i, key)| SyncItem::from_doc_key(key, base_timestamp + i as u64))
393            .collect()
394    }
395
396    /// Create a test EndpointId from a seed byte
397    fn test_peer_id(seed: u8) -> EndpointId {
398        use iroh::SecretKey;
399        let mut key_bytes = [0u8; 32];
400        key_bytes[0] = seed;
401        let secret = SecretKey::from_bytes(&key_bytes);
402        secret.public()
403    }
404
405    #[test]
406    fn test_sync_item_from_doc_key() {
407        let item1 = SyncItem::from_doc_key("nodes::node-1", 1000);
408        let item2 = SyncItem::from_doc_key("nodes::node-1", 1000);
409        let item3 = SyncItem::from_doc_key("nodes::node-2", 1000);
410
411        // Same key should produce same ID
412        assert_eq!(item1.id, item2.id);
413        // Different keys should produce different IDs
414        assert_ne!(item1.id, item3.id);
415    }
416
417    #[test]
418    fn test_identical_sets_no_differences() {
419        let peer_a = test_peer_id(1);
420        let peer_b = test_peer_id(2);
421
422        let items = make_test_items(&["doc-1", "doc-2", "doc-3"], 1000);
423
424        let sync_a = NegentropySync::new();
425        let sync_b = NegentropySync::new();
426
427        // A initiates
428        let msg1 = sync_a.initiate_sync(peer_b, items.clone()).unwrap();
429
430        // B responds
431        let result_b = sync_b.handle_message(peer_a, &msg1, items.clone()).unwrap();
432        assert!(result_b.next_message.is_some());
433
434        // A processes response
435        let result_a = sync_a
436            .handle_message(peer_b, &result_b.next_message.unwrap(), items)
437            .unwrap();
438
439        // Should complete with no differences
440        assert!(result_a.is_complete);
441        assert!(result_a.have_keys.is_empty());
442        assert!(result_a.need_keys.is_empty());
443    }
444
445    #[test]
446    fn test_different_sets_finds_differences() {
447        let peer_a = test_peer_id(1);
448        let peer_b = test_peer_id(2);
449
450        // A has doc-1, doc-2
451        let items_a = make_test_items(&["doc-1", "doc-2"], 1000);
452        // B has doc-2, doc-3
453        let items_b = make_test_items(&["doc-2", "doc-3"], 1000);
454
455        let sync_a = NegentropySync::new();
456        let sync_b = NegentropySync::new();
457
458        // A initiates
459        let msg1 = sync_a.initiate_sync(peer_b, items_a.clone()).unwrap();
460
461        // B responds
462        let result_b = sync_b
463            .handle_message(peer_a, &msg1, items_b.clone())
464            .unwrap();
465
466        // Continue reconciliation
467        let mut current_msg = result_b.next_message;
468        let mut final_result = None;
469
470        while let Some(msg) = current_msg {
471            let result = sync_a
472                .handle_message(peer_b, &msg, items_a.clone())
473                .unwrap();
474            if result.is_complete {
475                final_result = Some(result);
476                break;
477            }
478
479            if let Some(next) = result.next_message {
480                let resp = sync_b
481                    .handle_message(peer_a, &next, items_b.clone())
482                    .unwrap();
483                current_msg = resp.next_message;
484            } else {
485                break;
486            }
487        }
488
489        // A should discover:
490        // - have_keys: doc-1 (A has, B needs)
491        // - need_keys: doc-3 (B has, A needs)
492        if let Some(result) = final_result {
493            // Note: exact results depend on Negentropy internals
494            // At minimum, reconciliation should complete
495            assert!(result.is_complete);
496        }
497    }
498
499    #[test]
500    fn test_stats_tracking() {
501        let peer_b = test_peer_id(2);
502        let items = make_test_items(&["doc-1"], 1000);
503
504        let sync = NegentropySync::new();
505        let _msg = sync.initiate_sync(peer_b, items).unwrap();
506
507        let stats = sync.stats();
508        assert_eq!(stats.sessions_initiated, 1);
509        assert!(stats.bytes_exchanged > 0);
510    }
511}