Skip to main content

hashtree_cli/
nostr_relay.rs

1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicU64, Ordering},
6    Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex, Semaphore};
11
12#[cfg(feature = "p2p")]
13use hashtree_network::{MeshEventStore, MeshRelayClient};
14use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
15use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
16
17use crate::socialgraph;
18
19const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
20const MAX_CONCURRENT_NOSTR_STORE_BLOCKING_TASKS: usize = 4;
21
22#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
23pub struct BluetoothReceivedEventRecord {
24    pub event_id: String,
25    pub pubkey: String,
26    pub kind: u32,
27    pub created_at: u64,
28    pub received_at: u64,
29    pub peer_id: Option<String>,
30    pub cid_values: Vec<String>,
31}
32
33#[derive(Debug, Clone)]
34pub struct NostrRelayConfig {
35    pub spambox_db_max_bytes: u64,
36    pub max_query_limit: usize,
37    pub max_subs_per_client: usize,
38    pub max_filters_per_sub: usize,
39    pub spambox_max_events_per_min: u32,
40    pub spambox_max_reqs_per_min: u32,
41}
42
43impl Default for NostrRelayConfig {
44    fn default() -> Self {
45        Self {
46            spambox_db_max_bytes: 1024 * 1024 * 1024,
47            max_query_limit: 200,
48            max_subs_per_client: 64,
49            max_filters_per_sub: 32,
50            spambox_max_events_per_min: 120,
51            spambox_max_reqs_per_min: 120,
52        }
53    }
54}
55
56mod imp {
57    use super::*;
58    use anyhow::Result;
59
60    use crate::diagnostics::{
61        nostr_filter_summary, nostr_filters_summary, process_memory_snapshot,
62        trim_process_allocations,
63    };
64    use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
65    use hashtree_core::{nhash_decode, Cid};
66    use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
67    use tracing::{info, warn};
68
69    fn prefers_trusted_only(filter: &NostrFilter) -> bool {
70        let Some(kinds) = filter.kinds.as_ref() else {
71            return false;
72        };
73        if kinds.len() != 1 {
74            return false;
75        }
76
77        let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
78        let has_authors = filter
79            .authors
80            .as_ref()
81            .is_some_and(|authors| !authors.is_empty());
82        if !has_authors {
83            return false;
84        }
85
86        if is_replaceable_kind(kind) {
87            return true;
88        }
89
90        if is_parameterized_replaceable_kind(kind) {
91            let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
92            return filter
93                .generic_tags
94                .get(&d_tag)
95                .is_some_and(|values| !values.is_empty());
96        }
97
98        false
99    }
100
101    struct NostrStore {
102        store: Arc<dyn SocialGraphBackend>,
103        blocking_permits: Arc<Semaphore>,
104    }
105
106    impl NostrStore {
107        fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
108            Self {
109                store,
110                blocking_permits: Arc::new(Semaphore::new(
111                    MAX_CONCURRENT_NOSTR_STORE_BLOCKING_TASKS,
112                )),
113            }
114        }
115
116        async fn ingest(&self, event: Event) -> Result<()> {
117            let store = Arc::clone(&self.store);
118            let _permit = self
119                .blocking_permits
120                .clone()
121                .acquire_owned()
122                .await
123                .map_err(|err| anyhow::anyhow!("trusted nostr store closed: {err}"))?;
124            tokio::task::spawn_blocking(move || {
125                crate::socialgraph::ingest_parsed_event(store.as_ref(), &event)
126            })
127            .await
128            .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
129        }
130
131        async fn ingest_with_storage_class(
132            &self,
133            event: Event,
134            storage_class: EventStorageClass,
135        ) -> Result<()> {
136            let store = Arc::clone(&self.store);
137            let _permit = self
138                .blocking_permits
139                .clone()
140                .acquire_owned()
141                .await
142                .map_err(|err| anyhow::anyhow!("trusted nostr store closed: {err}"))?;
143            tokio::task::spawn_blocking(move || {
144                crate::socialgraph::ingest_parsed_event_with_storage_class(
145                    store.as_ref(),
146                    &event,
147                    storage_class,
148                )
149            })
150            .await
151            .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
152        }
153
154        async fn query(&self, filter: NostrFilter, limit: usize) -> Vec<Event> {
155            let store = Arc::clone(&self.store);
156            let filter_summary = nostr_filter_summary(&filter);
157            let memory_before = process_memory_snapshot();
158            let started = Instant::now();
159            let Ok(_permit) = self.blocking_permits.clone().acquire_owned().await else {
160                warn!("trusted nostr store query skipped: blocking semaphore closed");
161                return Vec::new();
162            };
163            let result = tokio::task::spawn_blocking(move || {
164                crate::socialgraph::query_events(store.as_ref(), &filter, limit)
165            })
166            .await;
167            match result {
168                Ok(events) => {
169                    info!(
170                        target: "hashtree_cli::nostr_relay::query",
171                        limit,
172                        events = events.len(),
173                        elapsed_ms = started.elapsed().as_millis() as u64,
174                        filter = %filter_summary,
175                        memory_before = ?memory_before,
176                        memory_after = ?process_memory_snapshot(),
177                        "trusted nostr store query completed",
178                    );
179                    events
180                }
181                Err(err) => {
182                    warn!("trusted nostr store query task failed: {}", err);
183                    Vec::new()
184                }
185            }
186        }
187    }
188
189    #[derive(Debug, Clone)]
190    struct ClientQuota {
191        last_reset: Instant,
192        spambox_events: u32,
193        reqs: u32,
194    }
195
196    impl ClientQuota {
197        fn new() -> Self {
198            Self {
199                last_reset: Instant::now(),
200                spambox_events: 0,
201                reqs: 0,
202            }
203        }
204
205        fn reset_if_needed(&mut self) {
206            if self.last_reset.elapsed() >= Duration::from_secs(60) {
207                self.last_reset = Instant::now();
208                self.spambox_events = 0;
209                self.reqs = 0;
210            }
211        }
212
213        fn allow_spambox_event(&mut self, limit: u32) -> bool {
214            self.reset_if_needed();
215            if self.spambox_events >= limit {
216                return false;
217            }
218            self.spambox_events += 1;
219            true
220        }
221
222        fn allow_req(&mut self, limit: u32) -> bool {
223            self.reset_if_needed();
224            if self.reqs >= limit {
225                return false;
226            }
227            self.reqs += 1;
228            true
229        }
230    }
231
232    struct ClientState {
233        sender: mpsc::UnboundedSender<String>,
234        pubkey: Option<String>,
235        quota: ClientQuota,
236    }
237
238    struct RecentEvents {
239        order: VecDeque<EventId>,
240        events: HashMap<EventId, Event>,
241        max_len: usize,
242    }
243
244    impl RecentEvents {
245        fn new(max_len: usize) -> Self {
246            Self {
247                order: VecDeque::new(),
248                events: HashMap::new(),
249                max_len: max_len.max(128),
250            }
251        }
252
253        fn insert(&mut self, event: Event) {
254            if self.events.contains_key(&event.id) {
255                return;
256            }
257            self.order.push_back(event.id);
258            self.events.insert(event.id, event);
259            while self.order.len() > self.max_len {
260                if let Some(oldest) = self.order.pop_front() {
261                    self.events.remove(&oldest);
262                }
263            }
264        }
265
266        fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
267            self.events
268                .values()
269                .filter(|event| filter.match_event(event))
270                .cloned()
271                .collect()
272        }
273    }
274
275    enum SpamboxStore {
276        Persistent(NostrStore),
277        Memory(MemorySpambox),
278    }
279
280    struct MemorySpambox {
281        events: Mutex<VecDeque<Event>>,
282        max_len: usize,
283    }
284
285    impl MemorySpambox {
286        fn new(max_len: usize) -> Self {
287            Self {
288                events: Mutex::new(VecDeque::new()),
289                max_len: max_len.max(128),
290            }
291        }
292
293        async fn ingest(&self, event: &Event) -> bool {
294            let mut events = self.events.lock().await;
295            events.push_back(event.clone());
296            while events.len() > self.max_len {
297                events.pop_front();
298            }
299            true
300        }
301    }
302
303    impl SpamboxStore {
304        async fn ingest(&self, event: &Event) -> bool {
305            match self {
306                SpamboxStore::Persistent(store) => store.ingest(event.clone()).await.is_ok(),
307                SpamboxStore::Memory(store) => store.ingest(event).await,
308            }
309        }
310    }
311
312    struct BluetoothEventLog {
313        path: PathBuf,
314        state: Mutex<BluetoothEventLogState>,
315    }
316
317    struct BluetoothEventLogState {
318        records: VecDeque<BluetoothReceivedEventRecord>,
319        event_ids: HashSet<String>,
320    }
321
322    impl BluetoothEventLog {
323        fn load(path: PathBuf) -> Self {
324            let records = std::fs::read_to_string(&path)
325                .ok()
326                .map(|serialized| {
327                    serialized
328                        .lines()
329                        .filter_map(|line| {
330                            serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
331                        })
332                        .collect::<Vec<_>>()
333                })
334                .unwrap_or_default();
335            let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
336            let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
337            for record in records.into_iter().skip(start) {
338                trimmed.push_back(record);
339            }
340            let event_ids = trimmed
341                .iter()
342                .map(|record| record.event_id.clone())
343                .collect::<HashSet<_>>();
344
345            Self {
346                path,
347                state: Mutex::new(BluetoothEventLogState {
348                    records: trimmed,
349                    event_ids,
350                }),
351            }
352        }
353
354        async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
355            let state = self.state.lock().await;
356            state
357                .records
358                .iter()
359                .rev()
360                .take(limit.max(1))
361                .cloned()
362                .collect()
363        }
364
365        async fn record(&self, event: &Event, peer_id: Option<String>) {
366            let record = BluetoothReceivedEventRecord {
367                event_id: event.id.to_hex(),
368                pubkey: event.pubkey.to_hex(),
369                kind: event.kind.as_u16() as u32,
370                created_at: event.created_at.as_u64(),
371                received_at: std::time::SystemTime::now()
372                    .duration_since(std::time::UNIX_EPOCH)
373                    .map(|value| value.as_secs())
374                    .unwrap_or(0),
375                peer_id,
376                cid_values: cid_values_from_event(event),
377            };
378
379            let serialized = {
380                let mut state = self.state.lock().await;
381                if state.event_ids.contains(&record.event_id) {
382                    return;
383                }
384
385                state.event_ids.insert(record.event_id.clone());
386                state.records.push_back(record);
387                while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
388                    if let Some(removed) = state.records.pop_front() {
389                        state.event_ids.remove(&removed.event_id);
390                    }
391                }
392
393                state
394                    .records
395                    .iter()
396                    .filter_map(|entry| serde_json::to_string(entry).ok())
397                    .collect::<Vec<_>>()
398                    .join("\n")
399            };
400
401            if let Some(parent) = self.path.parent() {
402                let _ = std::fs::create_dir_all(parent);
403            }
404            let _ = std::fs::write(&self.path, serialized);
405        }
406    }
407
408    fn looks_like_cid_reference(value: &str) -> bool {
409        Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
410    }
411
412    fn cid_values_from_event(event: &Event) -> Vec<String> {
413        let mut values = Vec::new();
414        let mut seen = HashSet::new();
415
416        for tag in &event.tags {
417            let fields = tag.clone().to_vec();
418            if fields.first().is_some_and(|name| name == "cid") {
419                if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
420                    if seen.insert(value.clone()) {
421                        values.push(value.clone());
422                    }
423                }
424                continue;
425            }
426
427            for value in fields.into_iter().skip(1) {
428                if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
429                    values.push(value);
430                }
431            }
432        }
433
434        values
435    }
436
437    pub struct NostrRelay {
438        config: NostrRelayConfig,
439        trusted: NostrStore,
440        public_pubkeys: HashSet<String>,
441        spambox: Option<SpamboxStore>,
442        social_graph: Option<Arc<SocialGraphAccessControl>>,
443        clients: Mutex<HashMap<u64, ClientState>>,
444        subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
445        recent_events: Mutex<RecentEvents>,
446        next_client_id: AtomicU64,
447        bluetooth_event_log: Arc<BluetoothEventLog>,
448    }
449
450    impl NostrRelay {
451        async fn collect_filter_events(
452            &self,
453            filter: &NostrFilter,
454            limit: usize,
455            seen: &mut HashSet<EventId>,
456            events: &mut Vec<Event>,
457        ) {
458            if limit == 0 {
459                return;
460            }
461
462            let mut added = 0usize;
463
464            if !prefers_trusted_only(filter) {
465                let recent = {
466                    let cache = self.recent_events.lock().await;
467                    cache.matching(filter)
468                };
469                for event in recent {
470                    if seen.insert(event.id) {
471                        events.push(event);
472                        added += 1;
473                        if added >= limit {
474                            return;
475                        }
476                    }
477                }
478            }
479
480            for event in self.trusted.query(filter.clone(), limit).await {
481                if seen.insert(event.id) {
482                    events.push(event);
483                    added += 1;
484                    if added >= limit {
485                        return;
486                    }
487                }
488            }
489        }
490
491        async fn collect_filter_count(
492            &self,
493            filter: &NostrFilter,
494            limit: usize,
495            seen: &mut HashSet<EventId>,
496        ) {
497            if limit == 0 {
498                return;
499            }
500
501            let mut added = 0usize;
502
503            if !prefers_trusted_only(filter) {
504                let recent = {
505                    let cache = self.recent_events.lock().await;
506                    cache.matching(filter)
507                };
508                for event in recent {
509                    if seen.insert(event.id) {
510                        added += 1;
511                        if added >= limit {
512                            return;
513                        }
514                    }
515                }
516            }
517
518            for event in self.trusted.query(filter.clone(), limit).await {
519                if seen.insert(event.id) {
520                    added += 1;
521                    if added >= limit {
522                        return;
523                    }
524                }
525            }
526        }
527
528        pub fn new(
529            trusted_store: Arc<dyn SocialGraphBackend>,
530            data_dir: PathBuf,
531            public_pubkeys: HashSet<String>,
532            social_graph: Option<Arc<SocialGraphAccessControl>>,
533            config: NostrRelayConfig,
534        ) -> Result<Self> {
535            let spambox = if config.spambox_db_max_bytes == 0 {
536                Some(SpamboxStore::Memory(MemorySpambox::new(
537                    config.max_query_limit * 2,
538                )))
539            } else {
540                let spam_dir = data_dir.join("socialgraph_spambox");
541                match socialgraph::open_social_graph_store_at_path(
542                    &spam_dir,
543                    Some(config.spambox_db_max_bytes),
544                ) {
545                    Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
546                    Err(err) => {
547                        warn!(
548                            "Failed to open social graph spambox (falling back to memory): {}",
549                            err
550                        );
551                        Some(SpamboxStore::Memory(MemorySpambox::new(
552                            config.max_query_limit * 2,
553                        )))
554                    }
555                }
556            };
557
558            let recent_size = config.max_query_limit.saturating_mul(2);
559            let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
560                data_dir.join("bluetooth-events.jsonl"),
561            ));
562
563            Ok(Self {
564                config,
565                trusted: NostrStore::new(trusted_store),
566                public_pubkeys,
567                spambox,
568                social_graph,
569                clients: Mutex::new(HashMap::new()),
570                subscriptions: Mutex::new(HashMap::new()),
571                recent_events: Mutex::new(RecentEvents::new(recent_size)),
572                next_client_id: AtomicU64::new(1),
573                bluetooth_event_log,
574            })
575        }
576
577        pub fn next_client_id(&self) -> u64 {
578            self.next_client_id.fetch_add(1, Ordering::SeqCst)
579        }
580
581        pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
582            self.ingest_trusted_event_inner(event, true).await
583        }
584
585        pub async fn ingest_trusted_event_from_bluetooth(
586            &self,
587            event: Event,
588            peer_id: Option<String>,
589        ) -> Result<()> {
590            self.ingest_trusted_event_inner(event.clone(), true).await?;
591            self.bluetooth_event_log.record(&event, peer_id).await;
592            Ok(())
593        }
594
595        pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
596            self.ingest_trusted_event_inner(event, false).await
597        }
598
599        pub async fn bluetooth_received_events(
600            &self,
601            limit: usize,
602        ) -> Vec<BluetoothReceivedEventRecord> {
603            self.bluetooth_event_log.recent(limit).await
604        }
605
606        async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
607            event
608                .verify()
609                .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
610
611            let is_ephemeral = event.kind.is_ephemeral();
612            {
613                let mut recent = self.recent_events.lock().await;
614                recent.insert(event.clone());
615            }
616
617            if !is_ephemeral {
618                let storage_class = self.event_storage_class(&event);
619                self.trusted
620                    .ingest_with_storage_class(event.clone(), storage_class)
621                    .await?;
622            }
623
624            if broadcast {
625                self.broadcast_event(&event).await;
626            }
627            Ok(())
628        }
629
630        pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
631            let limit = limit.min(self.config.max_query_limit);
632            if limit == 0 {
633                return Vec::new();
634            }
635
636            let mut seen: HashSet<EventId> = HashSet::new();
637            let mut events = Vec::new();
638
639            if !prefers_trusted_only(filter) {
640                let recent = {
641                    let cache = self.recent_events.lock().await;
642                    cache.matching(filter)
643                };
644                for event in recent {
645                    if seen.insert(event.id) {
646                        events.push(event);
647                        if events.len() >= limit {
648                            return events;
649                        }
650                    }
651                }
652            }
653
654            for event in self.trusted.query(filter.clone(), limit).await {
655                if seen.insert(event.id) {
656                    events.push(event);
657                    if events.len() >= limit {
658                        break;
659                    }
660                }
661            }
662
663            events
664        }
665
666        pub async fn register_client(
667            &self,
668            client_id: u64,
669            sender: mpsc::UnboundedSender<String>,
670            pubkey: Option<String>,
671        ) {
672            let mut clients = self.clients.lock().await;
673            clients.insert(
674                client_id,
675                ClientState {
676                    sender,
677                    pubkey,
678                    quota: ClientQuota::new(),
679                },
680            );
681        }
682
683        pub async fn unregister_client(&self, client_id: u64) {
684            let mut clients = self.clients.lock().await;
685            clients.remove(&client_id);
686            drop(clients);
687            let mut subs = self.subscriptions.lock().await;
688            subs.remove(&client_id);
689        }
690
691        pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
692            match msg {
693                NostrClientMessage::Event(event) => {
694                    self.handle_event(client_id, *event).await;
695                }
696                NostrClientMessage::Req {
697                    subscription_id,
698                    filters,
699                } => {
700                    self.handle_req(client_id, subscription_id, filters).await;
701                }
702                NostrClientMessage::Count {
703                    subscription_id,
704                    filters,
705                } => {
706                    self.handle_count(client_id, subscription_id, filters).await;
707                }
708                NostrClientMessage::Close(subscription_id) => {
709                    self.handle_close(client_id, subscription_id).await;
710                }
711                NostrClientMessage::Auth(event) => {
712                    self.handle_auth(client_id, *event).await;
713                }
714                NostrClientMessage::NegOpen { .. }
715                | NostrClientMessage::NegMsg { .. }
716                | NostrClientMessage::NegClose { .. } => {
717                    self.send_to_client(
718                        client_id,
719                        NostrRelayMessage::notice("negentropy not supported"),
720                    )
721                    .await;
722                }
723            }
724        }
725
726        pub async fn register_subscription_query(
727            &self,
728            client_id: u64,
729            subscription_id: SubscriptionId,
730            mut filters: Vec<NostrFilter>,
731        ) -> std::result::Result<Vec<Event>, &'static str> {
732            if !self.allow_req(client_id).await {
733                return Err("rate limited");
734            }
735
736            if filters.len() > self.config.max_filters_per_sub {
737                filters.truncate(self.config.max_filters_per_sub);
738            }
739
740            {
741                let mut subs = self.subscriptions.lock().await;
742                let entry = subs.entry(client_id).or_default();
743                if !entry.contains_key(&subscription_id)
744                    && entry.len() >= self.config.max_subs_per_client
745                {
746                    return Err("too many subscriptions");
747                }
748                entry.insert(subscription_id.clone(), filters.clone());
749            }
750
751            let mut seen: HashSet<EventId> = HashSet::new();
752            let mut events = Vec::new();
753            let memory_before = process_memory_snapshot();
754            let started = Instant::now();
755            let filter_summary = nostr_filters_summary(&filters);
756            for filter in &filters {
757                let remaining = self.config.max_query_limit.saturating_sub(events.len());
758                if remaining == 0 {
759                    break;
760                }
761                let limit = filter
762                    .limit
763                    .unwrap_or(self.config.max_query_limit)
764                    .min(self.config.max_query_limit)
765                    .min(remaining);
766                self.collect_filter_events(filter, limit, &mut seen, &mut events)
767                    .await;
768            }
769
770            info!(
771                target: "hashtree_cli::nostr_relay::query",
772                client_id,
773                subscription_id = %subscription_id,
774                filters = filters.len(),
775                events = events.len(),
776                elapsed_ms = started.elapsed().as_millis() as u64,
777                filter = %filter_summary,
778                memory_before = ?memory_before,
779                memory_after = ?process_memory_snapshot(),
780                "nostr relay local subscription query completed",
781            );
782            Ok(events)
783        }
784
785        async fn handle_auth(&self, client_id: u64, event: Event) {
786            let ok = event.verify().is_ok();
787            let message = if ok { "" } else { "invalid auth" };
788            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
789                .await;
790        }
791
792        async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
793            let mut subs = self.subscriptions.lock().await;
794            if let Some(map) = subs.get_mut(&client_id) {
795                map.remove(&subscription_id);
796            }
797        }
798
799        async fn handle_event(&self, client_id: u64, event: Event) {
800            let ok = event.verify().is_ok();
801            if !ok {
802                self.send_to_client(
803                    client_id,
804                    NostrRelayMessage::ok(event.id, false, "invalid: signature"),
805                )
806                .await;
807                return;
808            }
809
810            let trusted = self.is_trusted_event(client_id, &event).await;
811            if !trusted && !self.allow_spambox_event(client_id).await {
812                self.send_to_client(
813                    client_id,
814                    NostrRelayMessage::ok(event.id, false, "rate limited"),
815                )
816                .await;
817                return;
818            }
819
820            let is_ephemeral = event.kind.is_ephemeral();
821            if trusted {
822                let mut recent = self.recent_events.lock().await;
823                recent.insert(event.clone());
824            }
825            if !is_ephemeral {
826                let stored = if trusted {
827                    let storage_class = self.event_storage_class(&event);
828                    self.trusted
829                        .ingest_with_storage_class(event.clone(), storage_class)
830                        .await
831                        .is_ok()
832                } else {
833                    match self.spambox.as_ref() {
834                        Some(spambox) => spambox.ingest(&event).await,
835                        None => false,
836                    }
837                };
838
839                if !stored {
840                    let message = if trusted {
841                        "store failed"
842                    } else {
843                        "spambox full"
844                    };
845                    self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
846                        .await;
847                    return;
848                }
849            }
850
851            let message = if trusted { "" } else { "spambox" };
852            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
853                .await;
854
855            if trusted {
856                self.broadcast_event(&event).await;
857            }
858        }
859
860        async fn handle_req(
861            &self,
862            client_id: u64,
863            subscription_id: SubscriptionId,
864            filters: Vec<NostrFilter>,
865        ) {
866            match self
867                .register_subscription_query(client_id, subscription_id.clone(), filters)
868                .await
869            {
870                Ok(events) => {
871                    for event in events {
872                        self.send_to_client(
873                            client_id,
874                            NostrRelayMessage::event(subscription_id.clone(), event),
875                        )
876                        .await;
877                    }
878                    trim_process_allocations();
879
880                    self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
881                        .await;
882                }
883                Err(message) => {
884                    self.send_to_client(
885                        client_id,
886                        NostrRelayMessage::closed(subscription_id, message),
887                    )
888                    .await;
889                }
890            }
891        }
892
893        async fn handle_count(
894            &self,
895            client_id: u64,
896            subscription_id: SubscriptionId,
897            filters: Vec<NostrFilter>,
898        ) {
899            if !self.allow_req(client_id).await {
900                self.send_to_client(
901                    client_id,
902                    NostrRelayMessage::closed(subscription_id, "rate limited"),
903                )
904                .await;
905                return;
906            }
907
908            let mut seen: HashSet<EventId> = HashSet::new();
909            for filter in &filters {
910                let limit = filter
911                    .limit
912                    .unwrap_or(self.config.max_query_limit)
913                    .min(self.config.max_query_limit);
914                self.collect_filter_count(filter, limit, &mut seen).await;
915            }
916
917            self.send_to_client(
918                client_id,
919                NostrRelayMessage::count(subscription_id, seen.len()),
920            )
921            .await;
922        }
923
924        async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
925            let event_pubkey = event.pubkey.to_hex();
926            let client_pubkey = {
927                let clients = self.clients.lock().await;
928                clients
929                    .get(&client_id)
930                    .and_then(|state| state.pubkey.clone())
931            };
932            if let Some(pubkey) = client_pubkey {
933                return pubkey == event_pubkey
934                    || self.social_graph.as_ref().is_some_and(|social_graph| {
935                        social_graph.check_write_access(&event_pubkey)
936                    });
937            }
938            if let Some(ref social_graph) = self.social_graph {
939                return social_graph.check_write_access(&event_pubkey);
940            }
941            true
942        }
943
944        fn event_storage_class(&self, event: &Event) -> EventStorageClass {
945            if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
946                EventStorageClass::Public
947            } else {
948                EventStorageClass::Ambient
949            }
950        }
951
952        async fn allow_spambox_event(&self, client_id: u64) -> bool {
953            let mut clients = self.clients.lock().await;
954            let Some(state) = clients.get_mut(&client_id) else {
955                return false;
956            };
957            state
958                .quota
959                .allow_spambox_event(self.config.spambox_max_events_per_min)
960        }
961
962        async fn allow_req(&self, client_id: u64) -> bool {
963            let mut clients = self.clients.lock().await;
964            let Some(state) = clients.get_mut(&client_id) else {
965                return false;
966            };
967            state.quota.allow_req(self.config.spambox_max_reqs_per_min)
968        }
969
970        async fn broadcast_event(&self, event: &Event) {
971            let subscriptions = self.subscriptions.lock().await;
972            let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
973            for (client_id, subs) in subscriptions.iter() {
974                for (sub_id, filters) in subs.iter() {
975                    if filters.iter().any(|f| f.match_event(event)) {
976                        deliveries.push((*client_id, sub_id.clone()));
977                    }
978                }
979            }
980            drop(subscriptions);
981
982            for (client_id, sub_id) in deliveries {
983                self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
984                    .await;
985            }
986        }
987
988        async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
989            let sender = {
990                let clients = self.clients.lock().await;
991                clients.get(&client_id).map(|state| state.sender.clone())
992            };
993            if let Some(tx) = sender {
994                let _ = tx.send(msg.as_json());
995            }
996        }
997    }
998}
999
1000pub use imp::NostrRelay;
1001
1002#[cfg(feature = "p2p")]
1003#[async_trait::async_trait]
1004impl MeshEventStore for NostrRelay {
1005    async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
1006        NostrRelay::ingest_trusted_event(self, event).await
1007    }
1008
1009    async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
1010        NostrRelay::query_events(self, filter, limit).await
1011    }
1012}
1013
1014#[cfg(feature = "p2p")]
1015#[async_trait::async_trait]
1016impl MeshRelayClient for NostrRelay {
1017    fn next_client_id(&self) -> u64 {
1018        NostrRelay::next_client_id(self)
1019    }
1020
1021    async fn register_client(
1022        &self,
1023        client_id: u64,
1024        sender: mpsc::UnboundedSender<String>,
1025        pubkey: Option<String>,
1026    ) {
1027        NostrRelay::register_client(self, client_id, sender, pubkey).await
1028    }
1029
1030    async fn unregister_client(&self, client_id: u64) {
1031        NostrRelay::unregister_client(self, client_id).await
1032    }
1033
1034    async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
1035        NostrRelay::handle_client_message(self, client_id, msg).await
1036    }
1037
1038    async fn register_subscription_query(
1039        &self,
1040        client_id: u64,
1041        subscription_id: SubscriptionId,
1042        filters: Vec<NostrFilter>,
1043    ) -> std::result::Result<Vec<Event>, &'static str> {
1044        NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
1045    }
1046
1047    async fn ingest_trusted_event_from_peer(
1048        &self,
1049        event: Event,
1050        peer_id: Option<String>,
1051    ) -> anyhow::Result<()> {
1052        NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
1053    }
1054}
1055
1056#[cfg(test)]
1057#[path = "nostr_relay/tests.rs"]
1058mod tests;