1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex};
11
12#[cfg(feature = "p2p")]
13use hashtree_network::{MeshEventStore, MeshRelayClient};
14use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
15use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
16
17use crate::socialgraph;
18
19const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
20
21#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
22pub struct BluetoothReceivedEventRecord {
23 pub event_id: String,
24 pub pubkey: String,
25 pub kind: u32,
26 pub created_at: u64,
27 pub received_at: u64,
28 pub peer_id: Option<String>,
29 pub cid_values: Vec<String>,
30}
31
32#[derive(Debug, Clone)]
33pub struct NostrRelayConfig {
34 pub spambox_db_max_bytes: u64,
35 pub max_query_limit: usize,
36 pub max_subs_per_client: usize,
37 pub max_filters_per_sub: usize,
38 pub spambox_max_events_per_min: u32,
39 pub spambox_max_reqs_per_min: u32,
40}
41
42impl Default for NostrRelayConfig {
43 fn default() -> Self {
44 Self {
45 spambox_db_max_bytes: 1024 * 1024 * 1024,
46 max_query_limit: 200,
47 max_subs_per_client: 64,
48 max_filters_per_sub: 32,
49 spambox_max_events_per_min: 120,
50 spambox_max_reqs_per_min: 120,
51 }
52 }
53}
54
55mod imp {
56 use super::*;
57 use anyhow::Result;
58
59 use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
60 use hashtree_core::{nhash_decode, Cid};
61 use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
62 use tracing::warn;
63
64 fn prefers_trusted_only(filter: &NostrFilter) -> bool {
65 let Some(kinds) = filter.kinds.as_ref() else {
66 return false;
67 };
68 if kinds.len() != 1 {
69 return false;
70 }
71
72 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
73 let has_authors = filter
74 .authors
75 .as_ref()
76 .is_some_and(|authors| !authors.is_empty());
77 if !has_authors {
78 return false;
79 }
80
81 if is_replaceable_kind(kind) {
82 return true;
83 }
84
85 if is_parameterized_replaceable_kind(kind) {
86 let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
87 return filter
88 .generic_tags
89 .get(&d_tag)
90 .is_some_and(|values| !values.is_empty());
91 }
92
93 false
94 }
95
96 struct NostrStore {
97 store: Arc<dyn SocialGraphBackend>,
98 }
99
100 impl NostrStore {
101 fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
102 Self { store }
103 }
104
105 async fn ingest(&self, event: Event) -> Result<()> {
106 let store = Arc::clone(&self.store);
107 tokio::task::spawn_blocking(move || {
108 crate::socialgraph::ingest_parsed_event(store.as_ref(), &event)
109 })
110 .await
111 .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
112 }
113
114 async fn ingest_with_storage_class(
115 &self,
116 event: Event,
117 storage_class: EventStorageClass,
118 ) -> Result<()> {
119 let store = Arc::clone(&self.store);
120 tokio::task::spawn_blocking(move || {
121 crate::socialgraph::ingest_parsed_event_with_storage_class(
122 store.as_ref(),
123 &event,
124 storage_class,
125 )
126 })
127 .await
128 .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
129 }
130
131 async fn query(&self, filter: NostrFilter, limit: usize) -> Vec<Event> {
132 let store = Arc::clone(&self.store);
133 match std::thread::Builder::new()
134 .name("nostr-relay-query".to_string())
135 .spawn(move || crate::socialgraph::query_events(store.as_ref(), &filter, limit))
136 {
137 Ok(join) => match join.join() {
138 Ok(events) => events,
139 Err(_) => {
140 warn!("trusted nostr store query thread panicked");
141 Vec::new()
142 }
143 },
144 Err(err) => {
145 warn!("failed to spawn trusted nostr store query thread: {}", err);
146 Vec::new()
147 }
148 }
149 }
150 }
151
152 #[derive(Debug, Clone)]
153 struct ClientQuota {
154 last_reset: Instant,
155 spambox_events: u32,
156 reqs: u32,
157 }
158
159 impl ClientQuota {
160 fn new() -> Self {
161 Self {
162 last_reset: Instant::now(),
163 spambox_events: 0,
164 reqs: 0,
165 }
166 }
167
168 fn reset_if_needed(&mut self) {
169 if self.last_reset.elapsed() >= Duration::from_secs(60) {
170 self.last_reset = Instant::now();
171 self.spambox_events = 0;
172 self.reqs = 0;
173 }
174 }
175
176 fn allow_spambox_event(&mut self, limit: u32) -> bool {
177 self.reset_if_needed();
178 if self.spambox_events >= limit {
179 return false;
180 }
181 self.spambox_events += 1;
182 true
183 }
184
185 fn allow_req(&mut self, limit: u32) -> bool {
186 self.reset_if_needed();
187 if self.reqs >= limit {
188 return false;
189 }
190 self.reqs += 1;
191 true
192 }
193 }
194
195 struct ClientState {
196 sender: mpsc::UnboundedSender<String>,
197 pubkey: Option<String>,
198 quota: ClientQuota,
199 }
200
201 struct RecentEvents {
202 order: VecDeque<EventId>,
203 events: HashMap<EventId, Event>,
204 max_len: usize,
205 }
206
207 impl RecentEvents {
208 fn new(max_len: usize) -> Self {
209 Self {
210 order: VecDeque::new(),
211 events: HashMap::new(),
212 max_len: max_len.max(128),
213 }
214 }
215
216 fn insert(&mut self, event: Event) {
217 if self.events.contains_key(&event.id) {
218 return;
219 }
220 self.order.push_back(event.id);
221 self.events.insert(event.id, event);
222 while self.order.len() > self.max_len {
223 if let Some(oldest) = self.order.pop_front() {
224 self.events.remove(&oldest);
225 }
226 }
227 }
228
229 fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
230 self.events
231 .values()
232 .filter(|event| filter.match_event(event))
233 .cloned()
234 .collect()
235 }
236 }
237
238 enum SpamboxStore {
239 Persistent(NostrStore),
240 Memory(MemorySpambox),
241 }
242
243 struct MemorySpambox {
244 events: Mutex<VecDeque<Event>>,
245 max_len: usize,
246 }
247
248 impl MemorySpambox {
249 fn new(max_len: usize) -> Self {
250 Self {
251 events: Mutex::new(VecDeque::new()),
252 max_len: max_len.max(128),
253 }
254 }
255
256 async fn ingest(&self, event: &Event) -> bool {
257 let mut events = self.events.lock().await;
258 events.push_back(event.clone());
259 while events.len() > self.max_len {
260 events.pop_front();
261 }
262 true
263 }
264 }
265
266 impl SpamboxStore {
267 async fn ingest(&self, event: &Event) -> bool {
268 match self {
269 SpamboxStore::Persistent(store) => store.ingest(event.clone()).await.is_ok(),
270 SpamboxStore::Memory(store) => store.ingest(event).await,
271 }
272 }
273 }
274
275 struct BluetoothEventLog {
276 path: PathBuf,
277 state: Mutex<BluetoothEventLogState>,
278 }
279
280 struct BluetoothEventLogState {
281 records: VecDeque<BluetoothReceivedEventRecord>,
282 event_ids: HashSet<String>,
283 }
284
285 impl BluetoothEventLog {
286 fn load(path: PathBuf) -> Self {
287 let records = std::fs::read_to_string(&path)
288 .ok()
289 .map(|serialized| {
290 serialized
291 .lines()
292 .filter_map(|line| {
293 serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
294 })
295 .collect::<Vec<_>>()
296 })
297 .unwrap_or_default();
298 let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
299 let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
300 for record in records.into_iter().skip(start) {
301 trimmed.push_back(record);
302 }
303 let event_ids = trimmed
304 .iter()
305 .map(|record| record.event_id.clone())
306 .collect::<HashSet<_>>();
307
308 Self {
309 path,
310 state: Mutex::new(BluetoothEventLogState {
311 records: trimmed,
312 event_ids,
313 }),
314 }
315 }
316
317 async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
318 let state = self.state.lock().await;
319 state
320 .records
321 .iter()
322 .rev()
323 .take(limit.max(1))
324 .cloned()
325 .collect()
326 }
327
328 async fn record(&self, event: &Event, peer_id: Option<String>) {
329 let record = BluetoothReceivedEventRecord {
330 event_id: event.id.to_hex(),
331 pubkey: event.pubkey.to_hex(),
332 kind: event.kind.as_u16() as u32,
333 created_at: event.created_at.as_u64(),
334 received_at: std::time::SystemTime::now()
335 .duration_since(std::time::UNIX_EPOCH)
336 .map(|value| value.as_secs())
337 .unwrap_or(0),
338 peer_id,
339 cid_values: cid_values_from_event(event),
340 };
341
342 let serialized = {
343 let mut state = self.state.lock().await;
344 if state.event_ids.contains(&record.event_id) {
345 return;
346 }
347
348 state.event_ids.insert(record.event_id.clone());
349 state.records.push_back(record);
350 while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
351 if let Some(removed) = state.records.pop_front() {
352 state.event_ids.remove(&removed.event_id);
353 }
354 }
355
356 state
357 .records
358 .iter()
359 .filter_map(|entry| serde_json::to_string(entry).ok())
360 .collect::<Vec<_>>()
361 .join("\n")
362 };
363
364 if let Some(parent) = self.path.parent() {
365 let _ = std::fs::create_dir_all(parent);
366 }
367 let _ = std::fs::write(&self.path, serialized);
368 }
369 }
370
371 fn looks_like_cid_reference(value: &str) -> bool {
372 Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
373 }
374
375 fn cid_values_from_event(event: &Event) -> Vec<String> {
376 let mut values = Vec::new();
377 let mut seen = HashSet::new();
378
379 for tag in &event.tags {
380 let fields = tag.clone().to_vec();
381 if fields.first().is_some_and(|name| name == "cid") {
382 if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
383 if seen.insert(value.clone()) {
384 values.push(value.clone());
385 }
386 }
387 continue;
388 }
389
390 for value in fields.into_iter().skip(1) {
391 if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
392 values.push(value);
393 }
394 }
395 }
396
397 values
398 }
399
400 pub struct NostrRelay {
401 config: NostrRelayConfig,
402 trusted: NostrStore,
403 public_pubkeys: HashSet<String>,
404 spambox: Option<SpamboxStore>,
405 social_graph: Option<Arc<SocialGraphAccessControl>>,
406 clients: Mutex<HashMap<u64, ClientState>>,
407 subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
408 recent_events: Mutex<RecentEvents>,
409 next_client_id: AtomicU64,
410 bluetooth_event_log: Arc<BluetoothEventLog>,
411 }
412
413 impl NostrRelay {
414 async fn collect_filter_events(
415 &self,
416 filter: &NostrFilter,
417 limit: usize,
418 seen: &mut HashSet<EventId>,
419 events: &mut Vec<Event>,
420 ) {
421 if limit == 0 {
422 return;
423 }
424
425 let mut added = 0usize;
426
427 if !prefers_trusted_only(filter) {
428 let recent = {
429 let cache = self.recent_events.lock().await;
430 cache.matching(filter)
431 };
432 for event in recent {
433 if seen.insert(event.id) {
434 events.push(event);
435 added += 1;
436 if added >= limit {
437 return;
438 }
439 }
440 }
441 }
442
443 for event in self.trusted.query(filter.clone(), limit).await {
444 if seen.insert(event.id) {
445 events.push(event);
446 added += 1;
447 if added >= limit {
448 return;
449 }
450 }
451 }
452 }
453
454 async fn collect_filter_count(
455 &self,
456 filter: &NostrFilter,
457 limit: usize,
458 seen: &mut HashSet<EventId>,
459 ) {
460 if limit == 0 {
461 return;
462 }
463
464 let mut added = 0usize;
465
466 if !prefers_trusted_only(filter) {
467 let recent = {
468 let cache = self.recent_events.lock().await;
469 cache.matching(filter)
470 };
471 for event in recent {
472 if seen.insert(event.id) {
473 added += 1;
474 if added >= limit {
475 return;
476 }
477 }
478 }
479 }
480
481 for event in self.trusted.query(filter.clone(), limit).await {
482 if seen.insert(event.id) {
483 added += 1;
484 if added >= limit {
485 return;
486 }
487 }
488 }
489 }
490
491 pub fn new(
492 trusted_store: Arc<dyn SocialGraphBackend>,
493 data_dir: PathBuf,
494 public_pubkeys: HashSet<String>,
495 social_graph: Option<Arc<SocialGraphAccessControl>>,
496 config: NostrRelayConfig,
497 ) -> Result<Self> {
498 let spambox = if config.spambox_db_max_bytes == 0 {
499 Some(SpamboxStore::Memory(MemorySpambox::new(
500 config.max_query_limit * 2,
501 )))
502 } else {
503 let spam_dir = data_dir.join("socialgraph_spambox");
504 match socialgraph::open_social_graph_store_at_path(
505 &spam_dir,
506 Some(config.spambox_db_max_bytes),
507 ) {
508 Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
509 Err(err) => {
510 warn!(
511 "Failed to open social graph spambox (falling back to memory): {}",
512 err
513 );
514 Some(SpamboxStore::Memory(MemorySpambox::new(
515 config.max_query_limit * 2,
516 )))
517 }
518 }
519 };
520
521 let recent_size = config.max_query_limit.saturating_mul(2);
522 let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
523 data_dir.join("bluetooth-events.jsonl"),
524 ));
525
526 Ok(Self {
527 config,
528 trusted: NostrStore::new(trusted_store),
529 public_pubkeys,
530 spambox,
531 social_graph,
532 clients: Mutex::new(HashMap::new()),
533 subscriptions: Mutex::new(HashMap::new()),
534 recent_events: Mutex::new(RecentEvents::new(recent_size)),
535 next_client_id: AtomicU64::new(1),
536 bluetooth_event_log,
537 })
538 }
539
540 pub fn next_client_id(&self) -> u64 {
541 self.next_client_id.fetch_add(1, Ordering::SeqCst)
542 }
543
544 pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
545 self.ingest_trusted_event_inner(event, true).await
546 }
547
548 pub async fn ingest_trusted_event_from_bluetooth(
549 &self,
550 event: Event,
551 peer_id: Option<String>,
552 ) -> Result<()> {
553 self.ingest_trusted_event_inner(event.clone(), true).await?;
554 self.bluetooth_event_log.record(&event, peer_id).await;
555 Ok(())
556 }
557
558 pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
559 self.ingest_trusted_event_inner(event, false).await
560 }
561
562 pub async fn bluetooth_received_events(
563 &self,
564 limit: usize,
565 ) -> Vec<BluetoothReceivedEventRecord> {
566 self.bluetooth_event_log.recent(limit).await
567 }
568
569 async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
570 event
571 .verify()
572 .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
573
574 let is_ephemeral = event.kind.is_ephemeral();
575 {
576 let mut recent = self.recent_events.lock().await;
577 recent.insert(event.clone());
578 }
579
580 if !is_ephemeral {
581 let storage_class = self.event_storage_class(&event);
582 self.trusted
583 .ingest_with_storage_class(event.clone(), storage_class)
584 .await?;
585 }
586
587 if broadcast {
588 self.broadcast_event(&event).await;
589 }
590 Ok(())
591 }
592
593 pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
594 let limit = limit.min(self.config.max_query_limit);
595 if limit == 0 {
596 return Vec::new();
597 }
598
599 let mut seen: HashSet<EventId> = HashSet::new();
600 let mut events = Vec::new();
601
602 if !prefers_trusted_only(filter) {
603 let recent = {
604 let cache = self.recent_events.lock().await;
605 cache.matching(filter)
606 };
607 for event in recent {
608 if seen.insert(event.id) {
609 events.push(event);
610 if events.len() >= limit {
611 return events;
612 }
613 }
614 }
615 }
616
617 for event in self.trusted.query(filter.clone(), limit).await {
618 if seen.insert(event.id) {
619 events.push(event);
620 if events.len() >= limit {
621 break;
622 }
623 }
624 }
625
626 events
627 }
628
629 pub async fn register_client(
630 &self,
631 client_id: u64,
632 sender: mpsc::UnboundedSender<String>,
633 pubkey: Option<String>,
634 ) {
635 let mut clients = self.clients.lock().await;
636 clients.insert(
637 client_id,
638 ClientState {
639 sender,
640 pubkey,
641 quota: ClientQuota::new(),
642 },
643 );
644 }
645
646 pub async fn unregister_client(&self, client_id: u64) {
647 let mut clients = self.clients.lock().await;
648 clients.remove(&client_id);
649 drop(clients);
650 let mut subs = self.subscriptions.lock().await;
651 subs.remove(&client_id);
652 }
653
654 pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
655 match msg {
656 NostrClientMessage::Event(event) => {
657 self.handle_event(client_id, *event).await;
658 }
659 NostrClientMessage::Req {
660 subscription_id,
661 filters,
662 } => {
663 self.handle_req(client_id, subscription_id, filters).await;
664 }
665 NostrClientMessage::Count {
666 subscription_id,
667 filters,
668 } => {
669 self.handle_count(client_id, subscription_id, filters).await;
670 }
671 NostrClientMessage::Close(subscription_id) => {
672 self.handle_close(client_id, subscription_id).await;
673 }
674 NostrClientMessage::Auth(event) => {
675 self.handle_auth(client_id, *event).await;
676 }
677 NostrClientMessage::NegOpen { .. }
678 | NostrClientMessage::NegMsg { .. }
679 | NostrClientMessage::NegClose { .. } => {
680 self.send_to_client(
681 client_id,
682 NostrRelayMessage::notice("negentropy not supported"),
683 )
684 .await;
685 }
686 }
687 }
688
689 pub async fn register_subscription_query(
690 &self,
691 client_id: u64,
692 subscription_id: SubscriptionId,
693 mut filters: Vec<NostrFilter>,
694 ) -> std::result::Result<Vec<Event>, &'static str> {
695 if !self.allow_req(client_id).await {
696 return Err("rate limited");
697 }
698
699 if filters.len() > self.config.max_filters_per_sub {
700 filters.truncate(self.config.max_filters_per_sub);
701 }
702
703 {
704 let mut subs = self.subscriptions.lock().await;
705 let entry = subs.entry(client_id).or_default();
706 if !entry.contains_key(&subscription_id)
707 && entry.len() >= self.config.max_subs_per_client
708 {
709 return Err("too many subscriptions");
710 }
711 entry.insert(subscription_id, filters.clone());
712 }
713
714 let mut seen: HashSet<EventId> = HashSet::new();
715 let mut events = Vec::new();
716 for filter in &filters {
717 let limit = filter
718 .limit
719 .unwrap_or(self.config.max_query_limit)
720 .min(self.config.max_query_limit);
721 self.collect_filter_events(filter, limit, &mut seen, &mut events)
722 .await;
723 }
724
725 Ok(events)
726 }
727
728 async fn handle_auth(&self, client_id: u64, event: Event) {
729 let ok = event.verify().is_ok();
730 let message = if ok { "" } else { "invalid auth" };
731 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
732 .await;
733 }
734
735 async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
736 let mut subs = self.subscriptions.lock().await;
737 if let Some(map) = subs.get_mut(&client_id) {
738 map.remove(&subscription_id);
739 }
740 }
741
742 async fn handle_event(&self, client_id: u64, event: Event) {
743 let ok = event.verify().is_ok();
744 if !ok {
745 self.send_to_client(
746 client_id,
747 NostrRelayMessage::ok(event.id, false, "invalid: signature"),
748 )
749 .await;
750 return;
751 }
752
753 let trusted = self.is_trusted_event(client_id, &event).await;
754 if !trusted && !self.allow_spambox_event(client_id).await {
755 self.send_to_client(
756 client_id,
757 NostrRelayMessage::ok(event.id, false, "rate limited"),
758 )
759 .await;
760 return;
761 }
762
763 let is_ephemeral = event.kind.is_ephemeral();
764 if trusted {
765 let mut recent = self.recent_events.lock().await;
766 recent.insert(event.clone());
767 }
768 if !is_ephemeral {
769 let stored = if trusted {
770 let storage_class = self.event_storage_class(&event);
771 self.trusted
772 .ingest_with_storage_class(event.clone(), storage_class)
773 .await
774 .is_ok()
775 } else {
776 match self.spambox.as_ref() {
777 Some(spambox) => spambox.ingest(&event).await,
778 None => false,
779 }
780 };
781
782 if !stored {
783 let message = if trusted {
784 "store failed"
785 } else {
786 "spambox full"
787 };
788 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
789 .await;
790 return;
791 }
792 }
793
794 let message = if trusted { "" } else { "spambox" };
795 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
796 .await;
797
798 if trusted {
799 self.broadcast_event(&event).await;
800 }
801 }
802
803 async fn handle_req(
804 &self,
805 client_id: u64,
806 subscription_id: SubscriptionId,
807 filters: Vec<NostrFilter>,
808 ) {
809 match self
810 .register_subscription_query(client_id, subscription_id.clone(), filters)
811 .await
812 {
813 Ok(events) => {
814 for event in events {
815 self.send_to_client(
816 client_id,
817 NostrRelayMessage::event(subscription_id.clone(), event),
818 )
819 .await;
820 }
821
822 self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
823 .await;
824 }
825 Err(message) => {
826 self.send_to_client(
827 client_id,
828 NostrRelayMessage::closed(subscription_id, message),
829 )
830 .await;
831 }
832 }
833 }
834
835 async fn handle_count(
836 &self,
837 client_id: u64,
838 subscription_id: SubscriptionId,
839 filters: Vec<NostrFilter>,
840 ) {
841 if !self.allow_req(client_id).await {
842 self.send_to_client(
843 client_id,
844 NostrRelayMessage::closed(subscription_id, "rate limited"),
845 )
846 .await;
847 return;
848 }
849
850 let mut seen: HashSet<EventId> = HashSet::new();
851 for filter in &filters {
852 let limit = filter
853 .limit
854 .unwrap_or(self.config.max_query_limit)
855 .min(self.config.max_query_limit);
856 self.collect_filter_count(filter, limit, &mut seen).await;
857 }
858
859 self.send_to_client(
860 client_id,
861 NostrRelayMessage::count(subscription_id, seen.len()),
862 )
863 .await;
864 }
865
866 async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
867 let event_pubkey = event.pubkey.to_hex();
868 let client_pubkey = {
869 let clients = self.clients.lock().await;
870 clients
871 .get(&client_id)
872 .and_then(|state| state.pubkey.clone())
873 };
874 if let Some(pubkey) = client_pubkey {
875 return pubkey == event_pubkey
876 || self.social_graph.as_ref().is_some_and(|social_graph| {
877 social_graph.check_write_access(&event_pubkey)
878 });
879 }
880 if let Some(ref social_graph) = self.social_graph {
881 return social_graph.check_write_access(&event_pubkey);
882 }
883 true
884 }
885
886 fn event_storage_class(&self, event: &Event) -> EventStorageClass {
887 if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
888 EventStorageClass::Public
889 } else {
890 EventStorageClass::Ambient
891 }
892 }
893
894 async fn allow_spambox_event(&self, client_id: u64) -> bool {
895 let mut clients = self.clients.lock().await;
896 let Some(state) = clients.get_mut(&client_id) else {
897 return false;
898 };
899 state
900 .quota
901 .allow_spambox_event(self.config.spambox_max_events_per_min)
902 }
903
904 async fn allow_req(&self, client_id: u64) -> bool {
905 let mut clients = self.clients.lock().await;
906 let Some(state) = clients.get_mut(&client_id) else {
907 return false;
908 };
909 state.quota.allow_req(self.config.spambox_max_reqs_per_min)
910 }
911
912 async fn broadcast_event(&self, event: &Event) {
913 let subscriptions = self.subscriptions.lock().await;
914 let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
915 for (client_id, subs) in subscriptions.iter() {
916 for (sub_id, filters) in subs.iter() {
917 if filters.iter().any(|f| f.match_event(event)) {
918 deliveries.push((*client_id, sub_id.clone()));
919 }
920 }
921 }
922 drop(subscriptions);
923
924 for (client_id, sub_id) in deliveries {
925 self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
926 .await;
927 }
928 }
929
930 async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
931 let sender = {
932 let clients = self.clients.lock().await;
933 clients.get(&client_id).map(|state| state.sender.clone())
934 };
935 if let Some(tx) = sender {
936 let _ = tx.send(msg.as_json());
937 }
938 }
939 }
940}
941
942pub use imp::NostrRelay;
943
944#[cfg(feature = "p2p")]
945#[async_trait::async_trait]
946impl MeshEventStore for NostrRelay {
947 async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
948 NostrRelay::ingest_trusted_event(self, event).await
949 }
950
951 async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
952 NostrRelay::query_events(self, filter, limit).await
953 }
954}
955
956#[cfg(feature = "p2p")]
957#[async_trait::async_trait]
958impl MeshRelayClient for NostrRelay {
959 fn next_client_id(&self) -> u64 {
960 NostrRelay::next_client_id(self)
961 }
962
963 async fn register_client(
964 &self,
965 client_id: u64,
966 sender: mpsc::UnboundedSender<String>,
967 pubkey: Option<String>,
968 ) {
969 NostrRelay::register_client(self, client_id, sender, pubkey).await
970 }
971
972 async fn unregister_client(&self, client_id: u64) {
973 NostrRelay::unregister_client(self, client_id).await
974 }
975
976 async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
977 NostrRelay::handle_client_message(self, client_id, msg).await
978 }
979
980 async fn register_subscription_query(
981 &self,
982 client_id: u64,
983 subscription_id: SubscriptionId,
984 filters: Vec<NostrFilter>,
985 ) -> std::result::Result<Vec<Event>, &'static str> {
986 NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
987 }
988
989 async fn ingest_trusted_event_from_peer(
990 &self,
991 event: Event,
992 peer_id: Option<String>,
993 ) -> anyhow::Result<()> {
994 NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
995 }
996}
997
998#[cfg(test)]
999#[path = "nostr_relay/tests.rs"]
1000mod tests;