Skip to main content

hashtree_cli/
nostr_relay.rs

1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicU64, Ordering},
6    Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex};
11
12#[cfg(feature = "p2p")]
13use hashtree_network::{MeshEventStore, MeshRelayClient};
14use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
15use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
16
17use crate::socialgraph;
18
19const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
20
21#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
22pub struct BluetoothReceivedEventRecord {
23    pub event_id: String,
24    pub pubkey: String,
25    pub kind: u32,
26    pub created_at: u64,
27    pub received_at: u64,
28    pub peer_id: Option<String>,
29    pub cid_values: Vec<String>,
30}
31
32#[derive(Debug, Clone)]
33pub struct NostrRelayConfig {
34    pub spambox_db_max_bytes: u64,
35    pub max_query_limit: usize,
36    pub max_subs_per_client: usize,
37    pub max_filters_per_sub: usize,
38    pub spambox_max_events_per_min: u32,
39    pub spambox_max_reqs_per_min: u32,
40}
41
42impl Default for NostrRelayConfig {
43    fn default() -> Self {
44        Self {
45            spambox_db_max_bytes: 1024 * 1024 * 1024,
46            max_query_limit: 200,
47            max_subs_per_client: 64,
48            max_filters_per_sub: 32,
49            spambox_max_events_per_min: 120,
50            spambox_max_reqs_per_min: 120,
51        }
52    }
53}
54
55mod imp {
56    use super::*;
57    use anyhow::Result;
58
59    use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
60    use hashtree_core::{nhash_decode, Cid};
61    use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
62    use tracing::warn;
63
64    fn prefers_trusted_only(filter: &NostrFilter) -> bool {
65        let Some(kinds) = filter.kinds.as_ref() else {
66            return false;
67        };
68        if kinds.len() != 1 {
69            return false;
70        }
71
72        let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
73        let has_authors = filter
74            .authors
75            .as_ref()
76            .is_some_and(|authors| !authors.is_empty());
77        if !has_authors {
78            return false;
79        }
80
81        if is_replaceable_kind(kind) {
82            return true;
83        }
84
85        if is_parameterized_replaceable_kind(kind) {
86            let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
87            return filter
88                .generic_tags
89                .get(&d_tag)
90                .is_some_and(|values| !values.is_empty());
91        }
92
93        false
94    }
95
96    struct NostrStore {
97        store: Arc<dyn SocialGraphBackend>,
98    }
99
100    impl NostrStore {
101        fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
102            Self { store }
103        }
104
105        fn ingest(&self, event: &Event) -> Result<()> {
106            crate::socialgraph::ingest_parsed_event(self.store.as_ref(), event)
107        }
108
109        fn ingest_with_storage_class(
110            &self,
111            event: &Event,
112            storage_class: EventStorageClass,
113        ) -> Result<()> {
114            crate::socialgraph::ingest_parsed_event_with_storage_class(
115                self.store.as_ref(),
116                event,
117                storage_class,
118            )
119        }
120
121        fn query(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
122            crate::socialgraph::query_events(self.store.as_ref(), filter, limit)
123        }
124    }
125
126    #[derive(Debug, Clone)]
127    struct ClientQuota {
128        last_reset: Instant,
129        spambox_events: u32,
130        reqs: u32,
131    }
132
133    impl ClientQuota {
134        fn new() -> Self {
135            Self {
136                last_reset: Instant::now(),
137                spambox_events: 0,
138                reqs: 0,
139            }
140        }
141
142        fn reset_if_needed(&mut self) {
143            if self.last_reset.elapsed() >= Duration::from_secs(60) {
144                self.last_reset = Instant::now();
145                self.spambox_events = 0;
146                self.reqs = 0;
147            }
148        }
149
150        fn allow_spambox_event(&mut self, limit: u32) -> bool {
151            self.reset_if_needed();
152            if self.spambox_events >= limit {
153                return false;
154            }
155            self.spambox_events += 1;
156            true
157        }
158
159        fn allow_req(&mut self, limit: u32) -> bool {
160            self.reset_if_needed();
161            if self.reqs >= limit {
162                return false;
163            }
164            self.reqs += 1;
165            true
166        }
167    }
168
169    struct ClientState {
170        sender: mpsc::UnboundedSender<String>,
171        pubkey: Option<String>,
172        quota: ClientQuota,
173    }
174
175    struct RecentEvents {
176        order: VecDeque<EventId>,
177        events: HashMap<EventId, Event>,
178        max_len: usize,
179    }
180
181    impl RecentEvents {
182        fn new(max_len: usize) -> Self {
183            Self {
184                order: VecDeque::new(),
185                events: HashMap::new(),
186                max_len: max_len.max(128),
187            }
188        }
189
190        fn insert(&mut self, event: Event) {
191            if self.events.contains_key(&event.id) {
192                return;
193            }
194            self.order.push_back(event.id);
195            self.events.insert(event.id, event);
196            while self.order.len() > self.max_len {
197                if let Some(oldest) = self.order.pop_front() {
198                    self.events.remove(&oldest);
199                }
200            }
201        }
202
203        fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
204            self.events
205                .values()
206                .filter(|event| filter.match_event(event))
207                .cloned()
208                .collect()
209        }
210    }
211
212    enum SpamboxStore {
213        Persistent(NostrStore),
214        Memory(MemorySpambox),
215    }
216
217    struct MemorySpambox {
218        events: Mutex<VecDeque<Event>>,
219        max_len: usize,
220    }
221
222    impl MemorySpambox {
223        fn new(max_len: usize) -> Self {
224            Self {
225                events: Mutex::new(VecDeque::new()),
226                max_len: max_len.max(128),
227            }
228        }
229
230        async fn ingest(&self, event: &Event) -> bool {
231            let mut events = self.events.lock().await;
232            events.push_back(event.clone());
233            while events.len() > self.max_len {
234                events.pop_front();
235            }
236            true
237        }
238    }
239
240    impl SpamboxStore {
241        async fn ingest(&self, event: &Event) -> bool {
242            match self {
243                SpamboxStore::Persistent(store) => store.ingest(event).is_ok(),
244                SpamboxStore::Memory(store) => store.ingest(event).await,
245            }
246        }
247    }
248
249    struct BluetoothEventLog {
250        path: PathBuf,
251        state: Mutex<BluetoothEventLogState>,
252    }
253
254    struct BluetoothEventLogState {
255        records: VecDeque<BluetoothReceivedEventRecord>,
256        event_ids: HashSet<String>,
257    }
258
259    impl BluetoothEventLog {
260        fn load(path: PathBuf) -> Self {
261            let records = std::fs::read_to_string(&path)
262                .ok()
263                .map(|serialized| {
264                    serialized
265                        .lines()
266                        .filter_map(|line| {
267                            serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
268                        })
269                        .collect::<Vec<_>>()
270                })
271                .unwrap_or_default();
272            let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
273            let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
274            for record in records.into_iter().skip(start) {
275                trimmed.push_back(record);
276            }
277            let event_ids = trimmed
278                .iter()
279                .map(|record| record.event_id.clone())
280                .collect::<HashSet<_>>();
281
282            Self {
283                path,
284                state: Mutex::new(BluetoothEventLogState {
285                    records: trimmed,
286                    event_ids,
287                }),
288            }
289        }
290
291        async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
292            let state = self.state.lock().await;
293            state
294                .records
295                .iter()
296                .rev()
297                .take(limit.max(1))
298                .cloned()
299                .collect()
300        }
301
302        async fn record(&self, event: &Event, peer_id: Option<String>) {
303            let record = BluetoothReceivedEventRecord {
304                event_id: event.id.to_hex(),
305                pubkey: event.pubkey.to_hex(),
306                kind: event.kind.as_u16() as u32,
307                created_at: event.created_at.as_u64(),
308                received_at: std::time::SystemTime::now()
309                    .duration_since(std::time::UNIX_EPOCH)
310                    .map(|value| value.as_secs())
311                    .unwrap_or(0),
312                peer_id,
313                cid_values: cid_values_from_event(event),
314            };
315
316            let serialized = {
317                let mut state = self.state.lock().await;
318                if state.event_ids.contains(&record.event_id) {
319                    return;
320                }
321
322                state.event_ids.insert(record.event_id.clone());
323                state.records.push_back(record);
324                while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
325                    if let Some(removed) = state.records.pop_front() {
326                        state.event_ids.remove(&removed.event_id);
327                    }
328                }
329
330                state
331                    .records
332                    .iter()
333                    .filter_map(|entry| serde_json::to_string(entry).ok())
334                    .collect::<Vec<_>>()
335                    .join("\n")
336            };
337
338            if let Some(parent) = self.path.parent() {
339                let _ = std::fs::create_dir_all(parent);
340            }
341            let _ = std::fs::write(&self.path, serialized);
342        }
343    }
344
345    fn looks_like_cid_reference(value: &str) -> bool {
346        Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
347    }
348
349    fn cid_values_from_event(event: &Event) -> Vec<String> {
350        let mut values = Vec::new();
351        let mut seen = HashSet::new();
352
353        for tag in &event.tags {
354            let fields = tag.clone().to_vec();
355            if fields.first().is_some_and(|name| name == "cid") {
356                if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
357                    if seen.insert(value.clone()) {
358                        values.push(value.clone());
359                    }
360                }
361                continue;
362            }
363
364            for value in fields.into_iter().skip(1) {
365                if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
366                    values.push(value);
367                }
368            }
369        }
370
371        values
372    }
373
374    pub struct NostrRelay {
375        config: NostrRelayConfig,
376        trusted: NostrStore,
377        public_pubkeys: HashSet<String>,
378        spambox: Option<SpamboxStore>,
379        social_graph: Option<Arc<SocialGraphAccessControl>>,
380        clients: Mutex<HashMap<u64, ClientState>>,
381        subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
382        recent_events: Mutex<RecentEvents>,
383        next_client_id: AtomicU64,
384        bluetooth_event_log: Arc<BluetoothEventLog>,
385    }
386
387    impl NostrRelay {
388        async fn collect_filter_events(
389            &self,
390            filter: &NostrFilter,
391            limit: usize,
392            seen: &mut HashSet<EventId>,
393            events: &mut Vec<Event>,
394        ) {
395            if limit == 0 {
396                return;
397            }
398
399            let mut added = 0usize;
400
401            if !prefers_trusted_only(filter) {
402                let recent = {
403                    let cache = self.recent_events.lock().await;
404                    cache.matching(filter)
405                };
406                for event in recent {
407                    if seen.insert(event.id) {
408                        events.push(event);
409                        added += 1;
410                        if added >= limit {
411                            return;
412                        }
413                    }
414                }
415            }
416
417            for event in self.trusted.query(filter, limit) {
418                if seen.insert(event.id) {
419                    events.push(event);
420                    added += 1;
421                    if added >= limit {
422                        return;
423                    }
424                }
425            }
426        }
427
428        async fn collect_filter_count(
429            &self,
430            filter: &NostrFilter,
431            limit: usize,
432            seen: &mut HashSet<EventId>,
433        ) {
434            if limit == 0 {
435                return;
436            }
437
438            let mut added = 0usize;
439
440            if !prefers_trusted_only(filter) {
441                let recent = {
442                    let cache = self.recent_events.lock().await;
443                    cache.matching(filter)
444                };
445                for event in recent {
446                    if seen.insert(event.id) {
447                        added += 1;
448                        if added >= limit {
449                            return;
450                        }
451                    }
452                }
453            }
454
455            for event in self.trusted.query(filter, limit) {
456                if seen.insert(event.id) {
457                    added += 1;
458                    if added >= limit {
459                        return;
460                    }
461                }
462            }
463        }
464
465        pub fn new(
466            trusted_store: Arc<dyn SocialGraphBackend>,
467            data_dir: PathBuf,
468            public_pubkeys: HashSet<String>,
469            social_graph: Option<Arc<SocialGraphAccessControl>>,
470            config: NostrRelayConfig,
471        ) -> Result<Self> {
472            let spambox = if config.spambox_db_max_bytes == 0 {
473                Some(SpamboxStore::Memory(MemorySpambox::new(
474                    config.max_query_limit * 2,
475                )))
476            } else {
477                let spam_dir = data_dir.join("socialgraph_spambox");
478                match socialgraph::open_social_graph_store_at_path(
479                    &spam_dir,
480                    Some(config.spambox_db_max_bytes),
481                ) {
482                    Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
483                    Err(err) => {
484                        warn!(
485                            "Failed to open social graph spambox (falling back to memory): {}",
486                            err
487                        );
488                        Some(SpamboxStore::Memory(MemorySpambox::new(
489                            config.max_query_limit * 2,
490                        )))
491                    }
492                }
493            };
494
495            let recent_size = config.max_query_limit.saturating_mul(2);
496            let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
497                data_dir.join("bluetooth-events.jsonl"),
498            ));
499
500            Ok(Self {
501                config,
502                trusted: NostrStore::new(trusted_store),
503                public_pubkeys,
504                spambox,
505                social_graph,
506                clients: Mutex::new(HashMap::new()),
507                subscriptions: Mutex::new(HashMap::new()),
508                recent_events: Mutex::new(RecentEvents::new(recent_size)),
509                next_client_id: AtomicU64::new(1),
510                bluetooth_event_log,
511            })
512        }
513
514        pub fn next_client_id(&self) -> u64 {
515            self.next_client_id.fetch_add(1, Ordering::SeqCst)
516        }
517
518        pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
519            self.ingest_trusted_event_inner(event, true).await
520        }
521
522        pub async fn ingest_trusted_event_from_bluetooth(
523            &self,
524            event: Event,
525            peer_id: Option<String>,
526        ) -> Result<()> {
527            self.ingest_trusted_event_inner(event.clone(), true).await?;
528            self.bluetooth_event_log.record(&event, peer_id).await;
529            Ok(())
530        }
531
532        pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
533            self.ingest_trusted_event_inner(event, false).await
534        }
535
536        pub async fn bluetooth_received_events(
537            &self,
538            limit: usize,
539        ) -> Vec<BluetoothReceivedEventRecord> {
540            self.bluetooth_event_log.recent(limit).await
541        }
542
543        async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
544            event
545                .verify()
546                .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
547
548            let is_ephemeral = event.kind.is_ephemeral();
549            {
550                let mut recent = self.recent_events.lock().await;
551                recent.insert(event.clone());
552            }
553
554            if !is_ephemeral {
555                let storage_class = self.event_storage_class(&event);
556                self.trusted
557                    .ingest_with_storage_class(&event, storage_class)?;
558            }
559
560            if broadcast {
561                self.broadcast_event(&event).await;
562            }
563            Ok(())
564        }
565
566        pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
567            let limit = limit.min(self.config.max_query_limit);
568            if limit == 0 {
569                return Vec::new();
570            }
571
572            let mut seen: HashSet<EventId> = HashSet::new();
573            let mut events = Vec::new();
574
575            if !prefers_trusted_only(filter) {
576                let recent = {
577                    let cache = self.recent_events.lock().await;
578                    cache.matching(filter)
579                };
580                for event in recent {
581                    if seen.insert(event.id) {
582                        events.push(event);
583                        if events.len() >= limit {
584                            return events;
585                        }
586                    }
587                }
588            }
589
590            for event in self.trusted.query(filter, limit) {
591                if seen.insert(event.id) {
592                    events.push(event);
593                    if events.len() >= limit {
594                        break;
595                    }
596                }
597            }
598
599            events
600        }
601
602        pub async fn register_client(
603            &self,
604            client_id: u64,
605            sender: mpsc::UnboundedSender<String>,
606            pubkey: Option<String>,
607        ) {
608            let mut clients = self.clients.lock().await;
609            clients.insert(
610                client_id,
611                ClientState {
612                    sender,
613                    pubkey,
614                    quota: ClientQuota::new(),
615                },
616            );
617        }
618
619        pub async fn unregister_client(&self, client_id: u64) {
620            let mut clients = self.clients.lock().await;
621            clients.remove(&client_id);
622            drop(clients);
623            let mut subs = self.subscriptions.lock().await;
624            subs.remove(&client_id);
625        }
626
627        pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
628            match msg {
629                NostrClientMessage::Event(event) => {
630                    self.handle_event(client_id, *event).await;
631                }
632                NostrClientMessage::Req {
633                    subscription_id,
634                    filters,
635                } => {
636                    self.handle_req(client_id, subscription_id, filters).await;
637                }
638                NostrClientMessage::Count {
639                    subscription_id,
640                    filters,
641                } => {
642                    self.handle_count(client_id, subscription_id, filters).await;
643                }
644                NostrClientMessage::Close(subscription_id) => {
645                    self.handle_close(client_id, subscription_id).await;
646                }
647                NostrClientMessage::Auth(event) => {
648                    self.handle_auth(client_id, *event).await;
649                }
650                NostrClientMessage::NegOpen { .. }
651                | NostrClientMessage::NegMsg { .. }
652                | NostrClientMessage::NegClose { .. } => {
653                    self.send_to_client(
654                        client_id,
655                        NostrRelayMessage::notice("negentropy not supported"),
656                    )
657                    .await;
658                }
659            }
660        }
661
662        pub async fn register_subscription_query(
663            &self,
664            client_id: u64,
665            subscription_id: SubscriptionId,
666            mut filters: Vec<NostrFilter>,
667        ) -> std::result::Result<Vec<Event>, &'static str> {
668            if !self.allow_req(client_id).await {
669                return Err("rate limited");
670            }
671
672            if filters.len() > self.config.max_filters_per_sub {
673                filters.truncate(self.config.max_filters_per_sub);
674            }
675
676            {
677                let mut subs = self.subscriptions.lock().await;
678                let entry = subs.entry(client_id).or_default();
679                if !entry.contains_key(&subscription_id)
680                    && entry.len() >= self.config.max_subs_per_client
681                {
682                    return Err("too many subscriptions");
683                }
684                entry.insert(subscription_id, filters.clone());
685            }
686
687            let mut seen: HashSet<EventId> = HashSet::new();
688            let mut events = Vec::new();
689            for filter in &filters {
690                let limit = filter
691                    .limit
692                    .unwrap_or(self.config.max_query_limit)
693                    .min(self.config.max_query_limit);
694                self.collect_filter_events(filter, limit, &mut seen, &mut events)
695                    .await;
696            }
697
698            Ok(events)
699        }
700
701        async fn handle_auth(&self, client_id: u64, event: Event) {
702            let ok = event.verify().is_ok();
703            let message = if ok { "" } else { "invalid auth" };
704            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
705                .await;
706        }
707
708        async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
709            let mut subs = self.subscriptions.lock().await;
710            if let Some(map) = subs.get_mut(&client_id) {
711                map.remove(&subscription_id);
712            }
713        }
714
715        async fn handle_event(&self, client_id: u64, event: Event) {
716            let ok = event.verify().is_ok();
717            if !ok {
718                self.send_to_client(
719                    client_id,
720                    NostrRelayMessage::ok(event.id, false, "invalid: signature"),
721                )
722                .await;
723                return;
724            }
725
726            let trusted = self.is_trusted_event(client_id, &event).await;
727            if !trusted && !self.allow_spambox_event(client_id).await {
728                self.send_to_client(
729                    client_id,
730                    NostrRelayMessage::ok(event.id, false, "rate limited"),
731                )
732                .await;
733                return;
734            }
735
736            let is_ephemeral = event.kind.is_ephemeral();
737            if trusted {
738                let mut recent = self.recent_events.lock().await;
739                recent.insert(event.clone());
740            }
741            if !is_ephemeral {
742                let stored = if trusted {
743                    let storage_class = self.event_storage_class(&event);
744                    self.trusted
745                        .ingest_with_storage_class(&event, storage_class)
746                        .is_ok()
747                } else {
748                    match self.spambox.as_ref() {
749                        Some(spambox) => spambox.ingest(&event).await,
750                        None => false,
751                    }
752                };
753
754                if !stored {
755                    let message = if trusted {
756                        "store failed"
757                    } else {
758                        "spambox full"
759                    };
760                    self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
761                        .await;
762                    return;
763                }
764            }
765
766            let message = if trusted { "" } else { "spambox" };
767            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
768                .await;
769
770            if trusted {
771                self.broadcast_event(&event).await;
772            }
773        }
774
775        async fn handle_req(
776            &self,
777            client_id: u64,
778            subscription_id: SubscriptionId,
779            filters: Vec<NostrFilter>,
780        ) {
781            match self
782                .register_subscription_query(client_id, subscription_id.clone(), filters)
783                .await
784            {
785                Ok(events) => {
786                    for event in events {
787                        self.send_to_client(
788                            client_id,
789                            NostrRelayMessage::event(subscription_id.clone(), event),
790                        )
791                        .await;
792                    }
793
794                    self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
795                        .await;
796                }
797                Err(message) => {
798                    self.send_to_client(
799                        client_id,
800                        NostrRelayMessage::closed(subscription_id, message),
801                    )
802                    .await;
803                }
804            }
805        }
806
807        async fn handle_count(
808            &self,
809            client_id: u64,
810            subscription_id: SubscriptionId,
811            filters: Vec<NostrFilter>,
812        ) {
813            if !self.allow_req(client_id).await {
814                self.send_to_client(
815                    client_id,
816                    NostrRelayMessage::closed(subscription_id, "rate limited"),
817                )
818                .await;
819                return;
820            }
821
822            let mut seen: HashSet<EventId> = HashSet::new();
823            for filter in &filters {
824                let limit = filter
825                    .limit
826                    .unwrap_or(self.config.max_query_limit)
827                    .min(self.config.max_query_limit);
828                self.collect_filter_count(filter, limit, &mut seen).await;
829            }
830
831            self.send_to_client(
832                client_id,
833                NostrRelayMessage::count(subscription_id, seen.len()),
834            )
835            .await;
836        }
837
838        async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
839            let event_pubkey = event.pubkey.to_hex();
840            let client_pubkey = {
841                let clients = self.clients.lock().await;
842                clients
843                    .get(&client_id)
844                    .and_then(|state| state.pubkey.clone())
845            };
846            if let Some(pubkey) = client_pubkey {
847                return pubkey == event_pubkey
848                    || self.social_graph.as_ref().is_some_and(|social_graph| {
849                        social_graph.check_write_access(&event_pubkey)
850                    });
851            }
852            if let Some(ref social_graph) = self.social_graph {
853                return social_graph.check_write_access(&event_pubkey);
854            }
855            true
856        }
857
858        fn event_storage_class(&self, event: &Event) -> EventStorageClass {
859            if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
860                EventStorageClass::Public
861            } else {
862                EventStorageClass::Ambient
863            }
864        }
865
866        async fn allow_spambox_event(&self, client_id: u64) -> bool {
867            let mut clients = self.clients.lock().await;
868            let Some(state) = clients.get_mut(&client_id) else {
869                return false;
870            };
871            state
872                .quota
873                .allow_spambox_event(self.config.spambox_max_events_per_min)
874        }
875
876        async fn allow_req(&self, client_id: u64) -> bool {
877            let mut clients = self.clients.lock().await;
878            let Some(state) = clients.get_mut(&client_id) else {
879                return false;
880            };
881            state.quota.allow_req(self.config.spambox_max_reqs_per_min)
882        }
883
884        async fn broadcast_event(&self, event: &Event) {
885            let subscriptions = self.subscriptions.lock().await;
886            let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
887            for (client_id, subs) in subscriptions.iter() {
888                for (sub_id, filters) in subs.iter() {
889                    if filters.iter().any(|f| f.match_event(event)) {
890                        deliveries.push((*client_id, sub_id.clone()));
891                    }
892                }
893            }
894            drop(subscriptions);
895
896            for (client_id, sub_id) in deliveries {
897                self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
898                    .await;
899            }
900        }
901
902        async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
903            let sender = {
904                let clients = self.clients.lock().await;
905                clients.get(&client_id).map(|state| state.sender.clone())
906            };
907            if let Some(tx) = sender {
908                let _ = tx.send(msg.as_json());
909            }
910        }
911    }
912}
913
914pub use imp::NostrRelay;
915
916#[cfg(feature = "p2p")]
917#[async_trait::async_trait]
918impl MeshEventStore for NostrRelay {
919    async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
920        NostrRelay::ingest_trusted_event(self, event).await
921    }
922
923    async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
924        NostrRelay::query_events(self, filter, limit).await
925    }
926}
927
928#[cfg(feature = "p2p")]
929#[async_trait::async_trait]
930impl MeshRelayClient for NostrRelay {
931    fn next_client_id(&self) -> u64 {
932        NostrRelay::next_client_id(self)
933    }
934
935    async fn register_client(
936        &self,
937        client_id: u64,
938        sender: mpsc::UnboundedSender<String>,
939        pubkey: Option<String>,
940    ) {
941        NostrRelay::register_client(self, client_id, sender, pubkey).await
942    }
943
944    async fn unregister_client(&self, client_id: u64) {
945        NostrRelay::unregister_client(self, client_id).await
946    }
947
948    async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
949        NostrRelay::handle_client_message(self, client_id, msg).await
950    }
951
952    async fn register_subscription_query(
953        &self,
954        client_id: u64,
955        subscription_id: SubscriptionId,
956        filters: Vec<NostrFilter>,
957    ) -> std::result::Result<Vec<Event>, &'static str> {
958        NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
959    }
960
961    async fn ingest_trusted_event_from_peer(
962        &self,
963        event: Event,
964        peer_id: Option<String>,
965    ) -> anyhow::Result<()> {
966        NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
967    }
968}
969
970#[cfg(test)]
971#[path = "nostr_relay/tests.rs"]
972mod tests;