dwbase_swarm/
lib.rs

1//! Swarm coordination scaffold for DWBase; orchestration logic will be added later.
2
3use dwbase_core::{Atom, AtomId, WorldKey};
4use dwbase_engine::{SummaryAdvert, TimeWindow};
5use serde::{Deserialize, Serialize};
6use siphasher::sip::SipHasher13;
7use std::collections::HashMap;
8use std::fmt;
9use std::hash::Hasher;
10use std::net::SocketAddr;
11use std::time::{Duration, Instant, SystemTime};
12use thiserror::Error;
13
14pub type Result<T> = std::result::Result<T, SwarmError>;
15
16/// Errors surfaced by the swarm skeleton.
17#[derive(Debug, Error)]
18pub enum SwarmError {
19    #[error("io error: {0}")]
20    Io(#[from] std::io::Error),
21    #[error("encode error: {0}")]
22    Encode(String),
23    #[error("decode error: {0}")]
24    Decode(String),
25    #[error("datagram too large ({0} > {1})")]
26    Oversize(usize, usize),
27}
28
29/// Identifier for a peer in the swarm.
30#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
31pub struct PeerId(pub String);
32
33impl PeerId {
34    pub fn new<S: Into<String>>(s: S) -> Self {
35        Self(s.into())
36    }
37}
38
39impl fmt::Display for PeerId {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        self.0.fmt(f)
42    }
43}
44
45/// Metadata describing a peer.
46#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
47pub struct PeerInfo {
48    pub id: PeerId,
49    pub addr: SocketAddr,
50    pub trust_score: f32,
51    pub worlds: Vec<String>,
52    pub subscriptions: Vec<WorldSubscription>,
53    pub summary_adverts: Vec<SummaryAdvert>,
54}
55
56/// A peer's subscription intent for a world (or pattern).
57#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
58pub struct WorldSubscription {
59    pub world_pattern: String,
60    pub kinds: Vec<InterestKind>,
61}
62
63#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
64pub enum InterestKind {
65    Atoms,
66    Summaries,
67}
68
69/// Initial handshake: declares identity, served worlds, and subscription intents.
70#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
71pub struct Hello {
72    pub peer: PeerInfo,
73    pub subscriptions: Vec<WorldSubscription>,
74}
75
76/// Envelope wire format exchanged between peers.
77#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
78pub struct Envelope {
79    pub kind: EnvelopeKind,
80    pub from: PeerId,
81    pub to: Option<PeerId>,
82    pub payload_bytes: Vec<u8>,
83    pub sent_at: SystemTime,
84}
85
86impl Envelope {
87    pub fn new(
88        kind: EnvelopeKind,
89        from: PeerId,
90        to: Option<PeerId>,
91        payload_bytes: Vec<u8>,
92    ) -> Self {
93        Self {
94            kind,
95            from,
96            to,
97            payload_bytes,
98            sent_at: SystemTime::now(),
99        }
100    }
101
102    pub fn encode(&self) -> Result<Vec<u8>> {
103        bincode::serde::encode_to_vec(self, bincode::config::standard())
104            .map_err(|e: bincode::error::EncodeError| SwarmError::Encode(e.to_string()))
105    }
106
107    pub fn decode(bytes: &[u8]) -> Result<Self> {
108        bincode::serde::decode_from_slice(bytes, bincode::config::standard())
109            .map(|(v, _)| v)
110            .map_err(|e: bincode::error::DecodeError| SwarmError::Decode(e.to_string()))
111    }
112}
113
114/// Basic envelope category marker.
115#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
116pub enum EnvelopeKind {
117    Ping,
118    Membership,
119    Hello,
120    SummaryAdvert,
121    BloomOffer,
122    MissingRequest,
123    AtomBatch,
124    Gossip,
125    Custom(String),
126}
127
128/// Bloom filter wrapper for message payloads.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct BloomFilter {
131    bits: Vec<u8>,
132    k: u32,
133    m: u32,
134}
135
136impl BloomFilter {
137    pub fn from_ids(ids: &[AtomId], fp_rate: f64) -> Self {
138        let n = ids.len().max(1);
139        let fp = fp_rate.clamp(0.0001, 0.5);
140        let m = ((-(n as f64) * fp.ln()) / (std::f64::consts::LN_2.powi(2))).ceil() as u32;
141        let m = m.max(256);
142        let k = (((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as u32).max(1);
143        let mut bf = Self {
144            bits: vec![0; m.div_ceil(8) as usize],
145            k,
146            m,
147        };
148        for id in ids {
149            bf.set(id);
150        }
151        bf
152    }
153
154    fn hash_pair(id: &AtomId) -> (u64, u64) {
155        let mut h1 = SipHasher13::new_with_keys(0, 1);
156        h1.write(id.0.as_bytes());
157        let mut h2 = SipHasher13::new_with_keys(2, 3);
158        h2.write(id.0.as_bytes());
159        (h1.finish(), h2.finish())
160    }
161
162    fn indices(&self, id: &AtomId) -> impl Iterator<Item = u32> + '_ {
163        let (h1, h2) = Self::hash_pair(id);
164        (0..self.k).map(move |i| {
165            let step = (i as u64).wrapping_mul(h2);
166            ((h1.wrapping_add(step)) % self.m as u64) as u32
167        })
168    }
169
170    fn set(&mut self, id: &AtomId) {
171        let idxs: Vec<u32> = self.indices(id).collect();
172        for idx in idxs {
173            let byte = (idx / 8) as usize;
174            let bit = idx % 8;
175            self.bits[byte] |= 1 << bit;
176        }
177    }
178
179    pub fn contains(&self, id: &AtomId) -> bool {
180        for idx in self.indices(id) {
181            let byte = (idx / 8) as usize;
182            let bit = idx % 8;
183            if self.bits[byte] & (1 << bit) == 0 {
184                return false;
185            }
186        }
187        true
188    }
189
190    pub fn missing(&self, ids: &[AtomId]) -> Vec<AtomId> {
191        ids.iter()
192            .filter(|id| !self.contains(id))
193            .cloned()
194            .collect()
195    }
196}
197
198/// Offer of bloom filter for a world and window.
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct BloomOffer {
201    pub world: WorldKey,
202    pub window: TimeWindow,
203    pub bloom: BloomFilter,
204}
205
206/// Request for specific missing atoms.
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct MissingRequest {
209    pub world: WorldKey,
210    pub atom_ids: Vec<AtomId>,
211}
212
213/// Batch of atoms to ship to a peer.
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct AtomBatch {
216    pub world: WorldKey,
217    pub atoms: Vec<Atom>,
218}
219
220/// Clock abstraction to make membership logic testable.
221pub trait Clock: Clone + Send + Sync + 'static {
222    fn now(&self) -> Instant;
223}
224
225#[derive(Clone, Default)]
226pub struct SystemClock;
227
228impl Clock for SystemClock {
229    fn now(&self) -> Instant {
230        Instant::now()
231    }
232}
233
234#[derive(Debug, Clone)]
235struct PeerState {
236    info: PeerInfo,
237    last_seen: Instant,
238    failure_count: u32,
239    backoff_until: Option<Instant>,
240}
241
242/// Tracks known peers, last-seen timestamps, and backoff windows.
243pub struct Membership<C: Clock = SystemClock> {
244    peers: HashMap<PeerId, PeerState>,
245    expiry: Duration,
246    base_backoff: Duration,
247    clock: C,
248}
249
250impl Membership<SystemClock> {
251    pub fn new(expiry: Duration, base_backoff: Duration) -> Self {
252        Self::with_clock(SystemClock, expiry, base_backoff)
253    }
254}
255
256impl<C: Clock> Membership<C> {
257    pub fn with_clock(clock: C, expiry: Duration, base_backoff: Duration) -> Self {
258        Self {
259            peers: HashMap::new(),
260            expiry,
261            base_backoff,
262            clock,
263        }
264    }
265
266    /// Insert or update a peer record; returns true if it was newly added.
267    pub fn upsert(&mut self, info: PeerInfo) -> bool {
268        let now = self.clock.now();
269        let id = info.id.clone();
270        let is_new = !self.peers.contains_key(&id);
271        let entry = self.peers.entry(id.clone()).or_insert(PeerState {
272            info: info.clone(),
273            last_seen: now,
274            failure_count: 0,
275            backoff_until: None,
276        });
277
278        entry.info = info;
279        entry.last_seen = now;
280        entry.failure_count = 0;
281        entry.backoff_until = None;
282        is_new
283    }
284
285    /// Merge a Hello message into membership, resetting backoff/last_seen.
286    pub fn merge_hello(&mut self, hello: Hello) -> bool {
287        let mut info = hello.peer;
288        info.subscriptions = hello.subscriptions.clone();
289        self.upsert(info)
290    }
291
292    /// Apply a summary advert from a known peer, storing it in their record.
293    pub fn apply_summary_advert(&mut self, from: &PeerId, advert: SummaryAdvert) {
294        if let Some(state) = self.peers.get_mut(from) {
295            state
296                .info
297                .summary_adverts
298                .retain(|a| a.digest != advert.digest || a.world != advert.world);
299            state.info.summary_adverts.push(advert);
300            state.last_seen = self.clock.now();
301        }
302    }
303
304    /// Mark a peer as seen now (resets failures/backoff).
305    pub fn mark_seen(&mut self, id: &PeerId) {
306        if let Some(state) = self.peers.get_mut(id) {
307            state.last_seen = self.clock.now();
308            state.failure_count = 0;
309            state.backoff_until = None;
310        }
311    }
312
313    /// Increment failure count and set an exponential backoff window.
314    pub fn mark_failure(&mut self, id: &PeerId) {
315        if let Some(state) = self.peers.get_mut(id) {
316            state.failure_count = state.failure_count.saturating_add(1);
317            let shift = state.failure_count.saturating_sub(1).min(16);
318            let factor = 1u32 << shift;
319            let backoff = self
320                .base_backoff
321                .saturating_mul(factor)
322                .max(self.base_backoff);
323            state.backoff_until = Some(self.clock.now() + backoff);
324        }
325    }
326
327    /// Remove peers whose last_seen exceeds the expiry window; returns removed ids.
328    pub fn expire(&mut self) -> Vec<PeerId> {
329        let now = self.clock.now();
330        let expiry = self.expiry;
331        let mut removed = Vec::new();
332        self.peers.retain(|id, state| {
333            let keep = now.duration_since(state.last_seen) <= expiry;
334            if !keep {
335                removed.push(id.clone());
336            }
337            keep
338        });
339        removed
340    }
341
342    pub fn is_backing_off(&self, id: &PeerId) -> bool {
343        let now = self.clock.now();
344        self.peers
345            .get(id)
346            .and_then(|s| s.backoff_until)
347            .map(|until| until > now)
348            .unwrap_or(false)
349    }
350
351    pub fn peers(&self) -> Vec<PeerInfo> {
352        self.peers.values().map(|s| s.info.clone()).collect()
353    }
354
355    pub fn eligible_peers(&self) -> Vec<PeerInfo> {
356        let now = self.clock.now();
357        self.peers
358            .values()
359            .filter(|s| s.backoff_until.map(|b| b <= now).unwrap_or(true))
360            .map(|s| s.info.clone())
361            .collect()
362    }
363
364    pub fn len(&self) -> usize {
365        self.peers.len()
366    }
367
368    pub fn is_empty(&self) -> bool {
369        self.peers.is_empty()
370    }
371}
372
373#[cfg(any(feature = "swarm", test))]
374use async_trait::async_trait;
375#[cfg(any(feature = "swarm", test))]
376use std::sync::Arc;
377#[cfg(any(feature = "swarm", test))]
378use tokio::net::UdpSocket;
379
380/// Transport abstraction used by the gossip loop.
381#[cfg(any(feature = "swarm", test))]
382#[async_trait]
383pub trait Transport: Send + Sync {
384    async fn send(&self, target: SocketAddr, envelope: Envelope) -> Result<()>;
385    async fn recv(&self) -> Result<(SocketAddr, Envelope)>;
386}
387
388/// Minimal UDP transport stub for local/dev scenarios.
389#[cfg(any(feature = "swarm", test))]
390#[derive(Clone)]
391pub struct UdpTransport {
392    socket: Arc<UdpSocket>,
393    max_datagram: usize,
394}
395
396#[cfg(any(feature = "swarm", test))]
397impl UdpTransport {
398    pub async fn bind(addr: SocketAddr, max_datagram: usize) -> Result<Self> {
399        let socket = UdpSocket::bind(addr).await?;
400        Ok(Self {
401            socket: Arc::new(socket),
402            max_datagram,
403        })
404    }
405
406    pub fn local_addr(&self) -> Result<SocketAddr> {
407        self.socket.local_addr().map_err(SwarmError::from)
408    }
409}
410
411#[cfg(any(feature = "swarm", test))]
412#[async_trait]
413impl Transport for UdpTransport {
414    async fn send(&self, target: SocketAddr, envelope: Envelope) -> Result<()> {
415        let bytes = envelope.encode()?;
416        if bytes.len() > self.max_datagram {
417            return Err(SwarmError::Oversize(bytes.len(), self.max_datagram));
418        }
419        self.socket.send_to(&bytes, target).await?;
420        Ok(())
421    }
422
423    async fn recv(&self) -> Result<(SocketAddr, Envelope)> {
424        let mut buf = vec![0u8; self.max_datagram];
425        let (len, addr) = self.socket.recv_from(&mut buf).await?;
426        let env = Envelope::decode(&buf[..len])?;
427        Ok((addr, env))
428    }
429}
430
431/// Tick-based gossip loop skeleton.
432#[cfg(any(feature = "swarm", test))]
433pub struct GossipLoop<T, C: Clock = SystemClock> {
434    _transport: T,
435    membership: Membership<C>,
436    tick_interval: Duration,
437}
438
439#[cfg(any(feature = "swarm", test))]
440impl<T, C> GossipLoop<T, C>
441where
442    T: Transport,
443    C: Clock,
444{
445    pub fn new(transport: T, membership: Membership<C>, tick_interval: Duration) -> Self {
446        Self {
447            _transport: transport,
448            membership,
449            tick_interval,
450        }
451    }
452
453    pub async fn tick(&mut self) -> Result<()> {
454        let expired = self.membership.expire();
455        if !expired.is_empty() {
456            // placeholder hook for future metrics/logging
457        }
458        // Future work: pull from recv/send queues, drive membership gossip.
459        tokio::time::sleep(self.tick_interval).await;
460        Ok(())
461    }
462
463    pub async fn run_once(&mut self) -> Result<()> {
464        self.tick().await
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use async_trait::async_trait;
472    use dwbase_core::{AtomId, AtomKind, Importance, Timestamp, WorkerKey, WorldKey};
473    use dwbase_engine::{SummaryCatalog, SummaryWindow};
474    use std::net::{IpAddr, Ipv4Addr};
475    use std::sync::Mutex;
476    use time::{format_description::well_known::Rfc3339, OffsetDateTime};
477    use tokio::sync::{mpsc, Mutex as AsyncMutex};
478
479    #[derive(Clone)]
480    struct FakeClock {
481        now: std::sync::Arc<Mutex<Instant>>,
482    }
483
484    impl FakeClock {
485        fn new() -> Self {
486            Self {
487                now: std::sync::Arc::new(Mutex::new(Instant::now())),
488            }
489        }
490
491        fn advance(&self, duration: Duration) {
492            let mut guard = self.now.lock().unwrap();
493            *guard = (*guard) + duration;
494        }
495    }
496
497    impl Clock for FakeClock {
498        fn now(&self) -> Instant {
499            *self.now.lock().unwrap()
500        }
501    }
502
503    #[derive(Clone)]
504    struct LoopbackTransport {
505        addr: SocketAddr,
506        rx: std::sync::Arc<AsyncMutex<mpsc::UnboundedReceiver<(SocketAddr, Envelope)>>>,
507        tx: mpsc::UnboundedSender<(SocketAddr, Envelope)>,
508    }
509
510    fn loopback_pair() -> (LoopbackTransport, LoopbackTransport) {
511        let (tx_a, rx_a) = mpsc::unbounded_channel();
512        let (tx_b, rx_b) = mpsc::unbounded_channel();
513        let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9000);
514        let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9001);
515        (
516            LoopbackTransport {
517                addr: addr_a,
518                rx: std::sync::Arc::new(AsyncMutex::new(rx_a)),
519                tx: tx_b,
520            },
521            LoopbackTransport {
522                addr: addr_b,
523                rx: std::sync::Arc::new(AsyncMutex::new(rx_b)),
524                tx: tx_a,
525            },
526        )
527    }
528
529    #[async_trait]
530    impl Transport for LoopbackTransport {
531        async fn send(&self, _target: SocketAddr, envelope: Envelope) -> Result<()> {
532            self.tx
533                .send((self.addr, envelope))
534                .map_err(|e| SwarmError::Encode(e.to_string()))
535        }
536
537        async fn recv(&self) -> Result<(SocketAddr, Envelope)> {
538            let mut rx = self.rx.lock().await;
539            rx.recv()
540                .await
541                .ok_or_else(|| SwarmError::Decode("loopback channel closed".into()))
542        }
543    }
544
545    fn ts_from_ms(ms: i64) -> Timestamp {
546        let secs = ms / 1000;
547        let nanos = (ms % 1000) * 1_000_000;
548        let total = (secs as i128 * 1_000_000_000i128) + nanos as i128;
549        let dt =
550            OffsetDateTime::from_unix_timestamp_nanos(total).unwrap_or(OffsetDateTime::UNIX_EPOCH);
551        Timestamp(dt.format(&Rfc3339).unwrap())
552    }
553
554    fn atom_with(id: &str, world: &str, ms: i64) -> Atom {
555        Atom::builder(
556            AtomId::new(id),
557            WorldKey::new(world),
558            WorkerKey::new("worker"),
559            AtomKind::Observation,
560            ts_from_ms(ms),
561            Importance::new(1.0).unwrap(),
562            "{}",
563        )
564        .build()
565    }
566
567    struct SimpleNode {
568        world: WorldKey,
569        atoms: Vec<Atom>,
570    }
571
572    impl SimpleNode {
573        fn new(world: &str, atoms: Vec<Atom>) -> Self {
574            Self {
575                world: WorldKey::new(world),
576                atoms,
577            }
578        }
579
580        fn ids_in_window(&self, window: &TimeWindow) -> Vec<AtomId> {
581            self.atoms
582                .iter()
583                .filter_map(|a| {
584                    if let Ok(dt) = OffsetDateTime::parse(a.timestamp().0.as_str(), &Rfc3339) {
585                        let ms = (dt.unix_timestamp_nanos() / 1_000_000) as i64;
586                        if ms >= window.start_ms && ms <= window.end_ms {
587                            return Some(a.id().clone());
588                        }
589                    }
590                    None
591                })
592                .collect()
593        }
594
595        fn bloom_offer(&self, window: TimeWindow, fp: f64) -> BloomOffer {
596            let ids = self.ids_in_window(&window);
597            BloomOffer {
598                world: self.world.clone(),
599                window,
600                bloom: BloomFilter::from_ids(&ids, fp),
601            }
602        }
603
604        fn missing_atoms(&self, bloom: &BloomFilter, window: &TimeWindow) -> Vec<Atom> {
605            let ids = self.ids_in_window(window);
606            let missing = bloom.missing(&ids);
607            self.atoms
608                .iter()
609                .filter(|a| missing.contains(a.id()))
610                .cloned()
611                .collect()
612        }
613
614        fn ingest_batch(&mut self, batch: AtomBatch) {
615            for atom in batch.atoms {
616                if !self.atoms.iter().any(|a| a.id() == atom.id()) {
617                    self.atoms.push(atom);
618                }
619            }
620        }
621    }
622
623    fn peer(id: &str, port: u16) -> PeerInfo {
624        PeerInfo {
625            id: PeerId::new(id),
626            addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
627            trust_score: 1.0,
628            worlds: vec!["default".into()],
629            subscriptions: vec![],
630            summary_adverts: vec![],
631        }
632    }
633
634    #[test]
635    fn membership_add_update_and_expire() {
636        let clock = FakeClock::new();
637        let mut membership = Membership::with_clock(
638            clock.clone(),
639            Duration::from_secs(30),
640            Duration::from_secs(5),
641        );
642
643        let mut p1 = peer("node-a", 7000);
644        assert!(membership.upsert(p1.clone()));
645        assert_eq!(membership.len(), 1);
646
647        // Update trust score and ensure it overwrites.
648        p1.trust_score = 0.6;
649        assert!(!membership.upsert(p1.clone()));
650        assert_eq!(membership.peers()[0].trust_score, 0.6);
651
652        // Expire after window passes.
653        clock.advance(Duration::from_secs(31));
654        let expired = membership.expire();
655        assert_eq!(expired, vec![p1.id.clone()]);
656        assert!(membership.is_empty());
657    }
658
659    #[test]
660    fn membership_backoff_and_recovery() {
661        let clock = FakeClock::new();
662        let mut membership = Membership::with_clock(
663            clock.clone(),
664            Duration::from_secs(30),
665            Duration::from_secs(5),
666        );
667        let p1 = peer("node-b", 7001);
668        membership.upsert(p1.clone());
669
670        membership.mark_failure(&p1.id);
671        assert!(membership.is_backing_off(&p1.id));
672
673        // Backoff expires then mark seen clears failures.
674        clock.advance(Duration::from_secs(6));
675        assert!(!membership.is_backing_off(&p1.id));
676        membership.mark_seen(&p1.id);
677        assert!(!membership.is_backing_off(&p1.id));
678    }
679
680    #[test]
681    fn envelope_roundtrip() {
682        let env = Envelope::new(
683            EnvelopeKind::Ping,
684            PeerId::new("node-a"),
685            Some(PeerId::new("node-b")),
686            vec![1, 2, 3],
687        );
688        let bytes = env.encode().unwrap();
689        let decoded = Envelope::decode(&bytes).unwrap();
690        assert_eq!(env.kind, decoded.kind);
691        assert_eq!(env.from, decoded.from);
692        assert_eq!(env.to, decoded.to);
693        assert_eq!(env.payload_bytes, decoded.payload_bytes);
694    }
695
696    #[test]
697    fn hello_merge_is_idempotent() {
698        let clock = FakeClock::new();
699        let mut membership = Membership::with_clock(
700            clock.clone(),
701            Duration::from_secs(30),
702            Duration::from_secs(5),
703        );
704        let mut p1 = peer("node-hello", 7100);
705        p1.worlds = vec!["w1".into()];
706        let hello = Hello {
707            peer: p1.clone(),
708            subscriptions: vec![WorldSubscription {
709                world_pattern: "w*".into(),
710                kinds: vec![InterestKind::Summaries],
711            }],
712        };
713        assert!(membership.merge_hello(hello.clone()));
714        assert_eq!(membership.len(), 1);
715
716        // Second hello updates worlds/subscriptions without duplicating.
717        p1.worlds.push("w2".into());
718        let updated = Hello {
719            peer: p1.clone(),
720            subscriptions: hello.subscriptions.clone(),
721        };
722        assert!(!membership.merge_hello(updated));
723        let worlds: Vec<String> = membership.peers()[0].worlds.clone();
724        assert_eq!(worlds, vec!["w1".to_string(), "w2".to_string()]);
725    }
726
727    #[test]
728    fn summary_advert_updates_catalog_and_membership() {
729        let clock = FakeClock::new();
730        let mut membership = Membership::with_clock(
731            clock.clone(),
732            Duration::from_secs(30),
733            Duration::from_secs(5),
734        );
735        let mut catalog = SummaryCatalog::new();
736        let p1 = peer("node-sum", 7200);
737        membership.upsert(p1.clone());
738
739        let advert = SummaryAdvert::new(
740            WorldKey::new("w-sum"),
741            vec![SummaryWindow::new(0, 100)],
742            "digest-1",
743        );
744        membership.apply_summary_advert(&p1.id, advert.clone());
745        catalog.upsert(advert.clone());
746
747        let stored = membership.peers()[0].summary_adverts.clone();
748        assert_eq!(stored.len(), 1);
749        assert_eq!(stored[0].digest, "digest-1");
750
751        let known = catalog.list(&WorldKey::new("w-sum"));
752        assert_eq!(known.len(), 1);
753        assert_eq!(known[0].digest, "digest-1");
754    }
755
756    #[tokio::test]
757    async fn exchange_hello_and_summary_advert() -> Result<()> {
758        let (t1, t2) = loopback_pair();
759
760        let addr1 = t1.addr;
761        let addr2 = t2.addr;
762
763        let mut m1 = Membership::new(Duration::from_secs(60), Duration::from_secs(5));
764        let mut m2 = Membership::new(Duration::from_secs(60), Duration::from_secs(5));
765        let mut catalog = SummaryCatalog::new();
766
767        let hello = Hello {
768            peer: PeerInfo {
769                id: PeerId::new("node-a"),
770                addr: addr1,
771                trust_score: 1.0,
772                worlds: vec!["w1".into()],
773                subscriptions: vec![],
774                summary_adverts: vec![],
775            },
776            subscriptions: vec![WorldSubscription {
777                world_pattern: "w1".into(),
778                kinds: vec![InterestKind::Atoms, InterestKind::Summaries],
779            }],
780        };
781
782        // Send hello from node A to node B.
783        t1.send(
784            addr2,
785            Envelope::new(
786                EnvelopeKind::Hello,
787                hello.peer.id.clone(),
788                None,
789                bincode::serde::encode_to_vec(&hello, bincode::config::standard()).unwrap(),
790            ),
791        )
792        .await?;
793
794        let (_, env) = t2.recv().await?;
795        assert_eq!(env.kind, EnvelopeKind::Hello);
796        let decoded: Hello =
797            bincode::serde::decode_from_slice(&env.payload_bytes, bincode::config::standard())
798                .map(|(v, _)| v)
799                .unwrap();
800        m2.merge_hello(decoded.clone());
801        assert_eq!(m2.len(), 1);
802
803        // Send summary advert from node B back to A.
804        m1.upsert(PeerInfo {
805            id: PeerId::new("node-b"),
806            addr: addr2,
807            trust_score: 1.0,
808            worlds: vec!["w1".into()],
809            subscriptions: vec![],
810            summary_adverts: vec![],
811        });
812        let advert = SummaryAdvert::new(
813            WorldKey::new("w1"),
814            vec![SummaryWindow::new(0, 1000)],
815            "digest-hello",
816        );
817        t2.send(
818            addr1,
819            Envelope::new(
820                EnvelopeKind::SummaryAdvert,
821                PeerId::new("node-b"),
822                Some(PeerId::new("node-a")),
823                bincode::serde::encode_to_vec(&advert, bincode::config::standard()).unwrap(),
824            ),
825        )
826        .await?;
827        let (_, env2) = t1.recv().await?;
828        assert_eq!(env2.kind, EnvelopeKind::SummaryAdvert);
829        let decoded_adv: SummaryAdvert =
830            bincode::serde::decode_from_slice(&env2.payload_bytes, bincode::config::standard())
831                .map(|(v, _)| v)
832                .unwrap();
833        catalog.upsert(decoded_adv.clone());
834        m1.apply_summary_advert(&hello.peer.id, decoded_adv.clone());
835
836        assert_eq!(catalog.list(&WorldKey::new("w1")).len(), 1);
837        assert_eq!(m1.peers().len(), 1);
838        Ok(())
839    }
840
841    #[tokio::test]
842    async fn delta_sync_via_bloom_and_atom_batch() -> Result<()> {
843        let window = TimeWindow::new(0, 10_000);
844        let node_a = SimpleNode::new(
845            "w-delta",
846            vec![
847                atom_with("a1", "w-delta", 1_000),
848                atom_with("a2", "w-delta", 2_000),
849            ],
850        );
851        let mut node_b = SimpleNode::new("w-delta", vec![atom_with("a1", "w-delta", 1_000)]);
852
853        let (t_a, t_b) = loopback_pair();
854
855        // B sends bloom of what it has to A.
856        let offer = node_b.bloom_offer(window, 0.01);
857        t_b.send(
858            t_a.addr,
859            Envelope::new(
860                EnvelopeKind::BloomOffer,
861                PeerId::new("node-b"),
862                Some(PeerId::new("node-a")),
863                bincode::serde::encode_to_vec(&offer, bincode::config::standard()).unwrap(),
864            ),
865        )
866        .await?;
867
868        // A receives offer and responds with AtomBatch of missing atoms.
869        let (_, env) = t_a.recv().await?;
870        let decoded: BloomOffer =
871            bincode::serde::decode_from_slice(&env.payload_bytes, bincode::config::standard())
872                .map(|(v, _)| v)
873                .unwrap();
874        let missing_atoms = node_a.missing_atoms(&decoded.bloom, &decoded.window);
875        let batch = AtomBatch {
876            world: decoded.world.clone(),
877            atoms: missing_atoms,
878        };
879        t_a.send(
880            t_b.addr,
881            Envelope::new(
882                EnvelopeKind::AtomBatch,
883                PeerId::new("node-a"),
884                Some(PeerId::new("node-b")),
885                bincode::serde::encode_to_vec(&batch, bincode::config::standard()).unwrap(),
886            ),
887        )
888        .await?;
889
890        // B ingests the batch and ends up in sync.
891        let (_, env2) = t_b.recv().await?;
892        let decoded_batch: AtomBatch =
893            bincode::serde::decode_from_slice(&env2.payload_bytes, bincode::config::standard())
894                .map(|(v, _)| v)
895                .unwrap();
896        node_b.ingest_batch(decoded_batch);
897
898        assert_eq!(node_b.atoms.len(), node_a.atoms.len());
899        Ok(())
900    }
901}