Skip to main content

hashtree_network/
types.rs

1//! Mesh transport types for peer-to-peer data exchange.
2//!
3//! Defines signaling frames, negotiated peer-link messages, and shared mesh
4//! constants used across Nostr websocket transports, local buses, WebRTC, and
5//! simulation.
6
7use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::generic_store::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16/// Unique identifier for a peer in the network
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct PeerId {
19    /// Nostr public key (hex encoded)
20    pub pubkey: String,
21    /// Unique session identifier
22    pub uuid: String,
23}
24
25impl PeerId {
26    pub fn new(pubkey: String, uuid: String) -> Self {
27        Self { pubkey, uuid }
28    }
29
30    pub fn to_peer_string(&self) -> String {
31        format!("{}:{}", self.pubkey, self.uuid)
32    }
33
34    pub fn from_peer_string(s: &str) -> Option<Self> {
35        let parts: Vec<&str> = s.split(':').collect();
36        if parts.len() == 2 {
37            Some(Self {
38                pubkey: parts[0].to_string(),
39                uuid: parts[1].to_string(),
40            })
41        } else {
42            None
43        }
44    }
45}
46
47/// Signaling message types exchanged over mesh signaling transports.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "type")]
50pub enum SignalingMessage {
51    /// Initial hello message to discover peers
52    #[serde(rename = "hello")]
53    Hello {
54        #[serde(rename = "peerId")]
55        peer_id: String,
56        roots: Vec<String>,
57    },
58
59    /// Negotiation offer payload (currently SDP for WebRTC links).
60    #[serde(rename = "offer")]
61    Offer {
62        #[serde(rename = "peerId")]
63        peer_id: String,
64        #[serde(rename = "targetPeerId")]
65        target_peer_id: String,
66        sdp: String,
67    },
68
69    /// Negotiation answer payload (currently SDP for WebRTC links).
70    #[serde(rename = "answer")]
71    Answer {
72        #[serde(rename = "peerId")]
73        peer_id: String,
74        #[serde(rename = "targetPeerId")]
75        target_peer_id: String,
76        sdp: String,
77    },
78
79    /// Single candidate update for transports that need trickle negotiation.
80    #[serde(rename = "candidate")]
81    Candidate {
82        #[serde(rename = "peerId")]
83        peer_id: String,
84        #[serde(rename = "targetPeerId")]
85        target_peer_id: String,
86        candidate: String,
87        #[serde(rename = "sdpMLineIndex")]
88        sdp_m_line_index: Option<u16>,
89        #[serde(rename = "sdpMid")]
90        sdp_mid: Option<String>,
91    },
92
93    /// Batched candidate updates.
94    #[serde(rename = "candidates")]
95    Candidates {
96        #[serde(rename = "peerId")]
97        peer_id: String,
98        #[serde(rename = "targetPeerId")]
99        target_peer_id: String,
100        candidates: Vec<IceCandidate>,
101    },
102}
103
104impl SignalingMessage {
105    /// Get the message type label.
106    pub fn msg_type(&self) -> &str {
107        match self {
108            Self::Hello { .. } => "hello",
109            Self::Offer { .. } => "offer",
110            Self::Answer { .. } => "answer",
111            Self::Candidate { .. } => "candidate",
112            Self::Candidates { .. } => "candidates",
113        }
114    }
115
116    /// Get the sender's peer ID
117    pub fn peer_id(&self) -> &str {
118        match self {
119            Self::Hello { peer_id, .. } => peer_id,
120            Self::Offer { peer_id, .. } => peer_id,
121            Self::Answer { peer_id, .. } => peer_id,
122            Self::Candidate { peer_id, .. } => peer_id,
123            Self::Candidates { peer_id, .. } => peer_id,
124        }
125    }
126
127    /// Get the target peer ID (if applicable)
128    pub fn target_peer_id(&self) -> Option<&str> {
129        match self {
130            Self::Hello { .. } => None, // Broadcast
131            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
132            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
133            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
134            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
135        }
136    }
137
138    /// Check if this message is addressed to a specific peer
139    pub fn is_for(&self, my_peer_id: &str) -> bool {
140        match self.target_peer_id() {
141            Some(target) => target == my_peer_id,
142            None => true, // Broadcasts are for everyone
143        }
144    }
145}
146
147/// Perfect negotiation: determine if we are the "polite" peer
148///
149/// In perfect negotiation, both peers can send offers simultaneously.
150/// When a collision occurs (we receive an offer while we have a pending offer),
151/// the "polite" peer backs off and accepts the incoming offer instead.
152///
153/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
154///
155/// This pattern ensures connections form even when one peer is "satisfied" -
156/// the unsatisfied peer can still initiate and the satisfied peer will accept.
157#[inline]
158pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
159    // Lower ID is "polite" - they back off on collision
160    local_peer_id < remote_peer_id
161}
162
163/// Candidate metadata for transports that use ICE-style negotiation.
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct IceCandidate {
166    pub candidate: String,
167    #[serde(rename = "sdpMLineIndex")]
168    pub sdp_m_line_index: Option<u16>,
169    #[serde(rename = "sdpMid")]
170    pub sdp_mid: Option<String>,
171}
172
173/// Direct peer-link message types for hash-based data exchange.
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(tag = "type")]
176pub enum DataMessage {
177    /// Request data by hash
178    #[serde(rename = "req")]
179    Request {
180        id: u32,
181        hash: String,
182        /// Hops To Live - decremented on each forward hop
183        /// When htl reaches 0, request is not forwarded further
184        #[serde(skip_serializing_if = "Option::is_none")]
185        htl: Option<u8>,
186    },
187
188    /// Response with data (binary payload follows)
189    #[serde(rename = "res")]
190    Response {
191        id: u32,
192        hash: String,
193        found: bool,
194        #[serde(skip_serializing_if = "Option::is_none")]
195        size: Option<u64>,
196    },
197
198    /// Push data for a hash the peer previously requested but we didn't have
199    /// This happens when we get it later from another peer
200    #[serde(rename = "push")]
201    Push { hash: String },
202
203    /// Announce available hashes
204    #[serde(rename = "have")]
205    Have { hashes: Vec<String> },
206
207    /// Request list of wanted hashes
208    #[serde(rename = "want")]
209    Want { hashes: Vec<String> },
210
211    /// Notify about root hash update
212    #[serde(rename = "root")]
213    RootUpdate {
214        hash: String,
215        #[serde(skip_serializing_if = "Option::is_none")]
216        size: Option<u64>,
217    },
218}
219
220/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
221pub const MAX_HTL: u8 = 10;
222/// Probability to decrement at max HTL (50%)
223pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
224/// Probability to decrement at min HTL=1 (25%)
225pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
226
227/// HTL decrement mode.
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub enum HtlMode {
230    Probabilistic,
231}
232
233/// HTL policy parameters for a traffic class.
234#[derive(Debug, Clone, Copy)]
235pub struct HtlPolicy {
236    pub mode: HtlMode,
237    pub max_htl: u8,
238    pub p_at_max: f64,
239    pub p_at_min: f64,
240}
241
242/// Default policy for blob/content requests.
243pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
244    mode: HtlMode::Probabilistic,
245    max_htl: MAX_HTL,
246    p_at_max: DECREMENT_AT_MAX_PROB,
247    p_at_min: DECREMENT_AT_MIN_PROB,
248};
249
250/// Policy for mesh signaling/event forwarding.
251pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
252    mode: HtlMode::Probabilistic,
253    max_htl: 4,
254    p_at_max: 0.75,
255    p_at_min: 0.5,
256};
257
258/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
259/// Generated once per peer connection, stays fixed for connection lifetime
260#[derive(Debug, Clone, Copy)]
261pub struct PeerHTLConfig {
262    /// Random sample used to decide decrement at max HTL.
263    pub at_max_sample: f64,
264    /// Random sample used to decide decrement at min HTL.
265    pub at_min_sample: f64,
266}
267
268impl PeerHTLConfig {
269    /// Generate random HTL config for a new peer connection
270    pub fn random() -> Self {
271        use rand::Rng;
272        let mut rng = rand::thread_rng();
273        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
274    }
275
276    /// Construct config from explicit random samples.
277    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
278        Self {
279            at_max_sample: at_max_sample.clamp(0.0, 1.0),
280            at_min_sample: at_min_sample.clamp(0.0, 1.0),
281        }
282    }
283
284    /// Construct config from explicit decrement choices.
285    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
286        Self {
287            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
288            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
289        }
290    }
291
292    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
293    /// Called when SENDING to a peer, not on receive
294    pub fn decrement(&self, htl: u8) -> u8 {
295        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
296    }
297
298    /// Decrement HTL using the provided traffic policy.
299    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
300        decrement_htl_with_policy(htl, policy, self)
301    }
302}
303
304impl Default for PeerHTLConfig {
305    fn default() -> Self {
306        Self::random()
307    }
308}
309
310/// Decrement HTL according to policy and per-peer randomness profile.
311pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
312    let htl = htl.min(policy.max_htl);
313    if htl == 0 {
314        return 0;
315    }
316
317    match policy.mode {
318        HtlMode::Probabilistic => {
319            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
320            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
321
322            if htl == policy.max_htl {
323                return if config.at_max_sample < p_at_max {
324                    htl.saturating_sub(1)
325                } else {
326                    htl
327                };
328            }
329
330            if htl == 1 {
331                return if config.at_min_sample < p_at_min {
332                    0
333                } else {
334                    htl
335                };
336            }
337
338            htl.saturating_sub(1)
339        }
340    }
341}
342
343/// Check if a request should be forwarded based on HTL.
344pub fn should_forward_htl(htl: u8) -> bool {
345    htl > 0
346}
347
348/// Backward-compatible helper.
349pub fn should_forward(htl: u8) -> bool {
350    should_forward_htl(htl)
351}
352
353use tokio::sync::{mpsc, oneshot};
354
355/// Request to forward a data request to other peers
356pub struct ForwardRequest {
357    /// Hash being requested
358    pub hash: Hash,
359    /// Peer ID to exclude (the one who sent the request)
360    pub exclude_peer_id: String,
361    /// HTL for forwarded request
362    pub htl: u8,
363    /// Channel to send result back
364    pub response: oneshot::Sender<Option<Vec<u8>>>,
365}
366
367/// Sender for forward requests
368pub type ForwardTx = mpsc::Sender<ForwardRequest>;
369/// Receiver for forward requests
370pub type ForwardRx = mpsc::Receiver<ForwardRequest>;
371
372/// Peer pool classification
373#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
374pub enum PeerPool {
375    /// Users in social graph (followed or followers)
376    Follows,
377    /// Everyone else
378    Other,
379}
380
381/// Settings for a peer pool
382#[derive(Debug, Clone, Copy)]
383pub struct PoolConfig {
384    /// Maximum connections in this pool
385    pub max_connections: usize,
386    /// Number of connections to consider "satisfied"
387    pub satisfied_connections: usize,
388}
389
390impl PoolConfig {
391    /// Check if we can accept more peers (below max)
392    #[inline]
393    pub fn can_accept(&self, current_count: usize) -> bool {
394        current_count < self.max_connections
395    }
396
397    /// Check if we need more peers (below satisfied)
398    #[inline]
399    pub fn needs_peers(&self, current_count: usize) -> bool {
400        current_count < self.satisfied_connections
401    }
402
403    /// Check if we're satisfied (at or above satisfied threshold)
404    #[inline]
405    pub fn is_satisfied(&self, current_count: usize) -> bool {
406        current_count >= self.satisfied_connections
407    }
408}
409
410impl Default for PoolConfig {
411    fn default() -> Self {
412        Self {
413            max_connections: 16,
414            satisfied_connections: 8,
415        }
416    }
417}
418
419/// Pool settings for both pools
420#[derive(Debug, Clone)]
421pub struct PoolSettings {
422    pub follows: PoolConfig,
423    pub other: PoolConfig,
424}
425
426impl Default for PoolSettings {
427    fn default() -> Self {
428        Self {
429            follows: PoolConfig {
430                max_connections: 16,
431                satisfied_connections: 8,
432            },
433            other: PoolConfig {
434                max_connections: 16,
435                satisfied_connections: 8,
436            },
437        }
438    }
439}
440
441/// Request to classify a peer by pubkey
442pub struct ClassifyRequest {
443    /// Pubkey to classify (hex)
444    pub pubkey: String,
445    /// Channel to send result back
446    pub response: oneshot::Sender<PeerPool>,
447}
448
449/// Sender for peer classification requests
450pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
451
452/// Receiver for peer classification requests (implement this to provide classification)
453pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
454
455/// Create a classifier channel pair
456pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
457    mpsc::channel(buffer)
458}
459
460/// Configuration for the default mesh-backed store composition.
461#[derive(Clone)]
462pub struct WebRTCStoreConfig {
463    /// Nostr relays for signaling
464    pub relays: Vec<String>,
465    /// Root hashes to advertise
466    pub roots: Vec<Hash>,
467    /// Timeout for data requests (ms)
468    pub request_timeout_ms: u64,
469    /// Interval for sending hello messages (ms)
470    pub hello_interval_ms: u64,
471    /// Enable verbose logging
472    pub debug: bool,
473    /// Pool settings for follows and other peers
474    pub pools: PoolSettings,
475    /// Channel for peer classification (optional)
476    /// If None, all peers go to "Other" pool
477    pub classifier_tx: Option<ClassifierTx>,
478    /// Retrieval peer selection strategy (shared with GenericStore/CLI/sim).
479    pub request_selection_strategy: SelectionStrategy,
480    /// Whether fairness constraints are enabled for retrieval selection.
481    pub request_fairness_enabled: bool,
482    /// Hedged request dispatch policy for retrieval.
483    pub request_dispatch: RequestDispatchConfig,
484}
485
486impl Default for WebRTCStoreConfig {
487    fn default() -> Self {
488        Self {
489            relays: vec![
490                "wss://temp.iris.to".to_string(),
491                "wss://relay.damus.io".to_string(),
492            ],
493            roots: Vec::new(),
494            request_timeout_ms: 10000,
495            hello_interval_ms: 30000,
496            debug: false,
497            pools: PoolSettings::default(),
498            classifier_tx: None,
499            request_selection_strategy: SelectionStrategy::TitForTat,
500            request_fairness_enabled: true,
501            request_dispatch: RequestDispatchConfig {
502                initial_fanout: 2,
503                hedge_fanout: 1,
504                max_fanout: 8,
505                hedge_interval_ms: 120,
506            },
507        }
508    }
509}
510
511/// Backward-compatible alias for the generic mesh store configuration.
512pub type MeshStoreConfig = WebRTCStoreConfig;
513
514/// Connection state for a peer
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516pub enum PeerState {
517    /// Initial state
518    New,
519    /// Connecting via signaling.
520    Connecting,
521    /// Negotiated direct link established.
522    Connected,
523    /// Direct peer link open and ready for data.
524    Ready,
525    /// Connection failed or closed
526    Disconnected,
527}
528
529/// Statistics for the default mesh-backed store composition.
530#[derive(Debug, Clone, Default)]
531pub struct WebRTCStats {
532    pub connected_peers: usize,
533    pub pending_requests: usize,
534    pub bytes_sent: u64,
535    pub bytes_received: u64,
536    pub requests_made: u64,
537    pub requests_fulfilled: u64,
538}
539
540/// Backward-compatible alias for generic mesh store statistics.
541pub type MeshStats = WebRTCStats;
542
543/// Nostr event kind for hashtree signaling envelopes (ephemeral, NIP-17 style).
544pub const NOSTR_KIND_HASHTREE: u16 = 25050;
545pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
546
547/// Relayless mesh protocol constants for forwarding signaling events over data channels.
548pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
549pub const MESH_PROTOCOL_VERSION: u8 = 1;
550pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
551pub const MESH_MAX_HTL: u8 = 6;
552
553/// TTL/capacity dedupe structure for forwarded mesh frames/events.
554#[derive(Debug)]
555pub struct TimedSeenSet {
556    entries: HashMap<String, Instant>,
557    order: VecDeque<(String, Instant)>,
558    ttl: Duration,
559    capacity: usize,
560}
561
562impl TimedSeenSet {
563    pub fn new(capacity: usize, ttl: Duration) -> Self {
564        Self {
565            entries: HashMap::new(),
566            order: VecDeque::new(),
567            ttl,
568            capacity,
569        }
570    }
571
572    fn prune(&mut self, now: Instant) {
573        while let Some((key, inserted_at)) = self.order.front().cloned() {
574            if now.duration_since(inserted_at) < self.ttl {
575                break;
576            }
577            self.order.pop_front();
578            if self
579                .entries
580                .get(&key)
581                .map(|ts| *ts == inserted_at)
582                .unwrap_or(false)
583            {
584                self.entries.remove(&key);
585            }
586        }
587
588        while self.entries.len() > self.capacity {
589            if let Some((key, inserted_at)) = self.order.pop_front() {
590                if self
591                    .entries
592                    .get(&key)
593                    .map(|ts| *ts == inserted_at)
594                    .unwrap_or(false)
595                {
596                    self.entries.remove(&key);
597                }
598            } else {
599                break;
600            }
601        }
602    }
603
604    pub fn insert_if_new(&mut self, key: String) -> bool {
605        let now = Instant::now();
606        self.prune(now);
607        if self.entries.contains_key(&key) {
608            return false;
609        }
610        self.entries.insert(key.clone(), now);
611        self.order.push_back((key, now));
612        self.prune(now);
613        true
614    }
615
616    pub fn len(&self) -> usize {
617        self.entries.len()
618    }
619
620    pub fn is_empty(&self) -> bool {
621        self.entries.is_empty()
622    }
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626#[serde(tag = "type")]
627pub enum MeshNostrPayload {
628    #[serde(rename = "EVENT")]
629    Event { event: Event },
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
633pub struct MeshNostrFrame {
634    pub protocol: String,
635    pub version: u8,
636    pub frame_id: String,
637    pub htl: u8,
638    pub sender_peer_id: String,
639    pub payload: MeshNostrPayload,
640}
641
642impl MeshNostrFrame {
643    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
644        Self::new_event_with_id(
645            event,
646            sender_peer_id,
647            &uuid::Uuid::new_v4().to_string(),
648            htl,
649        )
650    }
651
652    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
653        Self {
654            protocol: MESH_PROTOCOL.to_string(),
655            version: MESH_PROTOCOL_VERSION,
656            frame_id: frame_id.to_string(),
657            htl,
658            sender_peer_id: sender_peer_id.to_string(),
659            payload: MeshNostrPayload::Event { event },
660        }
661    }
662
663    pub fn event(&self) -> &Event {
664        match &self.payload {
665            MeshNostrPayload::Event { event } => event,
666        }
667    }
668}
669
670pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
671    if frame.protocol != MESH_PROTOCOL {
672        return Err("invalid protocol");
673    }
674    if frame.version != MESH_PROTOCOL_VERSION {
675        return Err("invalid version");
676    }
677    if frame.frame_id.is_empty() {
678        return Err("missing frame id");
679    }
680    if frame.sender_peer_id.is_empty() {
681        return Err("missing sender peer id");
682    }
683    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
684        return Err("invalid htl");
685    }
686
687    let event = frame.event();
688    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
689        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
690    {
691        return Err("unsupported event kind");
692    }
693
694    Ok(())
695}
696
697/// Default WebRTC data channel label.
698pub const DATA_CHANNEL_LABEL: &str = "hashtree";