Skip to main content

hashtree_network/
types.rs

1//! Mesh transport types for peer-to-peer data exchange.
2//!
3//! Defines signaling frames, negotiated peer-link messages, and shared mesh
4//! constants used across Nostr websocket transports, local buses, direct-link
5//! transports such as WebRTC, and simulation.
6
7use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::Duration;
12use tokio::time::Instant;
13
14use crate::mesh_store_core::RequestDispatchConfig;
15use crate::peer_selector::SelectionStrategy;
16
17fn default_hash_get_enabled() -> bool {
18    true
19}
20
21/// Unique identifier for a peer in the network
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct PeerId {
24    /// Nostr public key / endpoint identity
25    pub pubkey: String,
26}
27
28impl PeerId {
29    pub fn new(pubkey: String) -> Self {
30        Self { pubkey }
31    }
32
33    pub fn to_peer_string(&self) -> String {
34        self.pubkey.clone()
35    }
36
37    pub fn from_peer_string(s: &str) -> Option<Self> {
38        let pubkey = s.trim();
39        if pubkey.is_empty() || pubkey.contains(':') {
40            return None;
41        }
42        Some(Self {
43            pubkey: pubkey.to_string(),
44        })
45    }
46
47    pub fn from_string(s: &str) -> Option<Self> {
48        Self::from_peer_string(s)
49    }
50
51    pub fn short(&self) -> String {
52        self.pubkey[..8.min(self.pubkey.len())].to_string()
53    }
54}
55
56/// Persisted private WebRTC signaling hints for a stable peer identity.
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct KnownPeerRecord {
59    pub peer_id: String,
60    #[serde(
61        default,
62        alias = "direct_urls",
63        alias = "addresses",
64        skip_serializing_if = "Vec::is_empty"
65    )]
66    pub signal_urls: Vec<String>,
67    #[serde(default)]
68    pub last_seen_unix_ms: u64,
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub last_source: Option<String>,
71}
72
73/// Snapshot of private peer signaling hints learned over established peer links.
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75pub struct KnownPeerSnapshot {
76    pub version: u32,
77    #[serde(default)]
78    pub peers: Vec<KnownPeerRecord>,
79}
80
81impl Default for KnownPeerSnapshot {
82    fn default() -> Self {
83        Self {
84            version: 1,
85            peers: Vec::new(),
86        }
87    }
88}
89
90impl std::fmt::Display for PeerId {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.write_str(&self.pubkey)
93    }
94}
95
96/// Signaling message types exchanged over mesh signaling transports.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type")]
99pub enum SignalingMessage {
100    /// Initial hello message to discover peers
101    #[serde(rename = "hello")]
102    Hello {
103        #[serde(rename = "peerId")]
104        peer_id: String,
105        roots: Vec<String>,
106        #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
107        hash_get: bool,
108    },
109
110    /// Negotiation offer payload (currently SDP for WebRTC links).
111    #[serde(rename = "offer")]
112    Offer {
113        #[serde(rename = "peerId")]
114        peer_id: String,
115        #[serde(rename = "targetPeerId")]
116        target_peer_id: String,
117        sdp: String,
118    },
119
120    /// Negotiation answer payload (currently SDP for WebRTC links).
121    #[serde(rename = "answer")]
122    Answer {
123        #[serde(rename = "peerId")]
124        peer_id: String,
125        #[serde(rename = "targetPeerId")]
126        target_peer_id: String,
127        sdp: String,
128    },
129
130    /// Single candidate update for transports that need trickle negotiation.
131    #[serde(rename = "candidate")]
132    Candidate {
133        #[serde(rename = "peerId")]
134        peer_id: String,
135        #[serde(rename = "targetPeerId")]
136        target_peer_id: String,
137        candidate: String,
138        #[serde(rename = "sdpMLineIndex")]
139        sdp_m_line_index: Option<u16>,
140        #[serde(rename = "sdpMid")]
141        sdp_mid: Option<String>,
142    },
143
144    /// Batched candidate updates.
145    #[serde(rename = "candidates")]
146    Candidates {
147        #[serde(rename = "peerId")]
148        peer_id: String,
149        #[serde(rename = "targetPeerId")]
150        target_peer_id: String,
151        candidates: Vec<IceCandidate>,
152    },
153}
154
155impl SignalingMessage {
156    /// Get the message type label.
157    pub fn msg_type(&self) -> &str {
158        match self {
159            Self::Hello { .. } => "hello",
160            Self::Offer { .. } => "offer",
161            Self::Answer { .. } => "answer",
162            Self::Candidate { .. } => "candidate",
163            Self::Candidates { .. } => "candidates",
164        }
165    }
166
167    /// Get the sender's peer ID
168    pub fn peer_id(&self) -> &str {
169        match self {
170            Self::Hello { peer_id, .. } => peer_id,
171            Self::Offer { peer_id, .. } => peer_id,
172            Self::Answer { peer_id, .. } => peer_id,
173            Self::Candidate { peer_id, .. } => peer_id,
174            Self::Candidates { peer_id, .. } => peer_id,
175        }
176    }
177
178    /// Get the target peer ID (if applicable)
179    pub fn target_peer_id(&self) -> Option<&str> {
180        match self {
181            Self::Hello { .. } => None, // Broadcast
182            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
183            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
184            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
185            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
186        }
187    }
188
189    /// Check if this message is addressed to a specific peer
190    pub fn is_for(&self, my_peer_id: &str) -> bool {
191        match self.target_peer_id() {
192            Some(target) => target == my_peer_id,
193            None => true, // Broadcasts are for everyone
194        }
195    }
196}
197
198/// Shared concurrent-offer negotiation: determine if we are the "polite" peer.
199///
200/// In this negotiation flow, both peers can send offers simultaneously.
201/// When a collision occurs (we receive an offer while we have a pending offer),
202/// the "polite" peer backs off and accepts the incoming offer instead.
203///
204/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
205///
206/// This pattern ensures connections form even when one peer is "satisfied" -
207/// the unsatisfied peer can still initiate and the satisfied peer will accept.
208#[inline]
209pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
210    // Lower ID is "polite" - they back off on collision
211    local_peer_id < remote_peer_id
212}
213
214/// Candidate metadata for transports that use ICE-style negotiation.
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct IceCandidate {
217    pub candidate: String,
218    #[serde(rename = "sdpMLineIndex")]
219    pub sdp_m_line_index: Option<u16>,
220    #[serde(rename = "sdpMid")]
221    pub sdp_mid: Option<String>,
222}
223
224/// Direct peer-link message types for hash-based data exchange.
225#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(tag = "type")]
227pub enum DataMessage {
228    /// Request data by hash
229    #[serde(rename = "req")]
230    Request {
231        id: u32,
232        hash: String,
233        /// Hops To Live - decremented on each forward hop
234        /// When htl reaches 0, request is not forwarded further
235        #[serde(skip_serializing_if = "Option::is_none")]
236        htl: Option<u8>,
237    },
238
239    /// Response with data (binary payload follows)
240    #[serde(rename = "res")]
241    Response {
242        id: u32,
243        hash: String,
244        found: bool,
245        #[serde(skip_serializing_if = "Option::is_none")]
246        size: Option<u64>,
247    },
248
249    /// Push data for a hash the peer previously requested but we didn't have
250    /// This happens when we get it later from another peer
251    #[serde(rename = "push")]
252    Push { hash: String },
253
254    /// Announce available hashes
255    #[serde(rename = "have")]
256    Have { hashes: Vec<String> },
257
258    /// Request list of wanted hashes
259    #[serde(rename = "want")]
260    Want { hashes: Vec<String> },
261
262    /// Notify about root hash update
263    #[serde(rename = "root")]
264    RootUpdate {
265        hash: String,
266        #[serde(skip_serializing_if = "Option::is_none")]
267        size: Option<u64>,
268    },
269}
270
271/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
272pub const MAX_HTL: u8 = 10;
273/// Probability to decrement at max HTL (50%)
274pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
275/// Probability to decrement at min HTL=1 (25%)
276pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
277
278/// HTL decrement mode.
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280pub enum HtlMode {
281    Probabilistic,
282}
283
284/// HTL policy parameters for a traffic class.
285#[derive(Debug, Clone, Copy)]
286pub struct HtlPolicy {
287    pub mode: HtlMode,
288    pub max_htl: u8,
289    pub p_at_max: f64,
290    pub p_at_min: f64,
291}
292
293/// Default policy for blob/content requests.
294pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
295    mode: HtlMode::Probabilistic,
296    max_htl: MAX_HTL,
297    p_at_max: DECREMENT_AT_MAX_PROB,
298    p_at_min: DECREMENT_AT_MIN_PROB,
299};
300
301/// Policy for mesh signaling/event forwarding.
302pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
303    mode: HtlMode::Probabilistic,
304    max_htl: 4,
305    p_at_max: 0.75,
306    p_at_min: 0.5,
307};
308
309/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
310/// Generated once per peer connection, stays fixed for connection lifetime
311#[derive(Debug, Clone, Copy)]
312pub struct PeerHTLConfig {
313    /// Random sample used to decide decrement at max HTL.
314    pub at_max_sample: f64,
315    /// Random sample used to decide decrement at min HTL.
316    pub at_min_sample: f64,
317}
318
319impl PeerHTLConfig {
320    /// Generate random HTL config for a new peer connection
321    pub fn random() -> Self {
322        use rand::Rng;
323        let mut rng = rand::thread_rng();
324        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
325    }
326
327    /// Construct config from explicit random samples.
328    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
329        Self {
330            at_max_sample: at_max_sample.clamp(0.0, 1.0),
331            at_min_sample: at_min_sample.clamp(0.0, 1.0),
332        }
333    }
334
335    /// Construct config from explicit decrement choices.
336    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
337        Self {
338            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
339            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
340        }
341    }
342
343    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
344    /// Called when SENDING to a peer, not on receive
345    pub fn decrement(&self, htl: u8) -> u8 {
346        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
347    }
348
349    /// Decrement HTL using the provided traffic policy.
350    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
351        decrement_htl_with_policy(htl, policy, self)
352    }
353}
354
355impl Default for PeerHTLConfig {
356    fn default() -> Self {
357        Self::random()
358    }
359}
360
361/// Decrement HTL according to policy and per-peer randomness profile.
362pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
363    let htl = htl.min(policy.max_htl);
364    if htl == 0 {
365        return 0;
366    }
367
368    match policy.mode {
369        HtlMode::Probabilistic => {
370            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
371            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
372
373            if htl == policy.max_htl {
374                return if config.at_max_sample < p_at_max {
375                    htl.saturating_sub(1)
376                } else {
377                    htl
378                };
379            }
380
381            if htl == 1 {
382                return if config.at_min_sample < p_at_min {
383                    0
384                } else {
385                    htl
386                };
387            }
388
389            htl.saturating_sub(1)
390        }
391    }
392}
393
394/// Check if a request should be forwarded based on HTL.
395pub fn should_forward_htl(htl: u8) -> bool {
396    htl > 0
397}
398
399/// Backward-compatible helper.
400pub fn should_forward(htl: u8) -> bool {
401    should_forward_htl(htl)
402}
403
404use tokio::sync::{mpsc, oneshot};
405
406/// Peer pool classification
407#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
408pub enum PeerPool {
409    /// Users in social graph (followed or followers)
410    Follows,
411    /// Everyone else
412    Other,
413}
414
415/// Settings for a peer pool
416#[derive(Debug, Clone, Copy)]
417pub struct PoolConfig {
418    /// Maximum connections in this pool
419    pub max_connections: usize,
420    /// Number of connections to consider "satisfied"
421    pub satisfied_connections: usize,
422}
423
424impl PoolConfig {
425    /// Check if we can accept more peers (below max)
426    #[inline]
427    pub fn can_accept(&self, current_count: usize) -> bool {
428        current_count < self.max_connections
429    }
430
431    /// Check if we need more peers (below satisfied)
432    #[inline]
433    pub fn needs_peers(&self, current_count: usize) -> bool {
434        current_count < self.satisfied_connections
435    }
436
437    /// Check if we're satisfied (at or above satisfied threshold)
438    #[inline]
439    pub fn is_satisfied(&self, current_count: usize) -> bool {
440        current_count >= self.satisfied_connections
441    }
442}
443
444impl Default for PoolConfig {
445    fn default() -> Self {
446        Self {
447            max_connections: 16,
448            satisfied_connections: 8,
449        }
450    }
451}
452
453/// Pool settings for both pools
454#[derive(Debug, Clone)]
455pub struct PoolSettings {
456    pub follows: PoolConfig,
457    pub other: PoolConfig,
458}
459
460impl Default for PoolSettings {
461    fn default() -> Self {
462        Self {
463            follows: PoolConfig {
464                max_connections: 16,
465                satisfied_connections: 8,
466            },
467            other: PoolConfig {
468                max_connections: 16,
469                satisfied_connections: 8,
470            },
471        }
472    }
473}
474
475/// Request to classify a peer by pubkey
476pub struct ClassifyRequest {
477    /// Pubkey to classify (hex)
478    pub pubkey: String,
479    /// Channel to send result back
480    pub response: oneshot::Sender<PeerPool>,
481}
482
483/// Sender for peer classification requests
484pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
485
486/// Receiver for peer classification requests (implement this to provide classification)
487pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
488
489/// Create a classifier channel pair
490pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
491    mpsc::channel(buffer)
492}
493
494/// Configuration for the default mesh-backed store composition.
495#[derive(Clone)]
496pub struct MeshStoreConfig {
497    /// Nostr relays for signaling
498    pub relays: Vec<String>,
499    /// Root hashes to advertise
500    pub roots: Vec<Hash>,
501    /// Timeout for data requests (ms)
502    pub request_timeout_ms: u64,
503    /// Interval for sending hello messages (ms)
504    pub hello_interval_ms: u64,
505    /// Enable verbose logging
506    pub debug: bool,
507    /// Pool settings for follows and other peers
508    pub pools: PoolSettings,
509    /// Channel for peer classification (optional)
510    /// If None, all peers go to "Other" pool
511    pub classifier_tx: Option<ClassifierTx>,
512    /// Retrieval peer selection strategy (shared with MeshStoreCore/CLI/sim).
513    pub request_selection_strategy: SelectionStrategy,
514    /// Whether fairness constraints are enabled for retrieval selection.
515    pub request_fairness_enabled: bool,
516    /// Hedged request dispatch policy for retrieval.
517    pub request_dispatch: RequestDispatchConfig,
518    /// Upstream Blossom servers used as adaptive fallback read sources.
519    pub upstream_blossom_servers: Vec<String>,
520}
521
522impl Default for MeshStoreConfig {
523    fn default() -> Self {
524        Self {
525            relays: vec![
526                "wss://temp.iris.to".to_string(),
527                "wss://relay.damus.io".to_string(),
528            ],
529            roots: Vec::new(),
530            request_timeout_ms: 10000,
531            hello_interval_ms: 30000,
532            debug: false,
533            pools: PoolSettings::default(),
534            classifier_tx: None,
535            request_selection_strategy: SelectionStrategy::Weighted,
536            request_fairness_enabled: true,
537            request_dispatch: RequestDispatchConfig {
538                initial_fanout: 2,
539                hedge_fanout: 1,
540                max_fanout: 8,
541                hedge_interval_ms: 120,
542            },
543            upstream_blossom_servers: Vec::new(),
544        }
545    }
546}
547
548/// Connection state for a peer
549#[derive(Debug, Clone, Copy, PartialEq, Eq)]
550pub enum PeerState {
551    /// Initial state
552    New,
553    /// Connecting via signaling.
554    Connecting,
555    /// Negotiated direct link established.
556    Connected,
557    /// Direct peer link open and ready for data.
558    Ready,
559    /// Connection failed or closed
560    Disconnected,
561}
562
563/// Statistics for the default mesh-backed store composition.
564#[derive(Debug, Clone, Default)]
565pub struct MeshStats {
566    pub connected_peers: usize,
567    pub pending_requests: usize,
568    pub bytes_sent: u64,
569    pub bytes_received: u64,
570    pub requests_made: u64,
571    pub requests_fulfilled: u64,
572}
573
574/// Backward-compatible alias for the previous production-specific name.
575pub type WebRTCStats = MeshStats;
576
577/// Nostr event kind for hashtree signaling envelopes (ephemeral, NIP-17 style).
578pub const NOSTR_KIND_HASHTREE: u16 = 25050;
579pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
580
581/// Relayless mesh protocol constants for forwarding signaling events over data channels.
582pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
583pub const MESH_PROTOCOL_VERSION: u8 = 1;
584pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
585pub const MESH_MAX_HTL: u8 = 6;
586
587/// TTL/capacity dedupe structure for forwarded mesh frames/events.
588#[derive(Debug)]
589pub struct TimedSeenSet {
590    entries: HashMap<String, Instant>,
591    order: VecDeque<(String, Instant)>,
592    ttl: Duration,
593    capacity: usize,
594}
595
596impl TimedSeenSet {
597    pub fn new(capacity: usize, ttl: Duration) -> Self {
598        Self {
599            entries: HashMap::new(),
600            order: VecDeque::new(),
601            ttl,
602            capacity,
603        }
604    }
605
606    fn prune(&mut self, now: Instant) {
607        while let Some((key, inserted_at)) = self.order.front().cloned() {
608            if now.duration_since(inserted_at) < self.ttl {
609                break;
610            }
611            self.order.pop_front();
612            if self
613                .entries
614                .get(&key)
615                .map(|ts| *ts == inserted_at)
616                .unwrap_or(false)
617            {
618                self.entries.remove(&key);
619            }
620        }
621
622        while self.entries.len() > self.capacity {
623            if let Some((key, inserted_at)) = self.order.pop_front() {
624                if self
625                    .entries
626                    .get(&key)
627                    .map(|ts| *ts == inserted_at)
628                    .unwrap_or(false)
629                {
630                    self.entries.remove(&key);
631                }
632            } else {
633                break;
634            }
635        }
636    }
637
638    pub fn insert_if_new(&mut self, key: String) -> bool {
639        let now = Instant::now();
640        self.prune(now);
641        if self.entries.contains_key(&key) {
642            return false;
643        }
644        self.entries.insert(key.clone(), now);
645        self.order.push_back((key, now));
646        self.prune(now);
647        true
648    }
649
650    pub fn contains(&mut self, key: &str) -> bool {
651        let now = Instant::now();
652        self.prune(now);
653        self.entries.contains_key(key)
654    }
655
656    pub fn len(&self) -> usize {
657        self.entries.len()
658    }
659
660    pub fn is_empty(&self) -> bool {
661        self.entries.is_empty()
662    }
663}
664
665#[derive(Debug, Clone, Serialize, Deserialize)]
666#[serde(tag = "type")]
667pub enum MeshNostrPayload {
668    #[serde(rename = "EVENT")]
669    Event { event: Event },
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize)]
673pub struct MeshNostrFrame {
674    pub protocol: String,
675    pub version: u8,
676    pub frame_id: String,
677    pub htl: u8,
678    pub sender_peer_id: String,
679    pub payload: MeshNostrPayload,
680}
681
682impl MeshNostrFrame {
683    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
684        Self::new_event_with_id(
685            event,
686            sender_peer_id,
687            &uuid::Uuid::new_v4().to_string(),
688            htl,
689        )
690    }
691
692    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
693        Self {
694            protocol: MESH_PROTOCOL.to_string(),
695            version: MESH_PROTOCOL_VERSION,
696            frame_id: frame_id.to_string(),
697            htl,
698            sender_peer_id: sender_peer_id.to_string(),
699            payload: MeshNostrPayload::Event { event },
700        }
701    }
702
703    pub fn event(&self) -> &Event {
704        match &self.payload {
705            MeshNostrPayload::Event { event } => event,
706        }
707    }
708}
709
710pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
711    if frame.protocol != MESH_PROTOCOL {
712        return Err("invalid protocol");
713    }
714    if frame.version != MESH_PROTOCOL_VERSION {
715        return Err("invalid version");
716    }
717    if frame.frame_id.is_empty() {
718        return Err("missing frame id");
719    }
720    if frame.sender_peer_id.is_empty() {
721        return Err("missing sender peer id");
722    }
723    if frame.sender_peer_id.contains(':') {
724        return Err("invalid sender peer id");
725    }
726    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
727        return Err("invalid htl");
728    }
729
730    let event = frame.event();
731    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
732        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
733    {
734        return Err("unsupported event kind");
735    }
736
737    Ok(())
738}
739
740/// Default data channel label used by the WebRTC peer-link implementation.
741pub const DATA_CHANNEL_LABEL: &str = "hashtree";