1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::Duration;
12use tokio::time::Instant;
13
14use crate::mesh_store_core::{PubsubDeliveryMode, RequestDispatchConfig};
15use crate::peer_selector::SelectionStrategy;
16
17fn default_hash_get_enabled() -> bool {
18 true
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct PeerId {
24 pub pubkey: String,
26}
27
28impl PeerId {
29 pub fn new(pubkey: String) -> Self {
30 Self { pubkey }
31 }
32
33 pub fn to_peer_string(&self) -> String {
34 self.pubkey.clone()
35 }
36
37 pub fn from_peer_string(s: &str) -> Option<Self> {
38 let pubkey = s.trim();
39 if pubkey.is_empty() || pubkey.contains(':') {
40 return None;
41 }
42 Some(Self {
43 pubkey: pubkey.to_string(),
44 })
45 }
46
47 pub fn from_string(s: &str) -> Option<Self> {
48 Self::from_peer_string(s)
49 }
50
51 pub fn short(&self) -> String {
52 self.pubkey[..8.min(self.pubkey.len())].to_string()
53 }
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct KnownPeerRecord {
59 pub peer_id: String,
60 #[serde(
61 default,
62 alias = "direct_urls",
63 alias = "addresses",
64 skip_serializing_if = "Vec::is_empty"
65 )]
66 pub signal_urls: Vec<String>,
67 #[serde(default)]
68 pub last_seen_unix_ms: u64,
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub last_source: Option<String>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75pub struct KnownPeerSnapshot {
76 pub version: u32,
77 #[serde(default)]
78 pub peers: Vec<KnownPeerRecord>,
79}
80
81impl Default for KnownPeerSnapshot {
82 fn default() -> Self {
83 Self {
84 version: 1,
85 peers: Vec::new(),
86 }
87 }
88}
89
90impl std::fmt::Display for PeerId {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.write_str(&self.pubkey)
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type")]
99pub enum SignalingMessage {
100 #[serde(rename = "hello")]
102 Hello {
103 #[serde(rename = "peerId")]
104 peer_id: String,
105 roots: Vec<String>,
106 #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
107 hash_get: bool,
108 },
109
110 #[serde(rename = "offer")]
112 Offer {
113 #[serde(rename = "peerId")]
114 peer_id: String,
115 #[serde(rename = "targetPeerId")]
116 target_peer_id: String,
117 sdp: String,
118 },
119
120 #[serde(rename = "answer")]
122 Answer {
123 #[serde(rename = "peerId")]
124 peer_id: String,
125 #[serde(rename = "targetPeerId")]
126 target_peer_id: String,
127 sdp: String,
128 },
129
130 #[serde(rename = "candidate")]
132 Candidate {
133 #[serde(rename = "peerId")]
134 peer_id: String,
135 #[serde(rename = "targetPeerId")]
136 target_peer_id: String,
137 candidate: String,
138 #[serde(rename = "sdpMLineIndex")]
139 sdp_m_line_index: Option<u16>,
140 #[serde(rename = "sdpMid")]
141 sdp_mid: Option<String>,
142 },
143
144 #[serde(rename = "candidates")]
146 Candidates {
147 #[serde(rename = "peerId")]
148 peer_id: String,
149 #[serde(rename = "targetPeerId")]
150 target_peer_id: String,
151 candidates: Vec<IceCandidate>,
152 },
153}
154
155impl SignalingMessage {
156 pub fn msg_type(&self) -> &str {
158 match self {
159 Self::Hello { .. } => "hello",
160 Self::Offer { .. } => "offer",
161 Self::Answer { .. } => "answer",
162 Self::Candidate { .. } => "candidate",
163 Self::Candidates { .. } => "candidates",
164 }
165 }
166
167 pub fn peer_id(&self) -> &str {
169 match self {
170 Self::Hello { peer_id, .. } => peer_id,
171 Self::Offer { peer_id, .. } => peer_id,
172 Self::Answer { peer_id, .. } => peer_id,
173 Self::Candidate { peer_id, .. } => peer_id,
174 Self::Candidates { peer_id, .. } => peer_id,
175 }
176 }
177
178 pub fn target_peer_id(&self) -> Option<&str> {
180 match self {
181 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
183 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
184 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
185 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
186 }
187 }
188
189 pub fn is_for(&self, my_peer_id: &str) -> bool {
191 match self.target_peer_id() {
192 Some(target) => target == my_peer_id,
193 None => true, }
195 }
196}
197
198#[inline]
209pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
210 local_peer_id < remote_peer_id
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct IceCandidate {
217 pub candidate: String,
218 #[serde(rename = "sdpMLineIndex")]
219 pub sdp_m_line_index: Option<u16>,
220 #[serde(rename = "sdpMid")]
221 pub sdp_mid: Option<String>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(tag = "type")]
227pub enum DataMessage {
228 #[serde(rename = "req")]
230 Request {
231 id: u32,
232 hash: String,
233 #[serde(skip_serializing_if = "Option::is_none")]
236 htl: Option<u8>,
237 },
238
239 #[serde(rename = "res")]
241 Response {
242 id: u32,
243 hash: String,
244 found: bool,
245 #[serde(skip_serializing_if = "Option::is_none")]
246 size: Option<u64>,
247 },
248
249 #[serde(rename = "push")]
252 Push { hash: String },
253
254 #[serde(rename = "have")]
256 Have { hashes: Vec<String> },
257
258 #[serde(rename = "want")]
260 Want { hashes: Vec<String> },
261
262 #[serde(rename = "root")]
264 RootUpdate {
265 hash: String,
266 #[serde(skip_serializing_if = "Option::is_none")]
267 size: Option<u64>,
268 },
269}
270
271pub const MAX_HTL: u8 = 10;
273pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
275pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280pub enum HtlMode {
281 Probabilistic,
282}
283
284#[derive(Debug, Clone, Copy)]
286pub struct HtlPolicy {
287 pub mode: HtlMode,
288 pub max_htl: u8,
289 pub p_at_max: f64,
290 pub p_at_min: f64,
291}
292
293pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
295 mode: HtlMode::Probabilistic,
296 max_htl: MAX_HTL,
297 p_at_max: DECREMENT_AT_MAX_PROB,
298 p_at_min: DECREMENT_AT_MIN_PROB,
299};
300
301pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
303 mode: HtlMode::Probabilistic,
304 max_htl: 4,
305 p_at_max: 0.75,
306 p_at_min: 0.5,
307};
308
309#[derive(Debug, Clone, Copy)]
312pub struct PeerHTLConfig {
313 pub at_max_sample: f64,
315 pub at_min_sample: f64,
317}
318
319impl PeerHTLConfig {
320 pub fn random() -> Self {
322 use rand::Rng;
323 let mut rng = rand::thread_rng();
324 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
325 }
326
327 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
329 Self {
330 at_max_sample: at_max_sample.clamp(0.0, 1.0),
331 at_min_sample: at_min_sample.clamp(0.0, 1.0),
332 }
333 }
334
335 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
337 Self {
338 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
339 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
340 }
341 }
342
343 pub fn decrement(&self, htl: u8) -> u8 {
346 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
347 }
348
349 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
351 decrement_htl_with_policy(htl, policy, self)
352 }
353}
354
355impl Default for PeerHTLConfig {
356 fn default() -> Self {
357 Self::random()
358 }
359}
360
361pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
363 let htl = htl.min(policy.max_htl);
364 if htl == 0 {
365 return 0;
366 }
367
368 match policy.mode {
369 HtlMode::Probabilistic => {
370 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
371 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
372
373 if htl == policy.max_htl {
374 return if config.at_max_sample < p_at_max {
375 htl.saturating_sub(1)
376 } else {
377 htl
378 };
379 }
380
381 if htl == 1 {
382 return if config.at_min_sample < p_at_min {
383 0
384 } else {
385 htl
386 };
387 }
388
389 htl.saturating_sub(1)
390 }
391 }
392}
393
394pub fn should_forward_htl(htl: u8) -> bool {
396 htl > 0
397}
398
399pub fn should_forward(htl: u8) -> bool {
401 should_forward_htl(htl)
402}
403
404use tokio::sync::{mpsc, oneshot};
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
408pub enum PeerPool {
409 Follows,
411 Other,
413}
414
415#[derive(Debug, Clone, Copy)]
417pub struct PoolConfig {
418 pub max_connections: usize,
420 pub satisfied_connections: usize,
422}
423
424impl PoolConfig {
425 #[inline]
427 pub fn can_accept(&self, current_count: usize) -> bool {
428 current_count < self.max_connections
429 }
430
431 #[inline]
433 pub fn needs_peers(&self, current_count: usize) -> bool {
434 current_count < self.satisfied_connections
435 }
436
437 #[inline]
439 pub fn is_satisfied(&self, current_count: usize) -> bool {
440 current_count >= self.satisfied_connections
441 }
442}
443
444impl Default for PoolConfig {
445 fn default() -> Self {
446 Self {
447 max_connections: 16,
448 satisfied_connections: 8,
449 }
450 }
451}
452
453#[derive(Debug, Clone)]
455pub struct PoolSettings {
456 pub follows: PoolConfig,
457 pub other: PoolConfig,
458}
459
460impl Default for PoolSettings {
461 fn default() -> Self {
462 Self {
463 follows: PoolConfig {
464 max_connections: 16,
465 satisfied_connections: 8,
466 },
467 other: PoolConfig {
468 max_connections: 16,
469 satisfied_connections: 8,
470 },
471 }
472 }
473}
474
475pub struct ClassifyRequest {
477 pub pubkey: String,
479 pub response: oneshot::Sender<PeerPool>,
481}
482
483pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
485
486pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
488
489pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
491 mpsc::channel(buffer)
492}
493
494#[derive(Clone)]
496pub struct MeshStoreConfig {
497 pub relays: Vec<String>,
499 pub roots: Vec<Hash>,
501 pub request_timeout_ms: u64,
503 pub hello_interval_ms: u64,
505 pub debug: bool,
507 pub pools: PoolSettings,
509 pub classifier_tx: Option<ClassifierTx>,
512 pub request_selection_strategy: SelectionStrategy,
514 pub request_fairness_enabled: bool,
516 pub request_dispatch: RequestDispatchConfig,
518 pub pubsub_delivery_mode: PubsubDeliveryMode,
520 pub upstream_blossom_servers: Vec<String>,
522}
523
524impl Default for MeshStoreConfig {
525 fn default() -> Self {
526 Self {
527 relays: vec![
528 "wss://temp.iris.to".to_string(),
529 "wss://relay.damus.io".to_string(),
530 ],
531 roots: Vec::new(),
532 request_timeout_ms: 10000,
533 hello_interval_ms: 30000,
534 debug: false,
535 pools: PoolSettings::default(),
536 classifier_tx: None,
537 request_selection_strategy: SelectionStrategy::Weighted,
538 request_fairness_enabled: true,
539 request_dispatch: RequestDispatchConfig {
540 initial_fanout: 2,
541 hedge_fanout: 1,
542 max_fanout: 8,
543 hedge_interval_ms: 120,
544 },
545 pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
546 upstream_blossom_servers: Vec::new(),
547 }
548 }
549}
550
551#[derive(Debug, Clone, Copy, PartialEq, Eq)]
553pub enum PeerState {
554 New,
556 Connecting,
558 Connected,
560 Ready,
562 Disconnected,
564}
565
566#[derive(Debug, Clone, Default)]
568pub struct MeshStats {
569 pub connected_peers: usize,
570 pub pending_requests: usize,
571 pub bytes_sent: u64,
572 pub bytes_received: u64,
573 pub requests_made: u64,
574 pub requests_fulfilled: u64,
575}
576
577pub type WebRTCStats = MeshStats;
579
580pub const NOSTR_KIND_HASHTREE: u16 = 25050;
582pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
583
584pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
586pub const MESH_PROTOCOL_VERSION: u8 = 1;
587pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
588pub const MESH_MAX_HTL: u8 = 6;
589
590#[derive(Debug)]
592pub struct TimedSeenSet {
593 entries: HashMap<String, Instant>,
594 order: VecDeque<(String, Instant)>,
595 ttl: Duration,
596 capacity: usize,
597}
598
599impl TimedSeenSet {
600 pub fn new(capacity: usize, ttl: Duration) -> Self {
601 Self {
602 entries: HashMap::new(),
603 order: VecDeque::new(),
604 ttl,
605 capacity,
606 }
607 }
608
609 fn prune(&mut self, now: Instant) {
610 while let Some((key, inserted_at)) = self.order.front().cloned() {
611 if now.duration_since(inserted_at) < self.ttl {
612 break;
613 }
614 self.order.pop_front();
615 if self
616 .entries
617 .get(&key)
618 .map(|ts| *ts == inserted_at)
619 .unwrap_or(false)
620 {
621 self.entries.remove(&key);
622 }
623 }
624
625 while self.entries.len() > self.capacity {
626 if let Some((key, inserted_at)) = self.order.pop_front() {
627 if self
628 .entries
629 .get(&key)
630 .map(|ts| *ts == inserted_at)
631 .unwrap_or(false)
632 {
633 self.entries.remove(&key);
634 }
635 } else {
636 break;
637 }
638 }
639 }
640
641 pub fn insert_if_new(&mut self, key: String) -> bool {
642 let now = Instant::now();
643 self.prune(now);
644 if self.entries.contains_key(&key) {
645 return false;
646 }
647 self.entries.insert(key.clone(), now);
648 self.order.push_back((key, now));
649 self.prune(now);
650 true
651 }
652
653 pub fn contains(&mut self, key: &str) -> bool {
654 let now = Instant::now();
655 self.prune(now);
656 self.entries.contains_key(key)
657 }
658
659 pub fn len(&self) -> usize {
660 self.entries.len()
661 }
662
663 pub fn is_empty(&self) -> bool {
664 self.entries.is_empty()
665 }
666}
667
668#[derive(Debug, Clone, Serialize, Deserialize)]
669#[serde(tag = "type")]
670pub enum MeshNostrPayload {
671 #[serde(rename = "EVENT")]
672 Event { event: Event },
673}
674
675#[derive(Debug, Clone, Serialize, Deserialize)]
676pub struct MeshNostrFrame {
677 pub protocol: String,
678 pub version: u8,
679 pub frame_id: String,
680 pub htl: u8,
681 pub sender_peer_id: String,
682 pub payload: MeshNostrPayload,
683}
684
685impl MeshNostrFrame {
686 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
687 Self::new_event_with_id(
688 event,
689 sender_peer_id,
690 &uuid::Uuid::new_v4().to_string(),
691 htl,
692 )
693 }
694
695 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
696 Self {
697 protocol: MESH_PROTOCOL.to_string(),
698 version: MESH_PROTOCOL_VERSION,
699 frame_id: frame_id.to_string(),
700 htl,
701 sender_peer_id: sender_peer_id.to_string(),
702 payload: MeshNostrPayload::Event { event },
703 }
704 }
705
706 pub fn event(&self) -> &Event {
707 match &self.payload {
708 MeshNostrPayload::Event { event } => event,
709 }
710 }
711}
712
713pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
714 if frame.protocol != MESH_PROTOCOL {
715 return Err("invalid protocol");
716 }
717 if frame.version != MESH_PROTOCOL_VERSION {
718 return Err("invalid version");
719 }
720 if frame.frame_id.is_empty() {
721 return Err("missing frame id");
722 }
723 if frame.sender_peer_id.is_empty() {
724 return Err("missing sender peer id");
725 }
726 if frame.sender_peer_id.contains(':') {
727 return Err("invalid sender peer id");
728 }
729 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
730 return Err("invalid htl");
731 }
732
733 let event = frame.event();
734 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
735 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
736 {
737 return Err("unsupported event kind");
738 }
739
740 Ok(())
741}
742
743pub const DATA_CHANNEL_LABEL: &str = "hashtree";