Skip to main content

hashtree_network/
types.rs

1//! Mesh transport types for peer-to-peer data exchange.
2//!
3//! Defines signaling frames, negotiated peer-link messages, and shared mesh
4//! constants used across Nostr websocket transports, local buses, direct-link
5//! transports, and simulation.
6
7use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::Duration;
12use tokio::time::Instant;
13
14use crate::mesh_store_core::{PubsubDeliveryMode, RequestDispatchConfig};
15use crate::peer_selector::SelectionStrategy;
16
17fn default_hash_get_enabled() -> bool {
18    true
19}
20
21/// Unique identifier for a peer in the network
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct PeerId {
24    /// Nostr public key / endpoint identity
25    pub pubkey: String,
26}
27
28impl PeerId {
29    pub fn new(pubkey: String) -> Self {
30        Self { pubkey }
31    }
32
33    pub fn to_peer_string(&self) -> String {
34        self.pubkey.clone()
35    }
36
37    pub fn from_peer_string(s: &str) -> Option<Self> {
38        let pubkey = s.trim();
39        if pubkey.is_empty() || pubkey.contains(':') {
40            return None;
41        }
42        Some(Self {
43            pubkey: pubkey.to_string(),
44        })
45    }
46
47    pub fn from_string(s: &str) -> Option<Self> {
48        Self::from_peer_string(s)
49    }
50
51    pub fn short(&self) -> String {
52        self.pubkey[..8.min(self.pubkey.len())].to_string()
53    }
54}
55
56/// Persisted private direct-link signaling hints for a stable peer identity.
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct KnownPeerRecord {
59    pub peer_id: String,
60    #[serde(
61        default,
62        alias = "direct_urls",
63        alias = "addresses",
64        skip_serializing_if = "Vec::is_empty"
65    )]
66    pub signal_urls: Vec<String>,
67    #[serde(default)]
68    pub last_seen_unix_ms: u64,
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub last_source: Option<String>,
71}
72
73/// Snapshot of private peer signaling hints learned over established peer links.
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75pub struct KnownPeerSnapshot {
76    pub version: u32,
77    #[serde(default)]
78    pub peers: Vec<KnownPeerRecord>,
79}
80
81impl Default for KnownPeerSnapshot {
82    fn default() -> Self {
83        Self {
84            version: 1,
85            peers: Vec::new(),
86        }
87    }
88}
89
90impl std::fmt::Display for PeerId {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.write_str(&self.pubkey)
93    }
94}
95
96/// Signaling message types exchanged over mesh signaling transports.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type")]
99pub enum SignalingMessage {
100    /// Initial hello message to discover peers
101    #[serde(rename = "hello")]
102    Hello {
103        #[serde(rename = "peerId")]
104        peer_id: String,
105        roots: Vec<String>,
106        #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
107        hash_get: bool,
108    },
109
110    /// Opaque negotiation offer payload.
111    ///
112    /// Serialized as `sdp` for compatibility with older signaling envelopes.
113    #[serde(rename = "offer")]
114    Offer {
115        #[serde(rename = "peerId")]
116        peer_id: String,
117        #[serde(rename = "targetPeerId")]
118        target_peer_id: String,
119        sdp: String,
120    },
121
122    /// Opaque negotiation answer payload.
123    ///
124    /// Serialized as `sdp` for compatibility with older signaling envelopes.
125    #[serde(rename = "answer")]
126    Answer {
127        #[serde(rename = "peerId")]
128        peer_id: String,
129        #[serde(rename = "targetPeerId")]
130        target_peer_id: String,
131        sdp: String,
132    },
133
134    /// Single candidate update for transports that need trickle negotiation.
135    #[serde(rename = "candidate")]
136    Candidate {
137        #[serde(rename = "peerId")]
138        peer_id: String,
139        #[serde(rename = "targetPeerId")]
140        target_peer_id: String,
141        candidate: String,
142        #[serde(rename = "sdpMLineIndex")]
143        sdp_m_line_index: Option<u16>,
144        #[serde(rename = "sdpMid")]
145        sdp_mid: Option<String>,
146    },
147
148    /// Batched candidate updates.
149    #[serde(rename = "candidates")]
150    Candidates {
151        #[serde(rename = "peerId")]
152        peer_id: String,
153        #[serde(rename = "targetPeerId")]
154        target_peer_id: String,
155        candidates: Vec<IceCandidate>,
156    },
157}
158
159impl SignalingMessage {
160    /// Get the message type label.
161    pub fn msg_type(&self) -> &str {
162        match self {
163            Self::Hello { .. } => "hello",
164            Self::Offer { .. } => "offer",
165            Self::Answer { .. } => "answer",
166            Self::Candidate { .. } => "candidate",
167            Self::Candidates { .. } => "candidates",
168        }
169    }
170
171    /// Get the sender's peer ID
172    pub fn peer_id(&self) -> &str {
173        match self {
174            Self::Hello { peer_id, .. } => peer_id,
175            Self::Offer { peer_id, .. } => peer_id,
176            Self::Answer { peer_id, .. } => peer_id,
177            Self::Candidate { peer_id, .. } => peer_id,
178            Self::Candidates { peer_id, .. } => peer_id,
179        }
180    }
181
182    /// Get the target peer ID (if applicable)
183    pub fn target_peer_id(&self) -> Option<&str> {
184        match self {
185            Self::Hello { .. } => None, // Broadcast
186            Self::Offer { target_peer_id, .. } => Some(target_peer_id),
187            Self::Answer { target_peer_id, .. } => Some(target_peer_id),
188            Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
189            Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
190        }
191    }
192
193    /// Check if this message is addressed to a specific peer
194    pub fn is_for(&self, my_peer_id: &str) -> bool {
195        match self.target_peer_id() {
196            Some(target) => target == my_peer_id,
197            None => true, // Broadcasts are for everyone
198        }
199    }
200}
201
202/// Shared concurrent-offer negotiation: determine if we are the "polite" peer.
203///
204/// In this negotiation flow, both peers can send offers simultaneously.
205/// When a collision occurs (we receive an offer while we have a pending offer),
206/// the "polite" peer backs off and accepts the incoming offer instead.
207///
208/// The "impolite" peer (higher ID) keeps their offer and ignores the incoming one.
209///
210/// This pattern ensures connections form even when one peer is "satisfied" -
211/// the unsatisfied peer can still initiate and the satisfied peer will accept.
212#[inline]
213pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
214    // Lower ID is "polite" - they back off on collision
215    local_peer_id < remote_peer_id
216}
217
218/// Candidate metadata for transports that use ICE-style negotiation.
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct IceCandidate {
221    pub candidate: String,
222    #[serde(rename = "sdpMLineIndex")]
223    pub sdp_m_line_index: Option<u16>,
224    #[serde(rename = "sdpMid")]
225    pub sdp_mid: Option<String>,
226}
227
228/// Direct peer-link message types for hash-based data exchange.
229#[derive(Debug, Clone, Serialize, Deserialize)]
230#[serde(tag = "type")]
231pub enum DataMessage {
232    /// Request data by hash
233    #[serde(rename = "req")]
234    Request {
235        id: u32,
236        hash: String,
237        /// Hops To Live - decremented on each forward hop
238        /// When htl reaches 0, request is not forwarded further
239        #[serde(skip_serializing_if = "Option::is_none")]
240        htl: Option<u8>,
241    },
242
243    /// Response with data (binary payload follows)
244    #[serde(rename = "res")]
245    Response {
246        id: u32,
247        hash: String,
248        found: bool,
249        #[serde(skip_serializing_if = "Option::is_none")]
250        size: Option<u64>,
251    },
252
253    /// Push data for a hash the peer previously requested but we didn't have
254    /// This happens when we get it later from another peer
255    #[serde(rename = "push")]
256    Push { hash: String },
257
258    /// Announce available hashes
259    #[serde(rename = "have")]
260    Have { hashes: Vec<String> },
261
262    /// Request list of wanted hashes
263    #[serde(rename = "want")]
264    Want { hashes: Vec<String> },
265
266    /// Notify about root hash update
267    #[serde(rename = "root")]
268    RootUpdate {
269        hash: String,
270        #[serde(skip_serializing_if = "Option::is_none")]
271        size: Option<u64>,
272    },
273}
274
275/// HTL (Hops To Live) constants - Freenet-style probabilistic decrement
276pub const MAX_HTL: u8 = 10;
277/// Probability to decrement at max HTL (50%)
278pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
279/// Probability to decrement at min HTL=1 (25%)
280pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
281
282/// HTL decrement mode.
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub enum HtlMode {
285    Probabilistic,
286}
287
288/// HTL policy parameters for a traffic class.
289#[derive(Debug, Clone, Copy)]
290pub struct HtlPolicy {
291    pub mode: HtlMode,
292    pub max_htl: u8,
293    pub p_at_max: f64,
294    pub p_at_min: f64,
295}
296
297/// Default policy for blob/content requests.
298pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
299    mode: HtlMode::Probabilistic,
300    max_htl: MAX_HTL,
301    p_at_max: DECREMENT_AT_MAX_PROB,
302    p_at_min: DECREMENT_AT_MIN_PROB,
303};
304
305/// Policy for mesh signaling/event forwarding.
306pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
307    mode: HtlMode::Probabilistic,
308    max_htl: 4,
309    p_at_max: 0.75,
310    p_at_min: 0.5,
311};
312
313/// Per-peer HTL configuration (Freenet-style probabilistic decrement)
314/// Generated once per peer connection, stays fixed for connection lifetime
315#[derive(Debug, Clone, Copy)]
316pub struct PeerHTLConfig {
317    /// Random sample used to decide decrement at max HTL.
318    pub at_max_sample: f64,
319    /// Random sample used to decide decrement at min HTL.
320    pub at_min_sample: f64,
321}
322
323impl PeerHTLConfig {
324    /// Generate random HTL config for a new peer connection
325    pub fn random() -> Self {
326        use rand::Rng;
327        let mut rng = rand::thread_rng();
328        Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
329    }
330
331    /// Construct config from explicit random samples.
332    pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
333        Self {
334            at_max_sample: at_max_sample.clamp(0.0, 1.0),
335            at_min_sample: at_min_sample.clamp(0.0, 1.0),
336        }
337    }
338
339    /// Construct config from explicit decrement choices.
340    pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
341        Self {
342            at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
343            at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
344        }
345    }
346
347    /// Decrement HTL using this peer's config (Freenet-style probabilistic)
348    /// Called when SENDING to a peer, not on receive
349    pub fn decrement(&self, htl: u8) -> u8 {
350        decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
351    }
352
353    /// Decrement HTL using the provided traffic policy.
354    pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
355        decrement_htl_with_policy(htl, policy, self)
356    }
357}
358
359impl Default for PeerHTLConfig {
360    fn default() -> Self {
361        Self::random()
362    }
363}
364
365/// Decrement HTL according to policy and per-peer randomness profile.
366pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
367    let htl = htl.min(policy.max_htl);
368    if htl == 0 {
369        return 0;
370    }
371
372    match policy.mode {
373        HtlMode::Probabilistic => {
374            let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
375            let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
376
377            if htl == policy.max_htl {
378                return if config.at_max_sample < p_at_max {
379                    htl.saturating_sub(1)
380                } else {
381                    htl
382                };
383            }
384
385            if htl == 1 {
386                return if config.at_min_sample < p_at_min {
387                    0
388                } else {
389                    htl
390                };
391            }
392
393            htl.saturating_sub(1)
394        }
395    }
396}
397
398/// Check if a request should be forwarded based on HTL.
399pub fn should_forward_htl(htl: u8) -> bool {
400    htl > 0
401}
402
403/// Backward-compatible helper.
404pub fn should_forward(htl: u8) -> bool {
405    should_forward_htl(htl)
406}
407
408use tokio::sync::{mpsc, oneshot};
409
410/// Peer pool classification
411#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
412pub enum PeerPool {
413    /// Users in social graph (followed or followers)
414    Follows,
415    /// Everyone else
416    Other,
417}
418
419/// Settings for a peer pool
420#[derive(Debug, Clone, Copy)]
421pub struct PoolConfig {
422    /// Maximum connections in this pool
423    pub max_connections: usize,
424    /// Number of connections to consider "satisfied"
425    pub satisfied_connections: usize,
426}
427
428impl PoolConfig {
429    /// Check if we can accept more peers (below max)
430    #[inline]
431    pub fn can_accept(&self, current_count: usize) -> bool {
432        current_count < self.max_connections
433    }
434
435    /// Check if we need more peers (below satisfied)
436    #[inline]
437    pub fn needs_peers(&self, current_count: usize) -> bool {
438        current_count < self.satisfied_connections
439    }
440
441    /// Check if we're satisfied (at or above satisfied threshold)
442    #[inline]
443    pub fn is_satisfied(&self, current_count: usize) -> bool {
444        current_count >= self.satisfied_connections
445    }
446}
447
448impl Default for PoolConfig {
449    fn default() -> Self {
450        Self {
451            max_connections: 16,
452            satisfied_connections: 8,
453        }
454    }
455}
456
457/// Pool settings for both pools
458#[derive(Debug, Clone)]
459pub struct PoolSettings {
460    pub follows: PoolConfig,
461    pub other: PoolConfig,
462}
463
464impl Default for PoolSettings {
465    fn default() -> Self {
466        Self {
467            follows: PoolConfig {
468                max_connections: 16,
469                satisfied_connections: 8,
470            },
471            other: PoolConfig {
472                max_connections: 16,
473                satisfied_connections: 8,
474            },
475        }
476    }
477}
478
479/// Request to classify a peer by pubkey
480pub struct ClassifyRequest {
481    /// Pubkey to classify (hex)
482    pub pubkey: String,
483    /// Channel to send result back
484    pub response: oneshot::Sender<PeerPool>,
485}
486
487/// Sender for peer classification requests
488pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
489
490/// Receiver for peer classification requests (implement this to provide classification)
491pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
492
493/// Create a classifier channel pair
494pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
495    mpsc::channel(buffer)
496}
497
498/// Configuration for the default mesh-backed store composition.
499#[derive(Clone)]
500pub struct MeshStoreConfig {
501    /// Nostr relays for signaling
502    pub relays: Vec<String>,
503    /// Root hashes to advertise
504    pub roots: Vec<Hash>,
505    /// Timeout for data requests (ms)
506    pub request_timeout_ms: u64,
507    /// Interval for sending hello messages (ms)
508    pub hello_interval_ms: u64,
509    /// Enable verbose logging
510    pub debug: bool,
511    /// Pool settings for follows and other peers
512    pub pools: PoolSettings,
513    /// Channel for peer classification (optional)
514    /// If None, all peers go to "Other" pool
515    pub classifier_tx: Option<ClassifierTx>,
516    /// Retrieval peer selection strategy (shared with MeshStoreCore/CLI/sim).
517    pub request_selection_strategy: SelectionStrategy,
518    /// Whether fairness constraints are enabled for retrieval selection.
519    pub request_fairness_enabled: bool,
520    /// Hedged request dispatch policy for retrieval.
521    pub request_dispatch: RequestDispatchConfig,
522    /// Pubsub delivery strategy used by the production mesh store.
523    pub pubsub_delivery_mode: PubsubDeliveryMode,
524    /// Upstream Blossom servers used as adaptive fallback read sources.
525    pub upstream_blossom_servers: Vec<String>,
526}
527
528impl Default for MeshStoreConfig {
529    fn default() -> Self {
530        Self {
531            relays: vec![
532                "wss://temp.iris.to".to_string(),
533                "wss://relay.damus.io".to_string(),
534            ],
535            roots: Vec::new(),
536            request_timeout_ms: 10000,
537            hello_interval_ms: 30000,
538            debug: false,
539            pools: PoolSettings::default(),
540            classifier_tx: None,
541            request_selection_strategy: SelectionStrategy::Weighted,
542            request_fairness_enabled: true,
543            request_dispatch: RequestDispatchConfig {
544                initial_fanout: 2,
545                hedge_fanout: 1,
546                max_fanout: 8,
547                hedge_interval_ms: 120,
548            },
549            pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
550            upstream_blossom_servers: Vec::new(),
551        }
552    }
553}
554
555/// Connection state for a peer
556#[derive(Debug, Clone, Copy, PartialEq, Eq)]
557pub enum PeerState {
558    /// Initial state
559    New,
560    /// Connecting via signaling.
561    Connecting,
562    /// Negotiated direct link established.
563    Connected,
564    /// Direct peer link open and ready for data.
565    Ready,
566    /// Connection failed or closed
567    Disconnected,
568}
569
570/// Statistics for the default mesh-backed store composition.
571#[derive(Debug, Clone, Default)]
572pub struct MeshStats {
573    pub connected_peers: usize,
574    pub pending_requests: usize,
575    pub bytes_sent: u64,
576    pub bytes_received: u64,
577    pub requests_made: u64,
578    pub requests_fulfilled: u64,
579}
580
581/// Nostr event kind for hashtree signaling envelopes (ephemeral, NIP-17 style).
582pub const NOSTR_KIND_HASHTREE: u16 = 25050;
583pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
584
585/// Relayless mesh protocol constants for forwarding signaling events over data channels.
586pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
587pub const MESH_PROTOCOL_VERSION: u8 = 1;
588pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
589pub const MESH_MAX_HTL: u8 = 6;
590
591/// TTL/capacity dedupe structure for forwarded mesh frames/events.
592#[derive(Debug)]
593pub struct TimedSeenSet {
594    entries: HashMap<String, Instant>,
595    order: VecDeque<(String, Instant)>,
596    ttl: Duration,
597    capacity: usize,
598}
599
600impl TimedSeenSet {
601    pub fn new(capacity: usize, ttl: Duration) -> Self {
602        Self {
603            entries: HashMap::new(),
604            order: VecDeque::new(),
605            ttl,
606            capacity,
607        }
608    }
609
610    fn prune(&mut self, now: Instant) {
611        while let Some((key, inserted_at)) = self.order.front().cloned() {
612            if now.duration_since(inserted_at) < self.ttl {
613                break;
614            }
615            self.order.pop_front();
616            if self
617                .entries
618                .get(&key)
619                .map(|ts| *ts == inserted_at)
620                .unwrap_or(false)
621            {
622                self.entries.remove(&key);
623            }
624        }
625
626        while self.entries.len() > self.capacity {
627            if let Some((key, inserted_at)) = self.order.pop_front() {
628                if self
629                    .entries
630                    .get(&key)
631                    .map(|ts| *ts == inserted_at)
632                    .unwrap_or(false)
633                {
634                    self.entries.remove(&key);
635                }
636            } else {
637                break;
638            }
639        }
640    }
641
642    pub fn insert_if_new(&mut self, key: String) -> bool {
643        let now = Instant::now();
644        self.prune(now);
645        if self.entries.contains_key(&key) {
646            return false;
647        }
648        self.entries.insert(key.clone(), now);
649        self.order.push_back((key, now));
650        self.prune(now);
651        true
652    }
653
654    pub fn contains(&mut self, key: &str) -> bool {
655        let now = Instant::now();
656        self.prune(now);
657        self.entries.contains_key(key)
658    }
659
660    pub fn len(&self) -> usize {
661        self.entries.len()
662    }
663
664    pub fn is_empty(&self) -> bool {
665        self.entries.is_empty()
666    }
667}
668
669#[derive(Debug, Clone, Serialize, Deserialize)]
670#[serde(tag = "type")]
671pub enum MeshNostrPayload {
672    #[serde(rename = "EVENT")]
673    Event { event: Event },
674}
675
676#[derive(Debug, Clone, Serialize, Deserialize)]
677pub struct MeshNostrFrame {
678    pub protocol: String,
679    pub version: u8,
680    pub frame_id: String,
681    pub htl: u8,
682    pub sender_peer_id: String,
683    pub payload: MeshNostrPayload,
684}
685
686impl MeshNostrFrame {
687    pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
688        Self::new_event_with_id(
689            event,
690            sender_peer_id,
691            &uuid::Uuid::new_v4().to_string(),
692            htl,
693        )
694    }
695
696    pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
697        Self {
698            protocol: MESH_PROTOCOL.to_string(),
699            version: MESH_PROTOCOL_VERSION,
700            frame_id: frame_id.to_string(),
701            htl,
702            sender_peer_id: sender_peer_id.to_string(),
703            payload: MeshNostrPayload::Event { event },
704        }
705    }
706
707    pub fn event(&self) -> &Event {
708        match &self.payload {
709            MeshNostrPayload::Event { event } => event,
710        }
711    }
712}
713
714pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
715    if frame.protocol != MESH_PROTOCOL {
716        return Err("invalid protocol");
717    }
718    if frame.version != MESH_PROTOCOL_VERSION {
719        return Err("invalid version");
720    }
721    if frame.frame_id.is_empty() {
722        return Err("missing frame id");
723    }
724    if frame.sender_peer_id.is_empty() {
725        return Err("missing sender peer id");
726    }
727    if frame.sender_peer_id.contains(':') {
728        return Err("invalid sender peer id");
729    }
730    if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
731        return Err("invalid htl");
732    }
733
734    let event = frame.event();
735    if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
736        && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
737    {
738        return Err("unsupported event kind");
739    }
740
741    Ok(())
742}