1use anyhow::Result;
5use nostr::{nips::nip19::FromBech32, Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag};
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeSet;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::RwLock;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ConnectionState {
17 Discovered,
18 Connecting,
19 Connected,
20 Failed,
21 Disconnected,
22}
23
24#[derive(Debug)]
26pub struct PeerEntry {
27 pub peer_id: PeerId,
28 pub direction: PeerDirection,
29 pub state: ConnectionState,
30 pub last_seen: std::time::Instant,
31 pub peer: Option<DummyPeer>,
32 pub pool: PeerPool,
33 pub transport: PeerTransport,
34 pub signal_paths: BTreeSet<PeerSignalPath>,
35 pub bytes_sent: u64,
36 pub bytes_received: u64,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct PeerId {
42 pub pubkey: String,
43}
44
45impl PeerId {
46 pub fn new(pubkey: String) -> Self {
47 Self { pubkey }
48 }
49
50 pub fn short(&self) -> String {
51 self.pubkey[..8.min(self.pubkey.len())].to_string()
52 }
53}
54
55impl std::fmt::Display for PeerId {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.write_str(&self.pubkey)
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum PeerDirection {
64 Inbound,
65 Outbound,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum PeerTransport {
71 WebRtc,
72 Bluetooth,
73}
74
75impl std::fmt::Display for PeerTransport {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 PeerTransport::WebRtc => f.write_str("webrtc"),
79 PeerTransport::Bluetooth => f.write_str("bluetooth"),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
86pub enum PeerSignalPath {
87 Relay,
88 Multicast,
89 WifiAware,
90 Bluetooth,
91}
92
93impl std::fmt::Display for PeerSignalPath {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 PeerSignalPath::Relay => f.write_str("relay"),
97 PeerSignalPath::Multicast => f.write_str("multicast"),
98 PeerSignalPath::WifiAware => f.write_str("wifi-aware"),
99 PeerSignalPath::Bluetooth => f.write_str("bluetooth"),
100 }
101 }
102}
103
104#[derive(Debug)]
106pub struct DummyPeer;
107
108impl DummyPeer {
109 pub fn is_ready(&self) -> bool {
110 false
111 }
112
113 pub fn has_data_channel(&self) -> bool {
114 false
115 }
116
117 pub fn state(&self) -> &str {
118 "Disabled"
119 }
120
121 pub fn as_webrtc(&self) -> Option<&Self> {
122 None
123 }
124
125 pub async fn request(&self, _hash: &str) -> Result<Option<Vec<u8>>> {
126 Ok(None)
127 }
128}
129
130#[derive(Debug, Clone, Copy)]
132pub enum PeerPool {
133 Follows,
134 Other,
135}
136
137#[derive(Debug, Clone)]
138pub struct PeerRootEvent {
139 pub hash: String,
140 pub key: Option<String>,
141 pub encrypted_key: Option<String>,
142 pub self_encrypted_key: Option<String>,
143 pub event_id: String,
144 pub created_at: u64,
145 pub peer_id: String,
146}
147
148#[derive(Debug)]
150pub struct WebRTCState {
151 pub peers: Arc<RwLock<HashMap<String, PeerEntry>>>,
152 bytes_sent: AtomicU64,
153 bytes_received: AtomicU64,
154}
155
156impl Default for WebRTCState {
157 fn default() -> Self {
158 Self {
159 peers: Arc::new(RwLock::new(HashMap::new())),
160 bytes_sent: AtomicU64::new(0),
161 bytes_received: AtomicU64::new(0),
162 }
163 }
164}
165
166impl WebRTCState {
167 pub fn new() -> Self {
168 Self::default()
169 }
170
171 pub async fn query_peers_for_data(&self, _hash: &str) -> Option<Vec<u8>> {
173 None
174 }
175
176 pub async fn request_from_peers(&self, _hash: &str) -> Option<Vec<u8>> {
178 None
179 }
180
181 pub async fn request_from_peers_with_source(&self, _hash: &str) -> Option<(Vec<u8>, String)> {
183 None
184 }
185
186 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
187 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
188 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
189 entry.bytes_sent += bytes;
190 }
191 }
192
193 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
194 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
195 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
196 entry.bytes_received += bytes;
197 }
198 }
199
200 pub fn get_bandwidth(&self) -> (u64, u64) {
202 (
203 self.bytes_sent.load(Ordering::Relaxed),
204 self.bytes_received.load(Ordering::Relaxed),
205 )
206 }
207
208 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
210 (0, 0, 0)
211 }
212
213 pub async fn resolve_root_from_peers(
215 &self,
216 _owner_pubkey: &str,
217 _tree_name: &str,
218 _per_peer_timeout: Duration,
219 ) -> Option<PeerRootEvent> {
220 None
221 }
222
223 pub async fn resolve_root_from_local_buses_with_source(
224 &self,
225 _owner_pubkey: &str,
226 _tree_name: &str,
227 _timeout: Duration,
228 ) -> Option<(&'static str, PeerRootEvent)> {
229 None
230 }
231}
232
233pub fn build_root_filter(owner_pubkey: &str, tree_name: &str) -> Option<Filter> {
234 let author = PublicKey::from_hex(owner_pubkey)
235 .or_else(|_| PublicKey::from_bech32(owner_pubkey))
236 .ok()?;
237 Some(
238 Filter::new()
239 .kind(Kind::Custom(30078))
240 .author(author)
241 .custom_tag(
242 SingleLetterTag::lowercase(Alphabet::D),
243 tree_name.to_string(),
244 )
245 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), "hashtree")
246 .limit(50),
247 )
248}
249
250pub fn pick_latest_event<'a, I>(events: I) -> Option<&'a Event>
251where
252 I: IntoIterator<Item = &'a Event>,
253{
254 events.into_iter().max_by(|a, b| {
255 let ordering = a.created_at.cmp(&b.created_at);
256 if ordering == std::cmp::Ordering::Equal {
257 a.id.cmp(&b.id)
258 } else {
259 ordering
260 }
261 })
262}
263
264pub fn root_event_from_peer(
265 event: &Event,
266 peer_id: &str,
267 tree_name: &str,
268) -> Option<PeerRootEvent> {
269 let mut tree_match = false;
270 let mut labeled = false;
271 let mut key = None;
272 let mut encrypted_key = None;
273 let mut self_encrypted_key = None;
274 let mut hash_tag = None;
275
276 for tag in event.tags.iter() {
277 let slice = tag.as_slice();
278 if slice.len() < 2 {
279 continue;
280 }
281 match slice[0].as_str() {
282 "d" => tree_match = slice[1].as_str() == tree_name,
283 "l" => labeled = slice[1].as_str() == "hashtree",
284 "hash" => hash_tag = Some(slice[1].to_string()),
285 "key" => key = Some(slice[1].to_string()),
286 "encryptedKey" => encrypted_key = Some(slice[1].to_string()),
287 "selfEncryptedKey" => self_encrypted_key = Some(slice[1].to_string()),
288 _ => {}
289 }
290 }
291
292 if !tree_match || !labeled {
293 return None;
294 }
295
296 let hash = hash_tag.or_else(|| {
297 if event.content.is_empty() {
298 None
299 } else {
300 Some(event.content.clone())
301 }
302 })?;
303
304 Some(PeerRootEvent {
305 hash,
306 key,
307 encrypted_key,
308 self_encrypted_key,
309 event_id: event.id.to_hex(),
310 created_at: event.created_at.as_secs(),
311 peer_id: peer_id.to_string(),
312 })
313}
314
315pub trait ContentStore: Send + Sync + 'static {
317 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
319}
320
321pub mod types {
322 use super::*;
323
324 pub const MAX_HTL: u8 = 7;
325 pub const MSG_TYPE_REQUEST: u8 = 0x00;
326 pub const MSG_TYPE_RESPONSE: u8 = 0x01;
327 pub const MSG_TYPE_QUOTE_REQUEST: u8 = 0x02;
328 pub const MSG_TYPE_QUOTE_RESPONSE: u8 = 0x03;
329 pub const MSG_TYPE_PAYMENT: u8 = 0x04;
330 pub const MSG_TYPE_PAYMENT_ACK: u8 = 0x05;
331 pub const MSG_TYPE_CHUNK: u8 = 0x06;
332 pub const MSG_TYPE_PEER_HINTS: u8 = 0x07;
333 pub const MSG_TYPE_PUBSUB_INTEREST: u8 = 0x08;
334 pub const MSG_TYPE_PUBSUB_FRAME: u8 = 0x09;
335 pub const MSG_TYPE_PUBSUB_INVENTORY: u8 = 0x0a;
336 pub const MSG_TYPE_PUBSUB_WANT: u8 = 0x0b;
337
338 #[derive(Debug, Clone, Serialize, Deserialize)]
339 pub struct DataRequest {
340 #[serde(with = "serde_bytes")]
341 pub h: Vec<u8>,
342 #[serde(default = "default_htl")]
343 pub htl: u8,
344 #[serde(skip_serializing_if = "Option::is_none")]
345 pub q: Option<u64>,
346 }
347
348 #[derive(Debug, Clone, Serialize, Deserialize)]
349 pub struct DataResponse {
350 #[serde(with = "serde_bytes")]
351 pub h: Vec<u8>,
352 #[serde(with = "serde_bytes")]
353 pub d: Vec<u8>,
354 #[serde(skip_serializing_if = "Option::is_none")]
355 pub i: Option<u32>,
356 #[serde(skip_serializing_if = "Option::is_none")]
357 pub n: Option<u32>,
358 }
359
360 #[derive(Debug, Clone, Serialize, Deserialize)]
361 pub struct DataQuoteRequest {
362 #[serde(with = "serde_bytes")]
363 pub h: Vec<u8>,
364 pub p: u64,
365 pub t: u32,
366 #[serde(skip_serializing_if = "Option::is_none")]
367 pub m: Option<String>,
368 }
369
370 #[derive(Debug, Clone, Serialize, Deserialize)]
371 pub struct DataQuoteResponse {
372 #[serde(with = "serde_bytes")]
373 pub h: Vec<u8>,
374 pub a: bool,
375 #[serde(skip_serializing_if = "Option::is_none")]
376 pub q: Option<u64>,
377 #[serde(skip_serializing_if = "Option::is_none")]
378 pub p: Option<u64>,
379 #[serde(skip_serializing_if = "Option::is_none")]
380 pub t: Option<u32>,
381 #[serde(skip_serializing_if = "Option::is_none")]
382 pub m: Option<String>,
383 }
384
385 #[derive(Debug, Clone, Serialize, Deserialize)]
386 pub struct DataPayment {
387 #[serde(with = "serde_bytes")]
388 pub h: Vec<u8>,
389 pub q: u64,
390 pub c: u32,
391 pub p: u64,
392 #[serde(skip_serializing_if = "Option::is_none")]
393 pub m: Option<String>,
394 pub tok: String,
395 }
396
397 #[derive(Debug, Clone, Serialize, Deserialize)]
398 pub struct DataPaymentAck {
399 #[serde(with = "serde_bytes")]
400 pub h: Vec<u8>,
401 pub q: u64,
402 pub c: u32,
403 pub a: bool,
404 #[serde(skip_serializing_if = "Option::is_none")]
405 pub e: Option<String>,
406 }
407
408 #[derive(Debug, Clone, Serialize, Deserialize)]
409 pub struct DataChunk {
410 #[serde(with = "serde_bytes")]
411 pub h: Vec<u8>,
412 pub q: u64,
413 pub c: u32,
414 pub n: u32,
415 pub p: u64,
416 #[serde(with = "serde_bytes")]
417 pub d: Vec<u8>,
418 }
419
420 #[derive(Debug, Clone, Serialize, Deserialize)]
421 pub struct PeerHints {
422 #[serde(default, rename = "u")]
423 pub signal_urls: Vec<String>,
424 }
425
426 #[derive(Debug, Clone, Serialize, Deserialize)]
427 pub struct PubsubInterest {
428 #[serde(rename = "s")]
429 pub stream_id: String,
430 #[serde(rename = "sub")]
431 pub subscriber_peer_id: String,
432 #[serde(rename = "q")]
433 pub seq: u64,
434 #[serde(rename = "a")]
435 pub active: bool,
436 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
437 pub htl: u8,
438 }
439
440 #[derive(Debug, Clone, Serialize, Deserialize)]
441 pub struct PubsubFrame {
442 #[serde(rename = "s")]
443 pub stream_id: String,
444 #[serde(rename = "q")]
445 pub seq: u64,
446 #[serde(rename = "o")]
447 pub origin_peer_id: String,
448 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
449 pub htl: u8,
450 #[serde(with = "serde_bytes", rename = "d")]
451 pub payload: Vec<u8>,
452 }
453
454 #[derive(Debug, Clone, Serialize, Deserialize)]
455 pub struct PubsubInventory {
456 #[serde(rename = "s")]
457 pub stream_id: String,
458 #[serde(rename = "q")]
459 pub seq: u64,
460 #[serde(rename = "o")]
461 pub origin_peer_id: String,
462 #[serde(rename = "b")]
463 pub payload_bytes: u64,
464 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
465 pub htl: u8,
466 }
467
468 #[derive(Debug, Clone, Serialize, Deserialize)]
469 pub struct PubsubWant {
470 #[serde(rename = "s")]
471 pub stream_id: String,
472 #[serde(rename = "q")]
473 pub seq: u64,
474 #[serde(rename = "o")]
475 pub origin_peer_id: String,
476 }
477
478 #[derive(Debug, Clone)]
479 pub enum DataMessage {
480 Request(DataRequest),
481 Response(DataResponse),
482 QuoteRequest(DataQuoteRequest),
483 QuoteResponse(DataQuoteResponse),
484 Payment(DataPayment),
485 PaymentAck(DataPaymentAck),
486 Chunk(DataChunk),
487 PeerHints(PeerHints),
488 PubsubInterest(PubsubInterest),
489 PubsubFrame(PubsubFrame),
490 PubsubInventory(PubsubInventory),
491 PubsubWant(PubsubWant),
492 }
493
494 fn default_htl() -> u8 {
495 MAX_HTL
496 }
497
498 fn is_max_htl(htl: &u8) -> bool {
499 *htl == MAX_HTL
500 }
501
502 pub fn encode_request(req: &DataRequest) -> Result<Vec<u8>, rmp_serde::encode::Error> {
503 let body = rmp_serde::to_vec_named(req)?;
504 let mut result = Vec::with_capacity(1 + body.len());
505 result.push(MSG_TYPE_REQUEST);
506 result.extend(body);
507 Ok(result)
508 }
509
510 pub fn encode_response(res: &DataResponse) -> Result<Vec<u8>, rmp_serde::encode::Error> {
511 let body = rmp_serde::to_vec_named(res)?;
512 let mut result = Vec::with_capacity(1 + body.len());
513 result.push(MSG_TYPE_RESPONSE);
514 result.extend(body);
515 Ok(result)
516 }
517
518 pub fn encode_quote_request(
519 req: &DataQuoteRequest,
520 ) -> Result<Vec<u8>, rmp_serde::encode::Error> {
521 let body = rmp_serde::to_vec_named(req)?;
522 let mut result = Vec::with_capacity(1 + body.len());
523 result.push(MSG_TYPE_QUOTE_REQUEST);
524 result.extend(body);
525 Ok(result)
526 }
527
528 pub fn encode_quote_response(
529 res: &DataQuoteResponse,
530 ) -> Result<Vec<u8>, rmp_serde::encode::Error> {
531 let body = rmp_serde::to_vec_named(res)?;
532 let mut result = Vec::with_capacity(1 + body.len());
533 result.push(MSG_TYPE_QUOTE_RESPONSE);
534 result.extend(body);
535 Ok(result)
536 }
537
538 pub fn encode_payment(req: &DataPayment) -> Result<Vec<u8>, rmp_serde::encode::Error> {
539 let body = rmp_serde::to_vec_named(req)?;
540 let mut result = Vec::with_capacity(1 + body.len());
541 result.push(MSG_TYPE_PAYMENT);
542 result.extend(body);
543 Ok(result)
544 }
545
546 pub fn encode_payment_ack(res: &DataPaymentAck) -> Result<Vec<u8>, rmp_serde::encode::Error> {
547 let body = rmp_serde::to_vec_named(res)?;
548 let mut result = Vec::with_capacity(1 + body.len());
549 result.push(MSG_TYPE_PAYMENT_ACK);
550 result.extend(body);
551 Ok(result)
552 }
553
554 pub fn encode_chunk(chunk: &DataChunk) -> Result<Vec<u8>, rmp_serde::encode::Error> {
555 let body = rmp_serde::to_vec_named(chunk)?;
556 let mut result = Vec::with_capacity(1 + body.len());
557 result.push(MSG_TYPE_CHUNK);
558 result.extend(body);
559 Ok(result)
560 }
561
562 pub fn parse_message(data: &[u8]) -> Result<DataMessage, rmp_serde::decode::Error> {
563 if data.is_empty() {
564 return Err(rmp_serde::decode::Error::LengthMismatch(0));
565 }
566
567 match data[0] {
568 MSG_TYPE_REQUEST => Ok(DataMessage::Request(rmp_serde::from_slice(&data[1..])?)),
569 MSG_TYPE_RESPONSE => Ok(DataMessage::Response(rmp_serde::from_slice(&data[1..])?)),
570 MSG_TYPE_QUOTE_REQUEST => Ok(DataMessage::QuoteRequest(rmp_serde::from_slice(
571 &data[1..],
572 )?)),
573 MSG_TYPE_QUOTE_RESPONSE => Ok(DataMessage::QuoteResponse(rmp_serde::from_slice(
574 &data[1..],
575 )?)),
576 MSG_TYPE_PAYMENT => Ok(DataMessage::Payment(rmp_serde::from_slice(&data[1..])?)),
577 MSG_TYPE_PAYMENT_ACK => Ok(DataMessage::PaymentAck(rmp_serde::from_slice(&data[1..])?)),
578 MSG_TYPE_CHUNK => Ok(DataMessage::Chunk(rmp_serde::from_slice(&data[1..])?)),
579 MSG_TYPE_PEER_HINTS => Ok(DataMessage::PeerHints(rmp_serde::from_slice(&data[1..])?)),
580 MSG_TYPE_PUBSUB_INTEREST => Ok(DataMessage::PubsubInterest(rmp_serde::from_slice(
581 &data[1..],
582 )?)),
583 MSG_TYPE_PUBSUB_FRAME => {
584 Ok(DataMessage::PubsubFrame(rmp_serde::from_slice(&data[1..])?))
585 }
586 MSG_TYPE_PUBSUB_INVENTORY => Ok(DataMessage::PubsubInventory(rmp_serde::from_slice(
587 &data[1..],
588 )?)),
589 MSG_TYPE_PUBSUB_WANT => Ok(DataMessage::PubsubWant(rmp_serde::from_slice(&data[1..])?)),
590 other => Err(rmp_serde::decode::Error::LengthMismatch(other as u32)),
591 }
592 }
593}