Skip to main content

hashtree_network/
types.rs

1//! Mesh transport types for peer-to-peer data exchange.
2//!
3//! Defines signaling frames, negotiated peer-link messages, and shared mesh
4//! constants used across Nostr websocket transports, local buses, direct-link
5//! transports such as WebRTC, and simulation.
6
7use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::Duration;
12use tokio::time::Instant;
13
14use crate::mesh_store_core::{PubsubDeliveryMode, RequestDispatchConfig};
15use crate::peer_selector::SelectionStrategy;
16
17fn default_hash_get_enabled() -> bool {
18    true
19}
20
21/// Unique identifier for a peer in the network
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct PeerId {
24    /// Nostr public key / endpoint identity
25    pub pubkey: String,
26}
27
28impl PeerId {
29    pub fn new(pubkey: String) -> Self {
30        Self { pubkey }
31    }
32
33    pub fn to_peer_string(&self) -> String {
34        self.pubkey.clone()
35    }
36
37    pub fn from_peer_string(s: &str) -> Option<Self> {
38        let pubkey = s.trim();
39        if pubkey.is_empty() || pubkey.contains(':') {
40            return None;
41        }
42        Some(Self {
43            pubkey: pubkey.to_string(),
44        })
45    }
46
47    pub fn from_string(s: &str) -> Option<Self> {
48        Self::from_peer_string(s)
49    }
50
51    pub fn short(&self) -> String {
52        self.pubkey[..8.min(self.pubkey.len())].to_string()
53    }
54}
55
56/// Persisted private WebRTC signaling hints for a stable peer identity.
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct KnownPeerRecord {
59    pub peer_id: String,
60    #[serde(
61        default,
62        alias = "direct_urls",
63        alias = "addresses",
64        skip_serializing_if = "Vec::is_empty"
65    )]
66    pub signal_urls: Vec<String>,
67    #[serde(default)]
68    pub last_seen_unix_ms: u64,
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub last_source: Option<String>,
71}
72
73/// Snapshot of private peer signaling hints learned over established peer links.
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75pub struct KnownPeerSnapshot {
76    pub version: u32,
77    #[serde(default)]
78    pub peers: Vec<KnownPeerRecord>,
79}
80
81impl Default for KnownPeerSnapshot {
82    fn default() -> Self {
83        Self {
84            version: 1,
85            peers: Vec::new(),
86        }
87    }
88}
89
90impl std::fmt::Display for PeerId {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.write_str(&self.pubkey)
93    }
94}
95
96/// Signaling message types exchanged over mesh signaling transports.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type")]
99pub enum SignalingMessage {
100    /// Initial hello message to discover peers
101    #[serde(rename = "hello")]
102    Hello {
103        #[serde(rename = "peerId")]
104        peer_id: String,
105        roots: Vec<String>,
106        #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
107        hash_get: bool,
108    },
109
110    /// Negotiation offer payload (currently SDP for WebRTC links).
111    #[serde(rename = "offer")]
112    Offer {
113        #[serde(rename = "peerId")]
114        peer_id: String,
115        #[serde(rename = "targetPeerId")]
116        target_peer_id: String,
117        sdp: String,
118    },
119
120    /// Negotiation answer payload (currently SDP for WebRTC links).
121    #[serde(rename = "answer")]
122    Answer {
123        #[serde(rename = "peerId")]
124        peer_id: String,
125        #[serde(rename = "targetPeerId")]
126        target_peer_id: String,
127        sdp: String,
128    },
129
130    /// Single candidate update for transports that need trickle negotiation.
131    #[serde(rename = "candidate")]
132    Candidate {
133        #[serde(rename = "peerId")]
134        peer_id: String,
135        #[serde(rename = "targetPeerId")]
136        target_peer_id: String,
137        candidate: String,
138        #[serde(rename = "sdpMLineIndex")]
139        sdp_m_line_index: Option<u16>,
140        #[serde(rename = "sdpMid")]
141        sdp_mid: Option<String>,
142    },
143
144    /// Batched candidate updates.
145    #[serde(rename = "candidates")]
146    Candidates {
147        #[serde(rename = "peerId")]
148        peer_id: String,
149        #[serde(rename = "targetPeerId")]
150        target_peer_id: String,
151        candidates: Vec<IceCandidate>,
152    },
153}
154
155impl SignalingMessage {
156    /// Get the message type label.
157    pub fn msg_type(&self) -> &str {
158        match self {
159            Self::Hello { .. } => "hello",
160            Self::Offer { .. } => "offer",
161            Self::Answer { .. } => "answer",
162            Self::Candidate { .. } => "candidate",
163            Self::Candidates { .. } => "candidates",
164        }
165    }
166
167    /// Get the sender's peer ID
168    pub fn peer_id(&self) -> &str {
169        match self {
170            Self::Hello { peer_id, .. } => peer_id,
171            Self::Offer { peer_id, .. } => peer_id,
172            Self::Answer { peer_id, .. } => peer_id,
173            Self::Candidate { peer_id, .. } => peer_id,
174            Self::Candidates { peer_id, .. } => peer_id,
175        }
176    }
177
178    /// Get the target peer ID (if applicable)
179    pub fn target_peer_id(&self) -> Option<&str> {
180        match self {
181            Self::Hello { .. } => None, // Broadcast
182            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
183            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
184            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
185            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
186        }
187    }
188
189    /// Check if this message is addressed to a specific peer
190    pub fn is_for(&self, my_peer_id: &str) -> bool {
191        match self.target_peer_id() {
192            Some(target) => target == my_peer_id,
193            None => true, // Broadcasts are for everyone
194        }
195    }
196}
197
198/// Shared concurrent-offer negotiation: determine if we are the "polite" peer.
199///
200/// In this negotiation flow, both peers can send offers simultaneously.
201/// When a collision occurs (we receive an offer while we have a pending offer),
202/// the "polite" peer backs off and accepts the incoming offer instead.
203///
204/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
205///
206/// This pattern ensures connections form even when one peer is "satisfied" -
207/// the unsatisfied peer can still initiate and the satisfied peer will accept.
208#[inline]
209pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
210    // Lower ID is "polite" - they back off on collision
211    local_peer_id < remote_peer_id
212}
213
214/// Candidate metadata for transports that use ICE-style negotiation.
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct IceCandidate {
217    pub candidate: String,
218    #[serde(rename = "sdpMLineIndex")]
219    pub sdp_m_line_index: Option<u16>,
220    #[serde(rename = "sdpMid")]
221    pub sdp_mid: Option<String>,
222}
223
224/// Direct peer-link message types for hash-based data exchange.
225#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(tag = "type")]
227pub enum DataMessage {
228    /// Request data by hash
229    #[serde(rename = "req")]
230    Request {
231        id: u32,
232        hash: String,
233        /// Hops To Live - decremented on each forward hop
234        /// When htl reaches 0, request is not forwarded further
235        #[serde(skip_serializing_if = "Option::is_none")]
236        htl: Option<u8>,
237    },
238
239    /// Response with data (binary payload follows)
240    #[serde(rename = "res")]
241    Response {
242        id: u32,
243        hash: String,
244        found: bool,
245        #[serde(skip_serializing_if = "Option::is_none")]
246        size: Option<u64>,
247    },
248
249    /// Push data for a hash the peer previously requested but we didn't have
250    /// This happens when we get it later from another peer
251    #[serde(rename = "push")]
252    Push { hash: String },
253
254    /// Announce available hashes
255    #[serde(rename = "have")]
256    Have { hashes: Vec<String> },
257
258    /// Request list of wanted hashes
259    #[serde(rename = "want")]
260    Want { hashes: Vec<String> },
261
262    /// Notify about root hash update
263    #[serde(rename = "root")]
264    RootUpdate {
265        hash: String,
266        #[serde(skip_serializing_if = "Option::is_none")]
267        size: Option<u64>,
268    },
269}
270
271/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
272pub const MAX_HTL: u8 = 10;
273/// Probability to decrement at max HTL (50%)
274pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
275/// Probability to decrement at min HTL=1 (25%)
276pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
277
278/// HTL decrement mode.
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280pub enum HtlMode {
281    Probabilistic,
282}
283
284/// HTL policy parameters for a traffic class.
285#[derive(Debug, Clone, Copy)]
286pub struct HtlPolicy {
287    pub mode: HtlMode,
288    pub max_htl: u8,
289    pub p_at_max: f64,
290    pub p_at_min: f64,
291}
292
293/// Default policy for blob/content requests.
294pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
295    mode: HtlMode::Probabilistic,
296    max_htl: MAX_HTL,
297    p_at_max: DECREMENT_AT_MAX_PROB,
298    p_at_min: DECREMENT_AT_MIN_PROB,
299};
300
301/// Policy for mesh signaling/event forwarding.
302pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
303    mode: HtlMode::Probabilistic,
304    max_htl: 4,
305    p_at_max: 0.75,
306    p_at_min: 0.5,
307};
308
309/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
310/// Generated once per peer connection, stays fixed for connection lifetime
311#[derive(Debug, Clone, Copy)]
312pub struct PeerHTLConfig {
313    /// Random sample used to decide decrement at max HTL.
314    pub at_max_sample: f64,
315    /// Random sample used to decide decrement at min HTL.
316    pub at_min_sample: f64,
317}
318
319impl PeerHTLConfig {
320    /// Generate random HTL config for a new peer connection
321    pub fn random() -> Self {
322        use rand::Rng;
323        let mut rng = rand::thread_rng();
324        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
325    }
326
327    /// Construct config from explicit random samples.
328    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
329        Self {
330            at_max_sample: at_max_sample.clamp(0.0, 1.0),
331            at_min_sample: at_min_sample.clamp(0.0, 1.0),
332        }
333    }
334
335    /// Construct config from explicit decrement choices.
336    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
337        Self {
338            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
339            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
340        }
341    }
342
343    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
344    /// Called when SENDING to a peer, not on receive
345    pub fn decrement(&self, htl: u8) -> u8 {
346        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
347    }
348
349    /// Decrement HTL using the provided traffic policy.
350    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
351        decrement_htl_with_policy(htl, policy, self)
352    }
353}
354
355impl Default for PeerHTLConfig {
356    fn default() -> Self {
357        Self::random()
358    }
359}
360
361/// Decrement HTL according to policy and per-peer randomness profile.
362pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
363    let htl = htl.min(policy.max_htl);
364    if htl == 0 {
365        return 0;
366    }
367
368    match policy.mode {
369        HtlMode::Probabilistic => {
370            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
371            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
372
373            if htl == policy.max_htl {
374                return if config.at_max_sample < p_at_max {
375                    htl.saturating_sub(1)
376                } else {
377                    htl
378                };
379            }
380
381            if htl == 1 {
382                return if config.at_min_sample < p_at_min {
383                    0
384                } else {
385                    htl
386                };
387            }
388
389            htl.saturating_sub(1)
390        }
391    }
392}
393
394/// Check if a request should be forwarded based on HTL.
395pub fn should_forward_htl(htl: u8) -> bool {
396    htl > 0
397}
398
399/// Backward-compatible helper.
400pub fn should_forward(htl: u8) -> bool {
401    should_forward_htl(htl)
402}
403
404use tokio::sync::{mpsc, oneshot};
405
406/// Peer pool classification
407#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
408pub enum PeerPool {
409    /// Users in social graph (followed or followers)
410    Follows,
411    /// Everyone else
412    Other,
413}
414
415/// Settings for a peer pool
416#[derive(Debug, Clone, Copy)]
417pub struct PoolConfig {
418    /// Maximum connections in this pool
419    pub max_connections: usize,
420    /// Number of connections to consider "satisfied"
421    pub satisfied_connections: usize,
422}
423
424impl PoolConfig {
425    /// Check if we can accept more peers (below max)
426    #[inline]
427    pub fn can_accept(&self, current_count: usize) -> bool {
428        current_count < self.max_connections
429    }
430
431    /// Check if we need more peers (below satisfied)
432    #[inline]
433    pub fn needs_peers(&self, current_count: usize) -> bool {
434        current_count < self.satisfied_connections
435    }
436
437    /// Check if we're satisfied (at or above satisfied threshold)
438    #[inline]
439    pub fn is_satisfied(&self, current_count: usize) -> bool {
440        current_count >= self.satisfied_connections
441    }
442}
443
444impl Default for PoolConfig {
445    fn default() -> Self {
446        Self {
447            max_connections: 16,
448            satisfied_connections: 8,
449        }
450    }
451}
452
453/// Pool settings for both pools
454#[derive(Debug, Clone)]
455pub struct PoolSettings {
456    pub follows: PoolConfig,
457    pub other: PoolConfig,
458}
459
460impl Default for PoolSettings {
461    fn default() -> Self {
462        Self {
463            follows: PoolConfig {
464                max_connections: 16,
465                satisfied_connections: 8,
466            },
467            other: PoolConfig {
468                max_connections: 16,
469                satisfied_connections: 8,
470            },
471        }
472    }
473}
474
475/// Request to classify a peer by pubkey
476pub struct ClassifyRequest {
477    /// Pubkey to classify (hex)
478    pub pubkey: String,
479    /// Channel to send result back
480    pub response: oneshot::Sender<PeerPool>,
481}
482
483/// Sender for peer classification requests
484pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
485
486/// Receiver for peer classification requests (implement this to provide classification)
487pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
488
489/// Create a classifier channel pair
490pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
491    mpsc::channel(buffer)
492}
493
494/// Configuration for the default mesh-backed store composition.
495#[derive(Clone)]
496pub struct MeshStoreConfig {
497    /// Nostr relays for signaling
498    pub relays: Vec<String>,
499    /// Root hashes to advertise
500    pub roots: Vec<Hash>,
501    /// Timeout for data requests (ms)
502    pub request_timeout_ms: u64,
503    /// Interval for sending hello messages (ms)
504    pub hello_interval_ms: u64,
505    /// Enable verbose logging
506    pub debug: bool,
507    /// Pool settings for follows and other peers
508    pub pools: PoolSettings,
509    /// Channel for peer classification (optional)
510    /// If None, all peers go to "Other" pool
511    pub classifier_tx: Option<ClassifierTx>,
512    /// Retrieval peer selection strategy (shared with MeshStoreCore/CLI/sim).
513    pub request_selection_strategy: SelectionStrategy,
514    /// Whether fairness constraints are enabled for retrieval selection.
515    pub request_fairness_enabled: bool,
516    /// Hedged request dispatch policy for retrieval.
517    pub request_dispatch: RequestDispatchConfig,
518    /// Pubsub delivery strategy used by the production mesh store.
519    pub pubsub_delivery_mode: PubsubDeliveryMode,
520    /// Upstream Blossom servers used as adaptive fallback read sources.
521    pub upstream_blossom_servers: Vec<String>,
522}
523
524impl Default for MeshStoreConfig {
525    fn default() -> Self {
526        Self {
527            relays: vec![
528                "wss://temp.iris.to".to_string(),
529                "wss://relay.damus.io".to_string(),
530            ],
531            roots: Vec::new(),
532            request_timeout_ms: 10000,
533            hello_interval_ms: 30000,
534            debug: false,
535            pools: PoolSettings::default(),
536            classifier_tx: None,
537            request_selection_strategy: SelectionStrategy::Weighted,
538            request_fairness_enabled: true,
539            request_dispatch: RequestDispatchConfig {
540                initial_fanout: 2,
541                hedge_fanout: 1,
542                max_fanout: 8,
543                hedge_interval_ms: 120,
544            },
545            pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
546            upstream_blossom_servers: Vec::new(),
547        }
548    }
549}
550
551/// Connection state for a peer
552#[derive(Debug, Clone, Copy, PartialEq, Eq)]
553pub enum PeerState {
554    /// Initial state
555    New,
556    /// Connecting via signaling.
557    Connecting,
558    /// Negotiated direct link established.
559    Connected,
560    /// Direct peer link open and ready for data.
561    Ready,
562    /// Connection failed or closed
563    Disconnected,
564}
565
566/// Statistics for the default mesh-backed store composition.
567#[derive(Debug, Clone, Default)]
568pub struct MeshStats {
569    pub connected_peers: usize,
570    pub pending_requests: usize,
571    pub bytes_sent: u64,
572    pub bytes_received: u64,
573    pub requests_made: u64,
574    pub requests_fulfilled: u64,
575}
576
577/// Backward-compatible alias for the previous production-specific name.
578pub type WebRTCStats = MeshStats;
579
580/// Nostr event kind for hashtree signaling envelopes (ephemeral, NIP-17 style).
581pub const NOSTR_KIND_HASHTREE: u16 = 25050;
582pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
583
584/// Relayless mesh protocol constants for forwarding signaling events over data channels.
585pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
586pub const MESH_PROTOCOL_VERSION: u8 = 1;
587pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
588pub const MESH_MAX_HTL: u8 = 6;
589
590/// TTL/capacity dedupe structure for forwarded mesh frames/events.
591#[derive(Debug)]
592pub struct TimedSeenSet {
593    entries: HashMap<String, Instant>,
594    order: VecDeque<(String, Instant)>,
595    ttl: Duration,
596    capacity: usize,
597}
598
599impl TimedSeenSet {
600    pub fn new(capacity: usize, ttl: Duration) -> Self {
601        Self {
602            entries: HashMap::new(),
603            order: VecDeque::new(),
604            ttl,
605            capacity,
606        }
607    }
608
609    fn prune(&mut self, now: Instant) {
610        while let Some((key, inserted_at)) = self.order.front().cloned() {
611            if now.duration_since(inserted_at) < self.ttl {
612                break;
613            }
614            self.order.pop_front();
615            if self
616                .entries
617                .get(&key)
618                .map(|ts| *ts == inserted_at)
619                .unwrap_or(false)
620            {
621                self.entries.remove(&key);
622            }
623        }
624
625        while self.entries.len() > self.capacity {
626            if let Some((key, inserted_at)) = self.order.pop_front() {
627                if self
628                    .entries
629                    .get(&key)
630                    .map(|ts| *ts == inserted_at)
631                    .unwrap_or(false)
632                {
633                    self.entries.remove(&key);
634                }
635            } else {
636                break;
637            }
638        }
639    }
640
641    pub fn insert_if_new(&mut self, key: String) -> bool {
642        let now = Instant::now();
643        self.prune(now);
644        if self.entries.contains_key(&key) {
645            return false;
646        }
647        self.entries.insert(key.clone(), now);
648        self.order.push_back((key, now));
649        self.prune(now);
650        true
651    }
652
653    pub fn contains(&mut self, key: &str) -> bool {
654        let now = Instant::now();
655        self.prune(now);
656        self.entries.contains_key(key)
657    }
658
659    pub fn len(&self) -> usize {
660        self.entries.len()
661    }
662
663    pub fn is_empty(&self) -> bool {
664        self.entries.is_empty()
665    }
666}
667
668#[derive(Debug, Clone, Serialize, Deserialize)]
669#[serde(tag = "type")]
670pub enum MeshNostrPayload {
671    #[serde(rename = "EVENT")]
672    Event { event: Event },
673}
674
675#[derive(Debug, Clone, Serialize, Deserialize)]
676pub struct MeshNostrFrame {
677    pub protocol: String,
678    pub version: u8,
679    pub frame_id: String,
680    pub htl: u8,
681    pub sender_peer_id: String,
682    pub payload: MeshNostrPayload,
683}
684
685impl MeshNostrFrame {
686    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
687        Self::new_event_with_id(
688            event,
689            sender_peer_id,
690            &uuid::Uuid::new_v4().to_string(),
691            htl,
692        )
693    }
694
695    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
696        Self {
697            protocol: MESH_PROTOCOL.to_string(),
698            version: MESH_PROTOCOL_VERSION,
699            frame_id: frame_id.to_string(),
700            htl,
701            sender_peer_id: sender_peer_id.to_string(),
702            payload: MeshNostrPayload::Event { event },
703        }
704    }
705
706    pub fn event(&self) -> &Event {
707        match &self.payload {
708            MeshNostrPayload::Event { event } => event,
709        }
710    }
711}
712
713pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
714    if frame.protocol != MESH_PROTOCOL {
715        return Err("invalid protocol");
716    }
717    if frame.version != MESH_PROTOCOL_VERSION {
718        return Err("invalid version");
719    }
720    if frame.frame_id.is_empty() {
721        return Err("missing frame id");
722    }
723    if frame.sender_peer_id.is_empty() {
724        return Err("missing sender peer id");
725    }
726    if frame.sender_peer_id.contains(':') {
727        return Err("invalid sender peer id");
728    }
729    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
730        return Err("invalid htl");
731    }
732
733    let event = frame.event();
734    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
735        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
736    {
737        return Err("unsupported event kind");
738    }
739
740    Ok(())
741}
742
743/// Default data channel label used by the WebRTC peer-link implementation.
744pub const DATA_CHANNEL_LABEL: &str = "hashtree";