1use anyhow::{Context, Result};
31use negentropy::{Id as NegentropyId, Negentropy, NegentropyStorageVector};
32use std::collections::HashMap;
33use std::sync::{Arc, RwLock};
34
35use iroh::EndpointId;
36
37pub const DOC_ID_SIZE: usize = 32;
39
40#[derive(Debug, Clone)]
42pub struct SyncItem {
43 pub doc_key: String,
45 pub timestamp: u64,
47 pub id: [u8; DOC_ID_SIZE],
49}
50
51impl SyncItem {
52 pub fn new(doc_key: String, timestamp: u64, content_hash: [u8; DOC_ID_SIZE]) -> Self {
54 Self {
55 doc_key,
56 timestamp,
57 id: content_hash,
58 }
59 }
60
61 pub fn from_doc_key(doc_key: &str, timestamp: u64) -> Self {
63 use sha2::{Digest, Sha256};
64 let mut hasher = Sha256::new();
65 hasher.update(doc_key.as_bytes());
66 let hash: [u8; 32] = hasher.finalize().into();
67 Self {
68 doc_key: doc_key.to_string(),
69 timestamp,
70 id: hash,
71 }
72 }
73}
74
75#[derive(Debug, Default)]
77pub struct ReconcileResult {
78 pub have_keys: Vec<String>,
80 pub need_keys: Vec<String>,
82 pub is_complete: bool,
84 pub next_message: Option<Vec<u8>>,
86}
87
88pub struct SyncSession {
90 pub peer_id: EndpointId,
92 storage: Option<NegentropyStorageVector>,
94 negentropy: Option<Negentropy<'static, NegentropyStorageVector>>,
96 id_to_key: HashMap<[u8; DOC_ID_SIZE], String>,
98 pub is_initiator: bool,
100}
101
102impl SyncSession {
103 fn new_initiator(peer_id: EndpointId, items: Vec<SyncItem>) -> Result<Self> {
105 let mut storage = NegentropyStorageVector::new();
106 let mut id_to_key = HashMap::new();
107
108 for item in items {
109 let neg_id =
110 NegentropyId::from_slice(&item.id).context("Invalid ID size for Negentropy")?;
111 storage.insert(item.timestamp, neg_id)?;
112 id_to_key.insert(item.id, item.doc_key);
113 }
114 storage.seal()?;
115
116 Ok(Self {
117 peer_id,
118 storage: Some(storage),
119 negentropy: None,
120 id_to_key,
121 is_initiator: true,
122 })
123 }
124
125 fn new_responder(peer_id: EndpointId, items: Vec<SyncItem>) -> Result<Self> {
127 let mut storage = NegentropyStorageVector::new();
128 let mut id_to_key = HashMap::new();
129
130 for item in items {
131 let neg_id =
132 NegentropyId::from_slice(&item.id).context("Invalid ID size for Negentropy")?;
133 storage.insert(item.timestamp, neg_id)?;
134 id_to_key.insert(item.id, item.doc_key);
135 }
136 storage.seal()?;
137
138 Ok(Self {
139 peer_id,
140 storage: Some(storage),
141 negentropy: None,
142 id_to_key,
143 is_initiator: false,
144 })
145 }
146
147 pub fn initiate(&mut self) -> Result<Vec<u8>> {
149 let storage = self.storage.take().context("Storage already consumed")?;
150
151 let mut neg =
152 Negentropy::owned(storage, 0).context("Failed to create Negentropy instance")?;
153
154 let init_msg = neg
155 .initiate()
156 .context("Failed to initiate Negentropy sync")?;
157
158 self.negentropy = Some(neg);
159 Ok(init_msg)
160 }
161
162 pub fn reconcile(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
164 if self.is_initiator {
165 self.reconcile_initiator(peer_msg)
166 } else {
167 self.reconcile_responder(peer_msg)
168 }
169 }
170
171 fn reconcile_initiator(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
172 let neg = self.negentropy.as_mut().context("Session not initiated")?;
173
174 let mut have_ids: Vec<NegentropyId> = Vec::new();
175 let mut need_ids: Vec<NegentropyId> = Vec::new();
176
177 let response = neg
179 .reconcile_with_ids(peer_msg, &mut have_ids, &mut need_ids)
180 .context("Failed to reconcile")?;
181
182 let have_keys: Vec<String> = have_ids
184 .iter()
185 .filter_map(|id| {
186 let bytes: &[u8; 32] = id.as_bytes();
187 self.id_to_key.get(bytes).cloned()
188 })
189 .collect();
190
191 let need_keys: Vec<String> = need_ids
192 .iter()
193 .filter_map(|id| {
194 let bytes: &[u8; 32] = id.as_bytes();
195 self.id_to_key.get(bytes).cloned()
196 })
197 .collect();
198
199 let is_complete = response.is_none();
200
201 Ok(ReconcileResult {
202 have_keys,
203 need_keys,
204 is_complete,
205 next_message: response,
206 })
207 }
208
209 fn reconcile_responder(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
210 if self.negentropy.is_none() {
212 let storage = self.storage.take().context("Storage already consumed")?;
213
214 let neg =
215 Negentropy::owned(storage, 0).context("Failed to create Negentropy instance")?;
216
217 self.negentropy = Some(neg);
218 }
219
220 let neg = self.negentropy.as_mut().unwrap();
221
222 let response = neg.reconcile(peer_msg).context("Failed to reconcile")?;
223
224 Ok(ReconcileResult {
227 have_keys: Vec::new(),
228 need_keys: Vec::new(),
229 is_complete: false,
230 next_message: Some(response),
231 })
232 }
233}
234
235pub struct NegentropySync {
240 sessions: Arc<RwLock<HashMap<EndpointId, SyncSession>>>,
242 stats: Arc<RwLock<NegentropyStats>>,
244}
245
246#[derive(Debug, Default, Clone)]
248pub struct NegentropyStats {
249 pub sessions_initiated: u64,
251 pub sessions_completed: u64,
253 pub docs_have: u64,
255 pub docs_need: u64,
257 pub bytes_exchanged: u64,
259 pub round_trips: u64,
261}
262
263impl NegentropySync {
264 pub fn new() -> Self {
266 Self {
267 sessions: Arc::new(RwLock::new(HashMap::new())),
268 stats: Arc::new(RwLock::new(NegentropyStats::default())),
269 }
270 }
271
272 pub fn initiate_sync(
276 &self,
277 peer_id: EndpointId,
278 local_items: Vec<SyncItem>,
279 ) -> Result<Vec<u8>> {
280 let mut session = SyncSession::new_initiator(peer_id, local_items)?;
281 let init_msg = session.initiate()?;
282
283 {
285 let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
286 sessions.insert(peer_id, session);
287 }
288
289 {
291 let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
292 stats.sessions_initiated += 1;
293 stats.bytes_exchanged += init_msg.len() as u64;
294 }
295
296 tracing::debug!(
297 "Initiated Negentropy sync with peer {:?}, msg_len={}",
298 peer_id,
299 init_msg.len()
300 );
301
302 Ok(init_msg)
303 }
304
305 pub fn handle_message(
310 &self,
311 peer_id: EndpointId,
312 message: &[u8],
313 local_items: Vec<SyncItem>,
314 ) -> Result<ReconcileResult> {
315 let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
316
317 let session = if let Some(existing) = sessions.get_mut(&peer_id) {
318 existing
319 } else {
320 let session = SyncSession::new_responder(peer_id, local_items)?;
322 sessions.insert(peer_id, session);
323 sessions.get_mut(&peer_id).unwrap()
324 };
325
326 let result = session.reconcile(message)?;
327
328 {
330 let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
331 stats.bytes_exchanged += message.len() as u64;
332 stats.round_trips += 1;
333 if let Some(next) = &result.next_message {
334 stats.bytes_exchanged += next.len() as u64;
335 }
336 stats.docs_have += result.have_keys.len() as u64;
337 stats.docs_need += result.need_keys.len() as u64;
338 if result.is_complete {
339 stats.sessions_completed += 1;
340 }
341 }
342
343 if result.is_complete {
345 sessions.remove(&peer_id);
346 tracing::debug!(
347 "Negentropy sync complete with {:?}: have={}, need={}",
348 peer_id,
349 result.have_keys.len(),
350 result.need_keys.len()
351 );
352 }
353
354 Ok(result)
355 }
356
357 pub fn stats(&self) -> NegentropyStats {
359 self.stats.read().unwrap_or_else(|e| e.into_inner()).clone()
360 }
361
362 pub fn has_session(&self, peer_id: &EndpointId) -> bool {
364 self.sessions
365 .read()
366 .unwrap_or_else(|e| e.into_inner())
367 .contains_key(peer_id)
368 }
369
370 pub fn cancel_session(&self, peer_id: &EndpointId) {
372 self.sessions
373 .write()
374 .unwrap_or_else(|e| e.into_inner())
375 .remove(peer_id);
376 }
377}
378
379impl Default for NegentropySync {
380 fn default() -> Self {
381 Self::new()
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 fn make_test_items(keys: &[&str], base_timestamp: u64) -> Vec<SyncItem> {
390 keys.iter()
391 .enumerate()
392 .map(|(i, key)| SyncItem::from_doc_key(key, base_timestamp + i as u64))
393 .collect()
394 }
395
396 fn test_peer_id(seed: u8) -> EndpointId {
398 use iroh::SecretKey;
399 let mut key_bytes = [0u8; 32];
400 key_bytes[0] = seed;
401 let secret = SecretKey::from_bytes(&key_bytes);
402 secret.public()
403 }
404
405 #[test]
406 fn test_sync_item_from_doc_key() {
407 let item1 = SyncItem::from_doc_key("nodes::node-1", 1000);
408 let item2 = SyncItem::from_doc_key("nodes::node-1", 1000);
409 let item3 = SyncItem::from_doc_key("nodes::node-2", 1000);
410
411 assert_eq!(item1.id, item2.id);
413 assert_ne!(item1.id, item3.id);
415 }
416
417 #[test]
418 fn test_identical_sets_no_differences() {
419 let peer_a = test_peer_id(1);
420 let peer_b = test_peer_id(2);
421
422 let items = make_test_items(&["doc-1", "doc-2", "doc-3"], 1000);
423
424 let sync_a = NegentropySync::new();
425 let sync_b = NegentropySync::new();
426
427 let msg1 = sync_a.initiate_sync(peer_b, items.clone()).unwrap();
429
430 let result_b = sync_b.handle_message(peer_a, &msg1, items.clone()).unwrap();
432 assert!(result_b.next_message.is_some());
433
434 let result_a = sync_a
436 .handle_message(peer_b, &result_b.next_message.unwrap(), items)
437 .unwrap();
438
439 assert!(result_a.is_complete);
441 assert!(result_a.have_keys.is_empty());
442 assert!(result_a.need_keys.is_empty());
443 }
444
445 #[test]
446 fn test_different_sets_finds_differences() {
447 let peer_a = test_peer_id(1);
448 let peer_b = test_peer_id(2);
449
450 let items_a = make_test_items(&["doc-1", "doc-2"], 1000);
452 let items_b = make_test_items(&["doc-2", "doc-3"], 1000);
454
455 let sync_a = NegentropySync::new();
456 let sync_b = NegentropySync::new();
457
458 let msg1 = sync_a.initiate_sync(peer_b, items_a.clone()).unwrap();
460
461 let result_b = sync_b
463 .handle_message(peer_a, &msg1, items_b.clone())
464 .unwrap();
465
466 let mut current_msg = result_b.next_message;
468 let mut final_result = None;
469
470 while let Some(msg) = current_msg {
471 let result = sync_a
472 .handle_message(peer_b, &msg, items_a.clone())
473 .unwrap();
474 if result.is_complete {
475 final_result = Some(result);
476 break;
477 }
478
479 if let Some(next) = result.next_message {
480 let resp = sync_b
481 .handle_message(peer_a, &next, items_b.clone())
482 .unwrap();
483 current_msg = resp.next_message;
484 } else {
485 break;
486 }
487 }
488
489 if let Some(result) = final_result {
493 assert!(result.is_complete);
496 }
497 }
498
499 #[test]
500 fn test_stats_tracking() {
501 let peer_b = test_peer_id(2);
502 let items = make_test_items(&["doc-1"], 1000);
503
504 let sync = NegentropySync::new();
505 let _msg = sync.initiate_sync(peer_b, items).unwrap();
506
507 let stats = sync.stats();
508 assert_eq!(stats.sessions_initiated, 1);
509 assert!(stats.bytes_exchanged > 0);
510 }
511}