1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::mesh_store_core::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16fn default_hash_get_enabled() -> bool {
17 true
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
22pub struct PeerId {
23 pub pubkey: String,
25}
26
27impl PeerId {
28 pub fn new(pubkey: String) -> Self {
29 Self { pubkey }
30 }
31
32 pub fn to_peer_string(&self) -> String {
33 self.pubkey.clone()
34 }
35
36 pub fn from_peer_string(s: &str) -> Option<Self> {
37 let pubkey = s.trim();
38 if pubkey.is_empty() || pubkey.contains(':') {
39 return None;
40 }
41 Some(Self {
42 pubkey: pubkey.to_string(),
43 })
44 }
45
46 pub fn from_string(s: &str) -> Option<Self> {
47 Self::from_peer_string(s)
48 }
49
50 pub fn short(&self) -> String {
51 self.pubkey[..8.min(self.pubkey.len())].to_string()
52 }
53}
54
55impl std::fmt::Display for PeerId {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.write_str(&self.pubkey)
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(tag = "type")]
64pub enum SignalingMessage {
65 #[serde(rename = "hello")]
67 Hello {
68 #[serde(rename = "peerId")]
69 peer_id: String,
70 roots: Vec<String>,
71 #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
72 hash_get: bool,
73 },
74
75 #[serde(rename = "offer")]
77 Offer {
78 #[serde(rename = "peerId")]
79 peer_id: String,
80 #[serde(rename = "targetPeerId")]
81 target_peer_id: String,
82 sdp: String,
83 },
84
85 #[serde(rename = "answer")]
87 Answer {
88 #[serde(rename = "peerId")]
89 peer_id: String,
90 #[serde(rename = "targetPeerId")]
91 target_peer_id: String,
92 sdp: String,
93 },
94
95 #[serde(rename = "candidate")]
97 Candidate {
98 #[serde(rename = "peerId")]
99 peer_id: String,
100 #[serde(rename = "targetPeerId")]
101 target_peer_id: String,
102 candidate: String,
103 #[serde(rename = "sdpMLineIndex")]
104 sdp_m_line_index: Option<u16>,
105 #[serde(rename = "sdpMid")]
106 sdp_mid: Option<String>,
107 },
108
109 #[serde(rename = "candidates")]
111 Candidates {
112 #[serde(rename = "peerId")]
113 peer_id: String,
114 #[serde(rename = "targetPeerId")]
115 target_peer_id: String,
116 candidates: Vec<IceCandidate>,
117 },
118}
119
120impl SignalingMessage {
121 pub fn msg_type(&self) -> &str {
123 match self {
124 Self::Hello { .. } => "hello",
125 Self::Offer { .. } => "offer",
126 Self::Answer { .. } => "answer",
127 Self::Candidate { .. } => "candidate",
128 Self::Candidates { .. } => "candidates",
129 }
130 }
131
132 pub fn peer_id(&self) -> &str {
134 match self {
135 Self::Hello { peer_id, .. } => peer_id,
136 Self::Offer { peer_id, .. } => peer_id,
137 Self::Answer { peer_id, .. } => peer_id,
138 Self::Candidate { peer_id, .. } => peer_id,
139 Self::Candidates { peer_id, .. } => peer_id,
140 }
141 }
142
143 pub fn target_peer_id(&self) -> Option<&str> {
145 match self {
146 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
148 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
149 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
150 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
151 }
152 }
153
154 pub fn is_for(&self, my_peer_id: &str) -> bool {
156 match self.target_peer_id() {
157 Some(target) => target == my_peer_id,
158 None => true, }
160 }
161}
162
163#[inline]
174pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
175 local_peer_id < remote_peer_id
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct IceCandidate {
182 pub candidate: String,
183 #[serde(rename = "sdpMLineIndex")]
184 pub sdp_m_line_index: Option<u16>,
185 #[serde(rename = "sdpMid")]
186 pub sdp_mid: Option<String>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191#[serde(tag = "type")]
192pub enum DataMessage {
193 #[serde(rename = "req")]
195 Request {
196 id: u32,
197 hash: String,
198 #[serde(skip_serializing_if = "Option::is_none")]
201 htl: Option<u8>,
202 },
203
204 #[serde(rename = "res")]
206 Response {
207 id: u32,
208 hash: String,
209 found: bool,
210 #[serde(skip_serializing_if = "Option::is_none")]
211 size: Option<u64>,
212 },
213
214 #[serde(rename = "push")]
217 Push { hash: String },
218
219 #[serde(rename = "have")]
221 Have { hashes: Vec<String> },
222
223 #[serde(rename = "want")]
225 Want { hashes: Vec<String> },
226
227 #[serde(rename = "root")]
229 RootUpdate {
230 hash: String,
231 #[serde(skip_serializing_if = "Option::is_none")]
232 size: Option<u64>,
233 },
234}
235
236pub const MAX_HTL: u8 = 10;
238pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
240pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245pub enum HtlMode {
246 Probabilistic,
247}
248
249#[derive(Debug, Clone, Copy)]
251pub struct HtlPolicy {
252 pub mode: HtlMode,
253 pub max_htl: u8,
254 pub p_at_max: f64,
255 pub p_at_min: f64,
256}
257
258pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
260 mode: HtlMode::Probabilistic,
261 max_htl: MAX_HTL,
262 p_at_max: DECREMENT_AT_MAX_PROB,
263 p_at_min: DECREMENT_AT_MIN_PROB,
264};
265
266pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
268 mode: HtlMode::Probabilistic,
269 max_htl: 4,
270 p_at_max: 0.75,
271 p_at_min: 0.5,
272};
273
274#[derive(Debug, Clone, Copy)]
277pub struct PeerHTLConfig {
278 pub at_max_sample: f64,
280 pub at_min_sample: f64,
282}
283
284impl PeerHTLConfig {
285 pub fn random() -> Self {
287 use rand::Rng;
288 let mut rng = rand::thread_rng();
289 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
290 }
291
292 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
294 Self {
295 at_max_sample: at_max_sample.clamp(0.0, 1.0),
296 at_min_sample: at_min_sample.clamp(0.0, 1.0),
297 }
298 }
299
300 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
302 Self {
303 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
304 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
305 }
306 }
307
308 pub fn decrement(&self, htl: u8) -> u8 {
311 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
312 }
313
314 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
316 decrement_htl_with_policy(htl, policy, self)
317 }
318}
319
320impl Default for PeerHTLConfig {
321 fn default() -> Self {
322 Self::random()
323 }
324}
325
326pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
328 let htl = htl.min(policy.max_htl);
329 if htl == 0 {
330 return 0;
331 }
332
333 match policy.mode {
334 HtlMode::Probabilistic => {
335 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
336 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
337
338 if htl == policy.max_htl {
339 return if config.at_max_sample < p_at_max {
340 htl.saturating_sub(1)
341 } else {
342 htl
343 };
344 }
345
346 if htl == 1 {
347 return if config.at_min_sample < p_at_min {
348 0
349 } else {
350 htl
351 };
352 }
353
354 htl.saturating_sub(1)
355 }
356 }
357}
358
359pub fn should_forward_htl(htl: u8) -> bool {
361 htl > 0
362}
363
364pub fn should_forward(htl: u8) -> bool {
366 should_forward_htl(htl)
367}
368
369use tokio::sync::{mpsc, oneshot};
370
371#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
373pub enum PeerPool {
374 Follows,
376 Other,
378}
379
380#[derive(Debug, Clone, Copy)]
382pub struct PoolConfig {
383 pub max_connections: usize,
385 pub satisfied_connections: usize,
387}
388
389impl PoolConfig {
390 #[inline]
392 pub fn can_accept(&self, current_count: usize) -> bool {
393 current_count < self.max_connections
394 }
395
396 #[inline]
398 pub fn needs_peers(&self, current_count: usize) -> bool {
399 current_count < self.satisfied_connections
400 }
401
402 #[inline]
404 pub fn is_satisfied(&self, current_count: usize) -> bool {
405 current_count >= self.satisfied_connections
406 }
407}
408
409impl Default for PoolConfig {
410 fn default() -> Self {
411 Self {
412 max_connections: 16,
413 satisfied_connections: 8,
414 }
415 }
416}
417
418#[derive(Debug, Clone)]
420pub struct PoolSettings {
421 pub follows: PoolConfig,
422 pub other: PoolConfig,
423}
424
425impl Default for PoolSettings {
426 fn default() -> Self {
427 Self {
428 follows: PoolConfig {
429 max_connections: 16,
430 satisfied_connections: 8,
431 },
432 other: PoolConfig {
433 max_connections: 16,
434 satisfied_connections: 8,
435 },
436 }
437 }
438}
439
440pub struct ClassifyRequest {
442 pub pubkey: String,
444 pub response: oneshot::Sender<PeerPool>,
446}
447
448pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
450
451pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
453
454pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
456 mpsc::channel(buffer)
457}
458
459#[derive(Clone)]
461pub struct MeshStoreConfig {
462 pub relays: Vec<String>,
464 pub roots: Vec<Hash>,
466 pub request_timeout_ms: u64,
468 pub hello_interval_ms: u64,
470 pub debug: bool,
472 pub pools: PoolSettings,
474 pub classifier_tx: Option<ClassifierTx>,
477 pub request_selection_strategy: SelectionStrategy,
479 pub request_fairness_enabled: bool,
481 pub request_dispatch: RequestDispatchConfig,
483 pub upstream_blossom_servers: Vec<String>,
485}
486
487impl Default for MeshStoreConfig {
488 fn default() -> Self {
489 Self {
490 relays: vec![
491 "wss://temp.iris.to".to_string(),
492 "wss://relay.damus.io".to_string(),
493 ],
494 roots: Vec::new(),
495 request_timeout_ms: 10000,
496 hello_interval_ms: 30000,
497 debug: false,
498 pools: PoolSettings::default(),
499 classifier_tx: None,
500 request_selection_strategy: SelectionStrategy::Weighted,
501 request_fairness_enabled: true,
502 request_dispatch: RequestDispatchConfig {
503 initial_fanout: 2,
504 hedge_fanout: 1,
505 max_fanout: 8,
506 hedge_interval_ms: 120,
507 },
508 upstream_blossom_servers: Vec::new(),
509 }
510 }
511}
512
513#[derive(Debug, Clone, Copy, PartialEq, Eq)]
515pub enum PeerState {
516 New,
518 Connecting,
520 Connected,
522 Ready,
524 Disconnected,
526}
527
528#[derive(Debug, Clone, Default)]
530pub struct MeshStats {
531 pub connected_peers: usize,
532 pub pending_requests: usize,
533 pub bytes_sent: u64,
534 pub bytes_received: u64,
535 pub requests_made: u64,
536 pub requests_fulfilled: u64,
537}
538
539pub type WebRTCStats = MeshStats;
541
542pub const NOSTR_KIND_HASHTREE: u16 = 25050;
544pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
545
546pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
548pub const MESH_PROTOCOL_VERSION: u8 = 1;
549pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
550pub const MESH_MAX_HTL: u8 = 6;
551
552#[derive(Debug)]
554pub struct TimedSeenSet {
555 entries: HashMap<String, Instant>,
556 order: VecDeque<(String, Instant)>,
557 ttl: Duration,
558 capacity: usize,
559}
560
561impl TimedSeenSet {
562 pub fn new(capacity: usize, ttl: Duration) -> Self {
563 Self {
564 entries: HashMap::new(),
565 order: VecDeque::new(),
566 ttl,
567 capacity,
568 }
569 }
570
571 fn prune(&mut self, now: Instant) {
572 while let Some((key, inserted_at)) = self.order.front().cloned() {
573 if now.duration_since(inserted_at) < self.ttl {
574 break;
575 }
576 self.order.pop_front();
577 if self
578 .entries
579 .get(&key)
580 .map(|ts| *ts == inserted_at)
581 .unwrap_or(false)
582 {
583 self.entries.remove(&key);
584 }
585 }
586
587 while self.entries.len() > self.capacity {
588 if let Some((key, inserted_at)) = self.order.pop_front() {
589 if self
590 .entries
591 .get(&key)
592 .map(|ts| *ts == inserted_at)
593 .unwrap_or(false)
594 {
595 self.entries.remove(&key);
596 }
597 } else {
598 break;
599 }
600 }
601 }
602
603 pub fn insert_if_new(&mut self, key: String) -> bool {
604 let now = Instant::now();
605 self.prune(now);
606 if self.entries.contains_key(&key) {
607 return false;
608 }
609 self.entries.insert(key.clone(), now);
610 self.order.push_back((key, now));
611 self.prune(now);
612 true
613 }
614
615 pub fn len(&self) -> usize {
616 self.entries.len()
617 }
618
619 pub fn is_empty(&self) -> bool {
620 self.entries.is_empty()
621 }
622}
623
624#[derive(Debug, Clone, Serialize, Deserialize)]
625#[serde(tag = "type")]
626pub enum MeshNostrPayload {
627 #[serde(rename = "EVENT")]
628 Event { event: Event },
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct MeshNostrFrame {
633 pub protocol: String,
634 pub version: u8,
635 pub frame_id: String,
636 pub htl: u8,
637 pub sender_peer_id: String,
638 pub payload: MeshNostrPayload,
639}
640
641impl MeshNostrFrame {
642 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
643 Self::new_event_with_id(
644 event,
645 sender_peer_id,
646 &uuid::Uuid::new_v4().to_string(),
647 htl,
648 )
649 }
650
651 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
652 Self {
653 protocol: MESH_PROTOCOL.to_string(),
654 version: MESH_PROTOCOL_VERSION,
655 frame_id: frame_id.to_string(),
656 htl,
657 sender_peer_id: sender_peer_id.to_string(),
658 payload: MeshNostrPayload::Event { event },
659 }
660 }
661
662 pub fn event(&self) -> &Event {
663 match &self.payload {
664 MeshNostrPayload::Event { event } => event,
665 }
666 }
667}
668
669pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
670 if frame.protocol != MESH_PROTOCOL {
671 return Err("invalid protocol");
672 }
673 if frame.version != MESH_PROTOCOL_VERSION {
674 return Err("invalid version");
675 }
676 if frame.frame_id.is_empty() {
677 return Err("missing frame id");
678 }
679 if frame.sender_peer_id.is_empty() {
680 return Err("missing sender peer id");
681 }
682 if frame.sender_peer_id.contains(':') {
683 return Err("invalid sender peer id");
684 }
685 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
686 return Err("invalid htl");
687 }
688
689 let event = frame.event();
690 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
691 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
692 {
693 return Err("unsupported event kind");
694 }
695
696 Ok(())
697}
698
699pub const DATA_CHANNEL_LABEL: &str = "hashtree";