1use dwbase_core::{Atom, AtomId, WorldKey};
4use dwbase_engine::{SummaryAdvert, TimeWindow};
5use serde::{Deserialize, Serialize};
6use siphasher::sip::SipHasher13;
7use std::collections::HashMap;
8use std::fmt;
9use std::hash::Hasher;
10use std::net::SocketAddr;
11use std::time::{Duration, Instant, SystemTime};
12use thiserror::Error;
13
14pub type Result<T> = std::result::Result<T, SwarmError>;
15
16#[derive(Debug, Error)]
18pub enum SwarmError {
19 #[error("io error: {0}")]
20 Io(#[from] std::io::Error),
21 #[error("encode error: {0}")]
22 Encode(String),
23 #[error("decode error: {0}")]
24 Decode(String),
25 #[error("datagram too large ({0} > {1})")]
26 Oversize(usize, usize),
27}
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
31pub struct PeerId(pub String);
32
33impl PeerId {
34 pub fn new<S: Into<String>>(s: S) -> Self {
35 Self(s.into())
36 }
37}
38
39impl fmt::Display for PeerId {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 self.0.fmt(f)
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
47pub struct PeerInfo {
48 pub id: PeerId,
49 pub addr: SocketAddr,
50 pub trust_score: f32,
51 pub worlds: Vec<String>,
52 pub subscriptions: Vec<WorldSubscription>,
53 pub summary_adverts: Vec<SummaryAdvert>,
54}
55
56#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
58pub struct WorldSubscription {
59 pub world_pattern: String,
60 pub kinds: Vec<InterestKind>,
61}
62
63#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
64pub enum InterestKind {
65 Atoms,
66 Summaries,
67}
68
69#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
71pub struct Hello {
72 pub peer: PeerInfo,
73 pub subscriptions: Vec<WorldSubscription>,
74}
75
76#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
78pub struct Envelope {
79 pub kind: EnvelopeKind,
80 pub from: PeerId,
81 pub to: Option<PeerId>,
82 pub payload_bytes: Vec<u8>,
83 pub sent_at: SystemTime,
84}
85
86impl Envelope {
87 pub fn new(
88 kind: EnvelopeKind,
89 from: PeerId,
90 to: Option<PeerId>,
91 payload_bytes: Vec<u8>,
92 ) -> Self {
93 Self {
94 kind,
95 from,
96 to,
97 payload_bytes,
98 sent_at: SystemTime::now(),
99 }
100 }
101
102 pub fn encode(&self) -> Result<Vec<u8>> {
103 bincode::serde::encode_to_vec(self, bincode::config::standard())
104 .map_err(|e: bincode::error::EncodeError| SwarmError::Encode(e.to_string()))
105 }
106
107 pub fn decode(bytes: &[u8]) -> Result<Self> {
108 bincode::serde::decode_from_slice(bytes, bincode::config::standard())
109 .map(|(v, _)| v)
110 .map_err(|e: bincode::error::DecodeError| SwarmError::Decode(e.to_string()))
111 }
112}
113
114#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
116pub enum EnvelopeKind {
117 Ping,
118 Membership,
119 Hello,
120 SummaryAdvert,
121 BloomOffer,
122 MissingRequest,
123 AtomBatch,
124 Gossip,
125 Custom(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct BloomFilter {
131 bits: Vec<u8>,
132 k: u32,
133 m: u32,
134}
135
136impl BloomFilter {
137 pub fn from_ids(ids: &[AtomId], fp_rate: f64) -> Self {
138 let n = ids.len().max(1);
139 let fp = fp_rate.clamp(0.0001, 0.5);
140 let m = ((-(n as f64) * fp.ln()) / (std::f64::consts::LN_2.powi(2))).ceil() as u32;
141 let m = m.max(256);
142 let k = (((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as u32).max(1);
143 let mut bf = Self {
144 bits: vec![0; m.div_ceil(8) as usize],
145 k,
146 m,
147 };
148 for id in ids {
149 bf.set(id);
150 }
151 bf
152 }
153
154 fn hash_pair(id: &AtomId) -> (u64, u64) {
155 let mut h1 = SipHasher13::new_with_keys(0, 1);
156 h1.write(id.0.as_bytes());
157 let mut h2 = SipHasher13::new_with_keys(2, 3);
158 h2.write(id.0.as_bytes());
159 (h1.finish(), h2.finish())
160 }
161
162 fn indices(&self, id: &AtomId) -> impl Iterator<Item = u32> + '_ {
163 let (h1, h2) = Self::hash_pair(id);
164 (0..self.k).map(move |i| {
165 let step = (i as u64).wrapping_mul(h2);
166 ((h1.wrapping_add(step)) % self.m as u64) as u32
167 })
168 }
169
170 fn set(&mut self, id: &AtomId) {
171 let idxs: Vec<u32> = self.indices(id).collect();
172 for idx in idxs {
173 let byte = (idx / 8) as usize;
174 let bit = idx % 8;
175 self.bits[byte] |= 1 << bit;
176 }
177 }
178
179 pub fn contains(&self, id: &AtomId) -> bool {
180 for idx in self.indices(id) {
181 let byte = (idx / 8) as usize;
182 let bit = idx % 8;
183 if self.bits[byte] & (1 << bit) == 0 {
184 return false;
185 }
186 }
187 true
188 }
189
190 pub fn missing(&self, ids: &[AtomId]) -> Vec<AtomId> {
191 ids.iter()
192 .filter(|id| !self.contains(id))
193 .cloned()
194 .collect()
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct BloomOffer {
201 pub world: WorldKey,
202 pub window: TimeWindow,
203 pub bloom: BloomFilter,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct MissingRequest {
209 pub world: WorldKey,
210 pub atom_ids: Vec<AtomId>,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct AtomBatch {
216 pub world: WorldKey,
217 pub atoms: Vec<Atom>,
218}
219
220pub trait Clock: Clone + Send + Sync + 'static {
222 fn now(&self) -> Instant;
223}
224
225#[derive(Clone, Default)]
226pub struct SystemClock;
227
228impl Clock for SystemClock {
229 fn now(&self) -> Instant {
230 Instant::now()
231 }
232}
233
234#[derive(Debug, Clone)]
235struct PeerState {
236 info: PeerInfo,
237 last_seen: Instant,
238 failure_count: u32,
239 backoff_until: Option<Instant>,
240}
241
242pub struct Membership<C: Clock = SystemClock> {
244 peers: HashMap<PeerId, PeerState>,
245 expiry: Duration,
246 base_backoff: Duration,
247 clock: C,
248}
249
250impl Membership<SystemClock> {
251 pub fn new(expiry: Duration, base_backoff: Duration) -> Self {
252 Self::with_clock(SystemClock, expiry, base_backoff)
253 }
254}
255
256impl<C: Clock> Membership<C> {
257 pub fn with_clock(clock: C, expiry: Duration, base_backoff: Duration) -> Self {
258 Self {
259 peers: HashMap::new(),
260 expiry,
261 base_backoff,
262 clock,
263 }
264 }
265
266 pub fn upsert(&mut self, info: PeerInfo) -> bool {
268 let now = self.clock.now();
269 let id = info.id.clone();
270 let is_new = !self.peers.contains_key(&id);
271 let entry = self.peers.entry(id.clone()).or_insert(PeerState {
272 info: info.clone(),
273 last_seen: now,
274 failure_count: 0,
275 backoff_until: None,
276 });
277
278 entry.info = info;
279 entry.last_seen = now;
280 entry.failure_count = 0;
281 entry.backoff_until = None;
282 is_new
283 }
284
285 pub fn merge_hello(&mut self, hello: Hello) -> bool {
287 let mut info = hello.peer;
288 info.subscriptions = hello.subscriptions.clone();
289 self.upsert(info)
290 }
291
292 pub fn apply_summary_advert(&mut self, from: &PeerId, advert: SummaryAdvert) {
294 if let Some(state) = self.peers.get_mut(from) {
295 state
296 .info
297 .summary_adverts
298 .retain(|a| a.digest != advert.digest || a.world != advert.world);
299 state.info.summary_adverts.push(advert);
300 state.last_seen = self.clock.now();
301 }
302 }
303
304 pub fn mark_seen(&mut self, id: &PeerId) {
306 if let Some(state) = self.peers.get_mut(id) {
307 state.last_seen = self.clock.now();
308 state.failure_count = 0;
309 state.backoff_until = None;
310 }
311 }
312
313 pub fn mark_failure(&mut self, id: &PeerId) {
315 if let Some(state) = self.peers.get_mut(id) {
316 state.failure_count = state.failure_count.saturating_add(1);
317 let shift = state.failure_count.saturating_sub(1).min(16);
318 let factor = 1u32 << shift;
319 let backoff = self
320 .base_backoff
321 .saturating_mul(factor)
322 .max(self.base_backoff);
323 state.backoff_until = Some(self.clock.now() + backoff);
324 }
325 }
326
327 pub fn expire(&mut self) -> Vec<PeerId> {
329 let now = self.clock.now();
330 let expiry = self.expiry;
331 let mut removed = Vec::new();
332 self.peers.retain(|id, state| {
333 let keep = now.duration_since(state.last_seen) <= expiry;
334 if !keep {
335 removed.push(id.clone());
336 }
337 keep
338 });
339 removed
340 }
341
342 pub fn is_backing_off(&self, id: &PeerId) -> bool {
343 let now = self.clock.now();
344 self.peers
345 .get(id)
346 .and_then(|s| s.backoff_until)
347 .map(|until| until > now)
348 .unwrap_or(false)
349 }
350
351 pub fn peers(&self) -> Vec<PeerInfo> {
352 self.peers.values().map(|s| s.info.clone()).collect()
353 }
354
355 pub fn eligible_peers(&self) -> Vec<PeerInfo> {
356 let now = self.clock.now();
357 self.peers
358 .values()
359 .filter(|s| s.backoff_until.map(|b| b <= now).unwrap_or(true))
360 .map(|s| s.info.clone())
361 .collect()
362 }
363
364 pub fn len(&self) -> usize {
365 self.peers.len()
366 }
367
368 pub fn is_empty(&self) -> bool {
369 self.peers.is_empty()
370 }
371}
372
373#[cfg(any(feature = "swarm", test))]
374use async_trait::async_trait;
375#[cfg(any(feature = "swarm", test))]
376use std::sync::Arc;
377#[cfg(any(feature = "swarm", test))]
378use tokio::net::UdpSocket;
379
380#[cfg(any(feature = "swarm", test))]
382#[async_trait]
383pub trait Transport: Send + Sync {
384 async fn send(&self, target: SocketAddr, envelope: Envelope) -> Result<()>;
385 async fn recv(&self) -> Result<(SocketAddr, Envelope)>;
386}
387
388#[cfg(any(feature = "swarm", test))]
390#[derive(Clone)]
391pub struct UdpTransport {
392 socket: Arc<UdpSocket>,
393 max_datagram: usize,
394}
395
396#[cfg(any(feature = "swarm", test))]
397impl UdpTransport {
398 pub async fn bind(addr: SocketAddr, max_datagram: usize) -> Result<Self> {
399 let socket = UdpSocket::bind(addr).await?;
400 Ok(Self {
401 socket: Arc::new(socket),
402 max_datagram,
403 })
404 }
405
406 pub fn local_addr(&self) -> Result<SocketAddr> {
407 self.socket.local_addr().map_err(SwarmError::from)
408 }
409}
410
411#[cfg(any(feature = "swarm", test))]
412#[async_trait]
413impl Transport for UdpTransport {
414 async fn send(&self, target: SocketAddr, envelope: Envelope) -> Result<()> {
415 let bytes = envelope.encode()?;
416 if bytes.len() > self.max_datagram {
417 return Err(SwarmError::Oversize(bytes.len(), self.max_datagram));
418 }
419 self.socket.send_to(&bytes, target).await?;
420 Ok(())
421 }
422
423 async fn recv(&self) -> Result<(SocketAddr, Envelope)> {
424 let mut buf = vec![0u8; self.max_datagram];
425 let (len, addr) = self.socket.recv_from(&mut buf).await?;
426 let env = Envelope::decode(&buf[..len])?;
427 Ok((addr, env))
428 }
429}
430
431#[cfg(any(feature = "swarm", test))]
433pub struct GossipLoop<T, C: Clock = SystemClock> {
434 _transport: T,
435 membership: Membership<C>,
436 tick_interval: Duration,
437}
438
439#[cfg(any(feature = "swarm", test))]
440impl<T, C> GossipLoop<T, C>
441where
442 T: Transport,
443 C: Clock,
444{
445 pub fn new(transport: T, membership: Membership<C>, tick_interval: Duration) -> Self {
446 Self {
447 _transport: transport,
448 membership,
449 tick_interval,
450 }
451 }
452
453 pub async fn tick(&mut self) -> Result<()> {
454 let expired = self.membership.expire();
455 if !expired.is_empty() {
456 }
458 tokio::time::sleep(self.tick_interval).await;
460 Ok(())
461 }
462
463 pub async fn run_once(&mut self) -> Result<()> {
464 self.tick().await
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use async_trait::async_trait;
472 use dwbase_core::{AtomId, AtomKind, Importance, Timestamp, WorkerKey, WorldKey};
473 use dwbase_engine::{SummaryCatalog, SummaryWindow};
474 use std::net::{IpAddr, Ipv4Addr};
475 use std::sync::Mutex;
476 use time::{format_description::well_known::Rfc3339, OffsetDateTime};
477 use tokio::sync::{mpsc, Mutex as AsyncMutex};
478
479 #[derive(Clone)]
480 struct FakeClock {
481 now: std::sync::Arc<Mutex<Instant>>,
482 }
483
484 impl FakeClock {
485 fn new() -> Self {
486 Self {
487 now: std::sync::Arc::new(Mutex::new(Instant::now())),
488 }
489 }
490
491 fn advance(&self, duration: Duration) {
492 let mut guard = self.now.lock().unwrap();
493 *guard = (*guard) + duration;
494 }
495 }
496
497 impl Clock for FakeClock {
498 fn now(&self) -> Instant {
499 *self.now.lock().unwrap()
500 }
501 }
502
503 #[derive(Clone)]
504 struct LoopbackTransport {
505 addr: SocketAddr,
506 rx: std::sync::Arc<AsyncMutex<mpsc::UnboundedReceiver<(SocketAddr, Envelope)>>>,
507 tx: mpsc::UnboundedSender<(SocketAddr, Envelope)>,
508 }
509
510 fn loopback_pair() -> (LoopbackTransport, LoopbackTransport) {
511 let (tx_a, rx_a) = mpsc::unbounded_channel();
512 let (tx_b, rx_b) = mpsc::unbounded_channel();
513 let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9000);
514 let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9001);
515 (
516 LoopbackTransport {
517 addr: addr_a,
518 rx: std::sync::Arc::new(AsyncMutex::new(rx_a)),
519 tx: tx_b,
520 },
521 LoopbackTransport {
522 addr: addr_b,
523 rx: std::sync::Arc::new(AsyncMutex::new(rx_b)),
524 tx: tx_a,
525 },
526 )
527 }
528
529 #[async_trait]
530 impl Transport for LoopbackTransport {
531 async fn send(&self, _target: SocketAddr, envelope: Envelope) -> Result<()> {
532 self.tx
533 .send((self.addr, envelope))
534 .map_err(|e| SwarmError::Encode(e.to_string()))
535 }
536
537 async fn recv(&self) -> Result<(SocketAddr, Envelope)> {
538 let mut rx = self.rx.lock().await;
539 rx.recv()
540 .await
541 .ok_or_else(|| SwarmError::Decode("loopback channel closed".into()))
542 }
543 }
544
545 fn ts_from_ms(ms: i64) -> Timestamp {
546 let secs = ms / 1000;
547 let nanos = (ms % 1000) * 1_000_000;
548 let total = (secs as i128 * 1_000_000_000i128) + nanos as i128;
549 let dt =
550 OffsetDateTime::from_unix_timestamp_nanos(total).unwrap_or(OffsetDateTime::UNIX_EPOCH);
551 Timestamp(dt.format(&Rfc3339).unwrap())
552 }
553
554 fn atom_with(id: &str, world: &str, ms: i64) -> Atom {
555 Atom::builder(
556 AtomId::new(id),
557 WorldKey::new(world),
558 WorkerKey::new("worker"),
559 AtomKind::Observation,
560 ts_from_ms(ms),
561 Importance::new(1.0).unwrap(),
562 "{}",
563 )
564 .build()
565 }
566
567 struct SimpleNode {
568 world: WorldKey,
569 atoms: Vec<Atom>,
570 }
571
572 impl SimpleNode {
573 fn new(world: &str, atoms: Vec<Atom>) -> Self {
574 Self {
575 world: WorldKey::new(world),
576 atoms,
577 }
578 }
579
580 fn ids_in_window(&self, window: &TimeWindow) -> Vec<AtomId> {
581 self.atoms
582 .iter()
583 .filter_map(|a| {
584 if let Ok(dt) = OffsetDateTime::parse(a.timestamp().0.as_str(), &Rfc3339) {
585 let ms = (dt.unix_timestamp_nanos() / 1_000_000) as i64;
586 if ms >= window.start_ms && ms <= window.end_ms {
587 return Some(a.id().clone());
588 }
589 }
590 None
591 })
592 .collect()
593 }
594
595 fn bloom_offer(&self, window: TimeWindow, fp: f64) -> BloomOffer {
596 let ids = self.ids_in_window(&window);
597 BloomOffer {
598 world: self.world.clone(),
599 window,
600 bloom: BloomFilter::from_ids(&ids, fp),
601 }
602 }
603
604 fn missing_atoms(&self, bloom: &BloomFilter, window: &TimeWindow) -> Vec<Atom> {
605 let ids = self.ids_in_window(window);
606 let missing = bloom.missing(&ids);
607 self.atoms
608 .iter()
609 .filter(|a| missing.contains(a.id()))
610 .cloned()
611 .collect()
612 }
613
614 fn ingest_batch(&mut self, batch: AtomBatch) {
615 for atom in batch.atoms {
616 if !self.atoms.iter().any(|a| a.id() == atom.id()) {
617 self.atoms.push(atom);
618 }
619 }
620 }
621 }
622
623 fn peer(id: &str, port: u16) -> PeerInfo {
624 PeerInfo {
625 id: PeerId::new(id),
626 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
627 trust_score: 1.0,
628 worlds: vec!["default".into()],
629 subscriptions: vec![],
630 summary_adverts: vec![],
631 }
632 }
633
634 #[test]
635 fn membership_add_update_and_expire() {
636 let clock = FakeClock::new();
637 let mut membership = Membership::with_clock(
638 clock.clone(),
639 Duration::from_secs(30),
640 Duration::from_secs(5),
641 );
642
643 let mut p1 = peer("node-a", 7000);
644 assert!(membership.upsert(p1.clone()));
645 assert_eq!(membership.len(), 1);
646
647 p1.trust_score = 0.6;
649 assert!(!membership.upsert(p1.clone()));
650 assert_eq!(membership.peers()[0].trust_score, 0.6);
651
652 clock.advance(Duration::from_secs(31));
654 let expired = membership.expire();
655 assert_eq!(expired, vec![p1.id.clone()]);
656 assert!(membership.is_empty());
657 }
658
659 #[test]
660 fn membership_backoff_and_recovery() {
661 let clock = FakeClock::new();
662 let mut membership = Membership::with_clock(
663 clock.clone(),
664 Duration::from_secs(30),
665 Duration::from_secs(5),
666 );
667 let p1 = peer("node-b", 7001);
668 membership.upsert(p1.clone());
669
670 membership.mark_failure(&p1.id);
671 assert!(membership.is_backing_off(&p1.id));
672
673 clock.advance(Duration::from_secs(6));
675 assert!(!membership.is_backing_off(&p1.id));
676 membership.mark_seen(&p1.id);
677 assert!(!membership.is_backing_off(&p1.id));
678 }
679
680 #[test]
681 fn envelope_roundtrip() {
682 let env = Envelope::new(
683 EnvelopeKind::Ping,
684 PeerId::new("node-a"),
685 Some(PeerId::new("node-b")),
686 vec![1, 2, 3],
687 );
688 let bytes = env.encode().unwrap();
689 let decoded = Envelope::decode(&bytes).unwrap();
690 assert_eq!(env.kind, decoded.kind);
691 assert_eq!(env.from, decoded.from);
692 assert_eq!(env.to, decoded.to);
693 assert_eq!(env.payload_bytes, decoded.payload_bytes);
694 }
695
696 #[test]
697 fn hello_merge_is_idempotent() {
698 let clock = FakeClock::new();
699 let mut membership = Membership::with_clock(
700 clock.clone(),
701 Duration::from_secs(30),
702 Duration::from_secs(5),
703 );
704 let mut p1 = peer("node-hello", 7100);
705 p1.worlds = vec!["w1".into()];
706 let hello = Hello {
707 peer: p1.clone(),
708 subscriptions: vec![WorldSubscription {
709 world_pattern: "w*".into(),
710 kinds: vec![InterestKind::Summaries],
711 }],
712 };
713 assert!(membership.merge_hello(hello.clone()));
714 assert_eq!(membership.len(), 1);
715
716 p1.worlds.push("w2".into());
718 let updated = Hello {
719 peer: p1.clone(),
720 subscriptions: hello.subscriptions.clone(),
721 };
722 assert!(!membership.merge_hello(updated));
723 let worlds: Vec<String> = membership.peers()[0].worlds.clone();
724 assert_eq!(worlds, vec!["w1".to_string(), "w2".to_string()]);
725 }
726
727 #[test]
728 fn summary_advert_updates_catalog_and_membership() {
729 let clock = FakeClock::new();
730 let mut membership = Membership::with_clock(
731 clock.clone(),
732 Duration::from_secs(30),
733 Duration::from_secs(5),
734 );
735 let mut catalog = SummaryCatalog::new();
736 let p1 = peer("node-sum", 7200);
737 membership.upsert(p1.clone());
738
739 let advert = SummaryAdvert::new(
740 WorldKey::new("w-sum"),
741 vec![SummaryWindow::new(0, 100)],
742 "digest-1",
743 );
744 membership.apply_summary_advert(&p1.id, advert.clone());
745 catalog.upsert(advert.clone());
746
747 let stored = membership.peers()[0].summary_adverts.clone();
748 assert_eq!(stored.len(), 1);
749 assert_eq!(stored[0].digest, "digest-1");
750
751 let known = catalog.list(&WorldKey::new("w-sum"));
752 assert_eq!(known.len(), 1);
753 assert_eq!(known[0].digest, "digest-1");
754 }
755
756 #[tokio::test]
757 async fn exchange_hello_and_summary_advert() -> Result<()> {
758 let (t1, t2) = loopback_pair();
759
760 let addr1 = t1.addr;
761 let addr2 = t2.addr;
762
763 let mut m1 = Membership::new(Duration::from_secs(60), Duration::from_secs(5));
764 let mut m2 = Membership::new(Duration::from_secs(60), Duration::from_secs(5));
765 let mut catalog = SummaryCatalog::new();
766
767 let hello = Hello {
768 peer: PeerInfo {
769 id: PeerId::new("node-a"),
770 addr: addr1,
771 trust_score: 1.0,
772 worlds: vec!["w1".into()],
773 subscriptions: vec![],
774 summary_adverts: vec![],
775 },
776 subscriptions: vec![WorldSubscription {
777 world_pattern: "w1".into(),
778 kinds: vec![InterestKind::Atoms, InterestKind::Summaries],
779 }],
780 };
781
782 t1.send(
784 addr2,
785 Envelope::new(
786 EnvelopeKind::Hello,
787 hello.peer.id.clone(),
788 None,
789 bincode::serde::encode_to_vec(&hello, bincode::config::standard()).unwrap(),
790 ),
791 )
792 .await?;
793
794 let (_, env) = t2.recv().await?;
795 assert_eq!(env.kind, EnvelopeKind::Hello);
796 let decoded: Hello =
797 bincode::serde::decode_from_slice(&env.payload_bytes, bincode::config::standard())
798 .map(|(v, _)| v)
799 .unwrap();
800 m2.merge_hello(decoded.clone());
801 assert_eq!(m2.len(), 1);
802
803 m1.upsert(PeerInfo {
805 id: PeerId::new("node-b"),
806 addr: addr2,
807 trust_score: 1.0,
808 worlds: vec!["w1".into()],
809 subscriptions: vec![],
810 summary_adverts: vec![],
811 });
812 let advert = SummaryAdvert::new(
813 WorldKey::new("w1"),
814 vec![SummaryWindow::new(0, 1000)],
815 "digest-hello",
816 );
817 t2.send(
818 addr1,
819 Envelope::new(
820 EnvelopeKind::SummaryAdvert,
821 PeerId::new("node-b"),
822 Some(PeerId::new("node-a")),
823 bincode::serde::encode_to_vec(&advert, bincode::config::standard()).unwrap(),
824 ),
825 )
826 .await?;
827 let (_, env2) = t1.recv().await?;
828 assert_eq!(env2.kind, EnvelopeKind::SummaryAdvert);
829 let decoded_adv: SummaryAdvert =
830 bincode::serde::decode_from_slice(&env2.payload_bytes, bincode::config::standard())
831 .map(|(v, _)| v)
832 .unwrap();
833 catalog.upsert(decoded_adv.clone());
834 m1.apply_summary_advert(&hello.peer.id, decoded_adv.clone());
835
836 assert_eq!(catalog.list(&WorldKey::new("w1")).len(), 1);
837 assert_eq!(m1.peers().len(), 1);
838 Ok(())
839 }
840
841 #[tokio::test]
842 async fn delta_sync_via_bloom_and_atom_batch() -> Result<()> {
843 let window = TimeWindow::new(0, 10_000);
844 let node_a = SimpleNode::new(
845 "w-delta",
846 vec![
847 atom_with("a1", "w-delta", 1_000),
848 atom_with("a2", "w-delta", 2_000),
849 ],
850 );
851 let mut node_b = SimpleNode::new("w-delta", vec![atom_with("a1", "w-delta", 1_000)]);
852
853 let (t_a, t_b) = loopback_pair();
854
855 let offer = node_b.bloom_offer(window, 0.01);
857 t_b.send(
858 t_a.addr,
859 Envelope::new(
860 EnvelopeKind::BloomOffer,
861 PeerId::new("node-b"),
862 Some(PeerId::new("node-a")),
863 bincode::serde::encode_to_vec(&offer, bincode::config::standard()).unwrap(),
864 ),
865 )
866 .await?;
867
868 let (_, env) = t_a.recv().await?;
870 let decoded: BloomOffer =
871 bincode::serde::decode_from_slice(&env.payload_bytes, bincode::config::standard())
872 .map(|(v, _)| v)
873 .unwrap();
874 let missing_atoms = node_a.missing_atoms(&decoded.bloom, &decoded.window);
875 let batch = AtomBatch {
876 world: decoded.world.clone(),
877 atoms: missing_atoms,
878 };
879 t_a.send(
880 t_b.addr,
881 Envelope::new(
882 EnvelopeKind::AtomBatch,
883 PeerId::new("node-a"),
884 Some(PeerId::new("node-b")),
885 bincode::serde::encode_to_vec(&batch, bincode::config::standard()).unwrap(),
886 ),
887 )
888 .await?;
889
890 let (_, env2) = t_b.recv().await?;
892 let decoded_batch: AtomBatch =
893 bincode::serde::decode_from_slice(&env2.payload_bytes, bincode::config::standard())
894 .map(|(v, _)| v)
895 .unwrap();
896 node_b.ingest_batch(decoded_batch);
897
898 assert_eq!(node_b.atoms.len(), node_a.atoms.len());
899 Ok(())
900 }
901}