Skip to main content

hashtree_webrtc/
types.rs

1//! WebRTC transport types for P2P data exchange
2//!
3//! Defines message types for WebRTC signaling via Nostr relays
4//! and the data channel protocol for hash-based data requests.
5
6use hashtree_core::Hash;
7use nostr_sdk::nostr::{Event, Kind};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::time::{Duration, Instant};
11
12use crate::generic_store::RequestDispatchConfig;
13use crate::peer_selector::SelectionStrategy;
14
15/// Unique identifier for a peer in the network
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct PeerId {
18    /// Nostr public key (hex encoded)
19    pub pubkey: String,
20    /// Unique session identifier
21    pub uuid: String,
22}
23
24impl PeerId {
25    pub fn new(pubkey: String, uuid: String) -> Self {
26        Self { pubkey, uuid }
27    }
28
29    pub fn to_peer_string(&self) -> String {
30        format!("{}:{}", self.pubkey, self.uuid)
31    }
32
33    pub fn from_peer_string(s: &str) -> Option<Self> {
34        let parts: Vec<&str> = s.split(':').collect();
35        if parts.len() == 2 {
36            Some(Self {
37                pubkey: parts[0].to_string(),
38                uuid: parts[1].to_string(),
39            })
40        } else {
41            None
42        }
43    }
44}
45
46/// Signaling message types sent via Nostr relays
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(tag = "type")]
49pub enum SignalingMessage {
50    /// Initial hello message to discover peers
51    #[serde(rename = "hello")]
52    Hello {
53        #[serde(rename = "peerId")]
54        peer_id: String,
55        roots: Vec<String>,
56    },
57
58    /// WebRTC offer (SDP)
59    #[serde(rename = "offer")]
60    Offer {
61        #[serde(rename = "peerId")]
62        peer_id: String,
63        #[serde(rename = "targetPeerId")]
64        target_peer_id: String,
65        sdp: String,
66    },
67
68    /// WebRTC answer (SDP)
69    #[serde(rename = "answer")]
70    Answer {
71        #[serde(rename = "peerId")]
72        peer_id: String,
73        #[serde(rename = "targetPeerId")]
74        target_peer_id: String,
75        sdp: String,
76    },
77
78    /// Single ICE candidate
79    #[serde(rename = "candidate")]
80    Candidate {
81        #[serde(rename = "peerId")]
82        peer_id: String,
83        #[serde(rename = "targetPeerId")]
84        target_peer_id: String,
85        candidate: String,
86        #[serde(rename = "sdpMLineIndex")]
87        sdp_m_line_index: Option<u16>,
88        #[serde(rename = "sdpMid")]
89        sdp_mid: Option<String>,
90    },
91
92    /// Batched ICE candidates
93    #[serde(rename = "candidates")]
94    Candidates {
95        #[serde(rename = "peerId")]
96        peer_id: String,
97        #[serde(rename = "targetPeerId")]
98        target_peer_id: String,
99        candidates: Vec<IceCandidate>,
100    },
101}
102
103impl SignalingMessage {
104    /// Get the sender's peer ID
105    pub fn peer_id(&self) -> &str {
106        match self {
107            Self::Hello { peer_id, .. } => peer_id,
108            Self::Offer { peer_id, .. } => peer_id,
109            Self::Answer { peer_id, .. } => peer_id,
110            Self::Candidate { peer_id, .. } => peer_id,
111            Self::Candidates { peer_id, .. } => peer_id,
112        }
113    }
114
115    /// Get the target peer ID (if applicable)
116    pub fn target_peer_id(&self) -> Option<&str> {
117        match self {
118            Self::Hello { .. } => None, // Broadcast
119            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
120            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
121            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
122            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
123        }
124    }
125
126    /// Check if this message is addressed to a specific peer
127    pub fn is_for(&self, my_peer_id: &str) -> bool {
128        match self.target_peer_id() {
129            Some(target) => target == my_peer_id,
130            None => true, // Broadcasts are for everyone
131        }
132    }
133}
134
135/// Perfect negotiation: determine if we are the "polite" peer
136///
137/// In WebRTC perfect negotiation, both peers can send offers simultaneously.
138/// When a collision occurs (we receive an offer while we have a pending offer),
139/// the "polite" peer backs off and accepts the incoming offer instead.
140///
141/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
142///
143/// This pattern ensures connections form even when one peer is "satisfied" -
144/// the unsatisfied peer can still initiate and the satisfied peer will accept.
145#[inline]
146pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
147    // Lower ID is "polite" - they back off on collision
148    local_peer_id < remote_peer_id
149}
150
151/// ICE candidate for WebRTC connection establishment
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct IceCandidate {
154    pub candidate: String,
155    #[serde(rename = "sdpMLineIndex")]
156    pub sdp_m_line_index: Option<u16>,
157    #[serde(rename = "sdpMid")]
158    pub sdp_mid: Option<String>,
159}
160
161/// Data channel message types for hash-based data exchange
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(tag = "type")]
164pub enum DataMessage {
165    /// Request data by hash
166    #[serde(rename = "req")]
167    Request {
168        id: u32,
169        hash: String,
170        /// Hops To Live - decremented on each forward hop
171        /// When htl reaches 0, request is not forwarded further
172        #[serde(skip_serializing_if = "Option::is_none")]
173        htl: Option<u8>,
174    },
175
176    /// Response with data (binary payload follows)
177    #[serde(rename = "res")]
178    Response {
179        id: u32,
180        hash: String,
181        found: bool,
182        #[serde(skip_serializing_if = "Option::is_none")]
183        size: Option<u64>,
184    },
185
186    /// Push data for a hash the peer previously requested but we didn't have
187    /// This happens when we get it later from another peer
188    #[serde(rename = "push")]
189    Push { hash: String },
190
191    /// Announce available hashes
192    #[serde(rename = "have")]
193    Have { hashes: Vec<String> },
194
195    /// Request list of wanted hashes
196    #[serde(rename = "want")]
197    Want { hashes: Vec<String> },
198
199    /// Notify about root hash update
200    #[serde(rename = "root")]
201    RootUpdate {
202        hash: String,
203        #[serde(skip_serializing_if = "Option::is_none")]
204        size: Option<u64>,
205    },
206}
207
208/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
209pub const MAX_HTL: u8 = 10;
210/// Probability to decrement at max HTL (50%)
211pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
212/// Probability to decrement at min HTL=1 (25%)
213pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
214
215/// HTL decrement mode.
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217pub enum HtlMode {
218    Probabilistic,
219}
220
221/// HTL policy parameters for a traffic class.
222#[derive(Debug, Clone, Copy)]
223pub struct HtlPolicy {
224    pub mode: HtlMode,
225    pub max_htl: u8,
226    pub p_at_max: f64,
227    pub p_at_min: f64,
228}
229
230/// Default policy for blob/content requests.
231pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
232    mode: HtlMode::Probabilistic,
233    max_htl: MAX_HTL,
234    p_at_max: DECREMENT_AT_MAX_PROB,
235    p_at_min: DECREMENT_AT_MIN_PROB,
236};
237
238/// Policy for mesh signaling/event forwarding.
239pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
240    mode: HtlMode::Probabilistic,
241    max_htl: 4,
242    p_at_max: 0.75,
243    p_at_min: 0.5,
244};
245
246/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
247/// Generated once per peer connection, stays fixed for connection lifetime
248#[derive(Debug, Clone, Copy)]
249pub struct PeerHTLConfig {
250    /// Random sample used to decide decrement at max HTL.
251    pub at_max_sample: f64,
252    /// Random sample used to decide decrement at min HTL.
253    pub at_min_sample: f64,
254}
255
256impl PeerHTLConfig {
257    /// Generate random HTL config for a new peer connection
258    pub fn random() -> Self {
259        use rand::Rng;
260        let mut rng = rand::thread_rng();
261        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
262    }
263
264    /// Construct config from explicit random samples.
265    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
266        Self {
267            at_max_sample: at_max_sample.clamp(0.0, 1.0),
268            at_min_sample: at_min_sample.clamp(0.0, 1.0),
269        }
270    }
271
272    /// Construct config from explicit decrement choices.
273    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
274        Self {
275            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
276            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
277        }
278    }
279
280    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
281    /// Called when SENDING to a peer, not on receive
282    pub fn decrement(&self, htl: u8) -> u8 {
283        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
284    }
285
286    /// Decrement HTL using the provided traffic policy.
287    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
288        decrement_htl_with_policy(htl, policy, self)
289    }
290}
291
292impl Default for PeerHTLConfig {
293    fn default() -> Self {
294        Self::random()
295    }
296}
297
298/// Decrement HTL according to policy and per-peer randomness profile.
299pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
300    let htl = htl.min(policy.max_htl);
301    if htl == 0 {
302        return 0;
303    }
304
305    match policy.mode {
306        HtlMode::Probabilistic => {
307            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
308            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
309
310            if htl == policy.max_htl {
311                return if config.at_max_sample < p_at_max {
312                    htl.saturating_sub(1)
313                } else {
314                    htl
315                };
316            }
317
318            if htl == 1 {
319                return if config.at_min_sample < p_at_min {
320                    0
321                } else {
322                    htl
323                };
324            }
325
326            htl.saturating_sub(1)
327        }
328    }
329}
330
331/// Check if a request should be forwarded based on HTL.
332pub fn should_forward_htl(htl: u8) -> bool {
333    htl > 0
334}
335
336/// Backward-compatible helper.
337pub fn should_forward(htl: u8) -> bool {
338    should_forward_htl(htl)
339}
340
341use tokio::sync::{mpsc, oneshot};
342
343/// Request to forward a data request to other peers
344pub struct ForwardRequest {
345    /// Hash being requested
346    pub hash: Hash,
347    /// Peer ID to exclude (the one who sent the request)
348    pub exclude_peer_id: String,
349    /// HTL for forwarded request
350    pub htl: u8,
351    /// Channel to send result back
352    pub response: oneshot::Sender<Option<Vec<u8>>>,
353}
354
355/// Sender for forward requests
356pub type ForwardTx = mpsc::Sender<ForwardRequest>;
357/// Receiver for forward requests
358pub type ForwardRx = mpsc::Receiver<ForwardRequest>;
359
360/// Peer pool classification
361#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
362pub enum PeerPool {
363    /// Users in social graph (followed or followers)
364    Follows,
365    /// Everyone else
366    Other,
367}
368
369/// Settings for a peer pool
370#[derive(Debug, Clone, Copy)]
371pub struct PoolConfig {
372    /// Maximum connections in this pool
373    pub max_connections: usize,
374    /// Number of connections to consider "satisfied"
375    pub satisfied_connections: usize,
376}
377
378impl PoolConfig {
379    /// Check if we can accept more peers (below max)
380    #[inline]
381    pub fn can_accept(&self, current_count: usize) -> bool {
382        current_count < self.max_connections
383    }
384
385    /// Check if we need more peers (below satisfied)
386    #[inline]
387    pub fn needs_peers(&self, current_count: usize) -> bool {
388        current_count < self.satisfied_connections
389    }
390
391    /// Check if we're satisfied (at or above satisfied threshold)
392    #[inline]
393    pub fn is_satisfied(&self, current_count: usize) -> bool {
394        current_count >= self.satisfied_connections
395    }
396}
397
398impl Default for PoolConfig {
399    fn default() -> Self {
400        Self {
401            max_connections: 16,
402            satisfied_connections: 8,
403        }
404    }
405}
406
407/// Pool settings for both pools
408#[derive(Debug, Clone)]
409pub struct PoolSettings {
410    pub follows: PoolConfig,
411    pub other: PoolConfig,
412}
413
414impl Default for PoolSettings {
415    fn default() -> Self {
416        Self {
417            follows: PoolConfig {
418                max_connections: 16,
419                satisfied_connections: 8,
420            },
421            other: PoolConfig {
422                max_connections: 16,
423                satisfied_connections: 8,
424            },
425        }
426    }
427}
428
429/// Request to classify a peer by pubkey
430pub struct ClassifyRequest {
431    /// Pubkey to classify (hex)
432    pub pubkey: String,
433    /// Channel to send result back
434    pub response: oneshot::Sender<PeerPool>,
435}
436
437/// Sender for peer classification requests
438pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
439
440/// Receiver for peer classification requests (implement this to provide classification)
441pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
442
443/// Create a classifier channel pair
444pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
445    mpsc::channel(buffer)
446}
447
448/// Configuration for WebRTC store
449#[derive(Clone)]
450pub struct WebRTCStoreConfig {
451    /// Nostr relays for signaling
452    pub relays: Vec<String>,
453    /// Root hashes to advertise
454    pub roots: Vec<Hash>,
455    /// Timeout for data requests (ms)
456    pub request_timeout_ms: u64,
457    /// Interval for sending hello messages (ms)
458    pub hello_interval_ms: u64,
459    /// Enable verbose logging
460    pub debug: bool,
461    /// Pool settings for follows and other peers
462    pub pools: PoolSettings,
463    /// Channel for peer classification (optional)
464    /// If None, all peers go to "Other" pool
465    pub classifier_tx: Option<ClassifierTx>,
466    /// Retrieval peer selection strategy (shared with GenericStore/CLI/sim).
467    pub request_selection_strategy: SelectionStrategy,
468    /// Whether fairness constraints are enabled for retrieval selection.
469    pub request_fairness_enabled: bool,
470    /// Hedged request dispatch policy for retrieval.
471    pub request_dispatch: RequestDispatchConfig,
472}
473
474impl Default for WebRTCStoreConfig {
475    fn default() -> Self {
476        Self {
477            relays: vec![
478                "wss://temp.iris.to".to_string(),
479                "wss://relay.damus.io".to_string(),
480            ],
481            roots: Vec::new(),
482            request_timeout_ms: 10000,
483            hello_interval_ms: 30000,
484            debug: false,
485            pools: PoolSettings::default(),
486            classifier_tx: None,
487            request_selection_strategy: SelectionStrategy::TitForTat,
488            request_fairness_enabled: true,
489            request_dispatch: RequestDispatchConfig {
490                initial_fanout: 2,
491                hedge_fanout: 1,
492                max_fanout: 8,
493                hedge_interval_ms: 120,
494            },
495        }
496    }
497}
498
499/// Connection state for a peer
500#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub enum PeerState {
502    /// Initial state
503    New,
504    /// Connecting via signaling
505    Connecting,
506    /// WebRTC connection established
507    Connected,
508    /// Data channel open and ready
509    Ready,
510    /// Connection failed or closed
511    Disconnected,
512}
513
514/// Statistics for WebRTC store
515#[derive(Debug, Clone, Default)]
516pub struct WebRTCStats {
517    pub connected_peers: usize,
518    pub pending_requests: usize,
519    pub bytes_sent: u64,
520    pub bytes_received: u64,
521    pub requests_made: u64,
522    pub requests_fulfilled: u64,
523}
524
525/// Nostr event kind for WebRTC signaling (ephemeral, NIP-17 style)
526pub const NOSTR_KIND_HASHTREE: u16 = 25050;
527
528/// Relayless mesh protocol constants for forwarding signaling events over data channels.
529pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
530pub const MESH_PROTOCOL_VERSION: u8 = 1;
531pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
532pub const MESH_MAX_HTL: u8 = 6;
533
534/// TTL/capacity dedupe structure for forwarded mesh frames/events.
535#[derive(Debug)]
536pub struct TimedSeenSet {
537    entries: HashMap<String, Instant>,
538    order: VecDeque<(String, Instant)>,
539    ttl: Duration,
540    capacity: usize,
541}
542
543impl TimedSeenSet {
544    pub fn new(capacity: usize, ttl: Duration) -> Self {
545        Self {
546            entries: HashMap::new(),
547            order: VecDeque::new(),
548            ttl,
549            capacity,
550        }
551    }
552
553    fn prune(&mut self, now: Instant) {
554        while let Some((key, inserted_at)) = self.order.front().cloned() {
555            if now.duration_since(inserted_at) < self.ttl {
556                break;
557            }
558            self.order.pop_front();
559            if self
560                .entries
561                .get(&key)
562                .map(|ts| *ts == inserted_at)
563                .unwrap_or(false)
564            {
565                self.entries.remove(&key);
566            }
567        }
568
569        while self.entries.len() > self.capacity {
570            if let Some((key, inserted_at)) = self.order.pop_front() {
571                if self
572                    .entries
573                    .get(&key)
574                    .map(|ts| *ts == inserted_at)
575                    .unwrap_or(false)
576                {
577                    self.entries.remove(&key);
578                }
579            } else {
580                break;
581            }
582        }
583    }
584
585    pub fn insert_if_new(&mut self, key: String) -> bool {
586        let now = Instant::now();
587        self.prune(now);
588        if self.entries.contains_key(&key) {
589            return false;
590        }
591        self.entries.insert(key.clone(), now);
592        self.order.push_back((key, now));
593        self.prune(now);
594        true
595    }
596
597    pub fn len(&self) -> usize {
598        self.entries.len()
599    }
600
601    pub fn is_empty(&self) -> bool {
602        self.entries.is_empty()
603    }
604}
605
606#[derive(Debug, Clone, Serialize, Deserialize)]
607#[serde(tag = "type")]
608pub enum MeshNostrPayload {
609    #[serde(rename = "EVENT")]
610    Event { event: Event },
611}
612
613#[derive(Debug, Clone, Serialize, Deserialize)]
614pub struct MeshNostrFrame {
615    pub protocol: String,
616    pub version: u8,
617    pub frame_id: String,
618    pub htl: u8,
619    pub sender_peer_id: String,
620    pub payload: MeshNostrPayload,
621}
622
623impl MeshNostrFrame {
624    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
625        Self::new_event_with_id(
626            event,
627            sender_peer_id,
628            &uuid::Uuid::new_v4().to_string(),
629            htl,
630        )
631    }
632
633    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
634        Self {
635            protocol: MESH_PROTOCOL.to_string(),
636            version: MESH_PROTOCOL_VERSION,
637            frame_id: frame_id.to_string(),
638            htl,
639            sender_peer_id: sender_peer_id.to_string(),
640            payload: MeshNostrPayload::Event { event },
641        }
642    }
643
644    pub fn event(&self) -> &Event {
645        match &self.payload {
646            MeshNostrPayload::Event { event } => event,
647        }
648    }
649}
650
651pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
652    if frame.protocol != MESH_PROTOCOL {
653        return Err("invalid protocol");
654    }
655    if frame.version != MESH_PROTOCOL_VERSION {
656        return Err("invalid version");
657    }
658    if frame.frame_id.is_empty() {
659        return Err("missing frame id");
660    }
661    if frame.sender_peer_id.is_empty() {
662        return Err("missing sender peer id");
663    }
664    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
665        return Err("invalid htl");
666    }
667
668    let event = frame.event();
669    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
670        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
671    {
672        return Err("unsupported event kind");
673    }
674
675    Ok(())
676}
677
678/// Data channel label
679pub const DATA_CHANNEL_LABEL: &str = "hashtree";