1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::Duration;
12use tokio::time::Instant;
13
14use crate::mesh_store_core::RequestDispatchConfig;
15use crate::peer_selector::SelectionStrategy;
16
17fn default_hash_get_enabled() -> bool {
18 true
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct PeerId {
24 pub pubkey: String,
26}
27
28impl PeerId {
29 pub fn new(pubkey: String) -> Self {
30 Self { pubkey }
31 }
32
33 pub fn to_peer_string(&self) -> String {
34 self.pubkey.clone()
35 }
36
37 pub fn from_peer_string(s: &str) -> Option<Self> {
38 let pubkey = s.trim();
39 if pubkey.is_empty() || pubkey.contains(':') {
40 return None;
41 }
42 Some(Self {
43 pubkey: pubkey.to_string(),
44 })
45 }
46
47 pub fn from_string(s: &str) -> Option<Self> {
48 Self::from_peer_string(s)
49 }
50
51 pub fn short(&self) -> String {
52 self.pubkey[..8.min(self.pubkey.len())].to_string()
53 }
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct KnownPeerRecord {
59 pub peer_id: String,
60 #[serde(
61 default,
62 alias = "direct_urls",
63 alias = "addresses",
64 skip_serializing_if = "Vec::is_empty"
65 )]
66 pub signal_urls: Vec<String>,
67 #[serde(default)]
68 pub last_seen_unix_ms: u64,
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub last_source: Option<String>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75pub struct KnownPeerSnapshot {
76 pub version: u32,
77 #[serde(default)]
78 pub peers: Vec<KnownPeerRecord>,
79}
80
81impl Default for KnownPeerSnapshot {
82 fn default() -> Self {
83 Self {
84 version: 1,
85 peers: Vec::new(),
86 }
87 }
88}
89
90impl std::fmt::Display for PeerId {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.write_str(&self.pubkey)
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type")]
99pub enum SignalingMessage {
100 #[serde(rename = "hello")]
102 Hello {
103 #[serde(rename = "peerId")]
104 peer_id: String,
105 roots: Vec<String>,
106 #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
107 hash_get: bool,
108 },
109
110 #[serde(rename = "offer")]
112 Offer {
113 #[serde(rename = "peerId")]
114 peer_id: String,
115 #[serde(rename = "targetPeerId")]
116 target_peer_id: String,
117 sdp: String,
118 },
119
120 #[serde(rename = "answer")]
122 Answer {
123 #[serde(rename = "peerId")]
124 peer_id: String,
125 #[serde(rename = "targetPeerId")]
126 target_peer_id: String,
127 sdp: String,
128 },
129
130 #[serde(rename = "candidate")]
132 Candidate {
133 #[serde(rename = "peerId")]
134 peer_id: String,
135 #[serde(rename = "targetPeerId")]
136 target_peer_id: String,
137 candidate: String,
138 #[serde(rename = "sdpMLineIndex")]
139 sdp_m_line_index: Option<u16>,
140 #[serde(rename = "sdpMid")]
141 sdp_mid: Option<String>,
142 },
143
144 #[serde(rename = "candidates")]
146 Candidates {
147 #[serde(rename = "peerId")]
148 peer_id: String,
149 #[serde(rename = "targetPeerId")]
150 target_peer_id: String,
151 candidates: Vec<IceCandidate>,
152 },
153}
154
155impl SignalingMessage {
156 pub fn msg_type(&self) -> &str {
158 match self {
159 Self::Hello { .. } => "hello",
160 Self::Offer { .. } => "offer",
161 Self::Answer { .. } => "answer",
162 Self::Candidate { .. } => "candidate",
163 Self::Candidates { .. } => "candidates",
164 }
165 }
166
167 pub fn peer_id(&self) -> &str {
169 match self {
170 Self::Hello { peer_id, .. } => peer_id,
171 Self::Offer { peer_id, .. } => peer_id,
172 Self::Answer { peer_id, .. } => peer_id,
173 Self::Candidate { peer_id, .. } => peer_id,
174 Self::Candidates { peer_id, .. } => peer_id,
175 }
176 }
177
178 pub fn target_peer_id(&self) -> Option<&str> {
180 match self {
181 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
183 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
184 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
185 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
186 }
187 }
188
189 pub fn is_for(&self, my_peer_id: &str) -> bool {
191 match self.target_peer_id() {
192 Some(target) => target == my_peer_id,
193 None => true, }
195 }
196}
197
198#[inline]
209pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
210 local_peer_id < remote_peer_id
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct IceCandidate {
217 pub candidate: String,
218 #[serde(rename = "sdpMLineIndex")]
219 pub sdp_m_line_index: Option<u16>,
220 #[serde(rename = "sdpMid")]
221 pub sdp_mid: Option<String>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(tag = "type")]
227pub enum DataMessage {
228 #[serde(rename = "req")]
230 Request {
231 id: u32,
232 hash: String,
233 #[serde(skip_serializing_if = "Option::is_none")]
236 htl: Option<u8>,
237 },
238
239 #[serde(rename = "res")]
241 Response {
242 id: u32,
243 hash: String,
244 found: bool,
245 #[serde(skip_serializing_if = "Option::is_none")]
246 size: Option<u64>,
247 },
248
249 #[serde(rename = "push")]
252 Push { hash: String },
253
254 #[serde(rename = "have")]
256 Have { hashes: Vec<String> },
257
258 #[serde(rename = "want")]
260 Want { hashes: Vec<String> },
261
262 #[serde(rename = "root")]
264 RootUpdate {
265 hash: String,
266 #[serde(skip_serializing_if = "Option::is_none")]
267 size: Option<u64>,
268 },
269}
270
271pub const MAX_HTL: u8 = 10;
273pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
275pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280pub enum HtlMode {
281 Probabilistic,
282}
283
284#[derive(Debug, Clone, Copy)]
286pub struct HtlPolicy {
287 pub mode: HtlMode,
288 pub max_htl: u8,
289 pub p_at_max: f64,
290 pub p_at_min: f64,
291}
292
293pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
295 mode: HtlMode::Probabilistic,
296 max_htl: MAX_HTL,
297 p_at_max: DECREMENT_AT_MAX_PROB,
298 p_at_min: DECREMENT_AT_MIN_PROB,
299};
300
301pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
303 mode: HtlMode::Probabilistic,
304 max_htl: 4,
305 p_at_max: 0.75,
306 p_at_min: 0.5,
307};
308
309#[derive(Debug, Clone, Copy)]
312pub struct PeerHTLConfig {
313 pub at_max_sample: f64,
315 pub at_min_sample: f64,
317}
318
319impl PeerHTLConfig {
320 pub fn random() -> Self {
322 use rand::Rng;
323 let mut rng = rand::thread_rng();
324 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
325 }
326
327 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
329 Self {
330 at_max_sample: at_max_sample.clamp(0.0, 1.0),
331 at_min_sample: at_min_sample.clamp(0.0, 1.0),
332 }
333 }
334
335 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
337 Self {
338 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
339 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
340 }
341 }
342
343 pub fn decrement(&self, htl: u8) -> u8 {
346 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
347 }
348
349 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
351 decrement_htl_with_policy(htl, policy, self)
352 }
353}
354
355impl Default for PeerHTLConfig {
356 fn default() -> Self {
357 Self::random()
358 }
359}
360
361pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
363 let htl = htl.min(policy.max_htl);
364 if htl == 0 {
365 return 0;
366 }
367
368 match policy.mode {
369 HtlMode::Probabilistic => {
370 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
371 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
372
373 if htl == policy.max_htl {
374 return if config.at_max_sample < p_at_max {
375 htl.saturating_sub(1)
376 } else {
377 htl
378 };
379 }
380
381 if htl == 1 {
382 return if config.at_min_sample < p_at_min {
383 0
384 } else {
385 htl
386 };
387 }
388
389 htl.saturating_sub(1)
390 }
391 }
392}
393
394pub fn should_forward_htl(htl: u8) -> bool {
396 htl > 0
397}
398
399pub fn should_forward(htl: u8) -> bool {
401 should_forward_htl(htl)
402}
403
404use tokio::sync::{mpsc, oneshot};
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
408pub enum PeerPool {
409 Follows,
411 Other,
413}
414
415#[derive(Debug, Clone, Copy)]
417pub struct PoolConfig {
418 pub max_connections: usize,
420 pub satisfied_connections: usize,
422}
423
424impl PoolConfig {
425 #[inline]
427 pub fn can_accept(&self, current_count: usize) -> bool {
428 current_count < self.max_connections
429 }
430
431 #[inline]
433 pub fn needs_peers(&self, current_count: usize) -> bool {
434 current_count < self.satisfied_connections
435 }
436
437 #[inline]
439 pub fn is_satisfied(&self, current_count: usize) -> bool {
440 current_count >= self.satisfied_connections
441 }
442}
443
444impl Default for PoolConfig {
445 fn default() -> Self {
446 Self {
447 max_connections: 16,
448 satisfied_connections: 8,
449 }
450 }
451}
452
453#[derive(Debug, Clone)]
455pub struct PoolSettings {
456 pub follows: PoolConfig,
457 pub other: PoolConfig,
458}
459
460impl Default for PoolSettings {
461 fn default() -> Self {
462 Self {
463 follows: PoolConfig {
464 max_connections: 16,
465 satisfied_connections: 8,
466 },
467 other: PoolConfig {
468 max_connections: 16,
469 satisfied_connections: 8,
470 },
471 }
472 }
473}
474
475pub struct ClassifyRequest {
477 pub pubkey: String,
479 pub response: oneshot::Sender<PeerPool>,
481}
482
483pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
485
486pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
488
489pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
491 mpsc::channel(buffer)
492}
493
494#[derive(Clone)]
496pub struct MeshStoreConfig {
497 pub relays: Vec<String>,
499 pub roots: Vec<Hash>,
501 pub request_timeout_ms: u64,
503 pub hello_interval_ms: u64,
505 pub debug: bool,
507 pub pools: PoolSettings,
509 pub classifier_tx: Option<ClassifierTx>,
512 pub request_selection_strategy: SelectionStrategy,
514 pub request_fairness_enabled: bool,
516 pub request_dispatch: RequestDispatchConfig,
518 pub upstream_blossom_servers: Vec<String>,
520}
521
522impl Default for MeshStoreConfig {
523 fn default() -> Self {
524 Self {
525 relays: vec![
526 "wss://temp.iris.to".to_string(),
527 "wss://relay.damus.io".to_string(),
528 ],
529 roots: Vec::new(),
530 request_timeout_ms: 10000,
531 hello_interval_ms: 30000,
532 debug: false,
533 pools: PoolSettings::default(),
534 classifier_tx: None,
535 request_selection_strategy: SelectionStrategy::Weighted,
536 request_fairness_enabled: true,
537 request_dispatch: RequestDispatchConfig {
538 initial_fanout: 2,
539 hedge_fanout: 1,
540 max_fanout: 8,
541 hedge_interval_ms: 120,
542 },
543 upstream_blossom_servers: Vec::new(),
544 }
545 }
546}
547
548#[derive(Debug, Clone, Copy, PartialEq, Eq)]
550pub enum PeerState {
551 New,
553 Connecting,
555 Connected,
557 Ready,
559 Disconnected,
561}
562
563#[derive(Debug, Clone, Default)]
565pub struct MeshStats {
566 pub connected_peers: usize,
567 pub pending_requests: usize,
568 pub bytes_sent: u64,
569 pub bytes_received: u64,
570 pub requests_made: u64,
571 pub requests_fulfilled: u64,
572}
573
574pub type WebRTCStats = MeshStats;
576
577pub const NOSTR_KIND_HASHTREE: u16 = 25050;
579pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
580
581pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
583pub const MESH_PROTOCOL_VERSION: u8 = 1;
584pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
585pub const MESH_MAX_HTL: u8 = 6;
586
587#[derive(Debug)]
589pub struct TimedSeenSet {
590 entries: HashMap<String, Instant>,
591 order: VecDeque<(String, Instant)>,
592 ttl: Duration,
593 capacity: usize,
594}
595
596impl TimedSeenSet {
597 pub fn new(capacity: usize, ttl: Duration) -> Self {
598 Self {
599 entries: HashMap::new(),
600 order: VecDeque::new(),
601 ttl,
602 capacity,
603 }
604 }
605
606 fn prune(&mut self, now: Instant) {
607 while let Some((key, inserted_at)) = self.order.front().cloned() {
608 if now.duration_since(inserted_at) < self.ttl {
609 break;
610 }
611 self.order.pop_front();
612 if self
613 .entries
614 .get(&key)
615 .map(|ts| *ts == inserted_at)
616 .unwrap_or(false)
617 {
618 self.entries.remove(&key);
619 }
620 }
621
622 while self.entries.len() > self.capacity {
623 if let Some((key, inserted_at)) = self.order.pop_front() {
624 if self
625 .entries
626 .get(&key)
627 .map(|ts| *ts == inserted_at)
628 .unwrap_or(false)
629 {
630 self.entries.remove(&key);
631 }
632 } else {
633 break;
634 }
635 }
636 }
637
638 pub fn insert_if_new(&mut self, key: String) -> bool {
639 let now = Instant::now();
640 self.prune(now);
641 if self.entries.contains_key(&key) {
642 return false;
643 }
644 self.entries.insert(key.clone(), now);
645 self.order.push_back((key, now));
646 self.prune(now);
647 true
648 }
649
650 pub fn contains(&mut self, key: &str) -> bool {
651 let now = Instant::now();
652 self.prune(now);
653 self.entries.contains_key(key)
654 }
655
656 pub fn len(&self) -> usize {
657 self.entries.len()
658 }
659
660 pub fn is_empty(&self) -> bool {
661 self.entries.is_empty()
662 }
663}
664
665#[derive(Debug, Clone, Serialize, Deserialize)]
666#[serde(tag = "type")]
667pub enum MeshNostrPayload {
668 #[serde(rename = "EVENT")]
669 Event { event: Event },
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize)]
673pub struct MeshNostrFrame {
674 pub protocol: String,
675 pub version: u8,
676 pub frame_id: String,
677 pub htl: u8,
678 pub sender_peer_id: String,
679 pub payload: MeshNostrPayload,
680}
681
682impl MeshNostrFrame {
683 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
684 Self::new_event_with_id(
685 event,
686 sender_peer_id,
687 &uuid::Uuid::new_v4().to_string(),
688 htl,
689 )
690 }
691
692 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
693 Self {
694 protocol: MESH_PROTOCOL.to_string(),
695 version: MESH_PROTOCOL_VERSION,
696 frame_id: frame_id.to_string(),
697 htl,
698 sender_peer_id: sender_peer_id.to_string(),
699 payload: MeshNostrPayload::Event { event },
700 }
701 }
702
703 pub fn event(&self) -> &Event {
704 match &self.payload {
705 MeshNostrPayload::Event { event } => event,
706 }
707 }
708}
709
710pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
711 if frame.protocol != MESH_PROTOCOL {
712 return Err("invalid protocol");
713 }
714 if frame.version != MESH_PROTOCOL_VERSION {
715 return Err("invalid version");
716 }
717 if frame.frame_id.is_empty() {
718 return Err("missing frame id");
719 }
720 if frame.sender_peer_id.is_empty() {
721 return Err("missing sender peer id");
722 }
723 if frame.sender_peer_id.contains(':') {
724 return Err("invalid sender peer id");
725 }
726 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
727 return Err("invalid htl");
728 }
729
730 let event = frame.event();
731 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
732 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
733 {
734 return Err("unsupported event kind");
735 }
736
737 Ok(())
738}
739
740pub const DATA_CHANNEL_LABEL: &str = "hashtree";