Skip to main content

hashtree_cli/
nostr_relay.rs

1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicU64, Ordering},
6    Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex};
11
12use hashtree_network::{MeshEventStore, MeshRelayClient};
13use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
14use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
15
16use crate::socialgraph;
17
18const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
19
20#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
21pub struct BluetoothReceivedEventRecord {
22    pub event_id: String,
23    pub pubkey: String,
24    pub kind: u32,
25    pub created_at: u64,
26    pub received_at: u64,
27    pub peer_id: Option<String>,
28    pub cid_values: Vec<String>,
29}
30
31#[derive(Debug, Clone)]
32pub struct NostrRelayConfig {
33    pub spambox_db_max_bytes: u64,
34    pub max_query_limit: usize,
35    pub max_subs_per_client: usize,
36    pub max_filters_per_sub: usize,
37    pub spambox_max_events_per_min: u32,
38    pub spambox_max_reqs_per_min: u32,
39}
40
41impl Default for NostrRelayConfig {
42    fn default() -> Self {
43        Self {
44            spambox_db_max_bytes: 1024 * 1024 * 1024,
45            max_query_limit: 200,
46            max_subs_per_client: 64,
47            max_filters_per_sub: 32,
48            spambox_max_events_per_min: 120,
49            spambox_max_reqs_per_min: 120,
50        }
51    }
52}
53
54mod imp {
55    use super::*;
56    use anyhow::Result;
57
58    use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
59    use hashtree_core::{nhash_decode, Cid};
60    use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
61    use tracing::warn;
62
63    fn prefers_trusted_only(filter: &NostrFilter) -> bool {
64        let Some(kinds) = filter.kinds.as_ref() else {
65            return false;
66        };
67        if kinds.len() != 1 {
68            return false;
69        }
70
71        let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
72        let has_authors = filter
73            .authors
74            .as_ref()
75            .is_some_and(|authors| !authors.is_empty());
76        if !has_authors {
77            return false;
78        }
79
80        if is_replaceable_kind(kind) {
81            return true;
82        }
83
84        if is_parameterized_replaceable_kind(kind) {
85            let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
86            return filter
87                .generic_tags
88                .get(&d_tag)
89                .is_some_and(|values| !values.is_empty());
90        }
91
92        false
93    }
94
95    struct NostrStore {
96        store: Arc<dyn SocialGraphBackend>,
97    }
98
99    impl NostrStore {
100        fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
101            Self { store }
102        }
103
104        fn ingest(&self, event: &Event) -> Result<()> {
105            crate::socialgraph::ingest_parsed_event(self.store.as_ref(), event)
106        }
107
108        fn ingest_with_storage_class(
109            &self,
110            event: &Event,
111            storage_class: EventStorageClass,
112        ) -> Result<()> {
113            crate::socialgraph::ingest_parsed_event_with_storage_class(
114                self.store.as_ref(),
115                event,
116                storage_class,
117            )
118        }
119
120        fn query(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
121            crate::socialgraph::query_events(self.store.as_ref(), filter, limit)
122        }
123    }
124
125    #[derive(Debug, Clone)]
126    struct ClientQuota {
127        last_reset: Instant,
128        spambox_events: u32,
129        reqs: u32,
130    }
131
132    impl ClientQuota {
133        fn new() -> Self {
134            Self {
135                last_reset: Instant::now(),
136                spambox_events: 0,
137                reqs: 0,
138            }
139        }
140
141        fn reset_if_needed(&mut self) {
142            if self.last_reset.elapsed() >= Duration::from_secs(60) {
143                self.last_reset = Instant::now();
144                self.spambox_events = 0;
145                self.reqs = 0;
146            }
147        }
148
149        fn allow_spambox_event(&mut self, limit: u32) -> bool {
150            self.reset_if_needed();
151            if self.spambox_events >= limit {
152                return false;
153            }
154            self.spambox_events += 1;
155            true
156        }
157
158        fn allow_req(&mut self, limit: u32) -> bool {
159            self.reset_if_needed();
160            if self.reqs >= limit {
161                return false;
162            }
163            self.reqs += 1;
164            true
165        }
166    }
167
168    struct ClientState {
169        sender: mpsc::UnboundedSender<String>,
170        pubkey: Option<String>,
171        quota: ClientQuota,
172    }
173
174    struct RecentEvents {
175        order: VecDeque<EventId>,
176        events: HashMap<EventId, Event>,
177        max_len: usize,
178    }
179
180    impl RecentEvents {
181        fn new(max_len: usize) -> Self {
182            Self {
183                order: VecDeque::new(),
184                events: HashMap::new(),
185                max_len: max_len.max(128),
186            }
187        }
188
189        fn insert(&mut self, event: Event) {
190            if self.events.contains_key(&event.id) {
191                return;
192            }
193            self.order.push_back(event.id);
194            self.events.insert(event.id, event);
195            while self.order.len() > self.max_len {
196                if let Some(oldest) = self.order.pop_front() {
197                    self.events.remove(&oldest);
198                }
199            }
200        }
201
202        fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
203            self.events
204                .values()
205                .filter(|event| filter.match_event(event))
206                .cloned()
207                .collect()
208        }
209    }
210
211    enum SpamboxStore {
212        Persistent(NostrStore),
213        Memory(MemorySpambox),
214    }
215
216    struct MemorySpambox {
217        events: Mutex<VecDeque<Event>>,
218        max_len: usize,
219    }
220
221    impl MemorySpambox {
222        fn new(max_len: usize) -> Self {
223            Self {
224                events: Mutex::new(VecDeque::new()),
225                max_len: max_len.max(128),
226            }
227        }
228
229        async fn ingest(&self, event: &Event) -> bool {
230            let mut events = self.events.lock().await;
231            events.push_back(event.clone());
232            while events.len() > self.max_len {
233                events.pop_front();
234            }
235            true
236        }
237    }
238
239    impl SpamboxStore {
240        async fn ingest(&self, event: &Event) -> bool {
241            match self {
242                SpamboxStore::Persistent(store) => store.ingest(event).is_ok(),
243                SpamboxStore::Memory(store) => store.ingest(event).await,
244            }
245        }
246    }
247
248    struct BluetoothEventLog {
249        path: PathBuf,
250        state: Mutex<BluetoothEventLogState>,
251    }
252
253    struct BluetoothEventLogState {
254        records: VecDeque<BluetoothReceivedEventRecord>,
255        event_ids: HashSet<String>,
256    }
257
258    impl BluetoothEventLog {
259        fn load(path: PathBuf) -> Self {
260            let records = std::fs::read_to_string(&path)
261                .ok()
262                .map(|serialized| {
263                    serialized
264                        .lines()
265                        .filter_map(|line| {
266                            serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
267                        })
268                        .collect::<Vec<_>>()
269                })
270                .unwrap_or_default();
271            let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
272            let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
273            for record in records.into_iter().skip(start) {
274                trimmed.push_back(record);
275            }
276            let event_ids = trimmed
277                .iter()
278                .map(|record| record.event_id.clone())
279                .collect::<HashSet<_>>();
280
281            Self {
282                path,
283                state: Mutex::new(BluetoothEventLogState {
284                    records: trimmed,
285                    event_ids,
286                }),
287            }
288        }
289
290        async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
291            let state = self.state.lock().await;
292            state
293                .records
294                .iter()
295                .rev()
296                .take(limit.max(1))
297                .cloned()
298                .collect()
299        }
300
301        async fn record(&self, event: &Event, peer_id: Option<String>) {
302            let record = BluetoothReceivedEventRecord {
303                event_id: event.id.to_hex(),
304                pubkey: event.pubkey.to_hex(),
305                kind: event.kind.as_u16() as u32,
306                created_at: event.created_at.as_u64(),
307                received_at: std::time::SystemTime::now()
308                    .duration_since(std::time::UNIX_EPOCH)
309                    .map(|value| value.as_secs())
310                    .unwrap_or(0),
311                peer_id,
312                cid_values: cid_values_from_event(event),
313            };
314
315            let serialized = {
316                let mut state = self.state.lock().await;
317                if state.event_ids.contains(&record.event_id) {
318                    return;
319                }
320
321                state.event_ids.insert(record.event_id.clone());
322                state.records.push_back(record);
323                while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
324                    if let Some(removed) = state.records.pop_front() {
325                        state.event_ids.remove(&removed.event_id);
326                    }
327                }
328
329                state
330                    .records
331                    .iter()
332                    .filter_map(|entry| serde_json::to_string(entry).ok())
333                    .collect::<Vec<_>>()
334                    .join("\n")
335            };
336
337            if let Some(parent) = self.path.parent() {
338                let _ = std::fs::create_dir_all(parent);
339            }
340            let _ = std::fs::write(&self.path, serialized);
341        }
342    }
343
344    fn looks_like_cid_reference(value: &str) -> bool {
345        Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
346    }
347
348    fn cid_values_from_event(event: &Event) -> Vec<String> {
349        let mut values = Vec::new();
350        let mut seen = HashSet::new();
351
352        for tag in &event.tags {
353            let fields = tag.clone().to_vec();
354            if fields.first().is_some_and(|name| name == "cid") {
355                if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
356                    if seen.insert(value.clone()) {
357                        values.push(value.clone());
358                    }
359                }
360                continue;
361            }
362
363            for value in fields.into_iter().skip(1) {
364                if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
365                    values.push(value);
366                }
367            }
368        }
369
370        values
371    }
372
373    pub struct NostrRelay {
374        config: NostrRelayConfig,
375        trusted: NostrStore,
376        public_pubkeys: HashSet<String>,
377        spambox: Option<SpamboxStore>,
378        social_graph: Option<Arc<SocialGraphAccessControl>>,
379        clients: Mutex<HashMap<u64, ClientState>>,
380        subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
381        recent_events: Mutex<RecentEvents>,
382        next_client_id: AtomicU64,
383        bluetooth_event_log: Arc<BluetoothEventLog>,
384    }
385
386    impl NostrRelay {
387        async fn collect_filter_events(
388            &self,
389            filter: &NostrFilter,
390            limit: usize,
391            seen: &mut HashSet<EventId>,
392            events: &mut Vec<Event>,
393        ) {
394            if limit == 0 {
395                return;
396            }
397
398            let mut added = 0usize;
399
400            if !prefers_trusted_only(filter) {
401                let recent = {
402                    let cache = self.recent_events.lock().await;
403                    cache.matching(filter)
404                };
405                for event in recent {
406                    if seen.insert(event.id) {
407                        events.push(event);
408                        added += 1;
409                        if added >= limit {
410                            return;
411                        }
412                    }
413                }
414            }
415
416            for event in self.trusted.query(filter, limit) {
417                if seen.insert(event.id) {
418                    events.push(event);
419                    added += 1;
420                    if added >= limit {
421                        return;
422                    }
423                }
424            }
425        }
426
427        async fn collect_filter_count(
428            &self,
429            filter: &NostrFilter,
430            limit: usize,
431            seen: &mut HashSet<EventId>,
432        ) {
433            if limit == 0 {
434                return;
435            }
436
437            let mut added = 0usize;
438
439            if !prefers_trusted_only(filter) {
440                let recent = {
441                    let cache = self.recent_events.lock().await;
442                    cache.matching(filter)
443                };
444                for event in recent {
445                    if seen.insert(event.id) {
446                        added += 1;
447                        if added >= limit {
448                            return;
449                        }
450                    }
451                }
452            }
453
454            for event in self.trusted.query(filter, limit) {
455                if seen.insert(event.id) {
456                    added += 1;
457                    if added >= limit {
458                        return;
459                    }
460                }
461            }
462        }
463
464        pub fn new(
465            trusted_store: Arc<dyn SocialGraphBackend>,
466            data_dir: PathBuf,
467            public_pubkeys: HashSet<String>,
468            social_graph: Option<Arc<SocialGraphAccessControl>>,
469            config: NostrRelayConfig,
470        ) -> Result<Self> {
471            let spambox = if config.spambox_db_max_bytes == 0 {
472                Some(SpamboxStore::Memory(MemorySpambox::new(
473                    config.max_query_limit * 2,
474                )))
475            } else {
476                let spam_dir = data_dir.join("socialgraph_spambox");
477                match socialgraph::open_social_graph_store_at_path(
478                    &spam_dir,
479                    Some(config.spambox_db_max_bytes),
480                ) {
481                    Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
482                    Err(err) => {
483                        warn!(
484                            "Failed to open social graph spambox (falling back to memory): {}",
485                            err
486                        );
487                        Some(SpamboxStore::Memory(MemorySpambox::new(
488                            config.max_query_limit * 2,
489                        )))
490                    }
491                }
492            };
493
494            let recent_size = config.max_query_limit.saturating_mul(2);
495            let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
496                data_dir.join("bluetooth-events.jsonl"),
497            ));
498
499            Ok(Self {
500                config,
501                trusted: NostrStore::new(trusted_store),
502                public_pubkeys,
503                spambox,
504                social_graph,
505                clients: Mutex::new(HashMap::new()),
506                subscriptions: Mutex::new(HashMap::new()),
507                recent_events: Mutex::new(RecentEvents::new(recent_size)),
508                next_client_id: AtomicU64::new(1),
509                bluetooth_event_log,
510            })
511        }
512
513        pub fn next_client_id(&self) -> u64 {
514            self.next_client_id.fetch_add(1, Ordering::SeqCst)
515        }
516
517        pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
518            self.ingest_trusted_event_inner(event, true).await
519        }
520
521        pub async fn ingest_trusted_event_from_bluetooth(
522            &self,
523            event: Event,
524            peer_id: Option<String>,
525        ) -> Result<()> {
526            self.ingest_trusted_event_inner(event.clone(), true).await?;
527            self.bluetooth_event_log.record(&event, peer_id).await;
528            Ok(())
529        }
530
531        pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
532            self.ingest_trusted_event_inner(event, false).await
533        }
534
535        pub async fn bluetooth_received_events(
536            &self,
537            limit: usize,
538        ) -> Vec<BluetoothReceivedEventRecord> {
539            self.bluetooth_event_log.recent(limit).await
540        }
541
542        async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
543            event
544                .verify()
545                .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
546
547            let is_ephemeral = event.kind.is_ephemeral();
548            {
549                let mut recent = self.recent_events.lock().await;
550                recent.insert(event.clone());
551            }
552
553            if !is_ephemeral {
554                let storage_class = self.event_storage_class(&event);
555                self.trusted
556                    .ingest_with_storage_class(&event, storage_class)?;
557            }
558
559            if broadcast {
560                self.broadcast_event(&event).await;
561            }
562            Ok(())
563        }
564
565        pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
566            let limit = limit.min(self.config.max_query_limit);
567            if limit == 0 {
568                return Vec::new();
569            }
570
571            let mut seen: HashSet<EventId> = HashSet::new();
572            let mut events = Vec::new();
573
574            if !prefers_trusted_only(filter) {
575                let recent = {
576                    let cache = self.recent_events.lock().await;
577                    cache.matching(filter)
578                };
579                for event in recent {
580                    if seen.insert(event.id) {
581                        events.push(event);
582                        if events.len() >= limit {
583                            return events;
584                        }
585                    }
586                }
587            }
588
589            for event in self.trusted.query(filter, limit) {
590                if seen.insert(event.id) {
591                    events.push(event);
592                    if events.len() >= limit {
593                        break;
594                    }
595                }
596            }
597
598            events
599        }
600
601        pub async fn register_client(
602            &self,
603            client_id: u64,
604            sender: mpsc::UnboundedSender<String>,
605            pubkey: Option<String>,
606        ) {
607            let mut clients = self.clients.lock().await;
608            clients.insert(
609                client_id,
610                ClientState {
611                    sender,
612                    pubkey,
613                    quota: ClientQuota::new(),
614                },
615            );
616        }
617
618        pub async fn unregister_client(&self, client_id: u64) {
619            let mut clients = self.clients.lock().await;
620            clients.remove(&client_id);
621            drop(clients);
622            let mut subs = self.subscriptions.lock().await;
623            subs.remove(&client_id);
624        }
625
626        pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
627            match msg {
628                NostrClientMessage::Event(event) => {
629                    self.handle_event(client_id, *event).await;
630                }
631                NostrClientMessage::Req {
632                    subscription_id,
633                    filters,
634                } => {
635                    self.handle_req(client_id, subscription_id, filters).await;
636                }
637                NostrClientMessage::Count {
638                    subscription_id,
639                    filters,
640                } => {
641                    self.handle_count(client_id, subscription_id, filters).await;
642                }
643                NostrClientMessage::Close(subscription_id) => {
644                    self.handle_close(client_id, subscription_id).await;
645                }
646                NostrClientMessage::Auth(event) => {
647                    self.handle_auth(client_id, *event).await;
648                }
649                NostrClientMessage::NegOpen { .. }
650                | NostrClientMessage::NegMsg { .. }
651                | NostrClientMessage::NegClose { .. } => {
652                    self.send_to_client(
653                        client_id,
654                        NostrRelayMessage::notice("negentropy not supported"),
655                    )
656                    .await;
657                }
658            }
659        }
660
661        pub async fn register_subscription_query(
662            &self,
663            client_id: u64,
664            subscription_id: SubscriptionId,
665            mut filters: Vec<NostrFilter>,
666        ) -> std::result::Result<Vec<Event>, &'static str> {
667            if !self.allow_req(client_id).await {
668                return Err("rate limited");
669            }
670
671            if filters.len() > self.config.max_filters_per_sub {
672                filters.truncate(self.config.max_filters_per_sub);
673            }
674
675            {
676                let mut subs = self.subscriptions.lock().await;
677                let entry = subs.entry(client_id).or_default();
678                if !entry.contains_key(&subscription_id)
679                    && entry.len() >= self.config.max_subs_per_client
680                {
681                    return Err("too many subscriptions");
682                }
683                entry.insert(subscription_id, filters.clone());
684            }
685
686            let mut seen: HashSet<EventId> = HashSet::new();
687            let mut events = Vec::new();
688            for filter in &filters {
689                let limit = filter
690                    .limit
691                    .unwrap_or(self.config.max_query_limit)
692                    .min(self.config.max_query_limit);
693                self.collect_filter_events(filter, limit, &mut seen, &mut events)
694                    .await;
695            }
696
697            Ok(events)
698        }
699
700        async fn handle_auth(&self, client_id: u64, event: Event) {
701            let ok = event.verify().is_ok();
702            let message = if ok { "" } else { "invalid auth" };
703            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
704                .await;
705        }
706
707        async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
708            let mut subs = self.subscriptions.lock().await;
709            if let Some(map) = subs.get_mut(&client_id) {
710                map.remove(&subscription_id);
711            }
712        }
713
714        async fn handle_event(&self, client_id: u64, event: Event) {
715            let ok = event.verify().is_ok();
716            if !ok {
717                self.send_to_client(
718                    client_id,
719                    NostrRelayMessage::ok(event.id, false, "invalid: signature"),
720                )
721                .await;
722                return;
723            }
724
725            let trusted = self.is_trusted_event(client_id, &event).await;
726            if !trusted && !self.allow_spambox_event(client_id).await {
727                self.send_to_client(
728                    client_id,
729                    NostrRelayMessage::ok(event.id, false, "rate limited"),
730                )
731                .await;
732                return;
733            }
734
735            let is_ephemeral = event.kind.is_ephemeral();
736            if trusted {
737                let mut recent = self.recent_events.lock().await;
738                recent.insert(event.clone());
739            }
740            if !is_ephemeral {
741                let stored = if trusted {
742                    let storage_class = self.event_storage_class(&event);
743                    self.trusted
744                        .ingest_with_storage_class(&event, storage_class)
745                        .is_ok()
746                } else {
747                    match self.spambox.as_ref() {
748                        Some(spambox) => spambox.ingest(&event).await,
749                        None => false,
750                    }
751                };
752
753                if !stored {
754                    let message = if trusted {
755                        "store failed"
756                    } else {
757                        "spambox full"
758                    };
759                    self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
760                        .await;
761                    return;
762                }
763            }
764
765            let message = if trusted { "" } else { "spambox" };
766            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
767                .await;
768
769            if trusted {
770                self.broadcast_event(&event).await;
771            }
772        }
773
774        async fn handle_req(
775            &self,
776            client_id: u64,
777            subscription_id: SubscriptionId,
778            filters: Vec<NostrFilter>,
779        ) {
780            match self
781                .register_subscription_query(client_id, subscription_id.clone(), filters)
782                .await
783            {
784                Ok(events) => {
785                    for event in events {
786                        self.send_to_client(
787                            client_id,
788                            NostrRelayMessage::event(subscription_id.clone(), event),
789                        )
790                        .await;
791                    }
792
793                    self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
794                        .await;
795                }
796                Err(message) => {
797                    self.send_to_client(
798                        client_id,
799                        NostrRelayMessage::closed(subscription_id, message),
800                    )
801                    .await;
802                }
803            }
804        }
805
806        async fn handle_count(
807            &self,
808            client_id: u64,
809            subscription_id: SubscriptionId,
810            filters: Vec<NostrFilter>,
811        ) {
812            if !self.allow_req(client_id).await {
813                self.send_to_client(
814                    client_id,
815                    NostrRelayMessage::closed(subscription_id, "rate limited"),
816                )
817                .await;
818                return;
819            }
820
821            let mut seen: HashSet<EventId> = HashSet::new();
822            for filter in &filters {
823                let limit = filter
824                    .limit
825                    .unwrap_or(self.config.max_query_limit)
826                    .min(self.config.max_query_limit);
827                self.collect_filter_count(filter, limit, &mut seen).await;
828            }
829
830            self.send_to_client(
831                client_id,
832                NostrRelayMessage::count(subscription_id, seen.len()),
833            )
834            .await;
835        }
836
837        async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
838            let event_pubkey = event.pubkey.to_hex();
839            let client_pubkey = {
840                let clients = self.clients.lock().await;
841                clients
842                    .get(&client_id)
843                    .and_then(|state| state.pubkey.clone())
844            };
845            if let Some(pubkey) = client_pubkey {
846                return pubkey == event_pubkey
847                    || self.social_graph.as_ref().is_some_and(|social_graph| {
848                        social_graph.check_write_access(&event_pubkey)
849                    });
850            }
851            if let Some(ref social_graph) = self.social_graph {
852                return social_graph.check_write_access(&event_pubkey);
853            }
854            true
855        }
856
857        fn event_storage_class(&self, event: &Event) -> EventStorageClass {
858            if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
859                EventStorageClass::Public
860            } else {
861                EventStorageClass::Ambient
862            }
863        }
864
865        async fn allow_spambox_event(&self, client_id: u64) -> bool {
866            let mut clients = self.clients.lock().await;
867            let Some(state) = clients.get_mut(&client_id) else {
868                return false;
869            };
870            state
871                .quota
872                .allow_spambox_event(self.config.spambox_max_events_per_min)
873        }
874
875        async fn allow_req(&self, client_id: u64) -> bool {
876            let mut clients = self.clients.lock().await;
877            let Some(state) = clients.get_mut(&client_id) else {
878                return false;
879            };
880            state.quota.allow_req(self.config.spambox_max_reqs_per_min)
881        }
882
883        async fn broadcast_event(&self, event: &Event) {
884            let subscriptions = self.subscriptions.lock().await;
885            let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
886            for (client_id, subs) in subscriptions.iter() {
887                for (sub_id, filters) in subs.iter() {
888                    if filters.iter().any(|f| f.match_event(event)) {
889                        deliveries.push((*client_id, sub_id.clone()));
890                    }
891                }
892            }
893            drop(subscriptions);
894
895            for (client_id, sub_id) in deliveries {
896                self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
897                    .await;
898            }
899        }
900
901        async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
902            let sender = {
903                let clients = self.clients.lock().await;
904                clients.get(&client_id).map(|state| state.sender.clone())
905            };
906            if let Some(tx) = sender {
907                let _ = tx.send(msg.as_json());
908            }
909        }
910    }
911}
912
913pub use imp::NostrRelay;
914
915#[async_trait::async_trait]
916impl MeshEventStore for NostrRelay {
917    async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
918        NostrRelay::ingest_trusted_event(self, event).await
919    }
920
921    async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
922        NostrRelay::query_events(self, filter, limit).await
923    }
924}
925
926#[async_trait::async_trait]
927impl MeshRelayClient for NostrRelay {
928    fn next_client_id(&self) -> u64 {
929        NostrRelay::next_client_id(self)
930    }
931
932    async fn register_client(
933        &self,
934        client_id: u64,
935        sender: mpsc::UnboundedSender<String>,
936        pubkey: Option<String>,
937    ) {
938        NostrRelay::register_client(self, client_id, sender, pubkey).await
939    }
940
941    async fn unregister_client(&self, client_id: u64) {
942        NostrRelay::unregister_client(self, client_id).await
943    }
944
945    async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
946        NostrRelay::handle_client_message(self, client_id, msg).await
947    }
948
949    async fn register_subscription_query(
950        &self,
951        client_id: u64,
952        subscription_id: SubscriptionId,
953        filters: Vec<NostrFilter>,
954    ) -> std::result::Result<Vec<Event>, &'static str> {
955        NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
956    }
957
958    async fn ingest_trusted_event_from_peer(
959        &self,
960        event: Event,
961        peer_id: Option<String>,
962    ) -> anyhow::Result<()> {
963        NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
964    }
965}
966
967#[cfg(test)]
968#[path = "nostr_relay/tests.rs"]
969mod tests;