Skip to main content

hashtree_network/
types.rs

1//! Mesh transport types for peer-to-peer data exchange.
2//!
3//! Defines signaling frames, negotiated peer-link messages, and shared mesh
4//! constants used across Nostr websocket transports, local buses, direct-link
5//! transports such as WebRTC, and simulation.
6
7use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::mesh_store_core::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16fn default_hash_get_enabled() -> bool {
17    true
18}
19
20/// Unique identifier for a peer in the network
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
22pub struct PeerId {
23    /// Nostr public key / endpoint identity
24    pub pubkey: String,
25}
26
27impl PeerId {
28    pub fn new(pubkey: String) -> Self {
29        Self { pubkey }
30    }
31
32    pub fn to_peer_string(&self) -> String {
33        self.pubkey.clone()
34    }
35
36    pub fn from_peer_string(s: &str) -> Option<Self> {
37        let pubkey = s.trim();
38        if pubkey.is_empty() || pubkey.contains(':') {
39            return None;
40        }
41        Some(Self {
42            pubkey: pubkey.to_string(),
43        })
44    }
45
46    pub fn from_string(s: &str) -> Option<Self> {
47        Self::from_peer_string(s)
48    }
49
50    pub fn short(&self) -> String {
51        self.pubkey[..8.min(self.pubkey.len())].to_string()
52    }
53}
54
55impl std::fmt::Display for PeerId {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.write_str(&self.pubkey)
58    }
59}
60
61/// Signaling message types exchanged over mesh signaling transports.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(tag = "type")]
64pub enum SignalingMessage {
65    /// Initial hello message to discover peers
66    #[serde(rename = "hello")]
67    Hello {
68        #[serde(rename = "peerId")]
69        peer_id: String,
70        roots: Vec<String>,
71        #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
72        hash_get: bool,
73    },
74
75    /// Negotiation offer payload (currently SDP for WebRTC links).
76    #[serde(rename = "offer")]
77    Offer {
78        #[serde(rename = "peerId")]
79        peer_id: String,
80        #[serde(rename = "targetPeerId")]
81        target_peer_id: String,
82        sdp: String,
83    },
84
85    /// Negotiation answer payload (currently SDP for WebRTC links).
86    #[serde(rename = "answer")]
87    Answer {
88        #[serde(rename = "peerId")]
89        peer_id: String,
90        #[serde(rename = "targetPeerId")]
91        target_peer_id: String,
92        sdp: String,
93    },
94
95    /// Single candidate update for transports that need trickle negotiation.
96    #[serde(rename = "candidate")]
97    Candidate {
98        #[serde(rename = "peerId")]
99        peer_id: String,
100        #[serde(rename = "targetPeerId")]
101        target_peer_id: String,
102        candidate: String,
103        #[serde(rename = "sdpMLineIndex")]
104        sdp_m_line_index: Option<u16>,
105        #[serde(rename = "sdpMid")]
106        sdp_mid: Option<String>,
107    },
108
109    /// Batched candidate updates.
110    #[serde(rename = "candidates")]
111    Candidates {
112        #[serde(rename = "peerId")]
113        peer_id: String,
114        #[serde(rename = "targetPeerId")]
115        target_peer_id: String,
116        candidates: Vec<IceCandidate>,
117    },
118}
119
120impl SignalingMessage {
121    /// Get the message type label.
122    pub fn msg_type(&self) -> &str {
123        match self {
124            Self::Hello { .. } => "hello",
125            Self::Offer { .. } => "offer",
126            Self::Answer { .. } => "answer",
127            Self::Candidate { .. } => "candidate",
128            Self::Candidates { .. } => "candidates",
129        }
130    }
131
132    /// Get the sender's peer ID
133    pub fn peer_id(&self) -> &str {
134        match self {
135            Self::Hello { peer_id, .. } => peer_id,
136            Self::Offer { peer_id, .. } => peer_id,
137            Self::Answer { peer_id, .. } => peer_id,
138            Self::Candidate { peer_id, .. } => peer_id,
139            Self::Candidates { peer_id, .. } => peer_id,
140        }
141    }
142
143    /// Get the target peer ID (if applicable)
144    pub fn target_peer_id(&self) -> Option<&str> {
145        match self {
146            Self::Hello { .. } => None, // Broadcast
147            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
148            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
149            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
150            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
151        }
152    }
153
154    /// Check if this message is addressed to a specific peer
155    pub fn is_for(&self, my_peer_id: &str) -> bool {
156        match self.target_peer_id() {
157            Some(target) => target == my_peer_id,
158            None => true, // Broadcasts are for everyone
159        }
160    }
161}
162
163/// Shared concurrent-offer negotiation: determine if we are the "polite" peer.
164///
165/// In this negotiation flow, both peers can send offers simultaneously.
166/// When a collision occurs (we receive an offer while we have a pending offer),
167/// the "polite" peer backs off and accepts the incoming offer instead.
168///
169/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
170///
171/// This pattern ensures connections form even when one peer is "satisfied" -
172/// the unsatisfied peer can still initiate and the satisfied peer will accept.
173#[inline]
174pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
175    // Lower ID is "polite" - they back off on collision
176    local_peer_id < remote_peer_id
177}
178
179/// Candidate metadata for transports that use ICE-style negotiation.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct IceCandidate {
182    pub candidate: String,
183    #[serde(rename = "sdpMLineIndex")]
184    pub sdp_m_line_index: Option<u16>,
185    #[serde(rename = "sdpMid")]
186    pub sdp_mid: Option<String>,
187}
188
189/// Direct peer-link message types for hash-based data exchange.
190#[derive(Debug, Clone, Serialize, Deserialize)]
191#[serde(tag = "type")]
192pub enum DataMessage {
193    /// Request data by hash
194    #[serde(rename = "req")]
195    Request {
196        id: u32,
197        hash: String,
198        /// Hops To Live - decremented on each forward hop
199        /// When htl reaches 0, request is not forwarded further
200        #[serde(skip_serializing_if = "Option::is_none")]
201        htl: Option<u8>,
202    },
203
204    /// Response with data (binary payload follows)
205    #[serde(rename = "res")]
206    Response {
207        id: u32,
208        hash: String,
209        found: bool,
210        #[serde(skip_serializing_if = "Option::is_none")]
211        size: Option<u64>,
212    },
213
214    /// Push data for a hash the peer previously requested but we didn't have
215    /// This happens when we get it later from another peer
216    #[serde(rename = "push")]
217    Push { hash: String },
218
219    /// Announce available hashes
220    #[serde(rename = "have")]
221    Have { hashes: Vec<String> },
222
223    /// Request list of wanted hashes
224    #[serde(rename = "want")]
225    Want { hashes: Vec<String> },
226
227    /// Notify about root hash update
228    #[serde(rename = "root")]
229    RootUpdate {
230        hash: String,
231        #[serde(skip_serializing_if = "Option::is_none")]
232        size: Option<u64>,
233    },
234}
235
236/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
237pub const MAX_HTL: u8 = 10;
238/// Probability to decrement at max HTL (50%)
239pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
240/// Probability to decrement at min HTL=1 (25%)
241pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
242
243/// HTL decrement mode.
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245pub enum HtlMode {
246    Probabilistic,
247}
248
249/// HTL policy parameters for a traffic class.
250#[derive(Debug, Clone, Copy)]
251pub struct HtlPolicy {
252    pub mode: HtlMode,
253    pub max_htl: u8,
254    pub p_at_max: f64,
255    pub p_at_min: f64,
256}
257
258/// Default policy for blob/content requests.
259pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
260    mode: HtlMode::Probabilistic,
261    max_htl: MAX_HTL,
262    p_at_max: DECREMENT_AT_MAX_PROB,
263    p_at_min: DECREMENT_AT_MIN_PROB,
264};
265
266/// Policy for mesh signaling/event forwarding.
267pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
268    mode: HtlMode::Probabilistic,
269    max_htl: 4,
270    p_at_max: 0.75,
271    p_at_min: 0.5,
272};
273
274/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
275/// Generated once per peer connection, stays fixed for connection lifetime
276#[derive(Debug, Clone, Copy)]
277pub struct PeerHTLConfig {
278    /// Random sample used to decide decrement at max HTL.
279    pub at_max_sample: f64,
280    /// Random sample used to decide decrement at min HTL.
281    pub at_min_sample: f64,
282}
283
284impl PeerHTLConfig {
285    /// Generate random HTL config for a new peer connection
286    pub fn random() -> Self {
287        use rand::Rng;
288        let mut rng = rand::thread_rng();
289        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
290    }
291
292    /// Construct config from explicit random samples.
293    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
294        Self {
295            at_max_sample: at_max_sample.clamp(0.0, 1.0),
296            at_min_sample: at_min_sample.clamp(0.0, 1.0),
297        }
298    }
299
300    /// Construct config from explicit decrement choices.
301    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
302        Self {
303            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
304            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
305        }
306    }
307
308    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
309    /// Called when SENDING to a peer, not on receive
310    pub fn decrement(&self, htl: u8) -> u8 {
311        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
312    }
313
314    /// Decrement HTL using the provided traffic policy.
315    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
316        decrement_htl_with_policy(htl, policy, self)
317    }
318}
319
320impl Default for PeerHTLConfig {
321    fn default() -> Self {
322        Self::random()
323    }
324}
325
326/// Decrement HTL according to policy and per-peer randomness profile.
327pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
328    let htl = htl.min(policy.max_htl);
329    if htl == 0 {
330        return 0;
331    }
332
333    match policy.mode {
334        HtlMode::Probabilistic => {
335            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
336            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
337
338            if htl == policy.max_htl {
339                return if config.at_max_sample < p_at_max {
340                    htl.saturating_sub(1)
341                } else {
342                    htl
343                };
344            }
345
346            if htl == 1 {
347                return if config.at_min_sample < p_at_min {
348                    0
349                } else {
350                    htl
351                };
352            }
353
354            htl.saturating_sub(1)
355        }
356    }
357}
358
359/// Check if a request should be forwarded based on HTL.
360pub fn should_forward_htl(htl: u8) -> bool {
361    htl > 0
362}
363
364/// Backward-compatible helper.
365pub fn should_forward(htl: u8) -> bool {
366    should_forward_htl(htl)
367}
368
369use tokio::sync::{mpsc, oneshot};
370
371/// Peer pool classification
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
373pub enum PeerPool {
374    /// Users in social graph (followed or followers)
375    Follows,
376    /// Everyone else
377    Other,
378}
379
380/// Settings for a peer pool
381#[derive(Debug, Clone, Copy)]
382pub struct PoolConfig {
383    /// Maximum connections in this pool
384    pub max_connections: usize,
385    /// Number of connections to consider "satisfied"
386    pub satisfied_connections: usize,
387}
388
389impl PoolConfig {
390    /// Check if we can accept more peers (below max)
391    #[inline]
392    pub fn can_accept(&self, current_count: usize) -> bool {
393        current_count < self.max_connections
394    }
395
396    /// Check if we need more peers (below satisfied)
397    #[inline]
398    pub fn needs_peers(&self, current_count: usize) -> bool {
399        current_count < self.satisfied_connections
400    }
401
402    /// Check if we're satisfied (at or above satisfied threshold)
403    #[inline]
404    pub fn is_satisfied(&self, current_count: usize) -> bool {
405        current_count >= self.satisfied_connections
406    }
407}
408
409impl Default for PoolConfig {
410    fn default() -> Self {
411        Self {
412            max_connections: 16,
413            satisfied_connections: 8,
414        }
415    }
416}
417
418/// Pool settings for both pools
419#[derive(Debug, Clone)]
420pub struct PoolSettings {
421    pub follows: PoolConfig,
422    pub other: PoolConfig,
423}
424
425impl Default for PoolSettings {
426    fn default() -> Self {
427        Self {
428            follows: PoolConfig {
429                max_connections: 16,
430                satisfied_connections: 8,
431            },
432            other: PoolConfig {
433                max_connections: 16,
434                satisfied_connections: 8,
435            },
436        }
437    }
438}
439
440/// Request to classify a peer by pubkey
441pub struct ClassifyRequest {
442    /// Pubkey to classify (hex)
443    pub pubkey: String,
444    /// Channel to send result back
445    pub response: oneshot::Sender<PeerPool>,
446}
447
448/// Sender for peer classification requests
449pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
450
451/// Receiver for peer classification requests (implement this to provide classification)
452pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
453
454/// Create a classifier channel pair
455pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
456    mpsc::channel(buffer)
457}
458
459/// Configuration for the default mesh-backed store composition.
460#[derive(Clone)]
461pub struct MeshStoreConfig {
462    /// Nostr relays for signaling
463    pub relays: Vec<String>,
464    /// Root hashes to advertise
465    pub roots: Vec<Hash>,
466    /// Timeout for data requests (ms)
467    pub request_timeout_ms: u64,
468    /// Interval for sending hello messages (ms)
469    pub hello_interval_ms: u64,
470    /// Enable verbose logging
471    pub debug: bool,
472    /// Pool settings for follows and other peers
473    pub pools: PoolSettings,
474    /// Channel for peer classification (optional)
475    /// If None, all peers go to "Other" pool
476    pub classifier_tx: Option<ClassifierTx>,
477    /// Retrieval peer selection strategy (shared with MeshStoreCore/CLI/sim).
478    pub request_selection_strategy: SelectionStrategy,
479    /// Whether fairness constraints are enabled for retrieval selection.
480    pub request_fairness_enabled: bool,
481    /// Hedged request dispatch policy for retrieval.
482    pub request_dispatch: RequestDispatchConfig,
483    /// Upstream Blossom servers used as adaptive fallback read sources.
484    pub upstream_blossom_servers: Vec<String>,
485}
486
487impl Default for MeshStoreConfig {
488    fn default() -> Self {
489        Self {
490            relays: vec![
491                "wss://temp.iris.to".to_string(),
492                "wss://relay.damus.io".to_string(),
493            ],
494            roots: Vec::new(),
495            request_timeout_ms: 10000,
496            hello_interval_ms: 30000,
497            debug: false,
498            pools: PoolSettings::default(),
499            classifier_tx: None,
500            request_selection_strategy: SelectionStrategy::Weighted,
501            request_fairness_enabled: true,
502            request_dispatch: RequestDispatchConfig {
503                initial_fanout: 2,
504                hedge_fanout: 1,
505                max_fanout: 8,
506                hedge_interval_ms: 120,
507            },
508            upstream_blossom_servers: Vec::new(),
509        }
510    }
511}
512
513/// Connection state for a peer
514#[derive(Debug, Clone, Copy, PartialEq, Eq)]
515pub enum PeerState {
516    /// Initial state
517    New,
518    /// Connecting via signaling.
519    Connecting,
520    /// Negotiated direct link established.
521    Connected,
522    /// Direct peer link open and ready for data.
523    Ready,
524    /// Connection failed or closed
525    Disconnected,
526}
527
528/// Statistics for the default mesh-backed store composition.
529#[derive(Debug, Clone, Default)]
530pub struct MeshStats {
531    pub connected_peers: usize,
532    pub pending_requests: usize,
533    pub bytes_sent: u64,
534    pub bytes_received: u64,
535    pub requests_made: u64,
536    pub requests_fulfilled: u64,
537}
538
539/// Backward-compatible alias for the previous production-specific name.
540pub type WebRTCStats = MeshStats;
541
542/// Nostr event kind for hashtree signaling envelopes (ephemeral, NIP-17 style).
543pub const NOSTR_KIND_HASHTREE: u16 = 25050;
544pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
545
546/// Relayless mesh protocol constants for forwarding signaling events over data channels.
547pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
548pub const MESH_PROTOCOL_VERSION: u8 = 1;
549pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
550pub const MESH_MAX_HTL: u8 = 6;
551
552/// TTL/capacity dedupe structure for forwarded mesh frames/events.
553#[derive(Debug)]
554pub struct TimedSeenSet {
555    entries: HashMap<String, Instant>,
556    order: VecDeque<(String, Instant)>,
557    ttl: Duration,
558    capacity: usize,
559}
560
561impl TimedSeenSet {
562    pub fn new(capacity: usize, ttl: Duration) -> Self {
563        Self {
564            entries: HashMap::new(),
565            order: VecDeque::new(),
566            ttl,
567            capacity,
568        }
569    }
570
571    fn prune(&mut self, now: Instant) {
572        while let Some((key, inserted_at)) = self.order.front().cloned() {
573            if now.duration_since(inserted_at) < self.ttl {
574                break;
575            }
576            self.order.pop_front();
577            if self
578                .entries
579                .get(&key)
580                .map(|ts| *ts == inserted_at)
581                .unwrap_or(false)
582            {
583                self.entries.remove(&key);
584            }
585        }
586
587        while self.entries.len() > self.capacity {
588            if let Some((key, inserted_at)) = self.order.pop_front() {
589                if self
590                    .entries
591                    .get(&key)
592                    .map(|ts| *ts == inserted_at)
593                    .unwrap_or(false)
594                {
595                    self.entries.remove(&key);
596                }
597            } else {
598                break;
599            }
600        }
601    }
602
603    pub fn insert_if_new(&mut self, key: String) -> bool {
604        let now = Instant::now();
605        self.prune(now);
606        if self.entries.contains_key(&key) {
607            return false;
608        }
609        self.entries.insert(key.clone(), now);
610        self.order.push_back((key, now));
611        self.prune(now);
612        true
613    }
614
615    pub fn len(&self) -> usize {
616        self.entries.len()
617    }
618
619    pub fn is_empty(&self) -> bool {
620        self.entries.is_empty()
621    }
622}
623
624#[derive(Debug, Clone, Serialize, Deserialize)]
625#[serde(tag = "type")]
626pub enum MeshNostrPayload {
627    #[serde(rename = "EVENT")]
628    Event { event: Event },
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct MeshNostrFrame {
633    pub protocol: String,
634    pub version: u8,
635    pub frame_id: String,
636    pub htl: u8,
637    pub sender_peer_id: String,
638    pub payload: MeshNostrPayload,
639}
640
641impl MeshNostrFrame {
642    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
643        Self::new_event_with_id(
644            event,
645            sender_peer_id,
646            &uuid::Uuid::new_v4().to_string(),
647            htl,
648        )
649    }
650
651    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
652        Self {
653            protocol: MESH_PROTOCOL.to_string(),
654            version: MESH_PROTOCOL_VERSION,
655            frame_id: frame_id.to_string(),
656            htl,
657            sender_peer_id: sender_peer_id.to_string(),
658            payload: MeshNostrPayload::Event { event },
659        }
660    }
661
662    pub fn event(&self) -> &Event {
663        match &self.payload {
664            MeshNostrPayload::Event { event } => event,
665        }
666    }
667}
668
669pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
670    if frame.protocol != MESH_PROTOCOL {
671        return Err("invalid protocol");
672    }
673    if frame.version != MESH_PROTOCOL_VERSION {
674        return Err("invalid version");
675    }
676    if frame.frame_id.is_empty() {
677        return Err("missing frame id");
678    }
679    if frame.sender_peer_id.is_empty() {
680        return Err("missing sender peer id");
681    }
682    if frame.sender_peer_id.contains(':') {
683        return Err("invalid sender peer id");
684    }
685    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
686        return Err("invalid htl");
687    }
688
689    let event = frame.event();
690    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
691        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
692    {
693        return Err("unsupported event kind");
694    }
695
696    Ok(())
697}
698
699/// Default data channel label used by the WebRTC peer-link implementation.
700pub const DATA_CHANNEL_LABEL: &str = "hashtree";