1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::mesh_store_core::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct PeerId {
19 pub pubkey: String,
21}
22
23impl PeerId {
24 pub fn new(pubkey: String) -> Self {
25 Self { pubkey }
26 }
27
28 pub fn to_peer_string(&self) -> String {
29 self.pubkey.clone()
30 }
31
32 pub fn from_peer_string(s: &str) -> Option<Self> {
33 let pubkey = s.trim();
34 if pubkey.is_empty() || pubkey.contains(':') {
35 return None;
36 }
37 Some(Self {
38 pubkey: pubkey.to_string(),
39 })
40 }
41
42 pub fn from_string(s: &str) -> Option<Self> {
43 Self::from_peer_string(s)
44 }
45
46 pub fn short(&self) -> String {
47 self.pubkey[..8.min(self.pubkey.len())].to_string()
48 }
49}
50
51impl std::fmt::Display for PeerId {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.write_str(&self.pubkey)
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "type")]
60pub enum SignalingMessage {
61 #[serde(rename = "hello")]
63 Hello {
64 #[serde(rename = "peerId")]
65 peer_id: String,
66 roots: Vec<String>,
67 },
68
69 #[serde(rename = "offer")]
71 Offer {
72 #[serde(rename = "peerId")]
73 peer_id: String,
74 #[serde(rename = "targetPeerId")]
75 target_peer_id: String,
76 sdp: String,
77 },
78
79 #[serde(rename = "answer")]
81 Answer {
82 #[serde(rename = "peerId")]
83 peer_id: String,
84 #[serde(rename = "targetPeerId")]
85 target_peer_id: String,
86 sdp: String,
87 },
88
89 #[serde(rename = "candidate")]
91 Candidate {
92 #[serde(rename = "peerId")]
93 peer_id: String,
94 #[serde(rename = "targetPeerId")]
95 target_peer_id: String,
96 candidate: String,
97 #[serde(rename = "sdpMLineIndex")]
98 sdp_m_line_index: Option<u16>,
99 #[serde(rename = "sdpMid")]
100 sdp_mid: Option<String>,
101 },
102
103 #[serde(rename = "candidates")]
105 Candidates {
106 #[serde(rename = "peerId")]
107 peer_id: String,
108 #[serde(rename = "targetPeerId")]
109 target_peer_id: String,
110 candidates: Vec<IceCandidate>,
111 },
112}
113
114impl SignalingMessage {
115 pub fn msg_type(&self) -> &str {
117 match self {
118 Self::Hello { .. } => "hello",
119 Self::Offer { .. } => "offer",
120 Self::Answer { .. } => "answer",
121 Self::Candidate { .. } => "candidate",
122 Self::Candidates { .. } => "candidates",
123 }
124 }
125
126 pub fn peer_id(&self) -> &str {
128 match self {
129 Self::Hello { peer_id, .. } => peer_id,
130 Self::Offer { peer_id, .. } => peer_id,
131 Self::Answer { peer_id, .. } => peer_id,
132 Self::Candidate { peer_id, .. } => peer_id,
133 Self::Candidates { peer_id, .. } => peer_id,
134 }
135 }
136
137 pub fn target_peer_id(&self) -> Option<&str> {
139 match self {
140 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
142 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
143 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
144 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
145 }
146 }
147
148 pub fn is_for(&self, my_peer_id: &str) -> bool {
150 match self.target_peer_id() {
151 Some(target) => target == my_peer_id,
152 None => true, }
154 }
155}
156
157#[inline]
168pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
169 local_peer_id < remote_peer_id
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct IceCandidate {
176 pub candidate: String,
177 #[serde(rename = "sdpMLineIndex")]
178 pub sdp_m_line_index: Option<u16>,
179 #[serde(rename = "sdpMid")]
180 pub sdp_mid: Option<String>,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(tag = "type")]
186pub enum DataMessage {
187 #[serde(rename = "req")]
189 Request {
190 id: u32,
191 hash: String,
192 #[serde(skip_serializing_if = "Option::is_none")]
195 htl: Option<u8>,
196 },
197
198 #[serde(rename = "res")]
200 Response {
201 id: u32,
202 hash: String,
203 found: bool,
204 #[serde(skip_serializing_if = "Option::is_none")]
205 size: Option<u64>,
206 },
207
208 #[serde(rename = "push")]
211 Push { hash: String },
212
213 #[serde(rename = "have")]
215 Have { hashes: Vec<String> },
216
217 #[serde(rename = "want")]
219 Want { hashes: Vec<String> },
220
221 #[serde(rename = "root")]
223 RootUpdate {
224 hash: String,
225 #[serde(skip_serializing_if = "Option::is_none")]
226 size: Option<u64>,
227 },
228}
229
230pub const MAX_HTL: u8 = 10;
232pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
234pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub enum HtlMode {
240 Probabilistic,
241}
242
243#[derive(Debug, Clone, Copy)]
245pub struct HtlPolicy {
246 pub mode: HtlMode,
247 pub max_htl: u8,
248 pub p_at_max: f64,
249 pub p_at_min: f64,
250}
251
252pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
254 mode: HtlMode::Probabilistic,
255 max_htl: MAX_HTL,
256 p_at_max: DECREMENT_AT_MAX_PROB,
257 p_at_min: DECREMENT_AT_MIN_PROB,
258};
259
260pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
262 mode: HtlMode::Probabilistic,
263 max_htl: 4,
264 p_at_max: 0.75,
265 p_at_min: 0.5,
266};
267
268#[derive(Debug, Clone, Copy)]
271pub struct PeerHTLConfig {
272 pub at_max_sample: f64,
274 pub at_min_sample: f64,
276}
277
278impl PeerHTLConfig {
279 pub fn random() -> Self {
281 use rand::Rng;
282 let mut rng = rand::thread_rng();
283 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
284 }
285
286 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
288 Self {
289 at_max_sample: at_max_sample.clamp(0.0, 1.0),
290 at_min_sample: at_min_sample.clamp(0.0, 1.0),
291 }
292 }
293
294 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
296 Self {
297 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
298 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
299 }
300 }
301
302 pub fn decrement(&self, htl: u8) -> u8 {
305 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
306 }
307
308 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
310 decrement_htl_with_policy(htl, policy, self)
311 }
312}
313
314impl Default for PeerHTLConfig {
315 fn default() -> Self {
316 Self::random()
317 }
318}
319
320pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
322 let htl = htl.min(policy.max_htl);
323 if htl == 0 {
324 return 0;
325 }
326
327 match policy.mode {
328 HtlMode::Probabilistic => {
329 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
330 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
331
332 if htl == policy.max_htl {
333 return if config.at_max_sample < p_at_max {
334 htl.saturating_sub(1)
335 } else {
336 htl
337 };
338 }
339
340 if htl == 1 {
341 return if config.at_min_sample < p_at_min {
342 0
343 } else {
344 htl
345 };
346 }
347
348 htl.saturating_sub(1)
349 }
350 }
351}
352
353pub fn should_forward_htl(htl: u8) -> bool {
355 htl > 0
356}
357
358pub fn should_forward(htl: u8) -> bool {
360 should_forward_htl(htl)
361}
362
363use tokio::sync::{mpsc, oneshot};
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
367pub enum PeerPool {
368 Follows,
370 Other,
372}
373
374#[derive(Debug, Clone, Copy)]
376pub struct PoolConfig {
377 pub max_connections: usize,
379 pub satisfied_connections: usize,
381}
382
383impl PoolConfig {
384 #[inline]
386 pub fn can_accept(&self, current_count: usize) -> bool {
387 current_count < self.max_connections
388 }
389
390 #[inline]
392 pub fn needs_peers(&self, current_count: usize) -> bool {
393 current_count < self.satisfied_connections
394 }
395
396 #[inline]
398 pub fn is_satisfied(&self, current_count: usize) -> bool {
399 current_count >= self.satisfied_connections
400 }
401}
402
403impl Default for PoolConfig {
404 fn default() -> Self {
405 Self {
406 max_connections: 16,
407 satisfied_connections: 8,
408 }
409 }
410}
411
412#[derive(Debug, Clone)]
414pub struct PoolSettings {
415 pub follows: PoolConfig,
416 pub other: PoolConfig,
417}
418
419impl Default for PoolSettings {
420 fn default() -> Self {
421 Self {
422 follows: PoolConfig {
423 max_connections: 16,
424 satisfied_connections: 8,
425 },
426 other: PoolConfig {
427 max_connections: 16,
428 satisfied_connections: 8,
429 },
430 }
431 }
432}
433
434pub struct ClassifyRequest {
436 pub pubkey: String,
438 pub response: oneshot::Sender<PeerPool>,
440}
441
442pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
444
445pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
447
448pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
450 mpsc::channel(buffer)
451}
452
453#[derive(Clone)]
455pub struct MeshStoreConfig {
456 pub relays: Vec<String>,
458 pub roots: Vec<Hash>,
460 pub request_timeout_ms: u64,
462 pub hello_interval_ms: u64,
464 pub debug: bool,
466 pub pools: PoolSettings,
468 pub classifier_tx: Option<ClassifierTx>,
471 pub request_selection_strategy: SelectionStrategy,
473 pub request_fairness_enabled: bool,
475 pub request_dispatch: RequestDispatchConfig,
477 pub upstream_blossom_servers: Vec<String>,
479}
480
481impl Default for MeshStoreConfig {
482 fn default() -> Self {
483 Self {
484 relays: vec![
485 "wss://temp.iris.to".to_string(),
486 "wss://relay.damus.io".to_string(),
487 ],
488 roots: Vec::new(),
489 request_timeout_ms: 10000,
490 hello_interval_ms: 30000,
491 debug: false,
492 pools: PoolSettings::default(),
493 classifier_tx: None,
494 request_selection_strategy: SelectionStrategy::TitForTat,
495 request_fairness_enabled: true,
496 request_dispatch: RequestDispatchConfig {
497 initial_fanout: 2,
498 hedge_fanout: 1,
499 max_fanout: 8,
500 hedge_interval_ms: 120,
501 },
502 upstream_blossom_servers: Vec::new(),
503 }
504 }
505}
506
507#[derive(Debug, Clone, Copy, PartialEq, Eq)]
509pub enum PeerState {
510 New,
512 Connecting,
514 Connected,
516 Ready,
518 Disconnected,
520}
521
522#[derive(Debug, Clone, Default)]
524pub struct MeshStats {
525 pub connected_peers: usize,
526 pub pending_requests: usize,
527 pub bytes_sent: u64,
528 pub bytes_received: u64,
529 pub requests_made: u64,
530 pub requests_fulfilled: u64,
531}
532
533pub type WebRTCStats = MeshStats;
535
536pub const NOSTR_KIND_HASHTREE: u16 = 25050;
538pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
539
540pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
542pub const MESH_PROTOCOL_VERSION: u8 = 1;
543pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
544pub const MESH_MAX_HTL: u8 = 6;
545
546#[derive(Debug)]
548pub struct TimedSeenSet {
549 entries: HashMap<String, Instant>,
550 order: VecDeque<(String, Instant)>,
551 ttl: Duration,
552 capacity: usize,
553}
554
555impl TimedSeenSet {
556 pub fn new(capacity: usize, ttl: Duration) -> Self {
557 Self {
558 entries: HashMap::new(),
559 order: VecDeque::new(),
560 ttl,
561 capacity,
562 }
563 }
564
565 fn prune(&mut self, now: Instant) {
566 while let Some((key, inserted_at)) = self.order.front().cloned() {
567 if now.duration_since(inserted_at) < self.ttl {
568 break;
569 }
570 self.order.pop_front();
571 if self
572 .entries
573 .get(&key)
574 .map(|ts| *ts == inserted_at)
575 .unwrap_or(false)
576 {
577 self.entries.remove(&key);
578 }
579 }
580
581 while self.entries.len() > self.capacity {
582 if let Some((key, inserted_at)) = self.order.pop_front() {
583 if self
584 .entries
585 .get(&key)
586 .map(|ts| *ts == inserted_at)
587 .unwrap_or(false)
588 {
589 self.entries.remove(&key);
590 }
591 } else {
592 break;
593 }
594 }
595 }
596
597 pub fn insert_if_new(&mut self, key: String) -> bool {
598 let now = Instant::now();
599 self.prune(now);
600 if self.entries.contains_key(&key) {
601 return false;
602 }
603 self.entries.insert(key.clone(), now);
604 self.order.push_back((key, now));
605 self.prune(now);
606 true
607 }
608
609 pub fn len(&self) -> usize {
610 self.entries.len()
611 }
612
613 pub fn is_empty(&self) -> bool {
614 self.entries.is_empty()
615 }
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize)]
619#[serde(tag = "type")]
620pub enum MeshNostrPayload {
621 #[serde(rename = "EVENT")]
622 Event { event: Event },
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct MeshNostrFrame {
627 pub protocol: String,
628 pub version: u8,
629 pub frame_id: String,
630 pub htl: u8,
631 pub sender_peer_id: String,
632 pub payload: MeshNostrPayload,
633}
634
635impl MeshNostrFrame {
636 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
637 Self::new_event_with_id(
638 event,
639 sender_peer_id,
640 &uuid::Uuid::new_v4().to_string(),
641 htl,
642 )
643 }
644
645 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
646 Self {
647 protocol: MESH_PROTOCOL.to_string(),
648 version: MESH_PROTOCOL_VERSION,
649 frame_id: frame_id.to_string(),
650 htl,
651 sender_peer_id: sender_peer_id.to_string(),
652 payload: MeshNostrPayload::Event { event },
653 }
654 }
655
656 pub fn event(&self) -> &Event {
657 match &self.payload {
658 MeshNostrPayload::Event { event } => event,
659 }
660 }
661}
662
663pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
664 if frame.protocol != MESH_PROTOCOL {
665 return Err("invalid protocol");
666 }
667 if frame.version != MESH_PROTOCOL_VERSION {
668 return Err("invalid version");
669 }
670 if frame.frame_id.is_empty() {
671 return Err("missing frame id");
672 }
673 if frame.sender_peer_id.is_empty() {
674 return Err("missing sender peer id");
675 }
676 if frame.sender_peer_id.contains(':') {
677 return Err("invalid sender peer id");
678 }
679 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
680 return Err("invalid htl");
681 }
682
683 let event = frame.event();
684 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
685 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
686 {
687 return Err("unsupported event kind");
688 }
689
690 Ok(())
691}
692
693pub const DATA_CHANNEL_LABEL: &str = "hashtree";