1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::Duration;
12use tokio::time::Instant;
13
14use crate::mesh_store_core::{PubsubDeliveryMode, RequestDispatchConfig};
15use crate::peer_selector::SelectionStrategy;
16
17fn default_hash_get_enabled() -> bool {
18 true
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct PeerId {
24 pub pubkey: String,
26}
27
28impl PeerId {
29 pub fn new(pubkey: String) -> Self {
30 Self { pubkey }
31 }
32
33 pub fn to_peer_string(&self) -> String {
34 self.pubkey.clone()
35 }
36
37 pub fn from_peer_string(s: &str) -> Option<Self> {
38 let pubkey = s.trim();
39 if pubkey.is_empty() || pubkey.contains(':') {
40 return None;
41 }
42 Some(Self {
43 pubkey: pubkey.to_string(),
44 })
45 }
46
47 pub fn from_string(s: &str) -> Option<Self> {
48 Self::from_peer_string(s)
49 }
50
51 pub fn short(&self) -> String {
52 self.pubkey[..8.min(self.pubkey.len())].to_string()
53 }
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct KnownPeerRecord {
59 pub peer_id: String,
60 #[serde(
61 default,
62 alias = "direct_urls",
63 alias = "addresses",
64 skip_serializing_if = "Vec::is_empty"
65 )]
66 pub signal_urls: Vec<String>,
67 #[serde(default)]
68 pub last_seen_unix_ms: u64,
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub last_source: Option<String>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75pub struct KnownPeerSnapshot {
76 pub version: u32,
77 #[serde(default)]
78 pub peers: Vec<KnownPeerRecord>,
79}
80
81impl Default for KnownPeerSnapshot {
82 fn default() -> Self {
83 Self {
84 version: 1,
85 peers: Vec::new(),
86 }
87 }
88}
89
90impl std::fmt::Display for PeerId {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.write_str(&self.pubkey)
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type")]
99pub enum SignalingMessage {
100 #[serde(rename = "hello")]
102 Hello {
103 #[serde(rename = "peerId")]
104 peer_id: String,
105 roots: Vec<String>,
106 #[serde(rename = "hashGet", default = "default_hash_get_enabled")]
107 hash_get: bool,
108 },
109
110 #[serde(rename = "offer")]
114 Offer {
115 #[serde(rename = "peerId")]
116 peer_id: String,
117 #[serde(rename = "targetPeerId")]
118 target_peer_id: String,
119 sdp: String,
120 },
121
122 #[serde(rename = "answer")]
126 Answer {
127 #[serde(rename = "peerId")]
128 peer_id: String,
129 #[serde(rename = "targetPeerId")]
130 target_peer_id: String,
131 sdp: String,
132 },
133
134 #[serde(rename = "candidate")]
136 Candidate {
137 #[serde(rename = "peerId")]
138 peer_id: String,
139 #[serde(rename = "targetPeerId")]
140 target_peer_id: String,
141 candidate: String,
142 #[serde(rename = "sdpMLineIndex")]
143 sdp_m_line_index: Option<u16>,
144 #[serde(rename = "sdpMid")]
145 sdp_mid: Option<String>,
146 },
147
148 #[serde(rename = "candidates")]
150 Candidates {
151 #[serde(rename = "peerId")]
152 peer_id: String,
153 #[serde(rename = "targetPeerId")]
154 target_peer_id: String,
155 candidates: Vec<IceCandidate>,
156 },
157}
158
159impl SignalingMessage {
160 pub fn msg_type(&self) -> &str {
162 match self {
163 Self::Hello { .. } => "hello",
164 Self::Offer { .. } => "offer",
165 Self::Answer { .. } => "answer",
166 Self::Candidate { .. } => "candidate",
167 Self::Candidates { .. } => "candidates",
168 }
169 }
170
171 pub fn peer_id(&self) -> &str {
173 match self {
174 Self::Hello { peer_id, .. } => peer_id,
175 Self::Offer { peer_id, .. } => peer_id,
176 Self::Answer { peer_id, .. } => peer_id,
177 Self::Candidate { peer_id, .. } => peer_id,
178 Self::Candidates { peer_id, .. } => peer_id,
179 }
180 }
181
182 pub fn target_peer_id(&self) -> Option<&str> {
184 match self {
185 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
187 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
188 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
189 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
190 }
191 }
192
193 pub fn is_for(&self, my_peer_id: &str) -> bool {
195 match self.target_peer_id() {
196 Some(target) => target == my_peer_id,
197 None => true, }
199 }
200}
201
202#[inline]
213pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
214 local_peer_id < remote_peer_id
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct IceCandidate {
221 pub candidate: String,
222 #[serde(rename = "sdpMLineIndex")]
223 pub sdp_m_line_index: Option<u16>,
224 #[serde(rename = "sdpMid")]
225 pub sdp_mid: Option<String>,
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230#[serde(tag = "type")]
231pub enum DataMessage {
232 #[serde(rename = "req")]
234 Request {
235 id: u32,
236 hash: String,
237 #[serde(skip_serializing_if = "Option::is_none")]
240 htl: Option<u8>,
241 },
242
243 #[serde(rename = "res")]
245 Response {
246 id: u32,
247 hash: String,
248 found: bool,
249 #[serde(skip_serializing_if = "Option::is_none")]
250 size: Option<u64>,
251 },
252
253 #[serde(rename = "push")]
256 Push { hash: String },
257
258 #[serde(rename = "have")]
260 Have { hashes: Vec<String> },
261
262 #[serde(rename = "want")]
264 Want { hashes: Vec<String> },
265
266 #[serde(rename = "root")]
268 RootUpdate {
269 hash: String,
270 #[serde(skip_serializing_if = "Option::is_none")]
271 size: Option<u64>,
272 },
273}
274
275pub const MAX_HTL: u8 = 10;
277pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
279pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
281
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub enum HtlMode {
285 Probabilistic,
286}
287
288#[derive(Debug, Clone, Copy)]
290pub struct HtlPolicy {
291 pub mode: HtlMode,
292 pub max_htl: u8,
293 pub p_at_max: f64,
294 pub p_at_min: f64,
295}
296
297pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
299 mode: HtlMode::Probabilistic,
300 max_htl: MAX_HTL,
301 p_at_max: DECREMENT_AT_MAX_PROB,
302 p_at_min: DECREMENT_AT_MIN_PROB,
303};
304
305pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
307 mode: HtlMode::Probabilistic,
308 max_htl: 4,
309 p_at_max: 0.75,
310 p_at_min: 0.5,
311};
312
313#[derive(Debug, Clone, Copy)]
316pub struct PeerHTLConfig {
317 pub at_max_sample: f64,
319 pub at_min_sample: f64,
321}
322
323impl PeerHTLConfig {
324 pub fn random() -> Self {
326 use rand::Rng;
327 let mut rng = rand::thread_rng();
328 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
329 }
330
331 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
333 Self {
334 at_max_sample: at_max_sample.clamp(0.0, 1.0),
335 at_min_sample: at_min_sample.clamp(0.0, 1.0),
336 }
337 }
338
339 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
341 Self {
342 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
343 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
344 }
345 }
346
347 pub fn decrement(&self, htl: u8) -> u8 {
350 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
351 }
352
353 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
355 decrement_htl_with_policy(htl, policy, self)
356 }
357}
358
359impl Default for PeerHTLConfig {
360 fn default() -> Self {
361 Self::random()
362 }
363}
364
365pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
367 let htl = htl.min(policy.max_htl);
368 if htl == 0 {
369 return 0;
370 }
371
372 match policy.mode {
373 HtlMode::Probabilistic => {
374 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
375 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
376
377 if htl == policy.max_htl {
378 return if config.at_max_sample < p_at_max {
379 htl.saturating_sub(1)
380 } else {
381 htl
382 };
383 }
384
385 if htl == 1 {
386 return if config.at_min_sample < p_at_min {
387 0
388 } else {
389 htl
390 };
391 }
392
393 htl.saturating_sub(1)
394 }
395 }
396}
397
398pub fn should_forward_htl(htl: u8) -> bool {
400 htl > 0
401}
402
403pub fn should_forward(htl: u8) -> bool {
405 should_forward_htl(htl)
406}
407
408use tokio::sync::{mpsc, oneshot};
409
410#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
412pub enum PeerPool {
413 Follows,
415 Other,
417}
418
419#[derive(Debug, Clone, Copy)]
421pub struct PoolConfig {
422 pub max_connections: usize,
424 pub satisfied_connections: usize,
426}
427
428impl PoolConfig {
429 #[inline]
431 pub fn can_accept(&self, current_count: usize) -> bool {
432 current_count < self.max_connections
433 }
434
435 #[inline]
437 pub fn needs_peers(&self, current_count: usize) -> bool {
438 current_count < self.satisfied_connections
439 }
440
441 #[inline]
443 pub fn is_satisfied(&self, current_count: usize) -> bool {
444 current_count >= self.satisfied_connections
445 }
446}
447
448impl Default for PoolConfig {
449 fn default() -> Self {
450 Self {
451 max_connections: 16,
452 satisfied_connections: 8,
453 }
454 }
455}
456
457#[derive(Debug, Clone)]
459pub struct PoolSettings {
460 pub follows: PoolConfig,
461 pub other: PoolConfig,
462}
463
464impl Default for PoolSettings {
465 fn default() -> Self {
466 Self {
467 follows: PoolConfig {
468 max_connections: 16,
469 satisfied_connections: 8,
470 },
471 other: PoolConfig {
472 max_connections: 16,
473 satisfied_connections: 8,
474 },
475 }
476 }
477}
478
479pub struct ClassifyRequest {
481 pub pubkey: String,
483 pub response: oneshot::Sender<PeerPool>,
485}
486
487pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
489
490pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
492
493pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
495 mpsc::channel(buffer)
496}
497
498#[derive(Clone)]
500pub struct MeshStoreConfig {
501 pub relays: Vec<String>,
503 pub roots: Vec<Hash>,
505 pub request_timeout_ms: u64,
507 pub hello_interval_ms: u64,
509 pub debug: bool,
511 pub pools: PoolSettings,
513 pub classifier_tx: Option<ClassifierTx>,
516 pub request_selection_strategy: SelectionStrategy,
518 pub request_fairness_enabled: bool,
520 pub request_dispatch: RequestDispatchConfig,
522 pub pubsub_delivery_mode: PubsubDeliveryMode,
524 pub upstream_blossom_servers: Vec<String>,
526}
527
528impl Default for MeshStoreConfig {
529 fn default() -> Self {
530 Self {
531 relays: vec![
532 "wss://temp.iris.to".to_string(),
533 "wss://relay.damus.io".to_string(),
534 ],
535 roots: Vec::new(),
536 request_timeout_ms: 10000,
537 hello_interval_ms: 30000,
538 debug: false,
539 pools: PoolSettings::default(),
540 classifier_tx: None,
541 request_selection_strategy: SelectionStrategy::Weighted,
542 request_fairness_enabled: true,
543 request_dispatch: RequestDispatchConfig {
544 initial_fanout: 2,
545 hedge_fanout: 1,
546 max_fanout: 8,
547 hedge_interval_ms: 120,
548 },
549 pubsub_delivery_mode: PubsubDeliveryMode::HtlInvWant,
550 upstream_blossom_servers: Vec::new(),
551 }
552 }
553}
554
555#[derive(Debug, Clone, Copy, PartialEq, Eq)]
557pub enum PeerState {
558 New,
560 Connecting,
562 Connected,
564 Ready,
566 Disconnected,
568}
569
570#[derive(Debug, Clone, Default)]
572pub struct MeshStats {
573 pub connected_peers: usize,
574 pub pending_requests: usize,
575 pub bytes_sent: u64,
576 pub bytes_received: u64,
577 pub requests_made: u64,
578 pub requests_fulfilled: u64,
579}
580
581pub const NOSTR_KIND_HASHTREE: u16 = 25050;
583pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
584
585pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
587pub const MESH_PROTOCOL_VERSION: u8 = 1;
588pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
589pub const MESH_MAX_HTL: u8 = 6;
590
591#[derive(Debug)]
593pub struct TimedSeenSet {
594 entries: HashMap<String, Instant>,
595 order: VecDeque<(String, Instant)>,
596 ttl: Duration,
597 capacity: usize,
598}
599
600impl TimedSeenSet {
601 pub fn new(capacity: usize, ttl: Duration) -> Self {
602 Self {
603 entries: HashMap::new(),
604 order: VecDeque::new(),
605 ttl,
606 capacity,
607 }
608 }
609
610 fn prune(&mut self, now: Instant) {
611 while let Some((key, inserted_at)) = self.order.front().cloned() {
612 if now.duration_since(inserted_at) < self.ttl {
613 break;
614 }
615 self.order.pop_front();
616 if self
617 .entries
618 .get(&key)
619 .map(|ts| *ts == inserted_at)
620 .unwrap_or(false)
621 {
622 self.entries.remove(&key);
623 }
624 }
625
626 while self.entries.len() > self.capacity {
627 if let Some((key, inserted_at)) = self.order.pop_front() {
628 if self
629 .entries
630 .get(&key)
631 .map(|ts| *ts == inserted_at)
632 .unwrap_or(false)
633 {
634 self.entries.remove(&key);
635 }
636 } else {
637 break;
638 }
639 }
640 }
641
642 pub fn insert_if_new(&mut self, key: String) -> bool {
643 let now = Instant::now();
644 self.prune(now);
645 if self.entries.contains_key(&key) {
646 return false;
647 }
648 self.entries.insert(key.clone(), now);
649 self.order.push_back((key, now));
650 self.prune(now);
651 true
652 }
653
654 pub fn contains(&mut self, key: &str) -> bool {
655 let now = Instant::now();
656 self.prune(now);
657 self.entries.contains_key(key)
658 }
659
660 pub fn len(&self) -> usize {
661 self.entries.len()
662 }
663
664 pub fn is_empty(&self) -> bool {
665 self.entries.is_empty()
666 }
667}
668
669#[derive(Debug, Clone, Serialize, Deserialize)]
670#[serde(tag = "type")]
671pub enum MeshNostrPayload {
672 #[serde(rename = "EVENT")]
673 Event { event: Event },
674}
675
676#[derive(Debug, Clone, Serialize, Deserialize)]
677pub struct MeshNostrFrame {
678 pub protocol: String,
679 pub version: u8,
680 pub frame_id: String,
681 pub htl: u8,
682 pub sender_peer_id: String,
683 pub payload: MeshNostrPayload,
684}
685
686impl MeshNostrFrame {
687 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
688 Self::new_event_with_id(
689 event,
690 sender_peer_id,
691 &uuid::Uuid::new_v4().to_string(),
692 htl,
693 )
694 }
695
696 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
697 Self {
698 protocol: MESH_PROTOCOL.to_string(),
699 version: MESH_PROTOCOL_VERSION,
700 frame_id: frame_id.to_string(),
701 htl,
702 sender_peer_id: sender_peer_id.to_string(),
703 payload: MeshNostrPayload::Event { event },
704 }
705 }
706
707 pub fn event(&self) -> &Event {
708 match &self.payload {
709 MeshNostrPayload::Event { event } => event,
710 }
711 }
712}
713
714pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
715 if frame.protocol != MESH_PROTOCOL {
716 return Err("invalid protocol");
717 }
718 if frame.version != MESH_PROTOCOL_VERSION {
719 return Err("invalid version");
720 }
721 if frame.frame_id.is_empty() {
722 return Err("missing frame id");
723 }
724 if frame.sender_peer_id.is_empty() {
725 return Err("missing sender peer id");
726 }
727 if frame.sender_peer_id.contains(':') {
728 return Err("invalid sender peer id");
729 }
730 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
731 return Err("invalid htl");
732 }
733
734 let event = frame.event();
735 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
736 && event.kind != Kind::from_u16(NOSTR_KIND_HASHTREE)
737 {
738 return Err("unsupported event kind");
739 }
740
741 Ok(())
742}