1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::generic_store::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct PeerId {
19 pub pubkey: String,
21 pub uuid: String,
23}
24
25impl PeerId {
26 pub fn new(pubkey: String, uuid: String) -> Self {
27 Self { pubkey, uuid }
28 }
29
30 pub fn to_peer_string(&self) -> String {
31 format!("{}:{}", self.pubkey, self.uuid)
32 }
33
34 pub fn from_peer_string(s: &str) -> Option<Self> {
35 let parts: Vec<&str> = s.split(':').collect();
36 if parts.len() == 2 {
37 Some(Self {
38 pubkey: parts[0].to_string(),
39 uuid: parts[1].to_string(),
40 })
41 } else {
42 None
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "type")]
50pub enum SignalingMessage {
51 #[serde(rename = "hello")]
53 Hello {
54 #[serde(rename = "peerId")]
55 peer_id: String,
56 roots: Vec<String>,
57 },
58
59 #[serde(rename = "offer")]
61 Offer {
62 #[serde(rename = "peerId")]
63 peer_id: String,
64 #[serde(rename = "targetPeerId")]
65 target_peer_id: String,
66 sdp: String,
67 },
68
69 #[serde(rename = "answer")]
71 Answer {
72 #[serde(rename = "peerId")]
73 peer_id: String,
74 #[serde(rename = "targetPeerId")]
75 target_peer_id: String,
76 sdp: String,
77 },
78
79 #[serde(rename = "candidate")]
81 Candidate {
82 #[serde(rename = "peerId")]
83 peer_id: String,
84 #[serde(rename = "targetPeerId")]
85 target_peer_id: String,
86 candidate: String,
87 #[serde(rename = "sdpMLineIndex")]
88 sdp_m_line_index: Option<u16>,
89 #[serde(rename = "sdpMid")]
90 sdp_mid: Option<String>,
91 },
92
93 #[serde(rename = "candidates")]
95 Candidates {
96 #[serde(rename = "peerId")]
97 peer_id: String,
98 #[serde(rename = "targetPeerId")]
99 target_peer_id: String,
100 candidates: Vec<IceCandidate>,
101 },
102}
103
104impl SignalingMessage {
105 pub fn peer_id(&self) -> &str {
107 match self {
108 Self::Hello { peer_id, .. } => peer_id,
109 Self::Offer { peer_id, .. } => peer_id,
110 Self::Answer { peer_id, .. } => peer_id,
111 Self::Candidate { peer_id, .. } => peer_id,
112 Self::Candidates { peer_id, .. } => peer_id,
113 }
114 }
115
116 pub fn target_peer_id(&self) -> Option<&str> {
118 match self {
119 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
121 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
122 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
123 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
124 }
125 }
126
127 pub fn is_for(&self, my_peer_id: &str) -> bool {
129 match self.target_peer_id() {
130 Some(target) => target == my_peer_id,
131 None => true, }
133 }
134}
135
136#[inline]
147pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
148 local_peer_id < remote_peer_id
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct IceCandidate {
155 pub candidate: String,
156 #[serde(rename = "sdpMLineIndex")]
157 pub sdp_m_line_index: Option<u16>,
158 #[serde(rename = "sdpMid")]
159 pub sdp_mid: Option<String>,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(tag = "type")]
165pub enum DataMessage {
166 #[serde(rename = "req")]
168 Request {
169 id: u32,
170 hash: String,
171 #[serde(skip_serializing_if = "Option::is_none")]
174 htl: Option<u8>,
175 },
176
177 #[serde(rename = "res")]
179 Response {
180 id: u32,
181 hash: String,
182 found: bool,
183 #[serde(skip_serializing_if = "Option::is_none")]
184 size: Option<u64>,
185 },
186
187 #[serde(rename = "push")]
190 Push { hash: String },
191
192 #[serde(rename = "have")]
194 Have { hashes: Vec<String> },
195
196 #[serde(rename = "want")]
198 Want { hashes: Vec<String> },
199
200 #[serde(rename = "root")]
202 RootUpdate {
203 hash: String,
204 #[serde(skip_serializing_if = "Option::is_none")]
205 size: Option<u64>,
206 },
207}
208
209pub const MAX_HTL: u8 = 10;
211pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
213pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub enum HtlMode {
219 Probabilistic,
220}
221
222#[derive(Debug, Clone, Copy)]
224pub struct HtlPolicy {
225 pub mode: HtlMode,
226 pub max_htl: u8,
227 pub p_at_max: f64,
228 pub p_at_min: f64,
229}
230
231pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
233 mode: HtlMode::Probabilistic,
234 max_htl: MAX_HTL,
235 p_at_max: DECREMENT_AT_MAX_PROB,
236 p_at_min: DECREMENT_AT_MIN_PROB,
237};
238
239pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
241 mode: HtlMode::Probabilistic,
242 max_htl: 4,
243 p_at_max: 0.75,
244 p_at_min: 0.5,
245};
246
247#[derive(Debug, Clone, Copy)]
250pub struct PeerHTLConfig {
251 pub at_max_sample: f64,
253 pub at_min_sample: f64,
255}
256
257impl PeerHTLConfig {
258 pub fn random() -> Self {
260 use rand::Rng;
261 let mut rng = rand::thread_rng();
262 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
263 }
264
265 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
267 Self {
268 at_max_sample: at_max_sample.clamp(0.0, 1.0),
269 at_min_sample: at_min_sample.clamp(0.0, 1.0),
270 }
271 }
272
273 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
275 Self {
276 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
277 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
278 }
279 }
280
281 pub fn decrement(&self, htl: u8) -> u8 {
284 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
285 }
286
287 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
289 decrement_htl_with_policy(htl, policy, self)
290 }
291}
292
293impl Default for PeerHTLConfig {
294 fn default() -> Self {
295 Self::random()
296 }
297}
298
299pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
301 let htl = htl.min(policy.max_htl);
302 if htl == 0 {
303 return 0;
304 }
305
306 match policy.mode {
307 HtlMode::Probabilistic => {
308 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
309 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
310
311 if htl == policy.max_htl {
312 return if config.at_max_sample < p_at_max {
313 htl.saturating_sub(1)
314 } else {
315 htl
316 };
317 }
318
319 if htl == 1 {
320 return if config.at_min_sample < p_at_min {
321 0
322 } else {
323 htl
324 };
325 }
326
327 htl.saturating_sub(1)
328 }
329 }
330}
331
332pub fn should_forward_htl(htl: u8) -> bool {
334 htl > 0
335}
336
337pub fn should_forward(htl: u8) -> bool {
339 should_forward_htl(htl)
340}
341
342use tokio::sync::{mpsc, oneshot};
343
344pub struct ForwardRequest {
346 pub hash: Hash,
348 pub exclude_peer_id: String,
350 pub htl: u8,
352 pub response: oneshot::Sender<Option<Vec<u8>>>,
354}
355
356pub type ForwardTx = mpsc::Sender<ForwardRequest>;
358pub type ForwardRx = mpsc::Receiver<ForwardRequest>;
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
363pub enum PeerPool {
364 Follows,
366 Other,
368}
369
370#[derive(Debug, Clone, Copy)]
372pub struct PoolConfig {
373 pub max_connections: usize,
375 pub satisfied_connections: usize,
377}
378
379impl PoolConfig {
380 #[inline]
382 pub fn can_accept(&self, current_count: usize) -> bool {
383 current_count < self.max_connections
384 }
385
386 #[inline]
388 pub fn needs_peers(&self, current_count: usize) -> bool {
389 current_count < self.satisfied_connections
390 }
391
392 #[inline]
394 pub fn is_satisfied(&self, current_count: usize) -> bool {
395 current_count >= self.satisfied_connections
396 }
397}
398
399impl Default for PoolConfig {
400 fn default() -> Self {
401 Self {
402 max_connections: 16,
403 satisfied_connections: 8,
404 }
405 }
406}
407
408#[derive(Debug, Clone)]
410pub struct PoolSettings {
411 pub follows: PoolConfig,
412 pub other: PoolConfig,
413}
414
415impl Default for PoolSettings {
416 fn default() -> Self {
417 Self {
418 follows: PoolConfig {
419 max_connections: 16,
420 satisfied_connections: 8,
421 },
422 other: PoolConfig {
423 max_connections: 16,
424 satisfied_connections: 8,
425 },
426 }
427 }
428}
429
430pub struct ClassifyRequest {
432 pub pubkey: String,
434 pub response: oneshot::Sender<PeerPool>,
436}
437
438pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
440
441pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
443
444pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
446 mpsc::channel(buffer)
447}
448
449#[derive(Clone)]
451pub struct WebRTCStoreConfig {
452 pub relays: Vec<String>,
454 pub roots: Vec<Hash>,
456 pub request_timeout_ms: u64,
458 pub hello_interval_ms: u64,
460 pub debug: bool,
462 pub pools: PoolSettings,
464 pub classifier_tx: Option<ClassifierTx>,
467 pub request_selection_strategy: SelectionStrategy,
469 pub request_fairness_enabled: bool,
471 pub request_dispatch: RequestDispatchConfig,
473}
474
475impl Default for WebRTCStoreConfig {
476 fn default() -> Self {
477 Self {
478 relays: vec![
479 "wss://temp.iris.to".to_string(),
480 "wss://relay.damus.io".to_string(),
481 ],
482 roots: Vec::new(),
483 request_timeout_ms: 10000,
484 hello_interval_ms: 30000,
485 debug: false,
486 pools: PoolSettings::default(),
487 classifier_tx: None,
488 request_selection_strategy: SelectionStrategy::TitForTat,
489 request_fairness_enabled: true,
490 request_dispatch: RequestDispatchConfig {
491 initial_fanout: 2,
492 hedge_fanout: 1,
493 max_fanout: 8,
494 hedge_interval_ms: 120,
495 },
496 }
497 }
498}
499
500pub type MeshStoreConfig = WebRTCStoreConfig;
502
503#[derive(Debug, Clone, Copy, PartialEq, Eq)]
505pub enum PeerState {
506 New,
508 Connecting,
510 Connected,
512 Ready,
514 Disconnected,
516}
517
518#[derive(Debug, Clone, Default)]
520pub struct WebRTCStats {
521 pub connected_peers: usize,
522 pub pending_requests: usize,
523 pub bytes_sent: u64,
524 pub bytes_received: u64,
525 pub requests_made: u64,
526 pub requests_fulfilled: u64,
527}
528
529pub type MeshStats = WebRTCStats;
531
532pub const NOSTR_KIND_HASHTREE: u16 = 25050;
534pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
535
536pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
538pub const MESH_PROTOCOL_VERSION: u8 = 1;
539pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
540pub const MESH_MAX_HTL: u8 = 6;
541
542#[derive(Debug)]
544pub struct TimedSeenSet {
545 entries: HashMap<String, Instant>,
546 order: VecDeque<(String, Instant)>,
547 ttl: Duration,
548 capacity: usize,
549}
550
551impl TimedSeenSet {
552 pub fn new(capacity: usize, ttl: Duration) -> Self {
553 Self {
554 entries: HashMap::new(),
555 order: VecDeque::new(),
556 ttl,
557 capacity,
558 }
559 }
560
561 fn prune(&mut self, now: Instant) {
562 while let Some((key, inserted_at)) = self.order.front().cloned() {
563 if now.duration_since(inserted_at) < self.ttl {
564 break;
565 }
566 self.order.pop_front();
567 if self
568 .entries
569 .get(&key)
570 .map(|ts| *ts == inserted_at)
571 .unwrap_or(false)
572 {
573 self.entries.remove(&key);
574 }
575 }
576
577 while self.entries.len() > self.capacity {
578 if let Some((key, inserted_at)) = self.order.pop_front() {
579 if self
580 .entries
581 .get(&key)
582 .map(|ts| *ts == inserted_at)
583 .unwrap_or(false)
584 {
585 self.entries.remove(&key);
586 }
587 } else {
588 break;
589 }
590 }
591 }
592
593 pub fn insert_if_new(&mut self, key: String) -> bool {
594 let now = Instant::now();
595 self.prune(now);
596 if self.entries.contains_key(&key) {
597 return false;
598 }
599 self.entries.insert(key.clone(), now);
600 self.order.push_back((key, now));
601 self.prune(now);
602 true
603 }
604
605 pub fn len(&self) -> usize {
606 self.entries.len()
607 }
608
609 pub fn is_empty(&self) -> bool {
610 self.entries.is_empty()
611 }
612}
613
614#[derive(Debug, Clone, Serialize, Deserialize)]
615#[serde(tag = "type")]
616pub enum MeshNostrPayload {
617 #[serde(rename = "EVENT")]
618 Event { event: Event },
619}
620
621#[derive(Debug, Clone, Serialize, Deserialize)]
622pub struct MeshNostrFrame {
623 pub protocol: String,
624 pub version: u8,
625 pub frame_id: String,
626 pub htl: u8,
627 pub sender_peer_id: String,
628 pub payload: MeshNostrPayload,
629}
630
631impl MeshNostrFrame {
632 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
633 Self::new_event_with_id(
634 event,
635 sender_peer_id,
636 &uuid::Uuid::new_v4().to_string(),
637 htl,
638 )
639 }
640
641 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
642 Self {
643 protocol: MESH_PROTOCOL.to_string(),
644 version: MESH_PROTOCOL_VERSION,
645 frame_id: frame_id.to_string(),
646 htl,
647 sender_peer_id: sender_peer_id.to_string(),
648 payload: MeshNostrPayload::Event { event },
649 }
650 }
651
652 pub fn event(&self) -> &Event {
653 match &self.payload {
654 MeshNostrPayload::Event { event } => event,
655 }
656 }
657}
658
659pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
660 if frame.protocol != MESH_PROTOCOL {
661 return Err("invalid protocol");
662 }
663 if frame.version != MESH_PROTOCOL_VERSION {
664 return Err("invalid version");
665 }
666 if frame.frame_id.is_empty() {
667 return Err("missing frame id");
668 }
669 if frame.sender_peer_id.is_empty() {
670 return Err("missing sender peer id");
671 }
672 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
673 return Err("invalid htl");
674 }
675
676 let event = frame.event();
677 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
678 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
679 {
680 return Err("unsupported event kind");
681 }
682
683 Ok(())
684}
685
686pub const DATA_CHANNEL_LABEL: &str = "hashtree";