Skip to main content

hashtree_cli/
nostr_relay.rs

1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicU64, Ordering},
6    Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex};
11
12#[cfg(feature = "p2p")]
13use hashtree_network::{MeshEventStore, MeshRelayClient};
14use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
15use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
16
17use crate::socialgraph;
18
19const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
20
21#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
22pub struct BluetoothReceivedEventRecord {
23    pub event_id: String,
24    pub pubkey: String,
25    pub kind: u32,
26    pub created_at: u64,
27    pub received_at: u64,
28    pub peer_id: Option<String>,
29    pub cid_values: Vec<String>,
30}
31
32#[derive(Debug, Clone)]
33pub struct NostrRelayConfig {
34    pub spambox_db_max_bytes: u64,
35    pub max_query_limit: usize,
36    pub max_subs_per_client: usize,
37    pub max_filters_per_sub: usize,
38    pub spambox_max_events_per_min: u32,
39    pub spambox_max_reqs_per_min: u32,
40}
41
42impl Default for NostrRelayConfig {
43    fn default() -> Self {
44        Self {
45            spambox_db_max_bytes: 1024 * 1024 * 1024,
46            max_query_limit: 200,
47            max_subs_per_client: 64,
48            max_filters_per_sub: 32,
49            spambox_max_events_per_min: 120,
50            spambox_max_reqs_per_min: 120,
51        }
52    }
53}
54
55mod imp {
56    use super::*;
57    use anyhow::Result;
58
59    use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
60    use hashtree_core::{nhash_decode, Cid};
61    use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
62    use tracing::warn;
63
64    fn prefers_trusted_only(filter: &NostrFilter) -> bool {
65        let Some(kinds) = filter.kinds.as_ref() else {
66            return false;
67        };
68        if kinds.len() != 1 {
69            return false;
70        }
71
72        let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
73        let has_authors = filter
74            .authors
75            .as_ref()
76            .is_some_and(|authors| !authors.is_empty());
77        if !has_authors {
78            return false;
79        }
80
81        if is_replaceable_kind(kind) {
82            return true;
83        }
84
85        if is_parameterized_replaceable_kind(kind) {
86            let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
87            return filter
88                .generic_tags
89                .get(&d_tag)
90                .is_some_and(|values| !values.is_empty());
91        }
92
93        false
94    }
95
96    struct NostrStore {
97        store: Arc<dyn SocialGraphBackend>,
98    }
99
100    impl NostrStore {
101        fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
102            Self { store }
103        }
104
105        async fn ingest(&self, event: Event) -> Result<()> {
106            let store = Arc::clone(&self.store);
107            tokio::task::spawn_blocking(move || {
108                crate::socialgraph::ingest_parsed_event(store.as_ref(), &event)
109            })
110            .await
111            .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
112        }
113
114        async fn ingest_with_storage_class(
115            &self,
116            event: Event,
117            storage_class: EventStorageClass,
118        ) -> Result<()> {
119            let store = Arc::clone(&self.store);
120            tokio::task::spawn_blocking(move || {
121                crate::socialgraph::ingest_parsed_event_with_storage_class(
122                    store.as_ref(),
123                    &event,
124                    storage_class,
125                )
126            })
127            .await
128            .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
129        }
130
131        async fn query(&self, filter: NostrFilter, limit: usize) -> Vec<Event> {
132            let store = Arc::clone(&self.store);
133            match std::thread::Builder::new()
134                .name("nostr-relay-query".to_string())
135                .spawn(move || crate::socialgraph::query_events(store.as_ref(), &filter, limit))
136            {
137                Ok(join) => match join.join() {
138                    Ok(events) => events,
139                    Err(_) => {
140                        warn!("trusted nostr store query thread panicked");
141                        Vec::new()
142                    }
143                },
144                Err(err) => {
145                    warn!("failed to spawn trusted nostr store query thread: {}", err);
146                    Vec::new()
147                }
148            }
149        }
150    }
151
152    #[derive(Debug, Clone)]
153    struct ClientQuota {
154        last_reset: Instant,
155        spambox_events: u32,
156        reqs: u32,
157    }
158
159    impl ClientQuota {
160        fn new() -> Self {
161            Self {
162                last_reset: Instant::now(),
163                spambox_events: 0,
164                reqs: 0,
165            }
166        }
167
168        fn reset_if_needed(&mut self) {
169            if self.last_reset.elapsed() >= Duration::from_secs(60) {
170                self.last_reset = Instant::now();
171                self.spambox_events = 0;
172                self.reqs = 0;
173            }
174        }
175
176        fn allow_spambox_event(&mut self, limit: u32) -> bool {
177            self.reset_if_needed();
178            if self.spambox_events >= limit {
179                return false;
180            }
181            self.spambox_events += 1;
182            true
183        }
184
185        fn allow_req(&mut self, limit: u32) -> bool {
186            self.reset_if_needed();
187            if self.reqs >= limit {
188                return false;
189            }
190            self.reqs += 1;
191            true
192        }
193    }
194
195    struct ClientState {
196        sender: mpsc::UnboundedSender<String>,
197        pubkey: Option<String>,
198        quota: ClientQuota,
199    }
200
201    struct RecentEvents {
202        order: VecDeque<EventId>,
203        events: HashMap<EventId, Event>,
204        max_len: usize,
205    }
206
207    impl RecentEvents {
208        fn new(max_len: usize) -> Self {
209            Self {
210                order: VecDeque::new(),
211                events: HashMap::new(),
212                max_len: max_len.max(128),
213            }
214        }
215
216        fn insert(&mut self, event: Event) {
217            if self.events.contains_key(&event.id) {
218                return;
219            }
220            self.order.push_back(event.id);
221            self.events.insert(event.id, event);
222            while self.order.len() > self.max_len {
223                if let Some(oldest) = self.order.pop_front() {
224                    self.events.remove(&oldest);
225                }
226            }
227        }
228
229        fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
230            self.events
231                .values()
232                .filter(|event| filter.match_event(event))
233                .cloned()
234                .collect()
235        }
236    }
237
238    enum SpamboxStore {
239        Persistent(NostrStore),
240        Memory(MemorySpambox),
241    }
242
243    struct MemorySpambox {
244        events: Mutex<VecDeque<Event>>,
245        max_len: usize,
246    }
247
248    impl MemorySpambox {
249        fn new(max_len: usize) -> Self {
250            Self {
251                events: Mutex::new(VecDeque::new()),
252                max_len: max_len.max(128),
253            }
254        }
255
256        async fn ingest(&self, event: &Event) -> bool {
257            let mut events = self.events.lock().await;
258            events.push_back(event.clone());
259            while events.len() > self.max_len {
260                events.pop_front();
261            }
262            true
263        }
264    }
265
266    impl SpamboxStore {
267        async fn ingest(&self, event: &Event) -> bool {
268            match self {
269                SpamboxStore::Persistent(store) => store.ingest(event.clone()).await.is_ok(),
270                SpamboxStore::Memory(store) => store.ingest(event).await,
271            }
272        }
273    }
274
275    struct BluetoothEventLog {
276        path: PathBuf,
277        state: Mutex<BluetoothEventLogState>,
278    }
279
280    struct BluetoothEventLogState {
281        records: VecDeque<BluetoothReceivedEventRecord>,
282        event_ids: HashSet<String>,
283    }
284
285    impl BluetoothEventLog {
286        fn load(path: PathBuf) -> Self {
287            let records = std::fs::read_to_string(&path)
288                .ok()
289                .map(|serialized| {
290                    serialized
291                        .lines()
292                        .filter_map(|line| {
293                            serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
294                        })
295                        .collect::<Vec<_>>()
296                })
297                .unwrap_or_default();
298            let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
299            let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
300            for record in records.into_iter().skip(start) {
301                trimmed.push_back(record);
302            }
303            let event_ids = trimmed
304                .iter()
305                .map(|record| record.event_id.clone())
306                .collect::<HashSet<_>>();
307
308            Self {
309                path,
310                state: Mutex::new(BluetoothEventLogState {
311                    records: trimmed,
312                    event_ids,
313                }),
314            }
315        }
316
317        async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
318            let state = self.state.lock().await;
319            state
320                .records
321                .iter()
322                .rev()
323                .take(limit.max(1))
324                .cloned()
325                .collect()
326        }
327
328        async fn record(&self, event: &Event, peer_id: Option<String>) {
329            let record = BluetoothReceivedEventRecord {
330                event_id: event.id.to_hex(),
331                pubkey: event.pubkey.to_hex(),
332                kind: event.kind.as_u16() as u32,
333                created_at: event.created_at.as_u64(),
334                received_at: std::time::SystemTime::now()
335                    .duration_since(std::time::UNIX_EPOCH)
336                    .map(|value| value.as_secs())
337                    .unwrap_or(0),
338                peer_id,
339                cid_values: cid_values_from_event(event),
340            };
341
342            let serialized = {
343                let mut state = self.state.lock().await;
344                if state.event_ids.contains(&record.event_id) {
345                    return;
346                }
347
348                state.event_ids.insert(record.event_id.clone());
349                state.records.push_back(record);
350                while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
351                    if let Some(removed) = state.records.pop_front() {
352                        state.event_ids.remove(&removed.event_id);
353                    }
354                }
355
356                state
357                    .records
358                    .iter()
359                    .filter_map(|entry| serde_json::to_string(entry).ok())
360                    .collect::<Vec<_>>()
361                    .join("\n")
362            };
363
364            if let Some(parent) = self.path.parent() {
365                let _ = std::fs::create_dir_all(parent);
366            }
367            let _ = std::fs::write(&self.path, serialized);
368        }
369    }
370
371    fn looks_like_cid_reference(value: &str) -> bool {
372        Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
373    }
374
375    fn cid_values_from_event(event: &Event) -> Vec<String> {
376        let mut values = Vec::new();
377        let mut seen = HashSet::new();
378
379        for tag in &event.tags {
380            let fields = tag.clone().to_vec();
381            if fields.first().is_some_and(|name| name == "cid") {
382                if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
383                    if seen.insert(value.clone()) {
384                        values.push(value.clone());
385                    }
386                }
387                continue;
388            }
389
390            for value in fields.into_iter().skip(1) {
391                if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
392                    values.push(value);
393                }
394            }
395        }
396
397        values
398    }
399
400    pub struct NostrRelay {
401        config: NostrRelayConfig,
402        trusted: NostrStore,
403        public_pubkeys: HashSet<String>,
404        spambox: Option<SpamboxStore>,
405        social_graph: Option<Arc<SocialGraphAccessControl>>,
406        clients: Mutex<HashMap<u64, ClientState>>,
407        subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
408        recent_events: Mutex<RecentEvents>,
409        next_client_id: AtomicU64,
410        bluetooth_event_log: Arc<BluetoothEventLog>,
411    }
412
413    impl NostrRelay {
414        async fn collect_filter_events(
415            &self,
416            filter: &NostrFilter,
417            limit: usize,
418            seen: &mut HashSet<EventId>,
419            events: &mut Vec<Event>,
420        ) {
421            if limit == 0 {
422                return;
423            }
424
425            let mut added = 0usize;
426
427            if !prefers_trusted_only(filter) {
428                let recent = {
429                    let cache = self.recent_events.lock().await;
430                    cache.matching(filter)
431                };
432                for event in recent {
433                    if seen.insert(event.id) {
434                        events.push(event);
435                        added += 1;
436                        if added >= limit {
437                            return;
438                        }
439                    }
440                }
441            }
442
443            for event in self.trusted.query(filter.clone(), limit).await {
444                if seen.insert(event.id) {
445                    events.push(event);
446                    added += 1;
447                    if added >= limit {
448                        return;
449                    }
450                }
451            }
452        }
453
454        async fn collect_filter_count(
455            &self,
456            filter: &NostrFilter,
457            limit: usize,
458            seen: &mut HashSet<EventId>,
459        ) {
460            if limit == 0 {
461                return;
462            }
463
464            let mut added = 0usize;
465
466            if !prefers_trusted_only(filter) {
467                let recent = {
468                    let cache = self.recent_events.lock().await;
469                    cache.matching(filter)
470                };
471                for event in recent {
472                    if seen.insert(event.id) {
473                        added += 1;
474                        if added >= limit {
475                            return;
476                        }
477                    }
478                }
479            }
480
481            for event in self.trusted.query(filter.clone(), limit).await {
482                if seen.insert(event.id) {
483                    added += 1;
484                    if added >= limit {
485                        return;
486                    }
487                }
488            }
489        }
490
491        pub fn new(
492            trusted_store: Arc<dyn SocialGraphBackend>,
493            data_dir: PathBuf,
494            public_pubkeys: HashSet<String>,
495            social_graph: Option<Arc<SocialGraphAccessControl>>,
496            config: NostrRelayConfig,
497        ) -> Result<Self> {
498            let spambox = if config.spambox_db_max_bytes == 0 {
499                Some(SpamboxStore::Memory(MemorySpambox::new(
500                    config.max_query_limit * 2,
501                )))
502            } else {
503                let spam_dir = data_dir.join("socialgraph_spambox");
504                match socialgraph::open_social_graph_store_at_path(
505                    &spam_dir,
506                    Some(config.spambox_db_max_bytes),
507                ) {
508                    Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
509                    Err(err) => {
510                        warn!(
511                            "Failed to open social graph spambox (falling back to memory): {}",
512                            err
513                        );
514                        Some(SpamboxStore::Memory(MemorySpambox::new(
515                            config.max_query_limit * 2,
516                        )))
517                    }
518                }
519            };
520
521            let recent_size = config.max_query_limit.saturating_mul(2);
522            let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
523                data_dir.join("bluetooth-events.jsonl"),
524            ));
525
526            Ok(Self {
527                config,
528                trusted: NostrStore::new(trusted_store),
529                public_pubkeys,
530                spambox,
531                social_graph,
532                clients: Mutex::new(HashMap::new()),
533                subscriptions: Mutex::new(HashMap::new()),
534                recent_events: Mutex::new(RecentEvents::new(recent_size)),
535                next_client_id: AtomicU64::new(1),
536                bluetooth_event_log,
537            })
538        }
539
540        pub fn next_client_id(&self) -> u64 {
541            self.next_client_id.fetch_add(1, Ordering::SeqCst)
542        }
543
544        pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
545            self.ingest_trusted_event_inner(event, true).await
546        }
547
548        pub async fn ingest_trusted_event_from_bluetooth(
549            &self,
550            event: Event,
551            peer_id: Option<String>,
552        ) -> Result<()> {
553            self.ingest_trusted_event_inner(event.clone(), true).await?;
554            self.bluetooth_event_log.record(&event, peer_id).await;
555            Ok(())
556        }
557
558        pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
559            self.ingest_trusted_event_inner(event, false).await
560        }
561
562        pub async fn bluetooth_received_events(
563            &self,
564            limit: usize,
565        ) -> Vec<BluetoothReceivedEventRecord> {
566            self.bluetooth_event_log.recent(limit).await
567        }
568
569        async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
570            event
571                .verify()
572                .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
573
574            let is_ephemeral = event.kind.is_ephemeral();
575            {
576                let mut recent = self.recent_events.lock().await;
577                recent.insert(event.clone());
578            }
579
580            if !is_ephemeral {
581                let storage_class = self.event_storage_class(&event);
582                self.trusted
583                    .ingest_with_storage_class(event.clone(), storage_class)
584                    .await?;
585            }
586
587            if broadcast {
588                self.broadcast_event(&event).await;
589            }
590            Ok(())
591        }
592
593        pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
594            let limit = limit.min(self.config.max_query_limit);
595            if limit == 0 {
596                return Vec::new();
597            }
598
599            let mut seen: HashSet<EventId> = HashSet::new();
600            let mut events = Vec::new();
601
602            if !prefers_trusted_only(filter) {
603                let recent = {
604                    let cache = self.recent_events.lock().await;
605                    cache.matching(filter)
606                };
607                for event in recent {
608                    if seen.insert(event.id) {
609                        events.push(event);
610                        if events.len() >= limit {
611                            return events;
612                        }
613                    }
614                }
615            }
616
617            for event in self.trusted.query(filter.clone(), limit).await {
618                if seen.insert(event.id) {
619                    events.push(event);
620                    if events.len() >= limit {
621                        break;
622                    }
623                }
624            }
625
626            events
627        }
628
629        pub async fn register_client(
630            &self,
631            client_id: u64,
632            sender: mpsc::UnboundedSender<String>,
633            pubkey: Option<String>,
634        ) {
635            let mut clients = self.clients.lock().await;
636            clients.insert(
637                client_id,
638                ClientState {
639                    sender,
640                    pubkey,
641                    quota: ClientQuota::new(),
642                },
643            );
644        }
645
646        pub async fn unregister_client(&self, client_id: u64) {
647            let mut clients = self.clients.lock().await;
648            clients.remove(&client_id);
649            drop(clients);
650            let mut subs = self.subscriptions.lock().await;
651            subs.remove(&client_id);
652        }
653
654        pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
655            match msg {
656                NostrClientMessage::Event(event) => {
657                    self.handle_event(client_id, *event).await;
658                }
659                NostrClientMessage::Req {
660                    subscription_id,
661                    filters,
662                } => {
663                    self.handle_req(client_id, subscription_id, filters).await;
664                }
665                NostrClientMessage::Count {
666                    subscription_id,
667                    filters,
668                } => {
669                    self.handle_count(client_id, subscription_id, filters).await;
670                }
671                NostrClientMessage::Close(subscription_id) => {
672                    self.handle_close(client_id, subscription_id).await;
673                }
674                NostrClientMessage::Auth(event) => {
675                    self.handle_auth(client_id, *event).await;
676                }
677                NostrClientMessage::NegOpen { .. }
678                | NostrClientMessage::NegMsg { .. }
679                | NostrClientMessage::NegClose { .. } => {
680                    self.send_to_client(
681                        client_id,
682                        NostrRelayMessage::notice("negentropy not supported"),
683                    )
684                    .await;
685                }
686            }
687        }
688
689        pub async fn register_subscription_query(
690            &self,
691            client_id: u64,
692            subscription_id: SubscriptionId,
693            mut filters: Vec<NostrFilter>,
694        ) -> std::result::Result<Vec<Event>, &'static str> {
695            if !self.allow_req(client_id).await {
696                return Err("rate limited");
697            }
698
699            if filters.len() > self.config.max_filters_per_sub {
700                filters.truncate(self.config.max_filters_per_sub);
701            }
702
703            {
704                let mut subs = self.subscriptions.lock().await;
705                let entry = subs.entry(client_id).or_default();
706                if !entry.contains_key(&subscription_id)
707                    && entry.len() >= self.config.max_subs_per_client
708                {
709                    return Err("too many subscriptions");
710                }
711                entry.insert(subscription_id, filters.clone());
712            }
713
714            let mut seen: HashSet<EventId> = HashSet::new();
715            let mut events = Vec::new();
716            for filter in &filters {
717                let limit = filter
718                    .limit
719                    .unwrap_or(self.config.max_query_limit)
720                    .min(self.config.max_query_limit);
721                self.collect_filter_events(filter, limit, &mut seen, &mut events)
722                    .await;
723            }
724
725            Ok(events)
726        }
727
728        async fn handle_auth(&self, client_id: u64, event: Event) {
729            let ok = event.verify().is_ok();
730            let message = if ok { "" } else { "invalid auth" };
731            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
732                .await;
733        }
734
735        async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
736            let mut subs = self.subscriptions.lock().await;
737            if let Some(map) = subs.get_mut(&client_id) {
738                map.remove(&subscription_id);
739            }
740        }
741
742        async fn handle_event(&self, client_id: u64, event: Event) {
743            let ok = event.verify().is_ok();
744            if !ok {
745                self.send_to_client(
746                    client_id,
747                    NostrRelayMessage::ok(event.id, false, "invalid: signature"),
748                )
749                .await;
750                return;
751            }
752
753            let trusted = self.is_trusted_event(client_id, &event).await;
754            if !trusted && !self.allow_spambox_event(client_id).await {
755                self.send_to_client(
756                    client_id,
757                    NostrRelayMessage::ok(event.id, false, "rate limited"),
758                )
759                .await;
760                return;
761            }
762
763            let is_ephemeral = event.kind.is_ephemeral();
764            if trusted {
765                let mut recent = self.recent_events.lock().await;
766                recent.insert(event.clone());
767            }
768            if !is_ephemeral {
769                let stored = if trusted {
770                    let storage_class = self.event_storage_class(&event);
771                    self.trusted
772                        .ingest_with_storage_class(event.clone(), storage_class)
773                        .await
774                        .is_ok()
775                } else {
776                    match self.spambox.as_ref() {
777                        Some(spambox) => spambox.ingest(&event).await,
778                        None => false,
779                    }
780                };
781
782                if !stored {
783                    let message = if trusted {
784                        "store failed"
785                    } else {
786                        "spambox full"
787                    };
788                    self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
789                        .await;
790                    return;
791                }
792            }
793
794            let message = if trusted { "" } else { "spambox" };
795            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
796                .await;
797
798            if trusted {
799                self.broadcast_event(&event).await;
800            }
801        }
802
803        async fn handle_req(
804            &self,
805            client_id: u64,
806            subscription_id: SubscriptionId,
807            filters: Vec<NostrFilter>,
808        ) {
809            match self
810                .register_subscription_query(client_id, subscription_id.clone(), filters)
811                .await
812            {
813                Ok(events) => {
814                    for event in events {
815                        self.send_to_client(
816                            client_id,
817                            NostrRelayMessage::event(subscription_id.clone(), event),
818                        )
819                        .await;
820                    }
821
822                    self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
823                        .await;
824                }
825                Err(message) => {
826                    self.send_to_client(
827                        client_id,
828                        NostrRelayMessage::closed(subscription_id, message),
829                    )
830                    .await;
831                }
832            }
833        }
834
835        async fn handle_count(
836            &self,
837            client_id: u64,
838            subscription_id: SubscriptionId,
839            filters: Vec<NostrFilter>,
840        ) {
841            if !self.allow_req(client_id).await {
842                self.send_to_client(
843                    client_id,
844                    NostrRelayMessage::closed(subscription_id, "rate limited"),
845                )
846                .await;
847                return;
848            }
849
850            let mut seen: HashSet<EventId> = HashSet::new();
851            for filter in &filters {
852                let limit = filter
853                    .limit
854                    .unwrap_or(self.config.max_query_limit)
855                    .min(self.config.max_query_limit);
856                self.collect_filter_count(filter, limit, &mut seen).await;
857            }
858
859            self.send_to_client(
860                client_id,
861                NostrRelayMessage::count(subscription_id, seen.len()),
862            )
863            .await;
864        }
865
866        async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
867            let event_pubkey = event.pubkey.to_hex();
868            let client_pubkey = {
869                let clients = self.clients.lock().await;
870                clients
871                    .get(&client_id)
872                    .and_then(|state| state.pubkey.clone())
873            };
874            if let Some(pubkey) = client_pubkey {
875                return pubkey == event_pubkey
876                    || self.social_graph.as_ref().is_some_and(|social_graph| {
877                        social_graph.check_write_access(&event_pubkey)
878                    });
879            }
880            if let Some(ref social_graph) = self.social_graph {
881                return social_graph.check_write_access(&event_pubkey);
882            }
883            true
884        }
885
886        fn event_storage_class(&self, event: &Event) -> EventStorageClass {
887            if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
888                EventStorageClass::Public
889            } else {
890                EventStorageClass::Ambient
891            }
892        }
893
894        async fn allow_spambox_event(&self, client_id: u64) -> bool {
895            let mut clients = self.clients.lock().await;
896            let Some(state) = clients.get_mut(&client_id) else {
897                return false;
898            };
899            state
900                .quota
901                .allow_spambox_event(self.config.spambox_max_events_per_min)
902        }
903
904        async fn allow_req(&self, client_id: u64) -> bool {
905            let mut clients = self.clients.lock().await;
906            let Some(state) = clients.get_mut(&client_id) else {
907                return false;
908            };
909            state.quota.allow_req(self.config.spambox_max_reqs_per_min)
910        }
911
912        async fn broadcast_event(&self, event: &Event) {
913            let subscriptions = self.subscriptions.lock().await;
914            let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
915            for (client_id, subs) in subscriptions.iter() {
916                for (sub_id, filters) in subs.iter() {
917                    if filters.iter().any(|f| f.match_event(event)) {
918                        deliveries.push((*client_id, sub_id.clone()));
919                    }
920                }
921            }
922            drop(subscriptions);
923
924            for (client_id, sub_id) in deliveries {
925                self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
926                    .await;
927            }
928        }
929
930        async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
931            let sender = {
932                let clients = self.clients.lock().await;
933                clients.get(&client_id).map(|state| state.sender.clone())
934            };
935            if let Some(tx) = sender {
936                let _ = tx.send(msg.as_json());
937            }
938        }
939    }
940}
941
942pub use imp::NostrRelay;
943
944#[cfg(feature = "p2p")]
945#[async_trait::async_trait]
946impl MeshEventStore for NostrRelay {
947    async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
948        NostrRelay::ingest_trusted_event(self, event).await
949    }
950
951    async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
952        NostrRelay::query_events(self, filter, limit).await
953    }
954}
955
956#[cfg(feature = "p2p")]
957#[async_trait::async_trait]
958impl MeshRelayClient for NostrRelay {
959    fn next_client_id(&self) -> u64 {
960        NostrRelay::next_client_id(self)
961    }
962
963    async fn register_client(
964        &self,
965        client_id: u64,
966        sender: mpsc::UnboundedSender<String>,
967        pubkey: Option<String>,
968    ) {
969        NostrRelay::register_client(self, client_id, sender, pubkey).await
970    }
971
972    async fn unregister_client(&self, client_id: u64) {
973        NostrRelay::unregister_client(self, client_id).await
974    }
975
976    async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
977        NostrRelay::handle_client_message(self, client_id, msg).await
978    }
979
980    async fn register_subscription_query(
981        &self,
982        client_id: u64,
983        subscription_id: SubscriptionId,
984        filters: Vec<NostrFilter>,
985    ) -> std::result::Result<Vec<Event>, &'static str> {
986        NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
987    }
988
989    async fn ingest_trusted_event_from_peer(
990        &self,
991        event: Event,
992        peer_id: Option<String>,
993    ) -> anyhow::Result<()> {
994        NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
995    }
996}
997
998#[cfg(test)]
999#[path = "nostr_relay/tests.rs"]
1000mod tests;