1use anyhow::Result;
5use nostr::{nips::nip19::FromBech32, Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag};
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeSet;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::RwLock;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ConnectionState {
17 Discovered,
18 Connecting,
19 Connected,
20 Failed,
21 Disconnected,
22}
23
24#[derive(Debug)]
26pub struct PeerEntry {
27 pub peer_id: PeerId,
28 pub direction: PeerDirection,
29 pub state: ConnectionState,
30 pub last_seen: std::time::Instant,
31 pub peer: Option<DummyPeer>,
32 pub pool: PeerPool,
33 pub transport: PeerTransport,
34 pub signal_paths: BTreeSet<PeerSignalPath>,
35 pub bytes_sent: u64,
36 pub bytes_received: u64,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct PeerId {
42 pub pubkey: String,
43}
44
45impl PeerId {
46 pub fn new(pubkey: String) -> Self {
47 Self { pubkey }
48 }
49
50 pub fn short(&self) -> String {
51 self.pubkey[..8.min(self.pubkey.len())].to_string()
52 }
53}
54
55impl std::fmt::Display for PeerId {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.write_str(&self.pubkey)
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum PeerDirection {
64 Inbound,
65 Outbound,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum PeerTransport {
71 WebRtc,
72 Bluetooth,
73}
74
75impl std::fmt::Display for PeerTransport {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 PeerTransport::WebRtc => f.write_str("webrtc"),
79 PeerTransport::Bluetooth => f.write_str("bluetooth"),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
86pub enum PeerSignalPath {
87 Relay,
88 Multicast,
89 WifiAware,
90 Bluetooth,
91}
92
93impl std::fmt::Display for PeerSignalPath {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 PeerSignalPath::Relay => f.write_str("relay"),
97 PeerSignalPath::Multicast => f.write_str("multicast"),
98 PeerSignalPath::WifiAware => f.write_str("wifi-aware"),
99 PeerSignalPath::Bluetooth => f.write_str("bluetooth"),
100 }
101 }
102}
103
104#[derive(Debug)]
106pub struct DummyPeer;
107
108impl DummyPeer {
109 pub fn is_ready(&self) -> bool {
110 false
111 }
112
113 pub fn has_data_channel(&self) -> bool {
114 false
115 }
116
117 pub fn state(&self) -> &str {
118 "Disabled"
119 }
120
121 pub fn as_webrtc(&self) -> Option<&Self> {
122 None
123 }
124
125 pub async fn request(&self, _hash: &str) -> Result<Option<Vec<u8>>> {
126 Ok(None)
127 }
128}
129
130#[derive(Debug, Clone, Copy)]
132pub enum PeerPool {
133 Follows,
134 Other,
135}
136
137#[derive(Debug, Clone)]
138pub struct PeerRootEvent {
139 pub hash: String,
140 pub key: Option<String>,
141 pub encrypted_key: Option<String>,
142 pub self_encrypted_key: Option<String>,
143 pub event_id: String,
144 pub created_at: u64,
145 pub peer_id: String,
146}
147
148#[derive(Debug)]
150pub struct WebRTCState {
151 pub peers: Arc<RwLock<HashMap<String, PeerEntry>>>,
152 bytes_sent: AtomicU64,
153 bytes_received: AtomicU64,
154}
155
156impl Default for WebRTCState {
157 fn default() -> Self {
158 Self {
159 peers: Arc::new(RwLock::new(HashMap::new())),
160 bytes_sent: AtomicU64::new(0),
161 bytes_received: AtomicU64::new(0),
162 }
163 }
164}
165
166impl WebRTCState {
167 pub fn new() -> Self {
168 Self::default()
169 }
170
171 pub async fn query_peers_for_data(&self, _hash: &str) -> Option<Vec<u8>> {
173 None
174 }
175
176 pub async fn request_from_peers(&self, _hash: &str) -> Option<Vec<u8>> {
178 None
179 }
180
181 pub async fn request_from_peers_with_source(&self, _hash: &str) -> Option<(Vec<u8>, String)> {
183 None
184 }
185
186 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
187 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
188 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
189 entry.bytes_sent += bytes;
190 }
191 }
192
193 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
194 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
195 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
196 entry.bytes_received += bytes;
197 }
198 }
199
200 pub fn get_bandwidth(&self) -> (u64, u64) {
202 (
203 self.bytes_sent.load(Ordering::Relaxed),
204 self.bytes_received.load(Ordering::Relaxed),
205 )
206 }
207
208 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
210 (0, 0, 0)
211 }
212
213 pub async fn resolve_root_from_peers(
215 &self,
216 _owner_pubkey: &str,
217 _tree_name: &str,
218 _per_peer_timeout: Duration,
219 ) -> Option<PeerRootEvent> {
220 None
221 }
222
223 pub async fn resolve_root_from_local_buses_with_source(
224 &self,
225 _owner_pubkey: &str,
226 _tree_name: &str,
227 _timeout: Duration,
228 ) -> Option<(&'static str, PeerRootEvent)> {
229 None
230 }
231}
232
233pub fn build_root_filter(owner_pubkey: &str, tree_name: &str) -> Option<Filter> {
234 let author = PublicKey::from_hex(owner_pubkey)
235 .or_else(|_| PublicKey::from_bech32(owner_pubkey))
236 .ok()?;
237 Some(
238 Filter::new()
239 .kind(Kind::Custom(30078))
240 .author(author)
241 .custom_tag(
242 SingleLetterTag::lowercase(Alphabet::D),
243 vec![tree_name.to_string()],
244 )
245 .custom_tag(
246 SingleLetterTag::lowercase(Alphabet::L),
247 vec!["hashtree".to_string()],
248 )
249 .limit(50),
250 )
251}
252
253pub fn pick_latest_event<'a, I>(events: I) -> Option<&'a Event>
254where
255 I: IntoIterator<Item = &'a Event>,
256{
257 events.into_iter().max_by(|a, b| {
258 let ordering = a.created_at.cmp(&b.created_at);
259 if ordering == std::cmp::Ordering::Equal {
260 a.id.cmp(&b.id)
261 } else {
262 ordering
263 }
264 })
265}
266
267pub fn root_event_from_peer(
268 event: &Event,
269 peer_id: &str,
270 tree_name: &str,
271) -> Option<PeerRootEvent> {
272 let mut tree_match = false;
273 let mut labeled = false;
274 let mut key = None;
275 let mut encrypted_key = None;
276 let mut self_encrypted_key = None;
277 let mut hash_tag = None;
278
279 for tag in &event.tags {
280 let slice = tag.as_slice();
281 if slice.len() < 2 {
282 continue;
283 }
284 match slice[0].as_str() {
285 "d" => tree_match = slice[1].as_str() == tree_name,
286 "l" => labeled = slice[1].as_str() == "hashtree",
287 "hash" => hash_tag = Some(slice[1].to_string()),
288 "key" => key = Some(slice[1].to_string()),
289 "encryptedKey" => encrypted_key = Some(slice[1].to_string()),
290 "selfEncryptedKey" => self_encrypted_key = Some(slice[1].to_string()),
291 _ => {}
292 }
293 }
294
295 if !tree_match || !labeled {
296 return None;
297 }
298
299 let hash = hash_tag.or_else(|| {
300 if event.content.is_empty() {
301 None
302 } else {
303 Some(event.content.clone())
304 }
305 })?;
306
307 Some(PeerRootEvent {
308 hash,
309 key,
310 encrypted_key,
311 self_encrypted_key,
312 event_id: event.id.to_hex(),
313 created_at: event.created_at.as_u64(),
314 peer_id: peer_id.to_string(),
315 })
316}
317
318pub trait ContentStore: Send + Sync + 'static {
320 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>>;
322}
323
324pub mod types {
325 use super::*;
326
327 pub const MAX_HTL: u8 = 7;
328 pub const MSG_TYPE_REQUEST: u8 = 0x00;
329 pub const MSG_TYPE_RESPONSE: u8 = 0x01;
330 pub const MSG_TYPE_QUOTE_REQUEST: u8 = 0x02;
331 pub const MSG_TYPE_QUOTE_RESPONSE: u8 = 0x03;
332 pub const MSG_TYPE_PAYMENT: u8 = 0x04;
333 pub const MSG_TYPE_PAYMENT_ACK: u8 = 0x05;
334 pub const MSG_TYPE_CHUNK: u8 = 0x06;
335 pub const MSG_TYPE_PEER_HINTS: u8 = 0x07;
336 pub const MSG_TYPE_PUBSUB_INTEREST: u8 = 0x08;
337 pub const MSG_TYPE_PUBSUB_FRAME: u8 = 0x09;
338 pub const MSG_TYPE_PUBSUB_INVENTORY: u8 = 0x0a;
339 pub const MSG_TYPE_PUBSUB_WANT: u8 = 0x0b;
340
341 #[derive(Debug, Clone, Serialize, Deserialize)]
342 pub struct DataRequest {
343 #[serde(with = "serde_bytes")]
344 pub h: Vec<u8>,
345 #[serde(default = "default_htl")]
346 pub htl: u8,
347 #[serde(skip_serializing_if = "Option::is_none")]
348 pub q: Option<u64>,
349 }
350
351 #[derive(Debug, Clone, Serialize, Deserialize)]
352 pub struct DataResponse {
353 #[serde(with = "serde_bytes")]
354 pub h: Vec<u8>,
355 #[serde(with = "serde_bytes")]
356 pub d: Vec<u8>,
357 #[serde(skip_serializing_if = "Option::is_none")]
358 pub i: Option<u32>,
359 #[serde(skip_serializing_if = "Option::is_none")]
360 pub n: Option<u32>,
361 }
362
363 #[derive(Debug, Clone, Serialize, Deserialize)]
364 pub struct DataQuoteRequest {
365 #[serde(with = "serde_bytes")]
366 pub h: Vec<u8>,
367 pub p: u64,
368 pub t: u32,
369 #[serde(skip_serializing_if = "Option::is_none")]
370 pub m: Option<String>,
371 }
372
373 #[derive(Debug, Clone, Serialize, Deserialize)]
374 pub struct DataQuoteResponse {
375 #[serde(with = "serde_bytes")]
376 pub h: Vec<u8>,
377 pub a: bool,
378 #[serde(skip_serializing_if = "Option::is_none")]
379 pub q: Option<u64>,
380 #[serde(skip_serializing_if = "Option::is_none")]
381 pub p: Option<u64>,
382 #[serde(skip_serializing_if = "Option::is_none")]
383 pub t: Option<u32>,
384 #[serde(skip_serializing_if = "Option::is_none")]
385 pub m: Option<String>,
386 }
387
388 #[derive(Debug, Clone, Serialize, Deserialize)]
389 pub struct DataPayment {
390 #[serde(with = "serde_bytes")]
391 pub h: Vec<u8>,
392 pub q: u64,
393 pub c: u32,
394 pub p: u64,
395 #[serde(skip_serializing_if = "Option::is_none")]
396 pub m: Option<String>,
397 pub tok: String,
398 }
399
400 #[derive(Debug, Clone, Serialize, Deserialize)]
401 pub struct DataPaymentAck {
402 #[serde(with = "serde_bytes")]
403 pub h: Vec<u8>,
404 pub q: u64,
405 pub c: u32,
406 pub a: bool,
407 #[serde(skip_serializing_if = "Option::is_none")]
408 pub e: Option<String>,
409 }
410
411 #[derive(Debug, Clone, Serialize, Deserialize)]
412 pub struct DataChunk {
413 #[serde(with = "serde_bytes")]
414 pub h: Vec<u8>,
415 pub q: u64,
416 pub c: u32,
417 pub n: u32,
418 pub p: u64,
419 #[serde(with = "serde_bytes")]
420 pub d: Vec<u8>,
421 }
422
423 #[derive(Debug, Clone, Serialize, Deserialize)]
424 pub struct PeerHints {
425 #[serde(default, rename = "u")]
426 pub signal_urls: Vec<String>,
427 }
428
429 #[derive(Debug, Clone, Serialize, Deserialize)]
430 pub struct PubsubInterest {
431 #[serde(rename = "s")]
432 pub stream_id: String,
433 #[serde(rename = "sub")]
434 pub subscriber_peer_id: String,
435 #[serde(rename = "q")]
436 pub seq: u64,
437 #[serde(rename = "a")]
438 pub active: bool,
439 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
440 pub htl: u8,
441 }
442
443 #[derive(Debug, Clone, Serialize, Deserialize)]
444 pub struct PubsubFrame {
445 #[serde(rename = "s")]
446 pub stream_id: String,
447 #[serde(rename = "q")]
448 pub seq: u64,
449 #[serde(rename = "o")]
450 pub origin_peer_id: String,
451 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
452 pub htl: u8,
453 #[serde(with = "serde_bytes", rename = "d")]
454 pub payload: Vec<u8>,
455 }
456
457 #[derive(Debug, Clone, Serialize, Deserialize)]
458 pub struct PubsubInventory {
459 #[serde(rename = "s")]
460 pub stream_id: String,
461 #[serde(rename = "q")]
462 pub seq: u64,
463 #[serde(rename = "o")]
464 pub origin_peer_id: String,
465 #[serde(rename = "b")]
466 pub payload_bytes: u64,
467 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
468 pub htl: u8,
469 }
470
471 #[derive(Debug, Clone, Serialize, Deserialize)]
472 pub struct PubsubWant {
473 #[serde(rename = "s")]
474 pub stream_id: String,
475 #[serde(rename = "q")]
476 pub seq: u64,
477 #[serde(rename = "o")]
478 pub origin_peer_id: String,
479 }
480
481 #[derive(Debug, Clone)]
482 pub enum DataMessage {
483 Request(DataRequest),
484 Response(DataResponse),
485 QuoteRequest(DataQuoteRequest),
486 QuoteResponse(DataQuoteResponse),
487 Payment(DataPayment),
488 PaymentAck(DataPaymentAck),
489 Chunk(DataChunk),
490 PeerHints(PeerHints),
491 PubsubInterest(PubsubInterest),
492 PubsubFrame(PubsubFrame),
493 PubsubInventory(PubsubInventory),
494 PubsubWant(PubsubWant),
495 }
496
497 fn default_htl() -> u8 {
498 MAX_HTL
499 }
500
501 fn is_max_htl(htl: &u8) -> bool {
502 *htl == MAX_HTL
503 }
504
505 pub fn encode_request(req: &DataRequest) -> Result<Vec<u8>, rmp_serde::encode::Error> {
506 let body = rmp_serde::to_vec_named(req)?;
507 let mut result = Vec::with_capacity(1 + body.len());
508 result.push(MSG_TYPE_REQUEST);
509 result.extend(body);
510 Ok(result)
511 }
512
513 pub fn encode_response(res: &DataResponse) -> Result<Vec<u8>, rmp_serde::encode::Error> {
514 let body = rmp_serde::to_vec_named(res)?;
515 let mut result = Vec::with_capacity(1 + body.len());
516 result.push(MSG_TYPE_RESPONSE);
517 result.extend(body);
518 Ok(result)
519 }
520
521 pub fn encode_quote_request(
522 req: &DataQuoteRequest,
523 ) -> Result<Vec<u8>, rmp_serde::encode::Error> {
524 let body = rmp_serde::to_vec_named(req)?;
525 let mut result = Vec::with_capacity(1 + body.len());
526 result.push(MSG_TYPE_QUOTE_REQUEST);
527 result.extend(body);
528 Ok(result)
529 }
530
531 pub fn encode_quote_response(
532 res: &DataQuoteResponse,
533 ) -> Result<Vec<u8>, rmp_serde::encode::Error> {
534 let body = rmp_serde::to_vec_named(res)?;
535 let mut result = Vec::with_capacity(1 + body.len());
536 result.push(MSG_TYPE_QUOTE_RESPONSE);
537 result.extend(body);
538 Ok(result)
539 }
540
541 pub fn encode_payment(req: &DataPayment) -> Result<Vec<u8>, rmp_serde::encode::Error> {
542 let body = rmp_serde::to_vec_named(req)?;
543 let mut result = Vec::with_capacity(1 + body.len());
544 result.push(MSG_TYPE_PAYMENT);
545 result.extend(body);
546 Ok(result)
547 }
548
549 pub fn encode_payment_ack(res: &DataPaymentAck) -> Result<Vec<u8>, rmp_serde::encode::Error> {
550 let body = rmp_serde::to_vec_named(res)?;
551 let mut result = Vec::with_capacity(1 + body.len());
552 result.push(MSG_TYPE_PAYMENT_ACK);
553 result.extend(body);
554 Ok(result)
555 }
556
557 pub fn encode_chunk(chunk: &DataChunk) -> Result<Vec<u8>, rmp_serde::encode::Error> {
558 let body = rmp_serde::to_vec_named(chunk)?;
559 let mut result = Vec::with_capacity(1 + body.len());
560 result.push(MSG_TYPE_CHUNK);
561 result.extend(body);
562 Ok(result)
563 }
564
565 pub fn parse_message(data: &[u8]) -> Result<DataMessage, rmp_serde::decode::Error> {
566 if data.is_empty() {
567 return Err(rmp_serde::decode::Error::LengthMismatch(0));
568 }
569
570 match data[0] {
571 MSG_TYPE_REQUEST => Ok(DataMessage::Request(rmp_serde::from_slice(&data[1..])?)),
572 MSG_TYPE_RESPONSE => Ok(DataMessage::Response(rmp_serde::from_slice(&data[1..])?)),
573 MSG_TYPE_QUOTE_REQUEST => Ok(DataMessage::QuoteRequest(rmp_serde::from_slice(
574 &data[1..],
575 )?)),
576 MSG_TYPE_QUOTE_RESPONSE => Ok(DataMessage::QuoteResponse(rmp_serde::from_slice(
577 &data[1..],
578 )?)),
579 MSG_TYPE_PAYMENT => Ok(DataMessage::Payment(rmp_serde::from_slice(&data[1..])?)),
580 MSG_TYPE_PAYMENT_ACK => Ok(DataMessage::PaymentAck(rmp_serde::from_slice(&data[1..])?)),
581 MSG_TYPE_CHUNK => Ok(DataMessage::Chunk(rmp_serde::from_slice(&data[1..])?)),
582 MSG_TYPE_PEER_HINTS => Ok(DataMessage::PeerHints(rmp_serde::from_slice(&data[1..])?)),
583 MSG_TYPE_PUBSUB_INTEREST => Ok(DataMessage::PubsubInterest(rmp_serde::from_slice(
584 &data[1..],
585 )?)),
586 MSG_TYPE_PUBSUB_FRAME => {
587 Ok(DataMessage::PubsubFrame(rmp_serde::from_slice(&data[1..])?))
588 }
589 MSG_TYPE_PUBSUB_INVENTORY => Ok(DataMessage::PubsubInventory(rmp_serde::from_slice(
590 &data[1..],
591 )?)),
592 MSG_TYPE_PUBSUB_WANT => Ok(DataMessage::PubsubWant(rmp_serde::from_slice(&data[1..])?)),
593 other => Err(rmp_serde::decode::Error::LengthMismatch(other as u32)),
594 }
595 }
596}