1use hashtree_core::Hash;
8use nostr_sdk::nostr::{Event, Kind};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, Instant};
12
13use crate::generic_store::RequestDispatchConfig;
14use crate::peer_selector::SelectionStrategy;
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct PeerId {
19 pub pubkey: String,
21 pub uuid: String,
23}
24
25impl PeerId {
26 pub fn new(pubkey: String, uuid: String) -> Self {
27 Self { pubkey, uuid }
28 }
29
30 pub fn to_peer_string(&self) -> String {
31 format!("{}:{}", self.pubkey, self.uuid)
32 }
33
34 pub fn from_peer_string(s: &str) -> Option<Self> {
35 let parts: Vec<&str> = s.split(':').collect();
36 if parts.len() == 2 {
37 Some(Self {
38 pubkey: parts[0].to_string(),
39 uuid: parts[1].to_string(),
40 })
41 } else {
42 None
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "type")]
50pub enum SignalingMessage {
51 #[serde(rename = "hello")]
53 Hello {
54 #[serde(rename = "peerId")]
55 peer_id: String,
56 roots: Vec<String>,
57 },
58
59 #[serde(rename = "offer")]
61 Offer {
62 #[serde(rename = "peerId")]
63 peer_id: String,
64 #[serde(rename = "targetPeerId")]
65 target_peer_id: String,
66 sdp: String,
67 },
68
69 #[serde(rename = "answer")]
71 Answer {
72 #[serde(rename = "peerId")]
73 peer_id: String,
74 #[serde(rename = "targetPeerId")]
75 target_peer_id: String,
76 sdp: String,
77 },
78
79 #[serde(rename = "candidate")]
81 Candidate {
82 #[serde(rename = "peerId")]
83 peer_id: String,
84 #[serde(rename = "targetPeerId")]
85 target_peer_id: String,
86 candidate: String,
87 #[serde(rename = "sdpMLineIndex")]
88 sdp_m_line_index: Option<u16>,
89 #[serde(rename = "sdpMid")]
90 sdp_mid: Option<String>,
91 },
92
93 #[serde(rename = "candidates")]
95 Candidates {
96 #[serde(rename = "peerId")]
97 peer_id: String,
98 #[serde(rename = "targetPeerId")]
99 target_peer_id: String,
100 candidates: Vec<IceCandidate>,
101 },
102}
103
104impl SignalingMessage {
105 pub fn msg_type(&self) -> &str {
107 match self {
108 Self::Hello { .. } => "hello",
109 Self::Offer { .. } => "offer",
110 Self::Answer { .. } => "answer",
111 Self::Candidate { .. } => "candidate",
112 Self::Candidates { .. } => "candidates",
113 }
114 }
115
116 pub fn peer_id(&self) -> &str {
118 match self {
119 Self::Hello { peer_id, .. } => peer_id,
120 Self::Offer { peer_id, .. } => peer_id,
121 Self::Answer { peer_id, .. } => peer_id,
122 Self::Candidate { peer_id, .. } => peer_id,
123 Self::Candidates { peer_id, .. } => peer_id,
124 }
125 }
126
127 pub fn target_peer_id(&self) -> Option<&str> {
129 match self {
130 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
132 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
133 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
134 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
135 }
136 }
137
138 pub fn is_for(&self, my_peer_id: &str) -> bool {
140 match self.target_peer_id() {
141 Some(target) => target == my_peer_id,
142 None => true, }
144 }
145}
146
147#[inline]
158pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
159 local_peer_id < remote_peer_id
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct IceCandidate {
166 pub candidate: String,
167 #[serde(rename = "sdpMLineIndex")]
168 pub sdp_m_line_index: Option<u16>,
169 #[serde(rename = "sdpMid")]
170 pub sdp_mid: Option<String>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(tag = "type")]
176pub enum DataMessage {
177 #[serde(rename = "req")]
179 Request {
180 id: u32,
181 hash: String,
182 #[serde(skip_serializing_if = "Option::is_none")]
185 htl: Option<u8>,
186 },
187
188 #[serde(rename = "res")]
190 Response {
191 id: u32,
192 hash: String,
193 found: bool,
194 #[serde(skip_serializing_if = "Option::is_none")]
195 size: Option<u64>,
196 },
197
198 #[serde(rename = "push")]
201 Push { hash: String },
202
203 #[serde(rename = "have")]
205 Have { hashes: Vec<String> },
206
207 #[serde(rename = "want")]
209 Want { hashes: Vec<String> },
210
211 #[serde(rename = "root")]
213 RootUpdate {
214 hash: String,
215 #[serde(skip_serializing_if = "Option::is_none")]
216 size: Option<u64>,
217 },
218}
219
220pub const MAX_HTL: u8 = 10;
222pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
224pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub enum HtlMode {
230 Probabilistic,
231}
232
233#[derive(Debug, Clone, Copy)]
235pub struct HtlPolicy {
236 pub mode: HtlMode,
237 pub max_htl: u8,
238 pub p_at_max: f64,
239 pub p_at_min: f64,
240}
241
242pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
244 mode: HtlMode::Probabilistic,
245 max_htl: MAX_HTL,
246 p_at_max: DECREMENT_AT_MAX_PROB,
247 p_at_min: DECREMENT_AT_MIN_PROB,
248};
249
250pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
252 mode: HtlMode::Probabilistic,
253 max_htl: 4,
254 p_at_max: 0.75,
255 p_at_min: 0.5,
256};
257
258#[derive(Debug, Clone, Copy)]
261pub struct PeerHTLConfig {
262 pub at_max_sample: f64,
264 pub at_min_sample: f64,
266}
267
268impl PeerHTLConfig {
269 pub fn random() -> Self {
271 use rand::Rng;
272 let mut rng = rand::thread_rng();
273 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
274 }
275
276 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
278 Self {
279 at_max_sample: at_max_sample.clamp(0.0, 1.0),
280 at_min_sample: at_min_sample.clamp(0.0, 1.0),
281 }
282 }
283
284 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
286 Self {
287 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
288 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
289 }
290 }
291
292 pub fn decrement(&self, htl: u8) -> u8 {
295 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
296 }
297
298 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
300 decrement_htl_with_policy(htl, policy, self)
301 }
302}
303
304impl Default for PeerHTLConfig {
305 fn default() -> Self {
306 Self::random()
307 }
308}
309
310pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
312 let htl = htl.min(policy.max_htl);
313 if htl == 0 {
314 return 0;
315 }
316
317 match policy.mode {
318 HtlMode::Probabilistic => {
319 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
320 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
321
322 if htl == policy.max_htl {
323 return if config.at_max_sample < p_at_max {
324 htl.saturating_sub(1)
325 } else {
326 htl
327 };
328 }
329
330 if htl == 1 {
331 return if config.at_min_sample < p_at_min {
332 0
333 } else {
334 htl
335 };
336 }
337
338 htl.saturating_sub(1)
339 }
340 }
341}
342
343pub fn should_forward_htl(htl: u8) -> bool {
345 htl > 0
346}
347
348pub fn should_forward(htl: u8) -> bool {
350 should_forward_htl(htl)
351}
352
353use tokio::sync::{mpsc, oneshot};
354
355pub struct ForwardRequest {
357 pub hash: Hash,
359 pub exclude_peer_id: String,
361 pub htl: u8,
363 pub response: oneshot::Sender<Option<Vec<u8>>>,
365}
366
367pub type ForwardTx = mpsc::Sender<ForwardRequest>;
369pub type ForwardRx = mpsc::Receiver<ForwardRequest>;
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
374pub enum PeerPool {
375 Follows,
377 Other,
379}
380
381#[derive(Debug, Clone, Copy)]
383pub struct PoolConfig {
384 pub max_connections: usize,
386 pub satisfied_connections: usize,
388}
389
390impl PoolConfig {
391 #[inline]
393 pub fn can_accept(&self, current_count: usize) -> bool {
394 current_count < self.max_connections
395 }
396
397 #[inline]
399 pub fn needs_peers(&self, current_count: usize) -> bool {
400 current_count < self.satisfied_connections
401 }
402
403 #[inline]
405 pub fn is_satisfied(&self, current_count: usize) -> bool {
406 current_count >= self.satisfied_connections
407 }
408}
409
410impl Default for PoolConfig {
411 fn default() -> Self {
412 Self {
413 max_connections: 16,
414 satisfied_connections: 8,
415 }
416 }
417}
418
419#[derive(Debug, Clone)]
421pub struct PoolSettings {
422 pub follows: PoolConfig,
423 pub other: PoolConfig,
424}
425
426impl Default for PoolSettings {
427 fn default() -> Self {
428 Self {
429 follows: PoolConfig {
430 max_connections: 16,
431 satisfied_connections: 8,
432 },
433 other: PoolConfig {
434 max_connections: 16,
435 satisfied_connections: 8,
436 },
437 }
438 }
439}
440
441pub struct ClassifyRequest {
443 pub pubkey: String,
445 pub response: oneshot::Sender<PeerPool>,
447}
448
449pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
451
452pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
454
455pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
457 mpsc::channel(buffer)
458}
459
460#[derive(Clone)]
462pub struct WebRTCStoreConfig {
463 pub relays: Vec<String>,
465 pub roots: Vec<Hash>,
467 pub request_timeout_ms: u64,
469 pub hello_interval_ms: u64,
471 pub debug: bool,
473 pub pools: PoolSettings,
475 pub classifier_tx: Option<ClassifierTx>,
478 pub request_selection_strategy: SelectionStrategy,
480 pub request_fairness_enabled: bool,
482 pub request_dispatch: RequestDispatchConfig,
484}
485
486impl Default for WebRTCStoreConfig {
487 fn default() -> Self {
488 Self {
489 relays: vec![
490 "wss://temp.iris.to".to_string(),
491 "wss://relay.damus.io".to_string(),
492 ],
493 roots: Vec::new(),
494 request_timeout_ms: 10000,
495 hello_interval_ms: 30000,
496 debug: false,
497 pools: PoolSettings::default(),
498 classifier_tx: None,
499 request_selection_strategy: SelectionStrategy::TitForTat,
500 request_fairness_enabled: true,
501 request_dispatch: RequestDispatchConfig {
502 initial_fanout: 2,
503 hedge_fanout: 1,
504 max_fanout: 8,
505 hedge_interval_ms: 120,
506 },
507 }
508 }
509}
510
511pub type MeshStoreConfig = WebRTCStoreConfig;
513
514#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516pub enum PeerState {
517 New,
519 Connecting,
521 Connected,
523 Ready,
525 Disconnected,
527}
528
529#[derive(Debug, Clone, Default)]
531pub struct WebRTCStats {
532 pub connected_peers: usize,
533 pub pending_requests: usize,
534 pub bytes_sent: u64,
535 pub bytes_received: u64,
536 pub requests_made: u64,
537 pub requests_fulfilled: u64,
538}
539
540pub type MeshStats = WebRTCStats;
542
543pub const NOSTR_KIND_HASHTREE: u16 = 25050;
545pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
546
547pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
549pub const MESH_PROTOCOL_VERSION: u8 = 1;
550pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
551pub const MESH_MAX_HTL: u8 = 6;
552
553#[derive(Debug)]
555pub struct TimedSeenSet {
556 entries: HashMap<String, Instant>,
557 order: VecDeque<(String, Instant)>,
558 ttl: Duration,
559 capacity: usize,
560}
561
562impl TimedSeenSet {
563 pub fn new(capacity: usize, ttl: Duration) -> Self {
564 Self {
565 entries: HashMap::new(),
566 order: VecDeque::new(),
567 ttl,
568 capacity,
569 }
570 }
571
572 fn prune(&mut self, now: Instant) {
573 while let Some((key, inserted_at)) = self.order.front().cloned() {
574 if now.duration_since(inserted_at) < self.ttl {
575 break;
576 }
577 self.order.pop_front();
578 if self
579 .entries
580 .get(&key)
581 .map(|ts| *ts == inserted_at)
582 .unwrap_or(false)
583 {
584 self.entries.remove(&key);
585 }
586 }
587
588 while self.entries.len() > self.capacity {
589 if let Some((key, inserted_at)) = self.order.pop_front() {
590 if self
591 .entries
592 .get(&key)
593 .map(|ts| *ts == inserted_at)
594 .unwrap_or(false)
595 {
596 self.entries.remove(&key);
597 }
598 } else {
599 break;
600 }
601 }
602 }
603
604 pub fn insert_if_new(&mut self, key: String) -> bool {
605 let now = Instant::now();
606 self.prune(now);
607 if self.entries.contains_key(&key) {
608 return false;
609 }
610 self.entries.insert(key.clone(), now);
611 self.order.push_back((key, now));
612 self.prune(now);
613 true
614 }
615
616 pub fn len(&self) -> usize {
617 self.entries.len()
618 }
619
620 pub fn is_empty(&self) -> bool {
621 self.entries.is_empty()
622 }
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626#[serde(tag = "type")]
627pub enum MeshNostrPayload {
628 #[serde(rename = "EVENT")]
629 Event { event: Event },
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
633pub struct MeshNostrFrame {
634 pub protocol: String,
635 pub version: u8,
636 pub frame_id: String,
637 pub htl: u8,
638 pub sender_peer_id: String,
639 pub payload: MeshNostrPayload,
640}
641
642impl MeshNostrFrame {
643 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
644 Self::new_event_with_id(
645 event,
646 sender_peer_id,
647 &uuid::Uuid::new_v4().to_string(),
648 htl,
649 )
650 }
651
652 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
653 Self {
654 protocol: MESH_PROTOCOL.to_string(),
655 version: MESH_PROTOCOL_VERSION,
656 frame_id: frame_id.to_string(),
657 htl,
658 sender_peer_id: sender_peer_id.to_string(),
659 payload: MeshNostrPayload::Event { event },
660 }
661 }
662
663 pub fn event(&self) -> &Event {
664 match &self.payload {
665 MeshNostrPayload::Event { event } => event,
666 }
667 }
668}
669
670pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
671 if frame.protocol != MESH_PROTOCOL {
672 return Err("invalid protocol");
673 }
674 if frame.version != MESH_PROTOCOL_VERSION {
675 return Err("invalid version");
676 }
677 if frame.frame_id.is_empty() {
678 return Err("missing frame id");
679 }
680 if frame.sender_peer_id.is_empty() {
681 return Err("missing sender peer id");
682 }
683 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
684 return Err("invalid htl");
685 }
686
687 let event = frame.event();
688 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
689 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
690 {
691 return Err("unsupported event kind");
692 }
693
694 Ok(())
695}
696
697pub const DATA_CHANNEL_LABEL: &str = "hashtree";