Skip to main content

hashtree_network/
types.rs

1//! Mesh transport types for peer-to-peer data exchange.
2//!
3//! Defines signaling frames, negotiated peer-link messages, and shared mesh
4//! constants used across Nostr websocket transports, local buses, direct-link
5//! transports such as WebRTC, and simulation.
6
7use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::Duration;
12use tokio::time::Instant;
13
14use crate::mesh_store_core::RequestDispatchConfig;
15use crate::peer_selector::SelectionStrategy;
16
17fn default_hash_get_enabled() -> bool {
18    true
19}
20
21/// Unique identifier for a peer in the network
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct PeerId {
24    /// Nostr public key / endpoint identity
25    pub pubkey: String,
26}
27
28impl PeerId {
29    pub fn new(pubkey: String) -> Self {
30        Self { pubkey }
31    }
32
33    pub fn to_peer_string(&self) -> String {
34        self.pubkey.clone()
35    }
36
37    pub fn from_peer_string(s: &str) -> Option<Self> {
38        let pubkey = s.trim();
39        if pubkey.is_empty() || pubkey.contains(':') {
40            return None;
41        }
42        Some(Self {
43            pubkey: pubkey.to_string(),
44        })
45    }
46
47    pub fn from_string(s: &str) -> Option<Self> {
48        Self::from_peer_string(s)
49    }
50
51    pub fn short(&self) -> String {
52        self.pubkey[..8.min(self.pubkey.len())].to_string()
53    }
54}
55
56impl std::fmt::Display for PeerId {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.write_str(&self.pubkey)
59    }
60}
61
62/// Signaling message types exchanged over mesh signaling transports.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(tag = "type")]
65pub enum SignalingMessage {
66    /// Initial hello message to discover peers
67    #[serde(rename = "hello")]
68    Hello {
69        #[serde(rename = "peerId")]
70        peer_id: String,
71        roots: Vec<String>,
72        #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
73        hash_get: bool,
74    },
75
76    /// Negotiation offer payload (currently SDP for WebRTC links).
77    #[serde(rename = "offer")]
78    Offer {
79        #[serde(rename = "peerId")]
80        peer_id: String,
81        #[serde(rename = "targetPeerId")]
82        target_peer_id: String,
83        sdp: String,
84    },
85
86    /// Negotiation answer payload (currently SDP for WebRTC links).
87    #[serde(rename = "answer")]
88    Answer {
89        #[serde(rename = "peerId")]
90        peer_id: String,
91        #[serde(rename = "targetPeerId")]
92        target_peer_id: String,
93        sdp: String,
94    },
95
96    /// Single candidate update for transports that need trickle negotiation.
97    #[serde(rename = "candidate")]
98    Candidate {
99        #[serde(rename = "peerId")]
100        peer_id: String,
101        #[serde(rename = "targetPeerId")]
102        target_peer_id: String,
103        candidate: String,
104        #[serde(rename = "sdpMLineIndex")]
105        sdp_m_line_index: Option<u16>,
106        #[serde(rename = "sdpMid")]
107        sdp_mid: Option<String>,
108    },
109
110    /// Batched candidate updates.
111    #[serde(rename = "candidates")]
112    Candidates {
113        #[serde(rename = "peerId")]
114        peer_id: String,
115        #[serde(rename = "targetPeerId")]
116        target_peer_id: String,
117        candidates: Vec<IceCandidate>,
118    },
119}
120
121impl SignalingMessage {
122    /// Get the message type label.
123    pub fn msg_type(&self) -> &str {
124        match self {
125            Self::Hello { .. } => "hello",
126            Self::Offer { .. } => "offer",
127            Self::Answer { .. } => "answer",
128            Self::Candidate { .. } => "candidate",
129            Self::Candidates { .. } => "candidates",
130        }
131    }
132
133    /// Get the sender's peer ID
134    pub fn peer_id(&self) -> &str {
135        match self {
136            Self::Hello { peer_id, .. } => peer_id,
137            Self::Offer { peer_id, .. } => peer_id,
138            Self::Answer { peer_id, .. } => peer_id,
139            Self::Candidate { peer_id, .. } => peer_id,
140            Self::Candidates { peer_id, .. } => peer_id,
141        }
142    }
143
144    /// Get the target peer ID (if applicable)
145    pub fn target_peer_id(&self) -> Option<&str> {
146        match self {
147            Self::Hello { .. } => None, // Broadcast
148            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
149            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
150            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
151            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
152        }
153    }
154
155    /// Check if this message is addressed to a specific peer
156    pub fn is_for(&self, my_peer_id: &str) -> bool {
157        match self.target_peer_id() {
158            Some(target) => target == my_peer_id,
159            None => true, // Broadcasts are for everyone
160        }
161    }
162}
163
164/// Shared concurrent-offer negotiation: determine if we are the "polite" peer.
165///
166/// In this negotiation flow, both peers can send offers simultaneously.
167/// When a collision occurs (we receive an offer while we have a pending offer),
168/// the "polite" peer backs off and accepts the incoming offer instead.
169///
170/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
171///
172/// This pattern ensures connections form even when one peer is "satisfied" -
173/// the unsatisfied peer can still initiate and the satisfied peer will accept.
174#[inline]
175pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
176    // Lower ID is "polite" - they back off on collision
177    local_peer_id < remote_peer_id
178}
179
180/// Candidate metadata for transports that use ICE-style negotiation.
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct IceCandidate {
183    pub candidate: String,
184    #[serde(rename = "sdpMLineIndex")]
185    pub sdp_m_line_index: Option<u16>,
186    #[serde(rename = "sdpMid")]
187    pub sdp_mid: Option<String>,
188}
189
190/// Direct peer-link message types for hash-based data exchange.
191#[derive(Debug, Clone, Serialize, Deserialize)]
192#[serde(tag = "type")]
193pub enum DataMessage {
194    /// Request data by hash
195    #[serde(rename = "req")]
196    Request {
197        id: u32,
198        hash: String,
199        /// Hops To Live - decremented on each forward hop
200        /// When htl reaches 0, request is not forwarded further
201        #[serde(skip_serializing_if = "Option::is_none")]
202        htl: Option<u8>,
203    },
204
205    /// Response with data (binary payload follows)
206    #[serde(rename = "res")]
207    Response {
208        id: u32,
209        hash: String,
210        found: bool,
211        #[serde(skip_serializing_if = "Option::is_none")]
212        size: Option<u64>,
213    },
214
215    /// Push data for a hash the peer previously requested but we didn't have
216    /// This happens when we get it later from another peer
217    #[serde(rename = "push")]
218    Push { hash: String },
219
220    /// Announce available hashes
221    #[serde(rename = "have")]
222    Have { hashes: Vec<String> },
223
224    /// Request list of wanted hashes
225    #[serde(rename = "want")]
226    Want { hashes: Vec<String> },
227
228    /// Notify about root hash update
229    #[serde(rename = "root")]
230    RootUpdate {
231        hash: String,
232        #[serde(skip_serializing_if = "Option::is_none")]
233        size: Option<u64>,
234    },
235}
236
237/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
238pub const MAX_HTL: u8 = 10;
239/// Probability to decrement at max HTL (50%)
240pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
241/// Probability to decrement at min HTL=1 (25%)
242pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
243
244/// HTL decrement mode.
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246pub enum HtlMode {
247    Probabilistic,
248}
249
250/// HTL policy parameters for a traffic class.
251#[derive(Debug, Clone, Copy)]
252pub struct HtlPolicy {
253    pub mode: HtlMode,
254    pub max_htl: u8,
255    pub p_at_max: f64,
256    pub p_at_min: f64,
257}
258
259/// Default policy for blob/content requests.
260pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
261    mode: HtlMode::Probabilistic,
262    max_htl: MAX_HTL,
263    p_at_max: DECREMENT_AT_MAX_PROB,
264    p_at_min: DECREMENT_AT_MIN_PROB,
265};
266
267/// Policy for mesh signaling/event forwarding.
268pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
269    mode: HtlMode::Probabilistic,
270    max_htl: 4,
271    p_at_max: 0.75,
272    p_at_min: 0.5,
273};
274
275/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
276/// Generated once per peer connection, stays fixed for connection lifetime
277#[derive(Debug, Clone, Copy)]
278pub struct PeerHTLConfig {
279    /// Random sample used to decide decrement at max HTL.
280    pub at_max_sample: f64,
281    /// Random sample used to decide decrement at min HTL.
282    pub at_min_sample: f64,
283}
284
285impl PeerHTLConfig {
286    /// Generate random HTL config for a new peer connection
287    pub fn random() -> Self {
288        use rand::Rng;
289        let mut rng = rand::thread_rng();
290        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
291    }
292
293    /// Construct config from explicit random samples.
294    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
295        Self {
296            at_max_sample: at_max_sample.clamp(0.0, 1.0),
297            at_min_sample: at_min_sample.clamp(0.0, 1.0),
298        }
299    }
300
301    /// Construct config from explicit decrement choices.
302    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
303        Self {
304            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
305            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
306        }
307    }
308
309    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
310    /// Called when SENDING to a peer, not on receive
311    pub fn decrement(&self, htl: u8) -> u8 {
312        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
313    }
314
315    /// Decrement HTL using the provided traffic policy.
316    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
317        decrement_htl_with_policy(htl, policy, self)
318    }
319}
320
321impl Default for PeerHTLConfig {
322    fn default() -> Self {
323        Self::random()
324    }
325}
326
327/// Decrement HTL according to policy and per-peer randomness profile.
328pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
329    let htl = htl.min(policy.max_htl);
330    if htl == 0 {
331        return 0;
332    }
333
334    match policy.mode {
335        HtlMode::Probabilistic => {
336            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
337            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
338
339            if htl == policy.max_htl {
340                return if config.at_max_sample < p_at_max {
341                    htl.saturating_sub(1)
342                } else {
343                    htl
344                };
345            }
346
347            if htl == 1 {
348                return if config.at_min_sample < p_at_min {
349                    0
350                } else {
351                    htl
352                };
353            }
354
355            htl.saturating_sub(1)
356        }
357    }
358}
359
360/// Check if a request should be forwarded based on HTL.
361pub fn should_forward_htl(htl: u8) -> bool {
362    htl > 0
363}
364
365/// Backward-compatible helper.
366pub fn should_forward(htl: u8) -> bool {
367    should_forward_htl(htl)
368}
369
370use tokio::sync::{mpsc, oneshot};
371
372/// Peer pool classification
373#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
374pub enum PeerPool {
375    /// Users in social graph (followed or followers)
376    Follows,
377    /// Everyone else
378    Other,
379}
380
381/// Settings for a peer pool
382#[derive(Debug, Clone, Copy)]
383pub struct PoolConfig {
384    /// Maximum connections in this pool
385    pub max_connections: usize,
386    /// Number of connections to consider "satisfied"
387    pub satisfied_connections: usize,
388}
389
390impl PoolConfig {
391    /// Check if we can accept more peers (below max)
392    #[inline]
393    pub fn can_accept(&self, current_count: usize) -> bool {
394        current_count < self.max_connections
395    }
396
397    /// Check if we need more peers (below satisfied)
398    #[inline]
399    pub fn needs_peers(&self, current_count: usize) -> bool {
400        current_count < self.satisfied_connections
401    }
402
403    /// Check if we're satisfied (at or above satisfied threshold)
404    #[inline]
405    pub fn is_satisfied(&self, current_count: usize) -> bool {
406        current_count >= self.satisfied_connections
407    }
408}
409
410impl Default for PoolConfig {
411    fn default() -> Self {
412        Self {
413            max_connections: 16,
414            satisfied_connections: 8,
415        }
416    }
417}
418
419/// Pool settings for both pools
420#[derive(Debug, Clone)]
421pub struct PoolSettings {
422    pub follows: PoolConfig,
423    pub other: PoolConfig,
424}
425
426impl Default for PoolSettings {
427    fn default() -> Self {
428        Self {
429            follows: PoolConfig {
430                max_connections: 16,
431                satisfied_connections: 8,
432            },
433            other: PoolConfig {
434                max_connections: 16,
435                satisfied_connections: 8,
436            },
437        }
438    }
439}
440
441/// Request to classify a peer by pubkey
442pub struct ClassifyRequest {
443    /// Pubkey to classify (hex)
444    pub pubkey: String,
445    /// Channel to send result back
446    pub response: oneshot::Sender<PeerPool>,
447}
448
449/// Sender for peer classification requests
450pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
451
452/// Receiver for peer classification requests (implement this to provide classification)
453pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
454
455/// Create a classifier channel pair
456pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
457    mpsc::channel(buffer)
458}
459
460/// Configuration for the default mesh-backed store composition.
461#[derive(Clone)]
462pub struct MeshStoreConfig {
463    /// Nostr relays for signaling
464    pub relays: Vec<String>,
465    /// Root hashes to advertise
466    pub roots: Vec<Hash>,
467    /// Timeout for data requests (ms)
468    pub request_timeout_ms: u64,
469    /// Interval for sending hello messages (ms)
470    pub hello_interval_ms: u64,
471    /// Enable verbose logging
472    pub debug: bool,
473    /// Pool settings for follows and other peers
474    pub pools: PoolSettings,
475    /// Channel for peer classification (optional)
476    /// If None, all peers go to "Other" pool
477    pub classifier_tx: Option<ClassifierTx>,
478    /// Retrieval peer selection strategy (shared with MeshStoreCore/CLI/sim).
479    pub request_selection_strategy: SelectionStrategy,
480    /// Whether fairness constraints are enabled for retrieval selection.
481    pub request_fairness_enabled: bool,
482    /// Hedged request dispatch policy for retrieval.
483    pub request_dispatch: RequestDispatchConfig,
484    /// Upstream Blossom servers used as adaptive fallback read sources.
485    pub upstream_blossom_servers: Vec<String>,
486}
487
488impl Default for MeshStoreConfig {
489    fn default() -> Self {
490        Self {
491            relays: vec![
492                "wss://temp.iris.to".to_string(),
493                "wss://relay.damus.io".to_string(),
494            ],
495            roots: Vec::new(),
496            request_timeout_ms: 10000,
497            hello_interval_ms: 30000,
498            debug: false,
499            pools: PoolSettings::default(),
500            classifier_tx: None,
501            request_selection_strategy: SelectionStrategy::Weighted,
502            request_fairness_enabled: true,
503            request_dispatch: RequestDispatchConfig {
504                initial_fanout: 2,
505                hedge_fanout: 1,
506                max_fanout: 8,
507                hedge_interval_ms: 120,
508            },
509            upstream_blossom_servers: Vec::new(),
510        }
511    }
512}
513
514/// Connection state for a peer
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516pub enum PeerState {
517    /// Initial state
518    New,
519    /// Connecting via signaling.
520    Connecting,
521    /// Negotiated direct link established.
522    Connected,
523    /// Direct peer link open and ready for data.
524    Ready,
525    /// Connection failed or closed
526    Disconnected,
527}
528
529/// Statistics for the default mesh-backed store composition.
530#[derive(Debug, Clone, Default)]
531pub struct MeshStats {
532    pub connected_peers: usize,
533    pub pending_requests: usize,
534    pub bytes_sent: u64,
535    pub bytes_received: u64,
536    pub requests_made: u64,
537    pub requests_fulfilled: u64,
538}
539
540/// Backward-compatible alias for the previous production-specific name.
541pub type WebRTCStats = MeshStats;
542
543/// Nostr event kind for hashtree signaling envelopes (ephemeral, NIP-17 style).
544pub const NOSTR_KIND_HASHTREE: u16 = 25050;
545pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
546
547/// Relayless mesh protocol constants for forwarding signaling events over data channels.
548pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
549pub const MESH_PROTOCOL_VERSION: u8 = 1;
550pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
551pub const MESH_MAX_HTL: u8 = 6;
552
553/// TTL/capacity dedupe structure for forwarded mesh frames/events.
554#[derive(Debug)]
555pub struct TimedSeenSet {
556    entries: HashMap<String, Instant>,
557    order: VecDeque<(String, Instant)>,
558    ttl: Duration,
559    capacity: usize,
560}
561
562impl TimedSeenSet {
563    pub fn new(capacity: usize, ttl: Duration) -> Self {
564        Self {
565            entries: HashMap::new(),
566            order: VecDeque::new(),
567            ttl,
568            capacity,
569        }
570    }
571
572    fn prune(&mut self, now: Instant) {
573        while let Some((key, inserted_at)) = self.order.front().cloned() {
574            if now.duration_since(inserted_at) < self.ttl {
575                break;
576            }
577            self.order.pop_front();
578            if self
579                .entries
580                .get(&key)
581                .map(|ts| *ts == inserted_at)
582                .unwrap_or(false)
583            {
584                self.entries.remove(&key);
585            }
586        }
587
588        while self.entries.len() > self.capacity {
589            if let Some((key, inserted_at)) = self.order.pop_front() {
590                if self
591                    .entries
592                    .get(&key)
593                    .map(|ts| *ts == inserted_at)
594                    .unwrap_or(false)
595                {
596                    self.entries.remove(&key);
597                }
598            } else {
599                break;
600            }
601        }
602    }
603
604    pub fn insert_if_new(&mut self, key: String) -> bool {
605        let now = Instant::now();
606        self.prune(now);
607        if self.entries.contains_key(&key) {
608            return false;
609        }
610        self.entries.insert(key.clone(), now);
611        self.order.push_back((key, now));
612        self.prune(now);
613        true
614    }
615
616    pub fn contains(&mut self, key: &str) -> bool {
617        let now = Instant::now();
618        self.prune(now);
619        self.entries.contains_key(key)
620    }
621
622    pub fn len(&self) -> usize {
623        self.entries.len()
624    }
625
626    pub fn is_empty(&self) -> bool {
627        self.entries.is_empty()
628    }
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632#[serde(tag = "type")]
633pub enum MeshNostrPayload {
634    #[serde(rename = "EVENT")]
635    Event { event: Event },
636}
637
638#[derive(Debug, Clone, Serialize, Deserialize)]
639pub struct MeshNostrFrame {
640    pub protocol: String,
641    pub version: u8,
642    pub frame_id: String,
643    pub htl: u8,
644    pub sender_peer_id: String,
645    pub payload: MeshNostrPayload,
646}
647
648impl MeshNostrFrame {
649    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
650        Self::new_event_with_id(
651            event,
652            sender_peer_id,
653            &uuid::Uuid::new_v4().to_string(),
654            htl,
655        )
656    }
657
658    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
659        Self {
660            protocol: MESH_PROTOCOL.to_string(),
661            version: MESH_PROTOCOL_VERSION,
662            frame_id: frame_id.to_string(),
663            htl,
664            sender_peer_id: sender_peer_id.to_string(),
665            payload: MeshNostrPayload::Event { event },
666        }
667    }
668
669    pub fn event(&self) -> &Event {
670        match &self.payload {
671            MeshNostrPayload::Event { event } => event,
672        }
673    }
674}
675
676pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
677    if frame.protocol != MESH_PROTOCOL {
678        return Err("invalid protocol");
679    }
680    if frame.version != MESH_PROTOCOL_VERSION {
681        return Err("invalid version");
682    }
683    if frame.frame_id.is_empty() {
684        return Err("missing frame id");
685    }
686    if frame.sender_peer_id.is_empty() {
687        return Err("missing sender peer id");
688    }
689    if frame.sender_peer_id.contains(':') {
690        return Err("invalid sender peer id");
691    }
692    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
693        return Err("invalid htl");
694    }
695
696    let event = frame.event();
697    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
698        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
699    {
700        return Err("unsupported event kind");
701    }
702
703    Ok(())
704}
705
706/// Default data channel label used by the WebRTC peer-link implementation.
707pub const DATA_CHANNEL_LABEL: &str = "hashtree";