1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::mesh_store_core::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct PeerId {
19 pub pubkey: String,
21}
22
23impl PeerId {
24 pub fn new(pubkey: String) -> Self {
25 Self { pubkey }
26 }
27
28 pub fn to_peer_string(&self) -> String {
29 self.pubkey.clone()
30 }
31
32 pub fn from_peer_string(s: &str) -> Option<Self> {
33 let pubkey = s.trim();
34 if pubkey.is_empty() || pubkey.contains(':') {
35 return None;
36 }
37 Some(Self {
38 pubkey: pubkey.to_string(),
39 })
40 }
41
42 pub fn from_string(s: &str) -> Option<Self> {
43 Self::from_peer_string(s)
44 }
45
46 pub fn short(&self) -> String {
47 self.pubkey[..8.min(self.pubkey.len())].to_string()
48 }
49}
50
51impl std::fmt::Display for PeerId {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.write_str(&self.pubkey)
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "type")]
60pub enum SignalingMessage {
61 #[serde(rename = "hello")]
63 Hello {
64 #[serde(rename = "peerId")]
65 peer_id: String,
66 roots: Vec<String>,
67 },
68
69 #[serde(rename = "offer")]
71 Offer {
72 #[serde(rename = "peerId")]
73 peer_id: String,
74 #[serde(rename = "targetPeerId")]
75 target_peer_id: String,
76 sdp: String,
77 },
78
79 #[serde(rename = "answer")]
81 Answer {
82 #[serde(rename = "peerId")]
83 peer_id: String,
84 #[serde(rename = "targetPeerId")]
85 target_peer_id: String,
86 sdp: String,
87 },
88
89 #[serde(rename = "candidate")]
91 Candidate {
92 #[serde(rename = "peerId")]
93 peer_id: String,
94 #[serde(rename = "targetPeerId")]
95 target_peer_id: String,
96 candidate: String,
97 #[serde(rename = "sdpMLineIndex")]
98 sdp_m_line_index: Option<u16>,
99 #[serde(rename = "sdpMid")]
100 sdp_mid: Option<String>,
101 },
102
103 #[serde(rename = "candidates")]
105 Candidates {
106 #[serde(rename = "peerId")]
107 peer_id: String,
108 #[serde(rename = "targetPeerId")]
109 target_peer_id: String,
110 candidates: Vec<IceCandidate>,
111 },
112}
113
114impl SignalingMessage {
115 pub fn msg_type(&self) -> &str {
117 match self {
118 Self::Hello { .. } => "hello",
119 Self::Offer { .. } => "offer",
120 Self::Answer { .. } => "answer",
121 Self::Candidate { .. } => "candidate",
122 Self::Candidates { .. } => "candidates",
123 }
124 }
125
126 pub fn peer_id(&self) -> &str {
128 match self {
129 Self::Hello { peer_id, .. } => peer_id,
130 Self::Offer { peer_id, .. } => peer_id,
131 Self::Answer { peer_id, .. } => peer_id,
132 Self::Candidate { peer_id, .. } => peer_id,
133 Self::Candidates { peer_id, .. } => peer_id,
134 }
135 }
136
137 pub fn target_peer_id(&self) -> Option<&str> {
139 match self {
140 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
142 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
143 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
144 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
145 }
146 }
147
148 pub fn is_for(&self, my_peer_id: &str) -> bool {
150 match self.target_peer_id() {
151 Some(target) => target == my_peer_id,
152 None => true, }
154 }
155}
156
157#[inline]
168pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
169 local_peer_id < remote_peer_id
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct IceCandidate {
176 pub candidate: String,
177 #[serde(rename = "sdpMLineIndex")]
178 pub sdp_m_line_index: Option<u16>,
179 #[serde(rename = "sdpMid")]
180 pub sdp_mid: Option<String>,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(tag = "type")]
186pub enum DataMessage {
187 #[serde(rename = "req")]
189 Request {
190 id: u32,
191 hash: String,
192 #[serde(skip_serializing_if = "Option::is_none")]
195 htl: Option<u8>,
196 },
197
198 #[serde(rename = "res")]
200 Response {
201 id: u32,
202 hash: String,
203 found: bool,
204 #[serde(skip_serializing_if = "Option::is_none")]
205 size: Option<u64>,
206 },
207
208 #[serde(rename = "push")]
211 Push { hash: String },
212
213 #[serde(rename = "have")]
215 Have { hashes: Vec<String> },
216
217 #[serde(rename = "want")]
219 Want { hashes: Vec<String> },
220
221 #[serde(rename = "root")]
223 RootUpdate {
224 hash: String,
225 #[serde(skip_serializing_if = "Option::is_none")]
226 size: Option<u64>,
227 },
228}
229
230pub const MAX_HTL: u8 = 10;
232pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
234pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub enum HtlMode {
240 Probabilistic,
241}
242
243#[derive(Debug, Clone, Copy)]
245pub struct HtlPolicy {
246 pub mode: HtlMode,
247 pub max_htl: u8,
248 pub p_at_max: f64,
249 pub p_at_min: f64,
250}
251
252pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
254 mode: HtlMode::Probabilistic,
255 max_htl: MAX_HTL,
256 p_at_max: DECREMENT_AT_MAX_PROB,
257 p_at_min: DECREMENT_AT_MIN_PROB,
258};
259
260pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
262 mode: HtlMode::Probabilistic,
263 max_htl: 4,
264 p_at_max: 0.75,
265 p_at_min: 0.5,
266};
267
268#[derive(Debug, Clone, Copy)]
271pub struct PeerHTLConfig {
272 pub at_max_sample: f64,
274 pub at_min_sample: f64,
276}
277
278impl PeerHTLConfig {
279 pub fn random() -> Self {
281 use rand::Rng;
282 let mut rng = rand::thread_rng();
283 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
284 }
285
286 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
288 Self {
289 at_max_sample: at_max_sample.clamp(0.0, 1.0),
290 at_min_sample: at_min_sample.clamp(0.0, 1.0),
291 }
292 }
293
294 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
296 Self {
297 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
298 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
299 }
300 }
301
302 pub fn decrement(&self, htl: u8) -> u8 {
305 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
306 }
307
308 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
310 decrement_htl_with_policy(htl, policy, self)
311 }
312}
313
314impl Default for PeerHTLConfig {
315 fn default() -> Self {
316 Self::random()
317 }
318}
319
320pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
322 let htl = htl.min(policy.max_htl);
323 if htl == 0 {
324 return 0;
325 }
326
327 match policy.mode {
328 HtlMode::Probabilistic => {
329 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
330 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
331
332 if htl == policy.max_htl {
333 return if config.at_max_sample < p_at_max {
334 htl.saturating_sub(1)
335 } else {
336 htl
337 };
338 }
339
340 if htl == 1 {
341 return if config.at_min_sample < p_at_min {
342 0
343 } else {
344 htl
345 };
346 }
347
348 htl.saturating_sub(1)
349 }
350 }
351}
352
353pub fn should_forward_htl(htl: u8) -> bool {
355 htl > 0
356}
357
358pub fn should_forward(htl: u8) -> bool {
360 should_forward_htl(htl)
361}
362
363use tokio::sync::{mpsc, oneshot};
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
367pub enum PeerPool {
368 Follows,
370 Other,
372}
373
374#[derive(Debug, Clone, Copy)]
376pub struct PoolConfig {
377 pub max_connections: usize,
379 pub satisfied_connections: usize,
381}
382
383impl PoolConfig {
384 #[inline]
386 pub fn can_accept(&self, current_count: usize) -> bool {
387 current_count < self.max_connections
388 }
389
390 #[inline]
392 pub fn needs_peers(&self, current_count: usize) -> bool {
393 current_count < self.satisfied_connections
394 }
395
396 #[inline]
398 pub fn is_satisfied(&self, current_count: usize) -> bool {
399 current_count >= self.satisfied_connections
400 }
401}
402
403impl Default for PoolConfig {
404 fn default() -> Self {
405 Self {
406 max_connections: 16,
407 satisfied_connections: 8,
408 }
409 }
410}
411
412#[derive(Debug, Clone)]
414pub struct PoolSettings {
415 pub follows: PoolConfig,
416 pub other: PoolConfig,
417}
418
419impl Default for PoolSettings {
420 fn default() -> Self {
421 Self {
422 follows: PoolConfig {
423 max_connections: 16,
424 satisfied_connections: 8,
425 },
426 other: PoolConfig {
427 max_connections: 16,
428 satisfied_connections: 8,
429 },
430 }
431 }
432}
433
434pub struct ClassifyRequest {
436 pub pubkey: String,
438 pub response: oneshot::Sender<PeerPool>,
440}
441
442pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
444
445pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
447
448pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
450 mpsc::channel(buffer)
451}
452
453#[derive(Clone)]
455pub struct MeshStoreConfig {
456 pub relays: Vec<String>,
458 pub roots: Vec<Hash>,
460 pub request_timeout_ms: u64,
462 pub hello_interval_ms: u64,
464 pub debug: bool,
466 pub pools: PoolSettings,
468 pub classifier_tx: Option<ClassifierTx>,
471 pub request_selection_strategy: SelectionStrategy,
473 pub request_fairness_enabled: bool,
475 pub request_dispatch: RequestDispatchConfig,
477}
478
479impl Default for MeshStoreConfig {
480 fn default() -> Self {
481 Self {
482 relays: vec![
483 "wss://temp.iris.to".to_string(),
484 "wss://relay.damus.io".to_string(),
485 ],
486 roots: Vec::new(),
487 request_timeout_ms: 10000,
488 hello_interval_ms: 30000,
489 debug: false,
490 pools: PoolSettings::default(),
491 classifier_tx: None,
492 request_selection_strategy: SelectionStrategy::TitForTat,
493 request_fairness_enabled: true,
494 request_dispatch: RequestDispatchConfig {
495 initial_fanout: 2,
496 hedge_fanout: 1,
497 max_fanout: 8,
498 hedge_interval_ms: 120,
499 },
500 }
501 }
502}
503
504#[derive(Debug, Clone, Copy, PartialEq, Eq)]
506pub enum PeerState {
507 New,
509 Connecting,
511 Connected,
513 Ready,
515 Disconnected,
517}
518
519#[derive(Debug, Clone, Default)]
521pub struct MeshStats {
522 pub connected_peers: usize,
523 pub pending_requests: usize,
524 pub bytes_sent: u64,
525 pub bytes_received: u64,
526 pub requests_made: u64,
527 pub requests_fulfilled: u64,
528}
529
530pub type WebRTCStats = MeshStats;
532
533pub const NOSTR_KIND_HASHTREE: u16 = 25050;
535pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
536
537pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
539pub const MESH_PROTOCOL_VERSION: u8 = 1;
540pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
541pub const MESH_MAX_HTL: u8 = 6;
542
543#[derive(Debug)]
545pub struct TimedSeenSet {
546 entries: HashMap<String, Instant>,
547 order: VecDeque<(String, Instant)>,
548 ttl: Duration,
549 capacity: usize,
550}
551
552impl TimedSeenSet {
553 pub fn new(capacity: usize, ttl: Duration) -> Self {
554 Self {
555 entries: HashMap::new(),
556 order: VecDeque::new(),
557 ttl,
558 capacity,
559 }
560 }
561
562 fn prune(&mut self, now: Instant) {
563 while let Some((key, inserted_at)) = self.order.front().cloned() {
564 if now.duration_since(inserted_at) < self.ttl {
565 break;
566 }
567 self.order.pop_front();
568 if self
569 .entries
570 .get(&key)
571 .map(|ts| *ts == inserted_at)
572 .unwrap_or(false)
573 {
574 self.entries.remove(&key);
575 }
576 }
577
578 while self.entries.len() > self.capacity {
579 if let Some((key, inserted_at)) = self.order.pop_front() {
580 if self
581 .entries
582 .get(&key)
583 .map(|ts| *ts == inserted_at)
584 .unwrap_or(false)
585 {
586 self.entries.remove(&key);
587 }
588 } else {
589 break;
590 }
591 }
592 }
593
594 pub fn insert_if_new(&mut self, key: String) -> bool {
595 let now = Instant::now();
596 self.prune(now);
597 if self.entries.contains_key(&key) {
598 return false;
599 }
600 self.entries.insert(key.clone(), now);
601 self.order.push_back((key, now));
602 self.prune(now);
603 true
604 }
605
606 pub fn len(&self) -> usize {
607 self.entries.len()
608 }
609
610 pub fn is_empty(&self) -> bool {
611 self.entries.is_empty()
612 }
613}
614
615#[derive(Debug, Clone, Serialize, Deserialize)]
616#[serde(tag = "type")]
617pub enum MeshNostrPayload {
618 #[serde(rename = "EVENT")]
619 Event { event: Event },
620}
621
622#[derive(Debug, Clone, Serialize, Deserialize)]
623pub struct MeshNostrFrame {
624 pub protocol: String,
625 pub version: u8,
626 pub frame_id: String,
627 pub htl: u8,
628 pub sender_peer_id: String,
629 pub payload: MeshNostrPayload,
630}
631
632impl MeshNostrFrame {
633 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
634 Self::new_event_with_id(
635 event,
636 sender_peer_id,
637 &uuid::Uuid::new_v4().to_string(),
638 htl,
639 )
640 }
641
642 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
643 Self {
644 protocol: MESH_PROTOCOL.to_string(),
645 version: MESH_PROTOCOL_VERSION,
646 frame_id: frame_id.to_string(),
647 htl,
648 sender_peer_id: sender_peer_id.to_string(),
649 payload: MeshNostrPayload::Event { event },
650 }
651 }
652
653 pub fn event(&self) -> &Event {
654 match &self.payload {
655 MeshNostrPayload::Event { event } => event,
656 }
657 }
658}
659
660pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
661 if frame.protocol != MESH_PROTOCOL {
662 return Err("invalid protocol");
663 }
664 if frame.version != MESH_PROTOCOL_VERSION {
665 return Err("invalid version");
666 }
667 if frame.frame_id.is_empty() {
668 return Err("missing frame id");
669 }
670 if frame.sender_peer_id.is_empty() {
671 return Err("missing sender peer id");
672 }
673 if frame.sender_peer_id.contains(':') {
674 return Err("invalid sender peer id");
675 }
676 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
677 return Err("invalid htl");
678 }
679
680 let event = frame.event();
681 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
682 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
683 {
684 return Err("unsupported event kind");
685 }
686
687 Ok(())
688}
689
690pub const DATA_CHANNEL_LABEL: &str = "hashtree";