1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::Duration;
12use tokio::time::Instant;
13
14use crate::mesh_store_core::RequestDispatchConfig;
15use crate::peer_selector::SelectionStrategy;
16
17fn default_hash_get_enabled() -> bool {
18 true
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct PeerId {
24 pub pubkey: String,
26}
27
28impl PeerId {
29 pub fn new(pubkey: String) -> Self {
30 Self { pubkey }
31 }
32
33 pub fn to_peer_string(&self) -> String {
34 self.pubkey.clone()
35 }
36
37 pub fn from_peer_string(s: &str) -> Option<Self> {
38 let pubkey = s.trim();
39 if pubkey.is_empty() || pubkey.contains(':') {
40 return None;
41 }
42 Some(Self {
43 pubkey: pubkey.to_string(),
44 })
45 }
46
47 pub fn from_string(s: &str) -> Option<Self> {
48 Self::from_peer_string(s)
49 }
50
51 pub fn short(&self) -> String {
52 self.pubkey[..8.min(self.pubkey.len())].to_string()
53 }
54}
55
56impl std::fmt::Display for PeerId {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.write_str(&self.pubkey)
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(tag = "type")]
65pub enum SignalingMessage {
66 #[serde(rename = "hello")]
68 Hello {
69 #[serde(rename = "peerId")]
70 peer_id: String,
71 roots: Vec<String>,
72 #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
73 hash_get: bool,
74 },
75
76 #[serde(rename = "offer")]
78 Offer {
79 #[serde(rename = "peerId")]
80 peer_id: String,
81 #[serde(rename = "targetPeerId")]
82 target_peer_id: String,
83 sdp: String,
84 },
85
86 #[serde(rename = "answer")]
88 Answer {
89 #[serde(rename = "peerId")]
90 peer_id: String,
91 #[serde(rename = "targetPeerId")]
92 target_peer_id: String,
93 sdp: String,
94 },
95
96 #[serde(rename = "candidate")]
98 Candidate {
99 #[serde(rename = "peerId")]
100 peer_id: String,
101 #[serde(rename = "targetPeerId")]
102 target_peer_id: String,
103 candidate: String,
104 #[serde(rename = "sdpMLineIndex")]
105 sdp_m_line_index: Option<u16>,
106 #[serde(rename = "sdpMid")]
107 sdp_mid: Option<String>,
108 },
109
110 #[serde(rename = "candidates")]
112 Candidates {
113 #[serde(rename = "peerId")]
114 peer_id: String,
115 #[serde(rename = "targetPeerId")]
116 target_peer_id: String,
117 candidates: Vec<IceCandidate>,
118 },
119}
120
121impl SignalingMessage {
122 pub fn msg_type(&self) -> &str {
124 match self {
125 Self::Hello { .. } => "hello",
126 Self::Offer { .. } => "offer",
127 Self::Answer { .. } => "answer",
128 Self::Candidate { .. } => "candidate",
129 Self::Candidates { .. } => "candidates",
130 }
131 }
132
133 pub fn peer_id(&self) -> &str {
135 match self {
136 Self::Hello { peer_id, .. } => peer_id,
137 Self::Offer { peer_id, .. } => peer_id,
138 Self::Answer { peer_id, .. } => peer_id,
139 Self::Candidate { peer_id, .. } => peer_id,
140 Self::Candidates { peer_id, .. } => peer_id,
141 }
142 }
143
144 pub fn target_peer_id(&self) -> Option<&str> {
146 match self {
147 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
149 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
150 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
151 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
152 }
153 }
154
155 pub fn is_for(&self, my_peer_id: &str) -> bool {
157 match self.target_peer_id() {
158 Some(target) => target == my_peer_id,
159 None => true, }
161 }
162}
163
164#[inline]
175pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
176 local_peer_id < remote_peer_id
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct IceCandidate {
183 pub candidate: String,
184 #[serde(rename = "sdpMLineIndex")]
185 pub sdp_m_line_index: Option<u16>,
186 #[serde(rename = "sdpMid")]
187 pub sdp_mid: Option<String>,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192#[serde(tag = "type")]
193pub enum DataMessage {
194 #[serde(rename = "req")]
196 Request {
197 id: u32,
198 hash: String,
199 #[serde(skip_serializing_if = "Option::is_none")]
202 htl: Option<u8>,
203 },
204
205 #[serde(rename = "res")]
207 Response {
208 id: u32,
209 hash: String,
210 found: bool,
211 #[serde(skip_serializing_if = "Option::is_none")]
212 size: Option<u64>,
213 },
214
215 #[serde(rename = "push")]
218 Push { hash: String },
219
220 #[serde(rename = "have")]
222 Have { hashes: Vec<String> },
223
224 #[serde(rename = "want")]
226 Want { hashes: Vec<String> },
227
228 #[serde(rename = "root")]
230 RootUpdate {
231 hash: String,
232 #[serde(skip_serializing_if = "Option::is_none")]
233 size: Option<u64>,
234 },
235}
236
237pub const MAX_HTL: u8 = 10;
239pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
241pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246pub enum HtlMode {
247 Probabilistic,
248}
249
250#[derive(Debug, Clone, Copy)]
252pub struct HtlPolicy {
253 pub mode: HtlMode,
254 pub max_htl: u8,
255 pub p_at_max: f64,
256 pub p_at_min: f64,
257}
258
259pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
261 mode: HtlMode::Probabilistic,
262 max_htl: MAX_HTL,
263 p_at_max: DECREMENT_AT_MAX_PROB,
264 p_at_min: DECREMENT_AT_MIN_PROB,
265};
266
267pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
269 mode: HtlMode::Probabilistic,
270 max_htl: 4,
271 p_at_max: 0.75,
272 p_at_min: 0.5,
273};
274
275#[derive(Debug, Clone, Copy)]
278pub struct PeerHTLConfig {
279 pub at_max_sample: f64,
281 pub at_min_sample: f64,
283}
284
285impl PeerHTLConfig {
286 pub fn random() -> Self {
288 use rand::Rng;
289 let mut rng = rand::thread_rng();
290 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
291 }
292
293 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
295 Self {
296 at_max_sample: at_max_sample.clamp(0.0, 1.0),
297 at_min_sample: at_min_sample.clamp(0.0, 1.0),
298 }
299 }
300
301 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
303 Self {
304 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
305 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
306 }
307 }
308
309 pub fn decrement(&self, htl: u8) -> u8 {
312 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
313 }
314
315 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
317 decrement_htl_with_policy(htl, policy, self)
318 }
319}
320
321impl Default for PeerHTLConfig {
322 fn default() -> Self {
323 Self::random()
324 }
325}
326
327pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
329 let htl = htl.min(policy.max_htl);
330 if htl == 0 {
331 return 0;
332 }
333
334 match policy.mode {
335 HtlMode::Probabilistic => {
336 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
337 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
338
339 if htl == policy.max_htl {
340 return if config.at_max_sample < p_at_max {
341 htl.saturating_sub(1)
342 } else {
343 htl
344 };
345 }
346
347 if htl == 1 {
348 return if config.at_min_sample < p_at_min {
349 0
350 } else {
351 htl
352 };
353 }
354
355 htl.saturating_sub(1)
356 }
357 }
358}
359
360pub fn should_forward_htl(htl: u8) -> bool {
362 htl > 0
363}
364
365pub fn should_forward(htl: u8) -> bool {
367 should_forward_htl(htl)
368}
369
370use tokio::sync::{mpsc, oneshot};
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
374pub enum PeerPool {
375 Follows,
377 Other,
379}
380
381#[derive(Debug, Clone, Copy)]
383pub struct PoolConfig {
384 pub max_connections: usize,
386 pub satisfied_connections: usize,
388}
389
390impl PoolConfig {
391 #[inline]
393 pub fn can_accept(&self, current_count: usize) -> bool {
394 current_count < self.max_connections
395 }
396
397 #[inline]
399 pub fn needs_peers(&self, current_count: usize) -> bool {
400 current_count < self.satisfied_connections
401 }
402
403 #[inline]
405 pub fn is_satisfied(&self, current_count: usize) -> bool {
406 current_count >= self.satisfied_connections
407 }
408}
409
410impl Default for PoolConfig {
411 fn default() -> Self {
412 Self {
413 max_connections: 16,
414 satisfied_connections: 8,
415 }
416 }
417}
418
419#[derive(Debug, Clone)]
421pub struct PoolSettings {
422 pub follows: PoolConfig,
423 pub other: PoolConfig,
424}
425
426impl Default for PoolSettings {
427 fn default() -> Self {
428 Self {
429 follows: PoolConfig {
430 max_connections: 16,
431 satisfied_connections: 8,
432 },
433 other: PoolConfig {
434 max_connections: 16,
435 satisfied_connections: 8,
436 },
437 }
438 }
439}
440
441pub struct ClassifyRequest {
443 pub pubkey: String,
445 pub response: oneshot::Sender<PeerPool>,
447}
448
449pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
451
452pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
454
455pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
457 mpsc::channel(buffer)
458}
459
460#[derive(Clone)]
462pub struct MeshStoreConfig {
463 pub relays: Vec<String>,
465 pub roots: Vec<Hash>,
467 pub request_timeout_ms: u64,
469 pub hello_interval_ms: u64,
471 pub debug: bool,
473 pub pools: PoolSettings,
475 pub classifier_tx: Option<ClassifierTx>,
478 pub request_selection_strategy: SelectionStrategy,
480 pub request_fairness_enabled: bool,
482 pub request_dispatch: RequestDispatchConfig,
484 pub upstream_blossom_servers: Vec<String>,
486}
487
488impl Default for MeshStoreConfig {
489 fn default() -> Self {
490 Self {
491 relays: vec![
492 "wss://temp.iris.to".to_string(),
493 "wss://relay.damus.io".to_string(),
494 ],
495 roots: Vec::new(),
496 request_timeout_ms: 10000,
497 hello_interval_ms: 30000,
498 debug: false,
499 pools: PoolSettings::default(),
500 classifier_tx: None,
501 request_selection_strategy: SelectionStrategy::Weighted,
502 request_fairness_enabled: true,
503 request_dispatch: RequestDispatchConfig {
504 initial_fanout: 2,
505 hedge_fanout: 1,
506 max_fanout: 8,
507 hedge_interval_ms: 120,
508 },
509 upstream_blossom_servers: Vec::new(),
510 }
511 }
512}
513
514#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516pub enum PeerState {
517 New,
519 Connecting,
521 Connected,
523 Ready,
525 Disconnected,
527}
528
529#[derive(Debug, Clone, Default)]
531pub struct MeshStats {
532 pub connected_peers: usize,
533 pub pending_requests: usize,
534 pub bytes_sent: u64,
535 pub bytes_received: u64,
536 pub requests_made: u64,
537 pub requests_fulfilled: u64,
538}
539
540pub type WebRTCStats = MeshStats;
542
543pub const NOSTR_KIND_HASHTREE: u16 = 25050;
545pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
546
547pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
549pub const MESH_PROTOCOL_VERSION: u8 = 1;
550pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
551pub const MESH_MAX_HTL: u8 = 6;
552
553#[derive(Debug)]
555pub struct TimedSeenSet {
556 entries: HashMap<String, Instant>,
557 order: VecDeque<(String, Instant)>,
558 ttl: Duration,
559 capacity: usize,
560}
561
562impl TimedSeenSet {
563 pub fn new(capacity: usize, ttl: Duration) -> Self {
564 Self {
565 entries: HashMap::new(),
566 order: VecDeque::new(),
567 ttl,
568 capacity,
569 }
570 }
571
572 fn prune(&mut self, now: Instant) {
573 while let Some((key, inserted_at)) = self.order.front().cloned() {
574 if now.duration_since(inserted_at) < self.ttl {
575 break;
576 }
577 self.order.pop_front();
578 if self
579 .entries
580 .get(&key)
581 .map(|ts| *ts == inserted_at)
582 .unwrap_or(false)
583 {
584 self.entries.remove(&key);
585 }
586 }
587
588 while self.entries.len() > self.capacity {
589 if let Some((key, inserted_at)) = self.order.pop_front() {
590 if self
591 .entries
592 .get(&key)
593 .map(|ts| *ts == inserted_at)
594 .unwrap_or(false)
595 {
596 self.entries.remove(&key);
597 }
598 } else {
599 break;
600 }
601 }
602 }
603
604 pub fn insert_if_new(&mut self, key: String) -> bool {
605 let now = Instant::now();
606 self.prune(now);
607 if self.entries.contains_key(&key) {
608 return false;
609 }
610 self.entries.insert(key.clone(), now);
611 self.order.push_back((key, now));
612 self.prune(now);
613 true
614 }
615
616 pub fn contains(&mut self, key: &str) -> bool {
617 let now = Instant::now();
618 self.prune(now);
619 self.entries.contains_key(key)
620 }
621
622 pub fn len(&self) -> usize {
623 self.entries.len()
624 }
625
626 pub fn is_empty(&self) -> bool {
627 self.entries.is_empty()
628 }
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632#[serde(tag = "type")]
633pub enum MeshNostrPayload {
634 #[serde(rename = "EVENT")]
635 Event { event: Event },
636}
637
638#[derive(Debug, Clone, Serialize, Deserialize)]
639pub struct MeshNostrFrame {
640 pub protocol: String,
641 pub version: u8,
642 pub frame_id: String,
643 pub htl: u8,
644 pub sender_peer_id: String,
645 pub payload: MeshNostrPayload,
646}
647
648impl MeshNostrFrame {
649 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
650 Self::new_event_with_id(
651 event,
652 sender_peer_id,
653 &uuid::Uuid::new_v4().to_string(),
654 htl,
655 )
656 }
657
658 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
659 Self {
660 protocol: MESH_PROTOCOL.to_string(),
661 version: MESH_PROTOCOL_VERSION,
662 frame_id: frame_id.to_string(),
663 htl,
664 sender_peer_id: sender_peer_id.to_string(),
665 payload: MeshNostrPayload::Event { event },
666 }
667 }
668
669 pub fn event(&self) -> &Event {
670 match &self.payload {
671 MeshNostrPayload::Event { event } => event,
672 }
673 }
674}
675
676pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
677 if frame.protocol != MESH_PROTOCOL {
678 return Err("invalid protocol");
679 }
680 if frame.version != MESH_PROTOCOL_VERSION {
681 return Err("invalid version");
682 }
683 if frame.frame_id.is_empty() {
684 return Err("missing frame id");
685 }
686 if frame.sender_peer_id.is_empty() {
687 return Err("missing sender peer id");
688 }
689 if frame.sender_peer_id.contains(':') {
690 return Err("invalid sender peer id");
691 }
692 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
693 return Err("invalid htl");
694 }
695
696 let event = frame.event();
697 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
698 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
699 {
700 return Err("unsupported event kind");
701 }
702
703 Ok(())
704}
705
706pub const DATA_CHANNEL_LABEL: &str = "hashtree";