1use hashtree_core::Hash;
7use nostr_sdk::nostr::{Event, Kind};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::time::{Duration, Instant};
11
12use crate::generic_store::RequestDispatchConfig;
13use crate::peer_selector::SelectionStrategy;
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct PeerId {
18 pub pubkey: String,
20 pub uuid: String,
22}
23
24impl PeerId {
25 pub fn new(pubkey: String, uuid: String) -> Self {
26 Self { pubkey, uuid }
27 }
28
29 pub fn to_peer_string(&self) -> String {
30 format!("{}:{}", self.pubkey, self.uuid)
31 }
32
33 pub fn from_peer_string(s: &str) -> Option<Self> {
34 let parts: Vec<&str> = s.split(':').collect();
35 if parts.len() == 2 {
36 Some(Self {
37 pubkey: parts[0].to_string(),
38 uuid: parts[1].to_string(),
39 })
40 } else {
41 None
42 }
43 }
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(tag = "type")]
49pub enum SignalingMessage {
50 #[serde(rename = "hello")]
52 Hello {
53 #[serde(rename = "peerId")]
54 peer_id: String,
55 roots: Vec<String>,
56 },
57
58 #[serde(rename = "offer")]
60 Offer {
61 #[serde(rename = "peerId")]
62 peer_id: String,
63 #[serde(rename = "targetPeerId")]
64 target_peer_id: String,
65 sdp: String,
66 },
67
68 #[serde(rename = "answer")]
70 Answer {
71 #[serde(rename = "peerId")]
72 peer_id: String,
73 #[serde(rename = "targetPeerId")]
74 target_peer_id: String,
75 sdp: String,
76 },
77
78 #[serde(rename = "candidate")]
80 Candidate {
81 #[serde(rename = "peerId")]
82 peer_id: String,
83 #[serde(rename = "targetPeerId")]
84 target_peer_id: String,
85 candidate: String,
86 #[serde(rename = "sdpMLineIndex")]
87 sdp_m_line_index: Option<u16>,
88 #[serde(rename = "sdpMid")]
89 sdp_mid: Option<String>,
90 },
91
92 #[serde(rename = "candidates")]
94 Candidates {
95 #[serde(rename = "peerId")]
96 peer_id: String,
97 #[serde(rename = "targetPeerId")]
98 target_peer_id: String,
99 candidates: Vec<IceCandidate>,
100 },
101}
102
103impl SignalingMessage {
104 pub fn peer_id(&self) -> &str {
106 match self {
107 Self::Hello { peer_id, .. } => peer_id,
108 Self::Offer { peer_id, .. } => peer_id,
109 Self::Answer { peer_id, .. } => peer_id,
110 Self::Candidate { peer_id, .. } => peer_id,
111 Self::Candidates { peer_id, .. } => peer_id,
112 }
113 }
114
115 pub fn target_peer_id(&self) -> Option<&str> {
117 match self {
118 Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
120 Self::Answer { target_peer_id, .. } => Some(target_peer_id),
121 Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
122 Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
123 }
124 }
125
126 pub fn is_for(&self, my_peer_id: &str) -> bool {
128 match self.target_peer_id() {
129 Some(target) => target == my_peer_id,
130 None => true, }
132 }
133}
134
135#[inline]
146pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
147 local_peer_id < remote_peer_id
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct IceCandidate {
154 pub candidate: String,
155 #[serde(rename = "sdpMLineIndex")]
156 pub sdp_m_line_index: Option<u16>,
157 #[serde(rename = "sdpMid")]
158 pub sdp_mid: Option<String>,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(tag = "type")]
164pub enum DataMessage {
165 #[serde(rename = "req")]
167 Request {
168 id: u32,
169 hash: String,
170 #[serde(skip_serializing_if = "Option::is_none")]
173 htl: Option<u8>,
174 },
175
176 #[serde(rename = "res")]
178 Response {
179 id: u32,
180 hash: String,
181 found: bool,
182 #[serde(skip_serializing_if = "Option::is_none")]
183 size: Option<u64>,
184 },
185
186 #[serde(rename = "push")]
189 Push { hash: String },
190
191 #[serde(rename = "have")]
193 Have { hashes: Vec<String> },
194
195 #[serde(rename = "want")]
197 Want { hashes: Vec<String> },
198
199 #[serde(rename = "root")]
201 RootUpdate {
202 hash: String,
203 #[serde(skip_serializing_if = "Option::is_none")]
204 size: Option<u64>,
205 },
206}
207
208pub const MAX_HTL: u8 = 10;
210pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
212pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217pub enum HtlMode {
218 Probabilistic,
219}
220
221#[derive(Debug, Clone, Copy)]
223pub struct HtlPolicy {
224 pub mode: HtlMode,
225 pub max_htl: u8,
226 pub p_at_max: f64,
227 pub p_at_min: f64,
228}
229
230pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
232 mode: HtlMode::Probabilistic,
233 max_htl: MAX_HTL,
234 p_at_max: DECREMENT_AT_MAX_PROB,
235 p_at_min: DECREMENT_AT_MIN_PROB,
236};
237
238pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
240 mode: HtlMode::Probabilistic,
241 max_htl: 4,
242 p_at_max: 0.75,
243 p_at_min: 0.5,
244};
245
246#[derive(Debug, Clone, Copy)]
249pub struct PeerHTLConfig {
250 pub at_max_sample: f64,
252 pub at_min_sample: f64,
254}
255
256impl PeerHTLConfig {
257 pub fn random() -> Self {
259 use rand::Rng;
260 let mut rng = rand::thread_rng();
261 Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
262 }
263
264 pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
266 Self {
267 at_max_sample: at_max_sample.clamp(0.0, 1.0),
268 at_min_sample: at_min_sample.clamp(0.0, 1.0),
269 }
270 }
271
272 pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
274 Self {
275 at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
276 at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
277 }
278 }
279
280 pub fn decrement(&self, htl: u8) -> u8 {
283 decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
284 }
285
286 pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
288 decrement_htl_with_policy(htl, policy, self)
289 }
290}
291
292impl Default for PeerHTLConfig {
293 fn default() -> Self {
294 Self::random()
295 }
296}
297
298pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
300 let htl = htl.min(policy.max_htl);
301 if htl == 0 {
302 return 0;
303 }
304
305 match policy.mode {
306 HtlMode::Probabilistic => {
307 let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
308 let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
309
310 if htl == policy.max_htl {
311 return if config.at_max_sample < p_at_max {
312 htl.saturating_sub(1)
313 } else {
314 htl
315 };
316 }
317
318 if htl == 1 {
319 return if config.at_min_sample < p_at_min {
320 0
321 } else {
322 htl
323 };
324 }
325
326 htl.saturating_sub(1)
327 }
328 }
329}
330
331pub fn should_forward_htl(htl: u8) -> bool {
333 htl > 0
334}
335
336pub fn should_forward(htl: u8) -> bool {
338 should_forward_htl(htl)
339}
340
341use tokio::sync::{mpsc, oneshot};
342
343pub struct ForwardRequest {
345 pub hash: Hash,
347 pub exclude_peer_id: String,
349 pub htl: u8,
351 pub response: oneshot::Sender<Option<Vec<u8>>>,
353}
354
355pub type ForwardTx = mpsc::Sender<ForwardRequest>;
357pub type ForwardRx = mpsc::Receiver<ForwardRequest>;
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
362pub enum PeerPool {
363 Follows,
365 Other,
367}
368
369#[derive(Debug, Clone, Copy)]
371pub struct PoolConfig {
372 pub max_connections: usize,
374 pub satisfied_connections: usize,
376}
377
378impl PoolConfig {
379 #[inline]
381 pub fn can_accept(&self, current_count: usize) -> bool {
382 current_count < self.max_connections
383 }
384
385 #[inline]
387 pub fn needs_peers(&self, current_count: usize) -> bool {
388 current_count < self.satisfied_connections
389 }
390
391 #[inline]
393 pub fn is_satisfied(&self, current_count: usize) -> bool {
394 current_count >= self.satisfied_connections
395 }
396}
397
398impl Default for PoolConfig {
399 fn default() -> Self {
400 Self {
401 max_connections: 16,
402 satisfied_connections: 8,
403 }
404 }
405}
406
407#[derive(Debug, Clone)]
409pub struct PoolSettings {
410 pub follows: PoolConfig,
411 pub other: PoolConfig,
412}
413
414impl Default for PoolSettings {
415 fn default() -> Self {
416 Self {
417 follows: PoolConfig {
418 max_connections: 16,
419 satisfied_connections: 8,
420 },
421 other: PoolConfig {
422 max_connections: 16,
423 satisfied_connections: 8,
424 },
425 }
426 }
427}
428
429pub struct ClassifyRequest {
431 pub pubkey: String,
433 pub response: oneshot::Sender<PeerPool>,
435}
436
437pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
439
440pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
442
443pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
445 mpsc::channel(buffer)
446}
447
448#[derive(Clone)]
450pub struct WebRTCStoreConfig {
451 pub relays: Vec<String>,
453 pub roots: Vec<Hash>,
455 pub request_timeout_ms: u64,
457 pub hello_interval_ms: u64,
459 pub debug: bool,
461 pub pools: PoolSettings,
463 pub classifier_tx: Option<ClassifierTx>,
466 pub request_selection_strategy: SelectionStrategy,
468 pub request_fairness_enabled: bool,
470 pub request_dispatch: RequestDispatchConfig,
472}
473
474impl Default for WebRTCStoreConfig {
475 fn default() -> Self {
476 Self {
477 relays: vec![
478 "wss://temp.iris.to".to_string(),
479 "wss://relay.damus.io".to_string(),
480 ],
481 roots: Vec::new(),
482 request_timeout_ms: 10000,
483 hello_interval_ms: 30000,
484 debug: false,
485 pools: PoolSettings::default(),
486 classifier_tx: None,
487 request_selection_strategy: SelectionStrategy::TitForTat,
488 request_fairness_enabled: true,
489 request_dispatch: RequestDispatchConfig {
490 initial_fanout: 2,
491 hedge_fanout: 1,
492 max_fanout: 8,
493 hedge_interval_ms: 120,
494 },
495 }
496 }
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub enum PeerState {
502 New,
504 Connecting,
506 Connected,
508 Ready,
510 Disconnected,
512}
513
514#[derive(Debug, Clone, Default)]
516pub struct WebRTCStats {
517 pub connected_peers: usize,
518 pub pending_requests: usize,
519 pub bytes_sent: u64,
520 pub bytes_received: u64,
521 pub requests_made: u64,
522 pub requests_fulfilled: u64,
523}
524
525pub const NOSTR_KIND_HASHTREE: u16 = 25050;
527
528pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
530pub const MESH_PROTOCOL_VERSION: u8 = 1;
531pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
532pub const MESH_MAX_HTL: u8 = 6;
533
534#[derive(Debug)]
536pub struct TimedSeenSet {
537 entries: HashMap<String, Instant>,
538 order: VecDeque<(String, Instant)>,
539 ttl: Duration,
540 capacity: usize,
541}
542
543impl TimedSeenSet {
544 pub fn new(capacity: usize, ttl: Duration) -> Self {
545 Self {
546 entries: HashMap::new(),
547 order: VecDeque::new(),
548 ttl,
549 capacity,
550 }
551 }
552
553 fn prune(&mut self, now: Instant) {
554 while let Some((key, inserted_at)) = self.order.front().cloned() {
555 if now.duration_since(inserted_at) < self.ttl {
556 break;
557 }
558 self.order.pop_front();
559 if self
560 .entries
561 .get(&key)
562 .map(|ts| *ts == inserted_at)
563 .unwrap_or(false)
564 {
565 self.entries.remove(&key);
566 }
567 }
568
569 while self.entries.len() > self.capacity {
570 if let Some((key, inserted_at)) = self.order.pop_front() {
571 if self
572 .entries
573 .get(&key)
574 .map(|ts| *ts == inserted_at)
575 .unwrap_or(false)
576 {
577 self.entries.remove(&key);
578 }
579 } else {
580 break;
581 }
582 }
583 }
584
585 pub fn insert_if_new(&mut self, key: String) -> bool {
586 let now = Instant::now();
587 self.prune(now);
588 if self.entries.contains_key(&key) {
589 return false;
590 }
591 self.entries.insert(key.clone(), now);
592 self.order.push_back((key, now));
593 self.prune(now);
594 true
595 }
596
597 pub fn len(&self) -> usize {
598 self.entries.len()
599 }
600
601 pub fn is_empty(&self) -> bool {
602 self.entries.is_empty()
603 }
604}
605
606#[derive(Debug, Clone, Serialize, Deserialize)]
607#[serde(tag = "type")]
608pub enum MeshNostrPayload {
609 #[serde(rename = "EVENT")]
610 Event { event: Event },
611}
612
613#[derive(Debug, Clone, Serialize, Deserialize)]
614pub struct MeshNostrFrame {
615 pub protocol: String,
616 pub version: u8,
617 pub frame_id: String,
618 pub htl: u8,
619 pub sender_peer_id: String,
620 pub payload: MeshNostrPayload,
621}
622
623impl MeshNostrFrame {
624 pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
625 Self::new_event_with_id(
626 event,
627 sender_peer_id,
628 &uuid::Uuid::new_v4().to_string(),
629 htl,
630 )
631 }
632
633 pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
634 Self {
635 protocol: MESH_PROTOCOL.to_string(),
636 version: MESH_PROTOCOL_VERSION,
637 frame_id: frame_id.to_string(),
638 htl,
639 sender_peer_id: sender_peer_id.to_string(),
640 payload: MeshNostrPayload::Event { event },
641 }
642 }
643
644 pub fn event(&self) -> &Event {
645 match &self.payload {
646 MeshNostrPayload::Event { event } => event,
647 }
648 }
649}
650
651pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
652 if frame.protocol != MESH_PROTOCOL {
653 return Err("invalid protocol");
654 }
655 if frame.version != MESH_PROTOCOL_VERSION {
656 return Err("invalid version");
657 }
658 if frame.frame_id.is_empty() {
659 return Err("missing frame id");
660 }
661 if frame.sender_peer_id.is_empty() {
662 return Err("missing sender peer id");
663 }
664 if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
665 return Err("invalid htl");
666 }
667
668 let event = frame.event();
669 if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
670 && event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
671 {
672 return Err("unsupported event kind");
673 }
674
675 Ok(())
676}
677
678pub const DATA_CHANNEL_LABEL: &str = "hashtree";