Skip to main content

hashtree_network/
types.rs

1//! Mesh transport types for peer-to-peer data exchange.
2//!
3//! Defines signaling frames, negotiated peer-link messages, and shared mesh
4//! constants used across Nostr websocket transports, local buses, direct-link
5//! transports such as WebRTC, and simulation.
6
7use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::mesh_store_core::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16/// Unique identifier for a peer in the network
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct PeerId {
19    /// Nostr public key / endpoint identity
20    pub pubkey: String,
21}
22
23impl PeerId {
24    pub fn new(pubkey: String) -> Self {
25        Self { pubkey }
26    }
27
28    pub fn to_peer_string(&self) -> String {
29        self.pubkey.clone()
30    }
31
32    pub fn from_peer_string(s: &str) -> Option<Self> {
33        let pubkey = s.trim();
34        if pubkey.is_empty() || pubkey.contains(':') {
35            return None;
36        }
37        Some(Self {
38            pubkey: pubkey.to_string(),
39        })
40    }
41
42    pub fn from_string(s: &str) -> Option<Self> {
43        Self::from_peer_string(s)
44    }
45
46    pub fn short(&self) -> String {
47        self.pubkey[..8.min(self.pubkey.len())].to_string()
48    }
49}
50
51impl std::fmt::Display for PeerId {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.write_str(&self.pubkey)
54    }
55}
56
57/// Signaling message types exchanged over mesh signaling transports.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "type")]
60pub enum SignalingMessage {
61    /// Initial hello message to discover peers
62    #[serde(rename = "hello")]
63    Hello {
64        #[serde(rename = "peerId")]
65        peer_id: String,
66        roots: Vec<String>,
67    },
68
69    /// Negotiation offer payload (currently SDP for WebRTC links).
70    #[serde(rename = "offer")]
71    Offer {
72        #[serde(rename = "peerId")]
73        peer_id: String,
74        #[serde(rename = "targetPeerId")]
75        target_peer_id: String,
76        sdp: String,
77    },
78
79    /// Negotiation answer payload (currently SDP for WebRTC links).
80    #[serde(rename = "answer")]
81    Answer {
82        #[serde(rename = "peerId")]
83        peer_id: String,
84        #[serde(rename = "targetPeerId")]
85        target_peer_id: String,
86        sdp: String,
87    },
88
89    /// Single candidate update for transports that need trickle negotiation.
90    #[serde(rename = "candidate")]
91    Candidate {
92        #[serde(rename = "peerId")]
93        peer_id: String,
94        #[serde(rename = "targetPeerId")]
95        target_peer_id: String,
96        candidate: String,
97        #[serde(rename = "sdpMLineIndex")]
98        sdp_m_line_index: Option<u16>,
99        #[serde(rename = "sdpMid")]
100        sdp_mid: Option<String>,
101    },
102
103    /// Batched candidate updates.
104    #[serde(rename = "candidates")]
105    Candidates {
106        #[serde(rename = "peerId")]
107        peer_id: String,
108        #[serde(rename = "targetPeerId")]
109        target_peer_id: String,
110        candidates: Vec<IceCandidate>,
111    },
112}
113
114impl SignalingMessage {
115    /// Get the message type label.
116    pub fn msg_type(&self) -> &str {
117        match self {
118            Self::Hello { .. } => "hello",
119            Self::Offer { .. } => "offer",
120            Self::Answer { .. } => "answer",
121            Self::Candidate { .. } => "candidate",
122            Self::Candidates { .. } => "candidates",
123        }
124    }
125
126    /// Get the sender's peer ID
127    pub fn peer_id(&self) -> &str {
128        match self {
129            Self::Hello { peer_id, .. } => peer_id,
130            Self::Offer { peer_id, .. } => peer_id,
131            Self::Answer { peer_id, .. } => peer_id,
132            Self::Candidate { peer_id, .. } => peer_id,
133            Self::Candidates { peer_id, .. } => peer_id,
134        }
135    }
136
137    /// Get the target peer ID (if applicable)
138    pub fn target_peer_id(&self) -> Option<&str> {
139        match self {
140            Self::Hello { .. } => None, // Broadcast
141            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
142            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
143            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
144            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
145        }
146    }
147
148    /// Check if this message is addressed to a specific peer
149    pub fn is_for(&self, my_peer_id: &str) -> bool {
150        match self.target_peer_id() {
151            Some(target) => target == my_peer_id,
152            None => true, // Broadcasts are for everyone
153        }
154    }
155}
156
157/// Shared concurrent-offer negotiation: determine if we are the "polite" peer.
158///
159/// In this negotiation flow, both peers can send offers simultaneously.
160/// When a collision occurs (we receive an offer while we have a pending offer),
161/// the "polite" peer backs off and accepts the incoming offer instead.
162///
163/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
164///
165/// This pattern ensures connections form even when one peer is "satisfied" -
166/// the unsatisfied peer can still initiate and the satisfied peer will accept.
167#[inline]
168pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
169    // Lower ID is "polite" - they back off on collision
170    local_peer_id < remote_peer_id
171}
172
173/// Candidate metadata for transports that use ICE-style negotiation.
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct IceCandidate {
176    pub candidate: String,
177    #[serde(rename = "sdpMLineIndex")]
178    pub sdp_m_line_index: Option<u16>,
179    #[serde(rename = "sdpMid")]
180    pub sdp_mid: Option<String>,
181}
182
183/// Direct peer-link message types for hash-based data exchange.
184#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(tag = "type")]
186pub enum DataMessage {
187    /// Request data by hash
188    #[serde(rename = "req")]
189    Request {
190        id: u32,
191        hash: String,
192        /// Hops To Live - decremented on each forward hop
193        /// When htl reaches 0, request is not forwarded further
194        #[serde(skip_serializing_if = "Option::is_none")]
195        htl: Option<u8>,
196    },
197
198    /// Response with data (binary payload follows)
199    #[serde(rename = "res")]
200    Response {
201        id: u32,
202        hash: String,
203        found: bool,
204        #[serde(skip_serializing_if = "Option::is_none")]
205        size: Option<u64>,
206    },
207
208    /// Push data for a hash the peer previously requested but we didn't have
209    /// This happens when we get it later from another peer
210    #[serde(rename = "push")]
211    Push { hash: String },
212
213    /// Announce available hashes
214    #[serde(rename = "have")]
215    Have { hashes: Vec<String> },
216
217    /// Request list of wanted hashes
218    #[serde(rename = "want")]
219    Want { hashes: Vec<String> },
220
221    /// Notify about root hash update
222    #[serde(rename = "root")]
223    RootUpdate {
224        hash: String,
225        #[serde(skip_serializing_if = "Option::is_none")]
226        size: Option<u64>,
227    },
228}
229
230/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
231pub const MAX_HTL: u8 = 10;
232/// Probability to decrement at max HTL (50%)
233pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
234/// Probability to decrement at min HTL=1 (25%)
235pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
236
237/// HTL decrement mode.
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub enum HtlMode {
240    Probabilistic,
241}
242
243/// HTL policy parameters for a traffic class.
244#[derive(Debug, Clone, Copy)]
245pub struct HtlPolicy {
246    pub mode: HtlMode,
247    pub max_htl: u8,
248    pub p_at_max: f64,
249    pub p_at_min: f64,
250}
251
252/// Default policy for blob/content requests.
253pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
254    mode: HtlMode::Probabilistic,
255    max_htl: MAX_HTL,
256    p_at_max: DECREMENT_AT_MAX_PROB,
257    p_at_min: DECREMENT_AT_MIN_PROB,
258};
259
260/// Policy for mesh signaling/event forwarding.
261pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
262    mode: HtlMode::Probabilistic,
263    max_htl: 4,
264    p_at_max: 0.75,
265    p_at_min: 0.5,
266};
267
268/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
269/// Generated once per peer connection, stays fixed for connection lifetime
270#[derive(Debug, Clone, Copy)]
271pub struct PeerHTLConfig {
272    /// Random sample used to decide decrement at max HTL.
273    pub at_max_sample: f64,
274    /// Random sample used to decide decrement at min HTL.
275    pub at_min_sample: f64,
276}
277
278impl PeerHTLConfig {
279    /// Generate random HTL config for a new peer connection
280    pub fn random() -> Self {
281        use rand::Rng;
282        let mut rng = rand::thread_rng();
283        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
284    }
285
286    /// Construct config from explicit random samples.
287    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
288        Self {
289            at_max_sample: at_max_sample.clamp(0.0, 1.0),
290            at_min_sample: at_min_sample.clamp(0.0, 1.0),
291        }
292    }
293
294    /// Construct config from explicit decrement choices.
295    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
296        Self {
297            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
298            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
299        }
300    }
301
302    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
303    /// Called when SENDING to a peer, not on receive
304    pub fn decrement(&self, htl: u8) -> u8 {
305        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
306    }
307
308    /// Decrement HTL using the provided traffic policy.
309    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
310        decrement_htl_with_policy(htl, policy, self)
311    }
312}
313
314impl Default for PeerHTLConfig {
315    fn default() -> Self {
316        Self::random()
317    }
318}
319
320/// Decrement HTL according to policy and per-peer randomness profile.
321pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
322    let htl = htl.min(policy.max_htl);
323    if htl == 0 {
324        return 0;
325    }
326
327    match policy.mode {
328        HtlMode::Probabilistic => {
329            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
330            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
331
332            if htl == policy.max_htl {
333                return if config.at_max_sample < p_at_max {
334                    htl.saturating_sub(1)
335                } else {
336                    htl
337                };
338            }
339
340            if htl == 1 {
341                return if config.at_min_sample < p_at_min {
342                    0
343                } else {
344                    htl
345                };
346            }
347
348            htl.saturating_sub(1)
349        }
350    }
351}
352
353/// Check if a request should be forwarded based on HTL.
354pub fn should_forward_htl(htl: u8) -> bool {
355    htl > 0
356}
357
358/// Backward-compatible helper.
359pub fn should_forward(htl: u8) -> bool {
360    should_forward_htl(htl)
361}
362
363use tokio::sync::{mpsc, oneshot};
364
365/// Peer pool classification
366#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
367pub enum PeerPool {
368    /// Users in social graph (followed or followers)
369    Follows,
370    /// Everyone else
371    Other,
372}
373
374/// Settings for a peer pool
375#[derive(Debug, Clone, Copy)]
376pub struct PoolConfig {
377    /// Maximum connections in this pool
378    pub max_connections: usize,
379    /// Number of connections to consider "satisfied"
380    pub satisfied_connections: usize,
381}
382
383impl PoolConfig {
384    /// Check if we can accept more peers (below max)
385    #[inline]
386    pub fn can_accept(&self, current_count: usize) -> bool {
387        current_count < self.max_connections
388    }
389
390    /// Check if we need more peers (below satisfied)
391    #[inline]
392    pub fn needs_peers(&self, current_count: usize) -> bool {
393        current_count < self.satisfied_connections
394    }
395
396    /// Check if we're satisfied (at or above satisfied threshold)
397    #[inline]
398    pub fn is_satisfied(&self, current_count: usize) -> bool {
399        current_count >= self.satisfied_connections
400    }
401}
402
403impl Default for PoolConfig {
404    fn default() -> Self {
405        Self {
406            max_connections: 16,
407            satisfied_connections: 8,
408        }
409    }
410}
411
412/// Pool settings for both pools
413#[derive(Debug, Clone)]
414pub struct PoolSettings {
415    pub follows: PoolConfig,
416    pub other: PoolConfig,
417}
418
419impl Default for PoolSettings {
420    fn default() -> Self {
421        Self {
422            follows: PoolConfig {
423                max_connections: 16,
424                satisfied_connections: 8,
425            },
426            other: PoolConfig {
427                max_connections: 16,
428                satisfied_connections: 8,
429            },
430        }
431    }
432}
433
434/// Request to classify a peer by pubkey
435pub struct ClassifyRequest {
436    /// Pubkey to classify (hex)
437    pub pubkey: String,
438    /// Channel to send result back
439    pub response: oneshot::Sender<PeerPool>,
440}
441
442/// Sender for peer classification requests
443pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
444
445/// Receiver for peer classification requests (implement this to provide classification)
446pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
447
448/// Create a classifier channel pair
449pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
450    mpsc::channel(buffer)
451}
452
453/// Configuration for the default mesh-backed store composition.
454#[derive(Clone)]
455pub struct MeshStoreConfig {
456    /// Nostr relays for signaling
457    pub relays: Vec<String>,
458    /// Root hashes to advertise
459    pub roots: Vec<Hash>,
460    /// Timeout for data requests (ms)
461    pub request_timeout_ms: u64,
462    /// Interval for sending hello messages (ms)
463    pub hello_interval_ms: u64,
464    /// Enable verbose logging
465    pub debug: bool,
466    /// Pool settings for follows and other peers
467    pub pools: PoolSettings,
468    /// Channel for peer classification (optional)
469    /// If None, all peers go to "Other" pool
470    pub classifier_tx: Option<ClassifierTx>,
471    /// Retrieval peer selection strategy (shared with MeshStoreCore/CLI/sim).
472    pub request_selection_strategy: SelectionStrategy,
473    /// Whether fairness constraints are enabled for retrieval selection.
474    pub request_fairness_enabled: bool,
475    /// Hedged request dispatch policy for retrieval.
476    pub request_dispatch: RequestDispatchConfig,
477}
478
479impl Default for MeshStoreConfig {
480    fn default() -> Self {
481        Self {
482            relays: vec![
483                "wss://temp.iris.to".to_string(),
484                "wss://relay.damus.io".to_string(),
485            ],
486            roots: Vec::new(),
487            request_timeout_ms: 10000,
488            hello_interval_ms: 30000,
489            debug: false,
490            pools: PoolSettings::default(),
491            classifier_tx: None,
492            request_selection_strategy: SelectionStrategy::TitForTat,
493            request_fairness_enabled: true,
494            request_dispatch: RequestDispatchConfig {
495                initial_fanout: 2,
496                hedge_fanout: 1,
497                max_fanout: 8,
498                hedge_interval_ms: 120,
499            },
500        }
501    }
502}
503
504/// Connection state for a peer
505#[derive(Debug, Clone, Copy, PartialEq, Eq)]
506pub enum PeerState {
507    /// Initial state
508    New,
509    /// Connecting via signaling.
510    Connecting,
511    /// Negotiated direct link established.
512    Connected,
513    /// Direct peer link open and ready for data.
514    Ready,
515    /// Connection failed or closed
516    Disconnected,
517}
518
519/// Statistics for the default mesh-backed store composition.
520#[derive(Debug, Clone, Default)]
521pub struct MeshStats {
522    pub connected_peers: usize,
523    pub pending_requests: usize,
524    pub bytes_sent: u64,
525    pub bytes_received: u64,
526    pub requests_made: u64,
527    pub requests_fulfilled: u64,
528}
529
530/// Backward-compatible alias for the previous production-specific name.
531pub type WebRTCStats = MeshStats;
532
533/// Nostr event kind for hashtree signaling envelopes (ephemeral, NIP-17 style).
534pub const NOSTR_KIND_HASHTREE: u16 = 25050;
535pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
536
537/// Relayless mesh protocol constants for forwarding signaling events over data channels.
538pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
539pub const MESH_PROTOCOL_VERSION: u8 = 1;
540pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
541pub const MESH_MAX_HTL: u8 = 6;
542
543/// TTL/capacity dedupe structure for forwarded mesh frames/events.
544#[derive(Debug)]
545pub struct TimedSeenSet {
546    entries: HashMap<String, Instant>,
547    order: VecDeque<(String, Instant)>,
548    ttl: Duration,
549    capacity: usize,
550}
551
552impl TimedSeenSet {
553    pub fn new(capacity: usize, ttl: Duration) -> Self {
554        Self {
555            entries: HashMap::new(),
556            order: VecDeque::new(),
557            ttl,
558            capacity,
559        }
560    }
561
562    fn prune(&mut self, now: Instant) {
563        while let Some((key, inserted_at)) = self.order.front().cloned() {
564            if now.duration_since(inserted_at) < self.ttl {
565                break;
566            }
567            self.order.pop_front();
568            if self
569                .entries
570                .get(&key)
571                .map(|ts| *ts == inserted_at)
572                .unwrap_or(false)
573            {
574                self.entries.remove(&key);
575            }
576        }
577
578        while self.entries.len() > self.capacity {
579            if let Some((key, inserted_at)) = self.order.pop_front() {
580                if self
581                    .entries
582                    .get(&key)
583                    .map(|ts| *ts == inserted_at)
584                    .unwrap_or(false)
585                {
586                    self.entries.remove(&key);
587                }
588            } else {
589                break;
590            }
591        }
592    }
593
594    pub fn insert_if_new(&mut self, key: String) -> bool {
595        let now = Instant::now();
596        self.prune(now);
597        if self.entries.contains_key(&key) {
598            return false;
599        }
600        self.entries.insert(key.clone(), now);
601        self.order.push_back((key, now));
602        self.prune(now);
603        true
604    }
605
606    pub fn len(&self) -> usize {
607        self.entries.len()
608    }
609
610    pub fn is_empty(&self) -> bool {
611        self.entries.is_empty()
612    }
613}
614
615#[derive(Debug, Clone, Serialize, Deserialize)]
616#[serde(tag = "type")]
617pub enum MeshNostrPayload {
618    #[serde(rename = "EVENT")]
619    Event { event: Event },
620}
621
622#[derive(Debug, Clone, Serialize, Deserialize)]
623pub struct MeshNostrFrame {
624    pub protocol: String,
625    pub version: u8,
626    pub frame_id: String,
627    pub htl: u8,
628    pub sender_peer_id: String,
629    pub payload: MeshNostrPayload,
630}
631
632impl MeshNostrFrame {
633    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
634        Self::new_event_with_id(
635            event,
636            sender_peer_id,
637            &uuid::Uuid::new_v4().to_string(),
638            htl,
639        )
640    }
641
642    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
643        Self {
644            protocol: MESH_PROTOCOL.to_string(),
645            version: MESH_PROTOCOL_VERSION,
646            frame_id: frame_id.to_string(),
647            htl,
648            sender_peer_id: sender_peer_id.to_string(),
649            payload: MeshNostrPayload::Event { event },
650        }
651    }
652
653    pub fn event(&self) -> &Event {
654        match &self.payload {
655            MeshNostrPayload::Event { event } => event,
656        }
657    }
658}
659
660pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
661    if frame.protocol != MESH_PROTOCOL {
662        return Err("invalid protocol");
663    }
664    if frame.version != MESH_PROTOCOL_VERSION {
665        return Err("invalid version");
666    }
667    if frame.frame_id.is_empty() {
668        return Err("missing frame id");
669    }
670    if frame.sender_peer_id.is_empty() {
671        return Err("missing sender peer id");
672    }
673    if frame.sender_peer_id.contains(':') {
674        return Err("invalid sender peer id");
675    }
676    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
677        return Err("invalid htl");
678    }
679
680    let event = frame.event();
681    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
682        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
683    {
684        return Err("unsupported event kind");
685    }
686
687    Ok(())
688}
689
690/// Default data channel label used by the WebRTC peer-link implementation.
691pub const DATA_CHANNEL_LABEL: &str = "hashtree";