1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex};
11
12use hashtree_network::{MeshEventStore, MeshRelayClient};
13use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
14use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
15
16use crate::socialgraph;
17
18const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
19
20#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
21pub struct BluetoothReceivedEventRecord {
22 pub event_id: String,
23 pub pubkey: String,
24 pub kind: u32,
25 pub created_at: u64,
26 pub received_at: u64,
27 pub peer_id: Option<String>,
28 pub cid_values: Vec<String>,
29}
30
31#[derive(Debug, Clone)]
32pub struct NostrRelayConfig {
33 pub spambox_db_max_bytes: u64,
34 pub max_query_limit: usize,
35 pub max_subs_per_client: usize,
36 pub max_filters_per_sub: usize,
37 pub spambox_max_events_per_min: u32,
38 pub spambox_max_reqs_per_min: u32,
39}
40
41impl Default for NostrRelayConfig {
42 fn default() -> Self {
43 Self {
44 spambox_db_max_bytes: 1024 * 1024 * 1024,
45 max_query_limit: 200,
46 max_subs_per_client: 64,
47 max_filters_per_sub: 32,
48 spambox_max_events_per_min: 120,
49 spambox_max_reqs_per_min: 120,
50 }
51 }
52}
53
54mod imp {
55 use super::*;
56 use anyhow::Result;
57
58 use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
59 use hashtree_core::{nhash_decode, Cid};
60 use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
61 use tracing::warn;
62
63 fn prefers_trusted_only(filter: &NostrFilter) -> bool {
64 let Some(kinds) = filter.kinds.as_ref() else {
65 return false;
66 };
67 if kinds.len() != 1 {
68 return false;
69 }
70
71 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
72 let has_authors = filter
73 .authors
74 .as_ref()
75 .is_some_and(|authors| !authors.is_empty());
76 if !has_authors {
77 return false;
78 }
79
80 if is_replaceable_kind(kind) {
81 return true;
82 }
83
84 if is_parameterized_replaceable_kind(kind) {
85 let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
86 return filter
87 .generic_tags
88 .get(&d_tag)
89 .is_some_and(|values| !values.is_empty());
90 }
91
92 false
93 }
94
95 struct NostrStore {
96 store: Arc<dyn SocialGraphBackend>,
97 }
98
99 impl NostrStore {
100 fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
101 Self { store }
102 }
103
104 fn ingest(&self, event: &Event) -> Result<()> {
105 crate::socialgraph::ingest_parsed_event(self.store.as_ref(), event)
106 }
107
108 fn ingest_with_storage_class(
109 &self,
110 event: &Event,
111 storage_class: EventStorageClass,
112 ) -> Result<()> {
113 crate::socialgraph::ingest_parsed_event_with_storage_class(
114 self.store.as_ref(),
115 event,
116 storage_class,
117 )
118 }
119
120 fn query(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
121 crate::socialgraph::query_events(self.store.as_ref(), filter, limit)
122 }
123 }
124
125 #[derive(Debug, Clone)]
126 struct ClientQuota {
127 last_reset: Instant,
128 spambox_events: u32,
129 reqs: u32,
130 }
131
132 impl ClientQuota {
133 fn new() -> Self {
134 Self {
135 last_reset: Instant::now(),
136 spambox_events: 0,
137 reqs: 0,
138 }
139 }
140
141 fn reset_if_needed(&mut self) {
142 if self.last_reset.elapsed() >= Duration::from_secs(60) {
143 self.last_reset = Instant::now();
144 self.spambox_events = 0;
145 self.reqs = 0;
146 }
147 }
148
149 fn allow_spambox_event(&mut self, limit: u32) -> bool {
150 self.reset_if_needed();
151 if self.spambox_events >= limit {
152 return false;
153 }
154 self.spambox_events += 1;
155 true
156 }
157
158 fn allow_req(&mut self, limit: u32) -> bool {
159 self.reset_if_needed();
160 if self.reqs >= limit {
161 return false;
162 }
163 self.reqs += 1;
164 true
165 }
166 }
167
168 struct ClientState {
169 sender: mpsc::UnboundedSender<String>,
170 pubkey: Option<String>,
171 quota: ClientQuota,
172 }
173
174 struct RecentEvents {
175 order: VecDeque<EventId>,
176 events: HashMap<EventId, Event>,
177 max_len: usize,
178 }
179
180 impl RecentEvents {
181 fn new(max_len: usize) -> Self {
182 Self {
183 order: VecDeque::new(),
184 events: HashMap::new(),
185 max_len: max_len.max(128),
186 }
187 }
188
189 fn insert(&mut self, event: Event) {
190 if self.events.contains_key(&event.id) {
191 return;
192 }
193 self.order.push_back(event.id);
194 self.events.insert(event.id, event);
195 while self.order.len() > self.max_len {
196 if let Some(oldest) = self.order.pop_front() {
197 self.events.remove(&oldest);
198 }
199 }
200 }
201
202 fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
203 self.events
204 .values()
205 .filter(|event| filter.match_event(event))
206 .cloned()
207 .collect()
208 }
209 }
210
211 enum SpamboxStore {
212 Persistent(NostrStore),
213 Memory(MemorySpambox),
214 }
215
216 struct MemorySpambox {
217 events: Mutex<VecDeque<Event>>,
218 max_len: usize,
219 }
220
221 impl MemorySpambox {
222 fn new(max_len: usize) -> Self {
223 Self {
224 events: Mutex::new(VecDeque::new()),
225 max_len: max_len.max(128),
226 }
227 }
228
229 async fn ingest(&self, event: &Event) -> bool {
230 let mut events = self.events.lock().await;
231 events.push_back(event.clone());
232 while events.len() > self.max_len {
233 events.pop_front();
234 }
235 true
236 }
237 }
238
239 impl SpamboxStore {
240 async fn ingest(&self, event: &Event) -> bool {
241 match self {
242 SpamboxStore::Persistent(store) => store.ingest(event).is_ok(),
243 SpamboxStore::Memory(store) => store.ingest(event).await,
244 }
245 }
246 }
247
248 struct BluetoothEventLog {
249 path: PathBuf,
250 state: Mutex<BluetoothEventLogState>,
251 }
252
253 struct BluetoothEventLogState {
254 records: VecDeque<BluetoothReceivedEventRecord>,
255 event_ids: HashSet<String>,
256 }
257
258 impl BluetoothEventLog {
259 fn load(path: PathBuf) -> Self {
260 let records = std::fs::read_to_string(&path)
261 .ok()
262 .map(|serialized| {
263 serialized
264 .lines()
265 .filter_map(|line| {
266 serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
267 })
268 .collect::<Vec<_>>()
269 })
270 .unwrap_or_default();
271 let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
272 let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
273 for record in records.into_iter().skip(start) {
274 trimmed.push_back(record);
275 }
276 let event_ids = trimmed
277 .iter()
278 .map(|record| record.event_id.clone())
279 .collect::<HashSet<_>>();
280
281 Self {
282 path,
283 state: Mutex::new(BluetoothEventLogState {
284 records: trimmed,
285 event_ids,
286 }),
287 }
288 }
289
290 async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
291 let state = self.state.lock().await;
292 state
293 .records
294 .iter()
295 .rev()
296 .take(limit.max(1))
297 .cloned()
298 .collect()
299 }
300
301 async fn record(&self, event: &Event, peer_id: Option<String>) {
302 let record = BluetoothReceivedEventRecord {
303 event_id: event.id.to_hex(),
304 pubkey: event.pubkey.to_hex(),
305 kind: event.kind.as_u16() as u32,
306 created_at: event.created_at.as_u64(),
307 received_at: std::time::SystemTime::now()
308 .duration_since(std::time::UNIX_EPOCH)
309 .map(|value| value.as_secs())
310 .unwrap_or(0),
311 peer_id,
312 cid_values: cid_values_from_event(event),
313 };
314
315 let serialized = {
316 let mut state = self.state.lock().await;
317 if state.event_ids.contains(&record.event_id) {
318 return;
319 }
320
321 state.event_ids.insert(record.event_id.clone());
322 state.records.push_back(record);
323 while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
324 if let Some(removed) = state.records.pop_front() {
325 state.event_ids.remove(&removed.event_id);
326 }
327 }
328
329 state
330 .records
331 .iter()
332 .filter_map(|entry| serde_json::to_string(entry).ok())
333 .collect::<Vec<_>>()
334 .join("\n")
335 };
336
337 if let Some(parent) = self.path.parent() {
338 let _ = std::fs::create_dir_all(parent);
339 }
340 let _ = std::fs::write(&self.path, serialized);
341 }
342 }
343
344 fn looks_like_cid_reference(value: &str) -> bool {
345 Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
346 }
347
348 fn cid_values_from_event(event: &Event) -> Vec<String> {
349 let mut values = Vec::new();
350 let mut seen = HashSet::new();
351
352 for tag in &event.tags {
353 let fields = tag.clone().to_vec();
354 if fields.first().is_some_and(|name| name == "cid") {
355 if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
356 if seen.insert(value.clone()) {
357 values.push(value.clone());
358 }
359 }
360 continue;
361 }
362
363 for value in fields.into_iter().skip(1) {
364 if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
365 values.push(value);
366 }
367 }
368 }
369
370 values
371 }
372
373 pub struct NostrRelay {
374 config: NostrRelayConfig,
375 trusted: NostrStore,
376 public_pubkeys: HashSet<String>,
377 spambox: Option<SpamboxStore>,
378 social_graph: Option<Arc<SocialGraphAccessControl>>,
379 clients: Mutex<HashMap<u64, ClientState>>,
380 subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
381 recent_events: Mutex<RecentEvents>,
382 next_client_id: AtomicU64,
383 bluetooth_event_log: Arc<BluetoothEventLog>,
384 }
385
386 impl NostrRelay {
387 async fn collect_filter_events(
388 &self,
389 filter: &NostrFilter,
390 limit: usize,
391 seen: &mut HashSet<EventId>,
392 events: &mut Vec<Event>,
393 ) {
394 if limit == 0 {
395 return;
396 }
397
398 let mut added = 0usize;
399
400 if !prefers_trusted_only(filter) {
401 let recent = {
402 let cache = self.recent_events.lock().await;
403 cache.matching(filter)
404 };
405 for event in recent {
406 if seen.insert(event.id) {
407 events.push(event);
408 added += 1;
409 if added >= limit {
410 return;
411 }
412 }
413 }
414 }
415
416 for event in self.trusted.query(filter, limit) {
417 if seen.insert(event.id) {
418 events.push(event);
419 added += 1;
420 if added >= limit {
421 return;
422 }
423 }
424 }
425 }
426
427 async fn collect_filter_count(
428 &self,
429 filter: &NostrFilter,
430 limit: usize,
431 seen: &mut HashSet<EventId>,
432 ) {
433 if limit == 0 {
434 return;
435 }
436
437 let mut added = 0usize;
438
439 if !prefers_trusted_only(filter) {
440 let recent = {
441 let cache = self.recent_events.lock().await;
442 cache.matching(filter)
443 };
444 for event in recent {
445 if seen.insert(event.id) {
446 added += 1;
447 if added >= limit {
448 return;
449 }
450 }
451 }
452 }
453
454 for event in self.trusted.query(filter, limit) {
455 if seen.insert(event.id) {
456 added += 1;
457 if added >= limit {
458 return;
459 }
460 }
461 }
462 }
463
464 pub fn new(
465 trusted_store: Arc<dyn SocialGraphBackend>,
466 data_dir: PathBuf,
467 public_pubkeys: HashSet<String>,
468 social_graph: Option<Arc<SocialGraphAccessControl>>,
469 config: NostrRelayConfig,
470 ) -> Result<Self> {
471 let spambox = if config.spambox_db_max_bytes == 0 {
472 Some(SpamboxStore::Memory(MemorySpambox::new(
473 config.max_query_limit * 2,
474 )))
475 } else {
476 let spam_dir = data_dir.join("socialgraph_spambox");
477 match socialgraph::open_social_graph_store_at_path(
478 &spam_dir,
479 Some(config.spambox_db_max_bytes),
480 ) {
481 Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
482 Err(err) => {
483 warn!(
484 "Failed to open social graph spambox (falling back to memory): {}",
485 err
486 );
487 Some(SpamboxStore::Memory(MemorySpambox::new(
488 config.max_query_limit * 2,
489 )))
490 }
491 }
492 };
493
494 let recent_size = config.max_query_limit.saturating_mul(2);
495 let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
496 data_dir.join("bluetooth-events.jsonl"),
497 ));
498
499 Ok(Self {
500 config,
501 trusted: NostrStore::new(trusted_store),
502 public_pubkeys,
503 spambox,
504 social_graph,
505 clients: Mutex::new(HashMap::new()),
506 subscriptions: Mutex::new(HashMap::new()),
507 recent_events: Mutex::new(RecentEvents::new(recent_size)),
508 next_client_id: AtomicU64::new(1),
509 bluetooth_event_log,
510 })
511 }
512
513 pub fn next_client_id(&self) -> u64 {
514 self.next_client_id.fetch_add(1, Ordering::SeqCst)
515 }
516
517 pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
518 self.ingest_trusted_event_inner(event, true).await
519 }
520
521 pub async fn ingest_trusted_event_from_bluetooth(
522 &self,
523 event: Event,
524 peer_id: Option<String>,
525 ) -> Result<()> {
526 self.ingest_trusted_event_inner(event.clone(), true).await?;
527 self.bluetooth_event_log.record(&event, peer_id).await;
528 Ok(())
529 }
530
531 pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
532 self.ingest_trusted_event_inner(event, false).await
533 }
534
535 pub async fn bluetooth_received_events(
536 &self,
537 limit: usize,
538 ) -> Vec<BluetoothReceivedEventRecord> {
539 self.bluetooth_event_log.recent(limit).await
540 }
541
542 async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
543 event
544 .verify()
545 .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
546
547 let is_ephemeral = event.kind.is_ephemeral();
548 {
549 let mut recent = self.recent_events.lock().await;
550 recent.insert(event.clone());
551 }
552
553 if !is_ephemeral {
554 let storage_class = self.event_storage_class(&event);
555 self.trusted
556 .ingest_with_storage_class(&event, storage_class)?;
557 }
558
559 if broadcast {
560 self.broadcast_event(&event).await;
561 }
562 Ok(())
563 }
564
565 pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
566 let limit = limit.min(self.config.max_query_limit);
567 if limit == 0 {
568 return Vec::new();
569 }
570
571 let mut seen: HashSet<EventId> = HashSet::new();
572 let mut events = Vec::new();
573
574 if !prefers_trusted_only(filter) {
575 let recent = {
576 let cache = self.recent_events.lock().await;
577 cache.matching(filter)
578 };
579 for event in recent {
580 if seen.insert(event.id) {
581 events.push(event);
582 if events.len() >= limit {
583 return events;
584 }
585 }
586 }
587 }
588
589 for event in self.trusted.query(filter, limit) {
590 if seen.insert(event.id) {
591 events.push(event);
592 if events.len() >= limit {
593 break;
594 }
595 }
596 }
597
598 events
599 }
600
601 pub async fn register_client(
602 &self,
603 client_id: u64,
604 sender: mpsc::UnboundedSender<String>,
605 pubkey: Option<String>,
606 ) {
607 let mut clients = self.clients.lock().await;
608 clients.insert(
609 client_id,
610 ClientState {
611 sender,
612 pubkey,
613 quota: ClientQuota::new(),
614 },
615 );
616 }
617
618 pub async fn unregister_client(&self, client_id: u64) {
619 let mut clients = self.clients.lock().await;
620 clients.remove(&client_id);
621 drop(clients);
622 let mut subs = self.subscriptions.lock().await;
623 subs.remove(&client_id);
624 }
625
626 pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
627 match msg {
628 NostrClientMessage::Event(event) => {
629 self.handle_event(client_id, *event).await;
630 }
631 NostrClientMessage::Req {
632 subscription_id,
633 filters,
634 } => {
635 self.handle_req(client_id, subscription_id, filters).await;
636 }
637 NostrClientMessage::Count {
638 subscription_id,
639 filters,
640 } => {
641 self.handle_count(client_id, subscription_id, filters).await;
642 }
643 NostrClientMessage::Close(subscription_id) => {
644 self.handle_close(client_id, subscription_id).await;
645 }
646 NostrClientMessage::Auth(event) => {
647 self.handle_auth(client_id, *event).await;
648 }
649 NostrClientMessage::NegOpen { .. }
650 | NostrClientMessage::NegMsg { .. }
651 | NostrClientMessage::NegClose { .. } => {
652 self.send_to_client(
653 client_id,
654 NostrRelayMessage::notice("negentropy not supported"),
655 )
656 .await;
657 }
658 }
659 }
660
661 pub async fn register_subscription_query(
662 &self,
663 client_id: u64,
664 subscription_id: SubscriptionId,
665 mut filters: Vec<NostrFilter>,
666 ) -> std::result::Result<Vec<Event>, &'static str> {
667 if !self.allow_req(client_id).await {
668 return Err("rate limited");
669 }
670
671 if filters.len() > self.config.max_filters_per_sub {
672 filters.truncate(self.config.max_filters_per_sub);
673 }
674
675 {
676 let mut subs = self.subscriptions.lock().await;
677 let entry = subs.entry(client_id).or_default();
678 if !entry.contains_key(&subscription_id)
679 && entry.len() >= self.config.max_subs_per_client
680 {
681 return Err("too many subscriptions");
682 }
683 entry.insert(subscription_id, filters.clone());
684 }
685
686 let mut seen: HashSet<EventId> = HashSet::new();
687 let mut events = Vec::new();
688 for filter in &filters {
689 let limit = filter
690 .limit
691 .unwrap_or(self.config.max_query_limit)
692 .min(self.config.max_query_limit);
693 self.collect_filter_events(filter, limit, &mut seen, &mut events)
694 .await;
695 }
696
697 Ok(events)
698 }
699
700 async fn handle_auth(&self, client_id: u64, event: Event) {
701 let ok = event.verify().is_ok();
702 let message = if ok { "" } else { "invalid auth" };
703 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
704 .await;
705 }
706
707 async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
708 let mut subs = self.subscriptions.lock().await;
709 if let Some(map) = subs.get_mut(&client_id) {
710 map.remove(&subscription_id);
711 }
712 }
713
714 async fn handle_event(&self, client_id: u64, event: Event) {
715 let ok = event.verify().is_ok();
716 if !ok {
717 self.send_to_client(
718 client_id,
719 NostrRelayMessage::ok(event.id, false, "invalid: signature"),
720 )
721 .await;
722 return;
723 }
724
725 let trusted = self.is_trusted_event(client_id, &event).await;
726 if !trusted && !self.allow_spambox_event(client_id).await {
727 self.send_to_client(
728 client_id,
729 NostrRelayMessage::ok(event.id, false, "rate limited"),
730 )
731 .await;
732 return;
733 }
734
735 let is_ephemeral = event.kind.is_ephemeral();
736 if trusted {
737 let mut recent = self.recent_events.lock().await;
738 recent.insert(event.clone());
739 }
740 if !is_ephemeral {
741 let stored = if trusted {
742 let storage_class = self.event_storage_class(&event);
743 self.trusted
744 .ingest_with_storage_class(&event, storage_class)
745 .is_ok()
746 } else {
747 match self.spambox.as_ref() {
748 Some(spambox) => spambox.ingest(&event).await,
749 None => false,
750 }
751 };
752
753 if !stored {
754 let message = if trusted {
755 "store failed"
756 } else {
757 "spambox full"
758 };
759 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
760 .await;
761 return;
762 }
763 }
764
765 let message = if trusted { "" } else { "spambox" };
766 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
767 .await;
768
769 if trusted {
770 self.broadcast_event(&event).await;
771 }
772 }
773
774 async fn handle_req(
775 &self,
776 client_id: u64,
777 subscription_id: SubscriptionId,
778 filters: Vec<NostrFilter>,
779 ) {
780 match self
781 .register_subscription_query(client_id, subscription_id.clone(), filters)
782 .await
783 {
784 Ok(events) => {
785 for event in events {
786 self.send_to_client(
787 client_id,
788 NostrRelayMessage::event(subscription_id.clone(), event),
789 )
790 .await;
791 }
792
793 self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
794 .await;
795 }
796 Err(message) => {
797 self.send_to_client(
798 client_id,
799 NostrRelayMessage::closed(subscription_id, message),
800 )
801 .await;
802 }
803 }
804 }
805
806 async fn handle_count(
807 &self,
808 client_id: u64,
809 subscription_id: SubscriptionId,
810 filters: Vec<NostrFilter>,
811 ) {
812 if !self.allow_req(client_id).await {
813 self.send_to_client(
814 client_id,
815 NostrRelayMessage::closed(subscription_id, "rate limited"),
816 )
817 .await;
818 return;
819 }
820
821 let mut seen: HashSet<EventId> = HashSet::new();
822 for filter in &filters {
823 let limit = filter
824 .limit
825 .unwrap_or(self.config.max_query_limit)
826 .min(self.config.max_query_limit);
827 self.collect_filter_count(filter, limit, &mut seen).await;
828 }
829
830 self.send_to_client(
831 client_id,
832 NostrRelayMessage::count(subscription_id, seen.len()),
833 )
834 .await;
835 }
836
837 async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
838 let event_pubkey = event.pubkey.to_hex();
839 let client_pubkey = {
840 let clients = self.clients.lock().await;
841 clients
842 .get(&client_id)
843 .and_then(|state| state.pubkey.clone())
844 };
845 if let Some(pubkey) = client_pubkey {
846 return pubkey == event_pubkey
847 || self.social_graph.as_ref().is_some_and(|social_graph| {
848 social_graph.check_write_access(&event_pubkey)
849 });
850 }
851 if let Some(ref social_graph) = self.social_graph {
852 return social_graph.check_write_access(&event_pubkey);
853 }
854 true
855 }
856
857 fn event_storage_class(&self, event: &Event) -> EventStorageClass {
858 if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
859 EventStorageClass::Public
860 } else {
861 EventStorageClass::Ambient
862 }
863 }
864
865 async fn allow_spambox_event(&self, client_id: u64) -> bool {
866 let mut clients = self.clients.lock().await;
867 let Some(state) = clients.get_mut(&client_id) else {
868 return false;
869 };
870 state
871 .quota
872 .allow_spambox_event(self.config.spambox_max_events_per_min)
873 }
874
875 async fn allow_req(&self, client_id: u64) -> bool {
876 let mut clients = self.clients.lock().await;
877 let Some(state) = clients.get_mut(&client_id) else {
878 return false;
879 };
880 state.quota.allow_req(self.config.spambox_max_reqs_per_min)
881 }
882
883 async fn broadcast_event(&self, event: &Event) {
884 let subscriptions = self.subscriptions.lock().await;
885 let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
886 for (client_id, subs) in subscriptions.iter() {
887 for (sub_id, filters) in subs.iter() {
888 if filters.iter().any(|f| f.match_event(event)) {
889 deliveries.push((*client_id, sub_id.clone()));
890 }
891 }
892 }
893 drop(subscriptions);
894
895 for (client_id, sub_id) in deliveries {
896 self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
897 .await;
898 }
899 }
900
901 async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
902 let sender = {
903 let clients = self.clients.lock().await;
904 clients.get(&client_id).map(|state| state.sender.clone())
905 };
906 if let Some(tx) = sender {
907 let _ = tx.send(msg.as_json());
908 }
909 }
910 }
911}
912
913pub use imp::NostrRelay;
914
915#[async_trait::async_trait]
916impl MeshEventStore for NostrRelay {
917 async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
918 NostrRelay::ingest_trusted_event(self, event).await
919 }
920
921 async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
922 NostrRelay::query_events(self, filter, limit).await
923 }
924}
925
926#[async_trait::async_trait]
927impl MeshRelayClient for NostrRelay {
928 fn next_client_id(&self) -> u64 {
929 NostrRelay::next_client_id(self)
930 }
931
932 async fn register_client(
933 &self,
934 client_id: u64,
935 sender: mpsc::UnboundedSender<String>,
936 pubkey: Option<String>,
937 ) {
938 NostrRelay::register_client(self, client_id, sender, pubkey).await
939 }
940
941 async fn unregister_client(&self, client_id: u64) {
942 NostrRelay::unregister_client(self, client_id).await
943 }
944
945 async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
946 NostrRelay::handle_client_message(self, client_id, msg).await
947 }
948
949 async fn register_subscription_query(
950 &self,
951 client_id: u64,
952 subscription_id: SubscriptionId,
953 filters: Vec<NostrFilter>,
954 ) -> std::result::Result<Vec<Event>, &'static str> {
955 NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
956 }
957
958 async fn ingest_trusted_event_from_peer(
959 &self,
960 event: Event,
961 peer_id: Option<String>,
962 ) -> anyhow::Result<()> {
963 NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
964 }
965}
966
967#[cfg(test)]
968#[path = "nostr_relay/tests.rs"]
969mod tests;