1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex};
11
12#[cfg(feature = "p2p")]
13use hashtree_network::{MeshEventStore, MeshRelayClient};
14use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
15use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
16
17use crate::socialgraph;
18
19const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
20
21#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
22pub struct BluetoothReceivedEventRecord {
23 pub event_id: String,
24 pub pubkey: String,
25 pub kind: u32,
26 pub created_at: u64,
27 pub received_at: u64,
28 pub peer_id: Option<String>,
29 pub cid_values: Vec<String>,
30}
31
32#[derive(Debug, Clone)]
33pub struct NostrRelayConfig {
34 pub spambox_db_max_bytes: u64,
35 pub max_query_limit: usize,
36 pub max_subs_per_client: usize,
37 pub max_filters_per_sub: usize,
38 pub spambox_max_events_per_min: u32,
39 pub spambox_max_reqs_per_min: u32,
40}
41
42impl Default for NostrRelayConfig {
43 fn default() -> Self {
44 Self {
45 spambox_db_max_bytes: 1024 * 1024 * 1024,
46 max_query_limit: 200,
47 max_subs_per_client: 64,
48 max_filters_per_sub: 32,
49 spambox_max_events_per_min: 120,
50 spambox_max_reqs_per_min: 120,
51 }
52 }
53}
54
55mod imp {
56 use super::*;
57 use anyhow::Result;
58
59 use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
60 use hashtree_core::{nhash_decode, Cid};
61 use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
62 use tracing::warn;
63
64 fn prefers_trusted_only(filter: &NostrFilter) -> bool {
65 let Some(kinds) = filter.kinds.as_ref() else {
66 return false;
67 };
68 if kinds.len() != 1 {
69 return false;
70 }
71
72 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
73 let has_authors = filter
74 .authors
75 .as_ref()
76 .is_some_and(|authors| !authors.is_empty());
77 if !has_authors {
78 return false;
79 }
80
81 if is_replaceable_kind(kind) {
82 return true;
83 }
84
85 if is_parameterized_replaceable_kind(kind) {
86 let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
87 return filter
88 .generic_tags
89 .get(&d_tag)
90 .is_some_and(|values| !values.is_empty());
91 }
92
93 false
94 }
95
96 struct NostrStore {
97 store: Arc<dyn SocialGraphBackend>,
98 }
99
100 impl NostrStore {
101 fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
102 Self { store }
103 }
104
105 fn ingest(&self, event: &Event) -> Result<()> {
106 crate::socialgraph::ingest_parsed_event(self.store.as_ref(), event)
107 }
108
109 fn ingest_with_storage_class(
110 &self,
111 event: &Event,
112 storage_class: EventStorageClass,
113 ) -> Result<()> {
114 crate::socialgraph::ingest_parsed_event_with_storage_class(
115 self.store.as_ref(),
116 event,
117 storage_class,
118 )
119 }
120
121 fn query(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
122 crate::socialgraph::query_events(self.store.as_ref(), filter, limit)
123 }
124 }
125
126 #[derive(Debug, Clone)]
127 struct ClientQuota {
128 last_reset: Instant,
129 spambox_events: u32,
130 reqs: u32,
131 }
132
133 impl ClientQuota {
134 fn new() -> Self {
135 Self {
136 last_reset: Instant::now(),
137 spambox_events: 0,
138 reqs: 0,
139 }
140 }
141
142 fn reset_if_needed(&mut self) {
143 if self.last_reset.elapsed() >= Duration::from_secs(60) {
144 self.last_reset = Instant::now();
145 self.spambox_events = 0;
146 self.reqs = 0;
147 }
148 }
149
150 fn allow_spambox_event(&mut self, limit: u32) -> bool {
151 self.reset_if_needed();
152 if self.spambox_events >= limit {
153 return false;
154 }
155 self.spambox_events += 1;
156 true
157 }
158
159 fn allow_req(&mut self, limit: u32) -> bool {
160 self.reset_if_needed();
161 if self.reqs >= limit {
162 return false;
163 }
164 self.reqs += 1;
165 true
166 }
167 }
168
169 struct ClientState {
170 sender: mpsc::UnboundedSender<String>,
171 pubkey: Option<String>,
172 quota: ClientQuota,
173 }
174
175 struct RecentEvents {
176 order: VecDeque<EventId>,
177 events: HashMap<EventId, Event>,
178 max_len: usize,
179 }
180
181 impl RecentEvents {
182 fn new(max_len: usize) -> Self {
183 Self {
184 order: VecDeque::new(),
185 events: HashMap::new(),
186 max_len: max_len.max(128),
187 }
188 }
189
190 fn insert(&mut self, event: Event) {
191 if self.events.contains_key(&event.id) {
192 return;
193 }
194 self.order.push_back(event.id);
195 self.events.insert(event.id, event);
196 while self.order.len() > self.max_len {
197 if let Some(oldest) = self.order.pop_front() {
198 self.events.remove(&oldest);
199 }
200 }
201 }
202
203 fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
204 self.events
205 .values()
206 .filter(|event| filter.match_event(event))
207 .cloned()
208 .collect()
209 }
210 }
211
212 enum SpamboxStore {
213 Persistent(NostrStore),
214 Memory(MemorySpambox),
215 }
216
217 struct MemorySpambox {
218 events: Mutex<VecDeque<Event>>,
219 max_len: usize,
220 }
221
222 impl MemorySpambox {
223 fn new(max_len: usize) -> Self {
224 Self {
225 events: Mutex::new(VecDeque::new()),
226 max_len: max_len.max(128),
227 }
228 }
229
230 async fn ingest(&self, event: &Event) -> bool {
231 let mut events = self.events.lock().await;
232 events.push_back(event.clone());
233 while events.len() > self.max_len {
234 events.pop_front();
235 }
236 true
237 }
238 }
239
240 impl SpamboxStore {
241 async fn ingest(&self, event: &Event) -> bool {
242 match self {
243 SpamboxStore::Persistent(store) => store.ingest(event).is_ok(),
244 SpamboxStore::Memory(store) => store.ingest(event).await,
245 }
246 }
247 }
248
249 struct BluetoothEventLog {
250 path: PathBuf,
251 state: Mutex<BluetoothEventLogState>,
252 }
253
254 struct BluetoothEventLogState {
255 records: VecDeque<BluetoothReceivedEventRecord>,
256 event_ids: HashSet<String>,
257 }
258
259 impl BluetoothEventLog {
260 fn load(path: PathBuf) -> Self {
261 let records = std::fs::read_to_string(&path)
262 .ok()
263 .map(|serialized| {
264 serialized
265 .lines()
266 .filter_map(|line| {
267 serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
268 })
269 .collect::<Vec<_>>()
270 })
271 .unwrap_or_default();
272 let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
273 let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
274 for record in records.into_iter().skip(start) {
275 trimmed.push_back(record);
276 }
277 let event_ids = trimmed
278 .iter()
279 .map(|record| record.event_id.clone())
280 .collect::<HashSet<_>>();
281
282 Self {
283 path,
284 state: Mutex::new(BluetoothEventLogState {
285 records: trimmed,
286 event_ids,
287 }),
288 }
289 }
290
291 async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
292 let state = self.state.lock().await;
293 state
294 .records
295 .iter()
296 .rev()
297 .take(limit.max(1))
298 .cloned()
299 .collect()
300 }
301
302 async fn record(&self, event: &Event, peer_id: Option<String>) {
303 let record = BluetoothReceivedEventRecord {
304 event_id: event.id.to_hex(),
305 pubkey: event.pubkey.to_hex(),
306 kind: event.kind.as_u16() as u32,
307 created_at: event.created_at.as_u64(),
308 received_at: std::time::SystemTime::now()
309 .duration_since(std::time::UNIX_EPOCH)
310 .map(|value| value.as_secs())
311 .unwrap_or(0),
312 peer_id,
313 cid_values: cid_values_from_event(event),
314 };
315
316 let serialized = {
317 let mut state = self.state.lock().await;
318 if state.event_ids.contains(&record.event_id) {
319 return;
320 }
321
322 state.event_ids.insert(record.event_id.clone());
323 state.records.push_back(record);
324 while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
325 if let Some(removed) = state.records.pop_front() {
326 state.event_ids.remove(&removed.event_id);
327 }
328 }
329
330 state
331 .records
332 .iter()
333 .filter_map(|entry| serde_json::to_string(entry).ok())
334 .collect::<Vec<_>>()
335 .join("\n")
336 };
337
338 if let Some(parent) = self.path.parent() {
339 let _ = std::fs::create_dir_all(parent);
340 }
341 let _ = std::fs::write(&self.path, serialized);
342 }
343 }
344
345 fn looks_like_cid_reference(value: &str) -> bool {
346 Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
347 }
348
349 fn cid_values_from_event(event: &Event) -> Vec<String> {
350 let mut values = Vec::new();
351 let mut seen = HashSet::new();
352
353 for tag in &event.tags {
354 let fields = tag.clone().to_vec();
355 if fields.first().is_some_and(|name| name == "cid") {
356 if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
357 if seen.insert(value.clone()) {
358 values.push(value.clone());
359 }
360 }
361 continue;
362 }
363
364 for value in fields.into_iter().skip(1) {
365 if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
366 values.push(value);
367 }
368 }
369 }
370
371 values
372 }
373
374 pub struct NostrRelay {
375 config: NostrRelayConfig,
376 trusted: NostrStore,
377 public_pubkeys: HashSet<String>,
378 spambox: Option<SpamboxStore>,
379 social_graph: Option<Arc<SocialGraphAccessControl>>,
380 clients: Mutex<HashMap<u64, ClientState>>,
381 subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
382 recent_events: Mutex<RecentEvents>,
383 next_client_id: AtomicU64,
384 bluetooth_event_log: Arc<BluetoothEventLog>,
385 }
386
387 impl NostrRelay {
388 async fn collect_filter_events(
389 &self,
390 filter: &NostrFilter,
391 limit: usize,
392 seen: &mut HashSet<EventId>,
393 events: &mut Vec<Event>,
394 ) {
395 if limit == 0 {
396 return;
397 }
398
399 let mut added = 0usize;
400
401 if !prefers_trusted_only(filter) {
402 let recent = {
403 let cache = self.recent_events.lock().await;
404 cache.matching(filter)
405 };
406 for event in recent {
407 if seen.insert(event.id) {
408 events.push(event);
409 added += 1;
410 if added >= limit {
411 return;
412 }
413 }
414 }
415 }
416
417 for event in self.trusted.query(filter, limit) {
418 if seen.insert(event.id) {
419 events.push(event);
420 added += 1;
421 if added >= limit {
422 return;
423 }
424 }
425 }
426 }
427
428 async fn collect_filter_count(
429 &self,
430 filter: &NostrFilter,
431 limit: usize,
432 seen: &mut HashSet<EventId>,
433 ) {
434 if limit == 0 {
435 return;
436 }
437
438 let mut added = 0usize;
439
440 if !prefers_trusted_only(filter) {
441 let recent = {
442 let cache = self.recent_events.lock().await;
443 cache.matching(filter)
444 };
445 for event in recent {
446 if seen.insert(event.id) {
447 added += 1;
448 if added >= limit {
449 return;
450 }
451 }
452 }
453 }
454
455 for event in self.trusted.query(filter, limit) {
456 if seen.insert(event.id) {
457 added += 1;
458 if added >= limit {
459 return;
460 }
461 }
462 }
463 }
464
465 pub fn new(
466 trusted_store: Arc<dyn SocialGraphBackend>,
467 data_dir: PathBuf,
468 public_pubkeys: HashSet<String>,
469 social_graph: Option<Arc<SocialGraphAccessControl>>,
470 config: NostrRelayConfig,
471 ) -> Result<Self> {
472 let spambox = if config.spambox_db_max_bytes == 0 {
473 Some(SpamboxStore::Memory(MemorySpambox::new(
474 config.max_query_limit * 2,
475 )))
476 } else {
477 let spam_dir = data_dir.join("socialgraph_spambox");
478 match socialgraph::open_social_graph_store_at_path(
479 &spam_dir,
480 Some(config.spambox_db_max_bytes),
481 ) {
482 Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
483 Err(err) => {
484 warn!(
485 "Failed to open social graph spambox (falling back to memory): {}",
486 err
487 );
488 Some(SpamboxStore::Memory(MemorySpambox::new(
489 config.max_query_limit * 2,
490 )))
491 }
492 }
493 };
494
495 let recent_size = config.max_query_limit.saturating_mul(2);
496 let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
497 data_dir.join("bluetooth-events.jsonl"),
498 ));
499
500 Ok(Self {
501 config,
502 trusted: NostrStore::new(trusted_store),
503 public_pubkeys,
504 spambox,
505 social_graph,
506 clients: Mutex::new(HashMap::new()),
507 subscriptions: Mutex::new(HashMap::new()),
508 recent_events: Mutex::new(RecentEvents::new(recent_size)),
509 next_client_id: AtomicU64::new(1),
510 bluetooth_event_log,
511 })
512 }
513
514 pub fn next_client_id(&self) -> u64 {
515 self.next_client_id.fetch_add(1, Ordering::SeqCst)
516 }
517
518 pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
519 self.ingest_trusted_event_inner(event, true).await
520 }
521
522 pub async fn ingest_trusted_event_from_bluetooth(
523 &self,
524 event: Event,
525 peer_id: Option<String>,
526 ) -> Result<()> {
527 self.ingest_trusted_event_inner(event.clone(), true).await?;
528 self.bluetooth_event_log.record(&event, peer_id).await;
529 Ok(())
530 }
531
532 pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
533 self.ingest_trusted_event_inner(event, false).await
534 }
535
536 pub async fn bluetooth_received_events(
537 &self,
538 limit: usize,
539 ) -> Vec<BluetoothReceivedEventRecord> {
540 self.bluetooth_event_log.recent(limit).await
541 }
542
543 async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
544 event
545 .verify()
546 .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
547
548 let is_ephemeral = event.kind.is_ephemeral();
549 {
550 let mut recent = self.recent_events.lock().await;
551 recent.insert(event.clone());
552 }
553
554 if !is_ephemeral {
555 let storage_class = self.event_storage_class(&event);
556 self.trusted
557 .ingest_with_storage_class(&event, storage_class)?;
558 }
559
560 if broadcast {
561 self.broadcast_event(&event).await;
562 }
563 Ok(())
564 }
565
566 pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
567 let limit = limit.min(self.config.max_query_limit);
568 if limit == 0 {
569 return Vec::new();
570 }
571
572 let mut seen: HashSet<EventId> = HashSet::new();
573 let mut events = Vec::new();
574
575 if !prefers_trusted_only(filter) {
576 let recent = {
577 let cache = self.recent_events.lock().await;
578 cache.matching(filter)
579 };
580 for event in recent {
581 if seen.insert(event.id) {
582 events.push(event);
583 if events.len() >= limit {
584 return events;
585 }
586 }
587 }
588 }
589
590 for event in self.trusted.query(filter, limit) {
591 if seen.insert(event.id) {
592 events.push(event);
593 if events.len() >= limit {
594 break;
595 }
596 }
597 }
598
599 events
600 }
601
602 pub async fn register_client(
603 &self,
604 client_id: u64,
605 sender: mpsc::UnboundedSender<String>,
606 pubkey: Option<String>,
607 ) {
608 let mut clients = self.clients.lock().await;
609 clients.insert(
610 client_id,
611 ClientState {
612 sender,
613 pubkey,
614 quota: ClientQuota::new(),
615 },
616 );
617 }
618
619 pub async fn unregister_client(&self, client_id: u64) {
620 let mut clients = self.clients.lock().await;
621 clients.remove(&client_id);
622 drop(clients);
623 let mut subs = self.subscriptions.lock().await;
624 subs.remove(&client_id);
625 }
626
627 pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
628 match msg {
629 NostrClientMessage::Event(event) => {
630 self.handle_event(client_id, *event).await;
631 }
632 NostrClientMessage::Req {
633 subscription_id,
634 filters,
635 } => {
636 self.handle_req(client_id, subscription_id, filters).await;
637 }
638 NostrClientMessage::Count {
639 subscription_id,
640 filters,
641 } => {
642 self.handle_count(client_id, subscription_id, filters).await;
643 }
644 NostrClientMessage::Close(subscription_id) => {
645 self.handle_close(client_id, subscription_id).await;
646 }
647 NostrClientMessage::Auth(event) => {
648 self.handle_auth(client_id, *event).await;
649 }
650 NostrClientMessage::NegOpen { .. }
651 | NostrClientMessage::NegMsg { .. }
652 | NostrClientMessage::NegClose { .. } => {
653 self.send_to_client(
654 client_id,
655 NostrRelayMessage::notice("negentropy not supported"),
656 )
657 .await;
658 }
659 }
660 }
661
662 pub async fn register_subscription_query(
663 &self,
664 client_id: u64,
665 subscription_id: SubscriptionId,
666 mut filters: Vec<NostrFilter>,
667 ) -> std::result::Result<Vec<Event>, &'static str> {
668 if !self.allow_req(client_id).await {
669 return Err("rate limited");
670 }
671
672 if filters.len() > self.config.max_filters_per_sub {
673 filters.truncate(self.config.max_filters_per_sub);
674 }
675
676 {
677 let mut subs = self.subscriptions.lock().await;
678 let entry = subs.entry(client_id).or_default();
679 if !entry.contains_key(&subscription_id)
680 && entry.len() >= self.config.max_subs_per_client
681 {
682 return Err("too many subscriptions");
683 }
684 entry.insert(subscription_id, filters.clone());
685 }
686
687 let mut seen: HashSet<EventId> = HashSet::new();
688 let mut events = Vec::new();
689 for filter in &filters {
690 let limit = filter
691 .limit
692 .unwrap_or(self.config.max_query_limit)
693 .min(self.config.max_query_limit);
694 self.collect_filter_events(filter, limit, &mut seen, &mut events)
695 .await;
696 }
697
698 Ok(events)
699 }
700
701 async fn handle_auth(&self, client_id: u64, event: Event) {
702 let ok = event.verify().is_ok();
703 let message = if ok { "" } else { "invalid auth" };
704 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
705 .await;
706 }
707
708 async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
709 let mut subs = self.subscriptions.lock().await;
710 if let Some(map) = subs.get_mut(&client_id) {
711 map.remove(&subscription_id);
712 }
713 }
714
715 async fn handle_event(&self, client_id: u64, event: Event) {
716 let ok = event.verify().is_ok();
717 if !ok {
718 self.send_to_client(
719 client_id,
720 NostrRelayMessage::ok(event.id, false, "invalid: signature"),
721 )
722 .await;
723 return;
724 }
725
726 let trusted = self.is_trusted_event(client_id, &event).await;
727 if !trusted && !self.allow_spambox_event(client_id).await {
728 self.send_to_client(
729 client_id,
730 NostrRelayMessage::ok(event.id, false, "rate limited"),
731 )
732 .await;
733 return;
734 }
735
736 let is_ephemeral = event.kind.is_ephemeral();
737 if trusted {
738 let mut recent = self.recent_events.lock().await;
739 recent.insert(event.clone());
740 }
741 if !is_ephemeral {
742 let stored = if trusted {
743 let storage_class = self.event_storage_class(&event);
744 self.trusted
745 .ingest_with_storage_class(&event, storage_class)
746 .is_ok()
747 } else {
748 match self.spambox.as_ref() {
749 Some(spambox) => spambox.ingest(&event).await,
750 None => false,
751 }
752 };
753
754 if !stored {
755 let message = if trusted {
756 "store failed"
757 } else {
758 "spambox full"
759 };
760 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
761 .await;
762 return;
763 }
764 }
765
766 let message = if trusted { "" } else { "spambox" };
767 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
768 .await;
769
770 if trusted {
771 self.broadcast_event(&event).await;
772 }
773 }
774
775 async fn handle_req(
776 &self,
777 client_id: u64,
778 subscription_id: SubscriptionId,
779 filters: Vec<NostrFilter>,
780 ) {
781 match self
782 .register_subscription_query(client_id, subscription_id.clone(), filters)
783 .await
784 {
785 Ok(events) => {
786 for event in events {
787 self.send_to_client(
788 client_id,
789 NostrRelayMessage::event(subscription_id.clone(), event),
790 )
791 .await;
792 }
793
794 self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
795 .await;
796 }
797 Err(message) => {
798 self.send_to_client(
799 client_id,
800 NostrRelayMessage::closed(subscription_id, message),
801 )
802 .await;
803 }
804 }
805 }
806
807 async fn handle_count(
808 &self,
809 client_id: u64,
810 subscription_id: SubscriptionId,
811 filters: Vec<NostrFilter>,
812 ) {
813 if !self.allow_req(client_id).await {
814 self.send_to_client(
815 client_id,
816 NostrRelayMessage::closed(subscription_id, "rate limited"),
817 )
818 .await;
819 return;
820 }
821
822 let mut seen: HashSet<EventId> = HashSet::new();
823 for filter in &filters {
824 let limit = filter
825 .limit
826 .unwrap_or(self.config.max_query_limit)
827 .min(self.config.max_query_limit);
828 self.collect_filter_count(filter, limit, &mut seen).await;
829 }
830
831 self.send_to_client(
832 client_id,
833 NostrRelayMessage::count(subscription_id, seen.len()),
834 )
835 .await;
836 }
837
838 async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
839 let event_pubkey = event.pubkey.to_hex();
840 let client_pubkey = {
841 let clients = self.clients.lock().await;
842 clients
843 .get(&client_id)
844 .and_then(|state| state.pubkey.clone())
845 };
846 if let Some(pubkey) = client_pubkey {
847 return pubkey == event_pubkey
848 || self.social_graph.as_ref().is_some_and(|social_graph| {
849 social_graph.check_write_access(&event_pubkey)
850 });
851 }
852 if let Some(ref social_graph) = self.social_graph {
853 return social_graph.check_write_access(&event_pubkey);
854 }
855 true
856 }
857
858 fn event_storage_class(&self, event: &Event) -> EventStorageClass {
859 if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
860 EventStorageClass::Public
861 } else {
862 EventStorageClass::Ambient
863 }
864 }
865
866 async fn allow_spambox_event(&self, client_id: u64) -> bool {
867 let mut clients = self.clients.lock().await;
868 let Some(state) = clients.get_mut(&client_id) else {
869 return false;
870 };
871 state
872 .quota
873 .allow_spambox_event(self.config.spambox_max_events_per_min)
874 }
875
876 async fn allow_req(&self, client_id: u64) -> bool {
877 let mut clients = self.clients.lock().await;
878 let Some(state) = clients.get_mut(&client_id) else {
879 return false;
880 };
881 state.quota.allow_req(self.config.spambox_max_reqs_per_min)
882 }
883
884 async fn broadcast_event(&self, event: &Event) {
885 let subscriptions = self.subscriptions.lock().await;
886 let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
887 for (client_id, subs) in subscriptions.iter() {
888 for (sub_id, filters) in subs.iter() {
889 if filters.iter().any(|f| f.match_event(event)) {
890 deliveries.push((*client_id, sub_id.clone()));
891 }
892 }
893 }
894 drop(subscriptions);
895
896 for (client_id, sub_id) in deliveries {
897 self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
898 .await;
899 }
900 }
901
902 async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
903 let sender = {
904 let clients = self.clients.lock().await;
905 clients.get(&client_id).map(|state| state.sender.clone())
906 };
907 if let Some(tx) = sender {
908 let _ = tx.send(msg.as_json());
909 }
910 }
911 }
912}
913
914pub use imp::NostrRelay;
915
916#[cfg(feature = "p2p")]
917#[async_trait::async_trait]
918impl MeshEventStore for NostrRelay {
919 async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
920 NostrRelay::ingest_trusted_event(self, event).await
921 }
922
923 async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
924 NostrRelay::query_events(self, filter, limit).await
925 }
926}
927
928#[cfg(feature = "p2p")]
929#[async_trait::async_trait]
930impl MeshRelayClient for NostrRelay {
931 fn next_client_id(&self) -> u64 {
932 NostrRelay::next_client_id(self)
933 }
934
935 async fn register_client(
936 &self,
937 client_id: u64,
938 sender: mpsc::UnboundedSender<String>,
939 pubkey: Option<String>,
940 ) {
941 NostrRelay::register_client(self, client_id, sender, pubkey).await
942 }
943
944 async fn unregister_client(&self, client_id: u64) {
945 NostrRelay::unregister_client(self, client_id).await
946 }
947
948 async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
949 NostrRelay::handle_client_message(self, client_id, msg).await
950 }
951
952 async fn register_subscription_query(
953 &self,
954 client_id: u64,
955 subscription_id: SubscriptionId,
956 filters: Vec<NostrFilter>,
957 ) -> std::result::Result<Vec<Event>, &'static str> {
958 NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
959 }
960
961 async fn ingest_trusted_event_from_peer(
962 &self,
963 event: Event,
964 peer_id: Option<String>,
965 ) -> anyhow::Result<()> {
966 NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
967 }
968}
969
970#[cfg(test)]
971#[path = "nostr_relay/tests.rs"]
972mod tests;