Skip to main content

hashtree_webrtc/
types.rs

1//! Mesh transport types for peer-to-peer data exchange.
2//!
3//! Defines signaling frames, negotiated peer-link messages, and shared mesh
4//! constants used across Nostr websocket transports, local buses, WebRTC, and
5//! simulation.
6
7use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::generic_store::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16/// Unique identifier for a peer in the network
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct PeerId {
19    /// Nostr public key (hex encoded)
20    pub pubkey: String,
21    /// Unique session identifier
22    pub uuid: String,
23}
24
25impl PeerId {
26    pub fn new(pubkey: String, uuid: String) -> Self {
27        Self { pubkey, uuid }
28    }
29
30    pub fn to_peer_string(&self) -> String {
31        format!("{}:{}", self.pubkey, self.uuid)
32    }
33
34    pub fn from_peer_string(s: &str) -> Option<Self> {
35        let parts: Vec<&str> = s.split(':').collect();
36        if parts.len() == 2 {
37            Some(Self {
38                pubkey: parts[0].to_string(),
39                uuid: parts[1].to_string(),
40            })
41        } else {
42            None
43        }
44    }
45}
46
47/// Signaling message types exchanged over mesh signaling transports.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "type")]
50pub enum SignalingMessage {
51    /// Initial hello message to discover peers
52    #[serde(rename = "hello")]
53    Hello {
54        #[serde(rename = "peerId")]
55        peer_id: String,
56        roots: Vec<String>,
57    },
58
59    /// Negotiation offer payload (currently SDP for WebRTC links).
60    #[serde(rename = "offer")]
61    Offer {
62        #[serde(rename = "peerId")]
63        peer_id: String,
64        #[serde(rename = "targetPeerId")]
65        target_peer_id: String,
66        sdp: String,
67    },
68
69    /// Negotiation answer payload (currently SDP for WebRTC links).
70    #[serde(rename = "answer")]
71    Answer {
72        #[serde(rename = "peerId")]
73        peer_id: String,
74        #[serde(rename = "targetPeerId")]
75        target_peer_id: String,
76        sdp: String,
77    },
78
79    /// Single candidate update for transports that need trickle negotiation.
80    #[serde(rename = "candidate")]
81    Candidate {
82        #[serde(rename = "peerId")]
83        peer_id: String,
84        #[serde(rename = "targetPeerId")]
85        target_peer_id: String,
86        candidate: String,
87        #[serde(rename = "sdpMLineIndex")]
88        sdp_m_line_index: Option<u16>,
89        #[serde(rename = "sdpMid")]
90        sdp_mid: Option<String>,
91    },
92
93    /// Batched candidate updates.
94    #[serde(rename = "candidates")]
95    Candidates {
96        #[serde(rename = "peerId")]
97        peer_id: String,
98        #[serde(rename = "targetPeerId")]
99        target_peer_id: String,
100        candidates: Vec<IceCandidate>,
101    },
102}
103
104impl SignalingMessage {
105    /// Get the sender's peer ID
106    pub fn peer_id(&self) -> &str {
107        match self {
108            Self::Hello { peer_id, .. } => peer_id,
109            Self::Offer { peer_id, .. } => peer_id,
110            Self::Answer { peer_id, .. } => peer_id,
111            Self::Candidate { peer_id, .. } => peer_id,
112            Self::Candidates { peer_id, .. } => peer_id,
113        }
114    }
115
116    /// Get the target peer ID (if applicable)
117    pub fn target_peer_id(&self) -> Option<&str> {
118        match self {
119            Self::Hello { .. } => None, // Broadcast
120            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
121            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
122            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
123            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
124        }
125    }
126
127    /// Check if this message is addressed to a specific peer
128    pub fn is_for(&self, my_peer_id: &str) -> bool {
129        match self.target_peer_id() {
130            Some(target) => target == my_peer_id,
131            None => true, // Broadcasts are for everyone
132        }
133    }
134}
135
136/// Perfect negotiation: determine if we are the "polite" peer
137///
138/// In perfect negotiation, both peers can send offers simultaneously.
139/// When a collision occurs (we receive an offer while we have a pending offer),
140/// the "polite" peer backs off and accepts the incoming offer instead.
141///
142/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
143///
144/// This pattern ensures connections form even when one peer is "satisfied" -
145/// the unsatisfied peer can still initiate and the satisfied peer will accept.
146#[inline]
147pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
148    // Lower ID is "polite" - they back off on collision
149    local_peer_id < remote_peer_id
150}
151
152/// Candidate metadata for transports that use ICE-style negotiation.
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct IceCandidate {
155    pub candidate: String,
156    #[serde(rename = "sdpMLineIndex")]
157    pub sdp_m_line_index: Option<u16>,
158    #[serde(rename = "sdpMid")]
159    pub sdp_mid: Option<String>,
160}
161
162/// Direct peer-link message types for hash-based data exchange.
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(tag = "type")]
165pub enum DataMessage {
166    /// Request data by hash
167    #[serde(rename = "req")]
168    Request {
169        id: u32,
170        hash: String,
171        /// Hops To Live - decremented on each forward hop
172        /// When htl reaches 0, request is not forwarded further
173        #[serde(skip_serializing_if = "Option::is_none")]
174        htl: Option<u8>,
175    },
176
177    /// Response with data (binary payload follows)
178    #[serde(rename = "res")]
179    Response {
180        id: u32,
181        hash: String,
182        found: bool,
183        #[serde(skip_serializing_if = "Option::is_none")]
184        size: Option<u64>,
185    },
186
187    /// Push data for a hash the peer previously requested but we didn't have
188    /// This happens when we get it later from another peer
189    #[serde(rename = "push")]
190    Push { hash: String },
191
192    /// Announce available hashes
193    #[serde(rename = "have")]
194    Have { hashes: Vec<String> },
195
196    /// Request list of wanted hashes
197    #[serde(rename = "want")]
198    Want { hashes: Vec<String> },
199
200    /// Notify about root hash update
201    #[serde(rename = "root")]
202    RootUpdate {
203        hash: String,
204        #[serde(skip_serializing_if = "Option::is_none")]
205        size: Option<u64>,
206    },
207}
208
209/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
210pub const MAX_HTL: u8 = 10;
211/// Probability to decrement at max HTL (50%)
212pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
213/// Probability to decrement at min HTL=1 (25%)
214pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
215
216/// HTL decrement mode.
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub enum HtlMode {
219    Probabilistic,
220}
221
222/// HTL policy parameters for a traffic class.
223#[derive(Debug, Clone, Copy)]
224pub struct HtlPolicy {
225    pub mode: HtlMode,
226    pub max_htl: u8,
227    pub p_at_max: f64,
228    pub p_at_min: f64,
229}
230
231/// Default policy for blob/content requests.
232pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
233    mode: HtlMode::Probabilistic,
234    max_htl: MAX_HTL,
235    p_at_max: DECREMENT_AT_MAX_PROB,
236    p_at_min: DECREMENT_AT_MIN_PROB,
237};
238
239/// Policy for mesh signaling/event forwarding.
240pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
241    mode: HtlMode::Probabilistic,
242    max_htl: 4,
243    p_at_max: 0.75,
244    p_at_min: 0.5,
245};
246
247/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
248/// Generated once per peer connection, stays fixed for connection lifetime
249#[derive(Debug, Clone, Copy)]
250pub struct PeerHTLConfig {
251    /// Random sample used to decide decrement at max HTL.
252    pub at_max_sample: f64,
253    /// Random sample used to decide decrement at min HTL.
254    pub at_min_sample: f64,
255}
256
257impl PeerHTLConfig {
258    /// Generate random HTL config for a new peer connection
259    pub fn random() -> Self {
260        use rand::Rng;
261        let mut rng = rand::thread_rng();
262        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
263    }
264
265    /// Construct config from explicit random samples.
266    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
267        Self {
268            at_max_sample: at_max_sample.clamp(0.0, 1.0),
269            at_min_sample: at_min_sample.clamp(0.0, 1.0),
270        }
271    }
272
273    /// Construct config from explicit decrement choices.
274    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
275        Self {
276            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
277            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
278        }
279    }
280
281    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
282    /// Called when SENDING to a peer, not on receive
283    pub fn decrement(&self, htl: u8) -> u8 {
284        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
285    }
286
287    /// Decrement HTL using the provided traffic policy.
288    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
289        decrement_htl_with_policy(htl, policy, self)
290    }
291}
292
293impl Default for PeerHTLConfig {
294    fn default() -> Self {
295        Self::random()
296    }
297}
298
299/// Decrement HTL according to policy and per-peer randomness profile.
300pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
301    let htl = htl.min(policy.max_htl);
302    if htl == 0 {
303        return 0;
304    }
305
306    match policy.mode {
307        HtlMode::Probabilistic => {
308            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
309            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
310
311            if htl == policy.max_htl {
312                return if config.at_max_sample < p_at_max {
313                    htl.saturating_sub(1)
314                } else {
315                    htl
316                };
317            }
318
319            if htl == 1 {
320                return if config.at_min_sample < p_at_min {
321                    0
322                } else {
323                    htl
324                };
325            }
326
327            htl.saturating_sub(1)
328        }
329    }
330}
331
332/// Check if a request should be forwarded based on HTL.
333pub fn should_forward_htl(htl: u8) -> bool {
334    htl > 0
335}
336
337/// Backward-compatible helper.
338pub fn should_forward(htl: u8) -> bool {
339    should_forward_htl(htl)
340}
341
342use tokio::sync::{mpsc, oneshot};
343
344/// Request to forward a data request to other peers
345pub struct ForwardRequest {
346    /// Hash being requested
347    pub hash: Hash,
348    /// Peer ID to exclude (the one who sent the request)
349    pub exclude_peer_id: String,
350    /// HTL for forwarded request
351    pub htl: u8,
352    /// Channel to send result back
353    pub response: oneshot::Sender<Option<Vec<u8>>>,
354}
355
356/// Sender for forward requests
357pub type ForwardTx = mpsc::Sender<ForwardRequest>;
358/// Receiver for forward requests
359pub type ForwardRx = mpsc::Receiver<ForwardRequest>;
360
361/// Peer pool classification
362#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
363pub enum PeerPool {
364    /// Users in social graph (followed or followers)
365    Follows,
366    /// Everyone else
367    Other,
368}
369
370/// Settings for a peer pool
371#[derive(Debug, Clone, Copy)]
372pub struct PoolConfig {
373    /// Maximum connections in this pool
374    pub max_connections: usize,
375    /// Number of connections to consider "satisfied"
376    pub satisfied_connections: usize,
377}
378
379impl PoolConfig {
380    /// Check if we can accept more peers (below max)
381    #[inline]
382    pub fn can_accept(&self, current_count: usize) -> bool {
383        current_count < self.max_connections
384    }
385
386    /// Check if we need more peers (below satisfied)
387    #[inline]
388    pub fn needs_peers(&self, current_count: usize) -> bool {
389        current_count < self.satisfied_connections
390    }
391
392    /// Check if we're satisfied (at or above satisfied threshold)
393    #[inline]
394    pub fn is_satisfied(&self, current_count: usize) -> bool {
395        current_count >= self.satisfied_connections
396    }
397}
398
399impl Default for PoolConfig {
400    fn default() -> Self {
401        Self {
402            max_connections: 16,
403            satisfied_connections: 8,
404        }
405    }
406}
407
408/// Pool settings for both pools
409#[derive(Debug, Clone)]
410pub struct PoolSettings {
411    pub follows: PoolConfig,
412    pub other: PoolConfig,
413}
414
415impl Default for PoolSettings {
416    fn default() -> Self {
417        Self {
418            follows: PoolConfig {
419                max_connections: 16,
420                satisfied_connections: 8,
421            },
422            other: PoolConfig {
423                max_connections: 16,
424                satisfied_connections: 8,
425            },
426        }
427    }
428}
429
430/// Request to classify a peer by pubkey
431pub struct ClassifyRequest {
432    /// Pubkey to classify (hex)
433    pub pubkey: String,
434    /// Channel to send result back
435    pub response: oneshot::Sender<PeerPool>,
436}
437
438/// Sender for peer classification requests
439pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
440
441/// Receiver for peer classification requests (implement this to provide classification)
442pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
443
444/// Create a classifier channel pair
445pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
446    mpsc::channel(buffer)
447}
448
449/// Configuration for the default mesh-backed store composition.
450#[derive(Clone)]
451pub struct WebRTCStoreConfig {
452    /// Nostr relays for signaling
453    pub relays: Vec<String>,
454    /// Root hashes to advertise
455    pub roots: Vec<Hash>,
456    /// Timeout for data requests (ms)
457    pub request_timeout_ms: u64,
458    /// Interval for sending hello messages (ms)
459    pub hello_interval_ms: u64,
460    /// Enable verbose logging
461    pub debug: bool,
462    /// Pool settings for follows and other peers
463    pub pools: PoolSettings,
464    /// Channel for peer classification (optional)
465    /// If None, all peers go to "Other" pool
466    pub classifier_tx: Option<ClassifierTx>,
467    /// Retrieval peer selection strategy (shared with GenericStore/CLI/sim).
468    pub request_selection_strategy: SelectionStrategy,
469    /// Whether fairness constraints are enabled for retrieval selection.
470    pub request_fairness_enabled: bool,
471    /// Hedged request dispatch policy for retrieval.
472    pub request_dispatch: RequestDispatchConfig,
473}
474
475impl Default for WebRTCStoreConfig {
476    fn default() -> Self {
477        Self {
478            relays: vec![
479                "wss://temp.iris.to".to_string(),
480                "wss://relay.damus.io".to_string(),
481            ],
482            roots: Vec::new(),
483            request_timeout_ms: 10000,
484            hello_interval_ms: 30000,
485            debug: false,
486            pools: PoolSettings::default(),
487            classifier_tx: None,
488            request_selection_strategy: SelectionStrategy::TitForTat,
489            request_fairness_enabled: true,
490            request_dispatch: RequestDispatchConfig {
491                initial_fanout: 2,
492                hedge_fanout: 1,
493                max_fanout: 8,
494                hedge_interval_ms: 120,
495            },
496        }
497    }
498}
499
500/// Backward-compatible alias for the generic mesh store configuration.
501pub type MeshStoreConfig = WebRTCStoreConfig;
502
503/// Connection state for a peer
504#[derive(Debug, Clone, Copy, PartialEq, Eq)]
505pub enum PeerState {
506    /// Initial state
507    New,
508    /// Connecting via signaling.
509    Connecting,
510    /// Negotiated direct link established.
511    Connected,
512    /// Direct peer link open and ready for data.
513    Ready,
514    /// Connection failed or closed
515    Disconnected,
516}
517
518/// Statistics for the default mesh-backed store composition.
519#[derive(Debug, Clone, Default)]
520pub struct WebRTCStats {
521    pub connected_peers: usize,
522    pub pending_requests: usize,
523    pub bytes_sent: u64,
524    pub bytes_received: u64,
525    pub requests_made: u64,
526    pub requests_fulfilled: u64,
527}
528
529/// Backward-compatible alias for generic mesh store statistics.
530pub type MeshStats = WebRTCStats;
531
532/// Nostr event kind for hashtree signaling envelopes (ephemeral, NIP-17 style).
533pub const NOSTR_KIND_HASHTREE: u16 = 25050;
534pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
535
536/// Relayless mesh protocol constants for forwarding signaling events over data channels.
537pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
538pub const MESH_PROTOCOL_VERSION: u8 = 1;
539pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
540pub const MESH_MAX_HTL: u8 = 6;
541
542/// TTL/capacity dedupe structure for forwarded mesh frames/events.
543#[derive(Debug)]
544pub struct TimedSeenSet {
545    entries: HashMap<String, Instant>,
546    order: VecDeque<(String, Instant)>,
547    ttl: Duration,
548    capacity: usize,
549}
550
551impl TimedSeenSet {
552    pub fn new(capacity: usize, ttl: Duration) -> Self {
553        Self {
554            entries: HashMap::new(),
555            order: VecDeque::new(),
556            ttl,
557            capacity,
558        }
559    }
560
561    fn prune(&mut self, now: Instant) {
562        while let Some((key, inserted_at)) = self.order.front().cloned() {
563            if now.duration_since(inserted_at) < self.ttl {
564                break;
565            }
566            self.order.pop_front();
567            if self
568                .entries
569                .get(&key)
570                .map(|ts| *ts == inserted_at)
571                .unwrap_or(false)
572            {
573                self.entries.remove(&key);
574            }
575        }
576
577        while self.entries.len() > self.capacity {
578            if let Some((key, inserted_at)) = self.order.pop_front() {
579                if self
580                    .entries
581                    .get(&key)
582                    .map(|ts| *ts == inserted_at)
583                    .unwrap_or(false)
584                {
585                    self.entries.remove(&key);
586                }
587            } else {
588                break;
589            }
590        }
591    }
592
593    pub fn insert_if_new(&mut self, key: String) -> bool {
594        let now = Instant::now();
595        self.prune(now);
596        if self.entries.contains_key(&key) {
597            return false;
598        }
599        self.entries.insert(key.clone(), now);
600        self.order.push_back((key, now));
601        self.prune(now);
602        true
603    }
604
605    pub fn len(&self) -> usize {
606        self.entries.len()
607    }
608
609    pub fn is_empty(&self) -> bool {
610        self.entries.is_empty()
611    }
612}
613
614#[derive(Debug, Clone, Serialize, Deserialize)]
615#[serde(tag = "type")]
616pub enum MeshNostrPayload {
617    #[serde(rename = "EVENT")]
618    Event { event: Event },
619}
620
621#[derive(Debug, Clone, Serialize, Deserialize)]
622pub struct MeshNostrFrame {
623    pub protocol: String,
624    pub version: u8,
625    pub frame_id: String,
626    pub htl: u8,
627    pub sender_peer_id: String,
628    pub payload: MeshNostrPayload,
629}
630
631impl MeshNostrFrame {
632    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
633        Self::new_event_with_id(
634            event,
635            sender_peer_id,
636            &uuid::Uuid::new_v4().to_string(),
637            htl,
638        )
639    }
640
641    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
642        Self {
643            protocol: MESH_PROTOCOL.to_string(),
644            version: MESH_PROTOCOL_VERSION,
645            frame_id: frame_id.to_string(),
646            htl,
647            sender_peer_id: sender_peer_id.to_string(),
648            payload: MeshNostrPayload::Event { event },
649        }
650    }
651
652    pub fn event(&self) -> &Event {
653        match &self.payload {
654            MeshNostrPayload::Event { event } => event,
655        }
656    }
657}
658
659pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
660    if frame.protocol != MESH_PROTOCOL {
661        return Err("invalid protocol");
662    }
663    if frame.version != MESH_PROTOCOL_VERSION {
664        return Err("invalid version");
665    }
666    if frame.frame_id.is_empty() {
667        return Err("missing frame id");
668    }
669    if frame.sender_peer_id.is_empty() {
670        return Err("missing sender peer id");
671    }
672    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
673        return Err("invalid htl");
674    }
675
676    let event = frame.event();
677    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
678        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
679    {
680        return Err("unsupported event kind");
681    }
682
683    Ok(())
684}
685
686/// Default WebRTC data channel label.
687pub const DATA_CHANNEL_LABEL: &str = "hashtree";