Skip to main content

hashtree_cli/
nostr_relay.rs

1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicU64, Ordering},
6    Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex};
11
12use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
13use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
14
15use crate::socialgraph;
16
17const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
18
19#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
20pub struct BluetoothReceivedEventRecord {
21    pub event_id: String,
22    pub pubkey: String,
23    pub kind: u32,
24    pub created_at: u64,
25    pub received_at: u64,
26    pub peer_id: Option<String>,
27    pub cid_values: Vec<String>,
28}
29
30#[derive(Debug, Clone)]
31pub struct NostrRelayConfig {
32    pub spambox_db_max_bytes: u64,
33    pub max_query_limit: usize,
34    pub max_subs_per_client: usize,
35    pub max_filters_per_sub: usize,
36    pub spambox_max_events_per_min: u32,
37    pub spambox_max_reqs_per_min: u32,
38}
39
40impl Default for NostrRelayConfig {
41    fn default() -> Self {
42        Self {
43            spambox_db_max_bytes: 1024 * 1024 * 1024,
44            max_query_limit: 200,
45            max_subs_per_client: 64,
46            max_filters_per_sub: 32,
47            spambox_max_events_per_min: 120,
48            spambox_max_reqs_per_min: 120,
49        }
50    }
51}
52
53mod imp {
54    use super::*;
55    use anyhow::Result;
56
57    use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
58    use hashtree_core::{nhash_decode, Cid};
59    use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
60    use tracing::warn;
61
62    fn prefers_trusted_only(filter: &NostrFilter) -> bool {
63        let Some(kinds) = filter.kinds.as_ref() else {
64            return false;
65        };
66        if kinds.len() != 1 {
67            return false;
68        }
69
70        let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
71        let has_authors = filter
72            .authors
73            .as_ref()
74            .is_some_and(|authors| !authors.is_empty());
75        if !has_authors {
76            return false;
77        }
78
79        if is_replaceable_kind(kind) {
80            return true;
81        }
82
83        if is_parameterized_replaceable_kind(kind) {
84            let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
85            return filter
86                .generic_tags
87                .get(&d_tag)
88                .is_some_and(|values| !values.is_empty());
89        }
90
91        false
92    }
93
94    struct NostrStore {
95        store: Arc<dyn SocialGraphBackend>,
96    }
97
98    impl NostrStore {
99        fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
100            Self { store }
101        }
102
103        fn ingest(&self, event: &Event) -> Result<()> {
104            crate::socialgraph::ingest_parsed_event(self.store.as_ref(), event)
105        }
106
107        fn ingest_with_storage_class(
108            &self,
109            event: &Event,
110            storage_class: EventStorageClass,
111        ) -> Result<()> {
112            crate::socialgraph::ingest_parsed_event_with_storage_class(
113                self.store.as_ref(),
114                event,
115                storage_class,
116            )
117        }
118
119        fn query(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
120            crate::socialgraph::query_events(self.store.as_ref(), filter, limit)
121        }
122    }
123
124    #[derive(Debug, Clone)]
125    struct ClientQuota {
126        last_reset: Instant,
127        spambox_events: u32,
128        reqs: u32,
129    }
130
131    impl ClientQuota {
132        fn new() -> Self {
133            Self {
134                last_reset: Instant::now(),
135                spambox_events: 0,
136                reqs: 0,
137            }
138        }
139
140        fn reset_if_needed(&mut self) {
141            if self.last_reset.elapsed() >= Duration::from_secs(60) {
142                self.last_reset = Instant::now();
143                self.spambox_events = 0;
144                self.reqs = 0;
145            }
146        }
147
148        fn allow_spambox_event(&mut self, limit: u32) -> bool {
149            self.reset_if_needed();
150            if self.spambox_events >= limit {
151                return false;
152            }
153            self.spambox_events += 1;
154            true
155        }
156
157        fn allow_req(&mut self, limit: u32) -> bool {
158            self.reset_if_needed();
159            if self.reqs >= limit {
160                return false;
161            }
162            self.reqs += 1;
163            true
164        }
165    }
166
167    struct ClientState {
168        sender: mpsc::UnboundedSender<String>,
169        pubkey: Option<String>,
170        quota: ClientQuota,
171    }
172
173    struct RecentEvents {
174        order: VecDeque<EventId>,
175        events: HashMap<EventId, Event>,
176        max_len: usize,
177    }
178
179    impl RecentEvents {
180        fn new(max_len: usize) -> Self {
181            Self {
182                order: VecDeque::new(),
183                events: HashMap::new(),
184                max_len: max_len.max(128),
185            }
186        }
187
188        fn insert(&mut self, event: Event) {
189            if self.events.contains_key(&event.id) {
190                return;
191            }
192            self.order.push_back(event.id);
193            self.events.insert(event.id, event);
194            while self.order.len() > self.max_len {
195                if let Some(oldest) = self.order.pop_front() {
196                    self.events.remove(&oldest);
197                }
198            }
199        }
200
201        fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
202            self.events
203                .values()
204                .filter(|event| filter.match_event(event))
205                .cloned()
206                .collect()
207        }
208    }
209
210    enum SpamboxStore {
211        Persistent(NostrStore),
212        Memory(MemorySpambox),
213    }
214
215    struct MemorySpambox {
216        events: Mutex<VecDeque<Event>>,
217        max_len: usize,
218    }
219
220    impl MemorySpambox {
221        fn new(max_len: usize) -> Self {
222            Self {
223                events: Mutex::new(VecDeque::new()),
224                max_len: max_len.max(128),
225            }
226        }
227
228        async fn ingest(&self, event: &Event) -> bool {
229            let mut events = self.events.lock().await;
230            events.push_back(event.clone());
231            while events.len() > self.max_len {
232                events.pop_front();
233            }
234            true
235        }
236    }
237
238    impl SpamboxStore {
239        async fn ingest(&self, event: &Event) -> bool {
240            match self {
241                SpamboxStore::Persistent(store) => store.ingest(event).is_ok(),
242                SpamboxStore::Memory(store) => store.ingest(event).await,
243            }
244        }
245    }
246
247    struct BluetoothEventLog {
248        path: PathBuf,
249        state: Mutex<BluetoothEventLogState>,
250    }
251
252    struct BluetoothEventLogState {
253        records: VecDeque<BluetoothReceivedEventRecord>,
254        event_ids: HashSet<String>,
255    }
256
257    impl BluetoothEventLog {
258        fn load(path: PathBuf) -> Self {
259            let records = std::fs::read_to_string(&path)
260                .ok()
261                .map(|serialized| {
262                    serialized
263                        .lines()
264                        .filter_map(|line| {
265                            serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
266                        })
267                        .collect::<Vec<_>>()
268                })
269                .unwrap_or_default();
270            let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
271            let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
272            for record in records.into_iter().skip(start) {
273                trimmed.push_back(record);
274            }
275            let event_ids = trimmed
276                .iter()
277                .map(|record| record.event_id.clone())
278                .collect::<HashSet<_>>();
279
280            Self {
281                path,
282                state: Mutex::new(BluetoothEventLogState {
283                    records: trimmed,
284                    event_ids,
285                }),
286            }
287        }
288
289        async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
290            let state = self.state.lock().await;
291            state
292                .records
293                .iter()
294                .rev()
295                .take(limit.max(1))
296                .cloned()
297                .collect()
298        }
299
300        async fn record(&self, event: &Event, peer_id: Option<String>) {
301            let record = BluetoothReceivedEventRecord {
302                event_id: event.id.to_hex(),
303                pubkey: event.pubkey.to_hex(),
304                kind: event.kind.as_u16() as u32,
305                created_at: event.created_at.as_u64(),
306                received_at: std::time::SystemTime::now()
307                    .duration_since(std::time::UNIX_EPOCH)
308                    .map(|value| value.as_secs())
309                    .unwrap_or(0),
310                peer_id,
311                cid_values: cid_values_from_event(event),
312            };
313
314            let serialized = {
315                let mut state = self.state.lock().await;
316                if state.event_ids.contains(&record.event_id) {
317                    return;
318                }
319
320                state.event_ids.insert(record.event_id.clone());
321                state.records.push_back(record);
322                while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
323                    if let Some(removed) = state.records.pop_front() {
324                        state.event_ids.remove(&removed.event_id);
325                    }
326                }
327
328                state
329                    .records
330                    .iter()
331                    .filter_map(|entry| serde_json::to_string(entry).ok())
332                    .collect::<Vec<_>>()
333                    .join("\n")
334            };
335
336            if let Some(parent) = self.path.parent() {
337                let _ = std::fs::create_dir_all(parent);
338            }
339            let _ = std::fs::write(&self.path, serialized);
340        }
341    }
342
343    fn looks_like_cid_reference(value: &str) -> bool {
344        Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
345    }
346
347    fn cid_values_from_event(event: &Event) -> Vec<String> {
348        let mut values = Vec::new();
349        let mut seen = HashSet::new();
350
351        for tag in &event.tags {
352            let fields = tag.clone().to_vec();
353            if fields.first().is_some_and(|name| name == "cid") {
354                if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
355                    if seen.insert(value.clone()) {
356                        values.push(value.clone());
357                    }
358                }
359                continue;
360            }
361
362            for value in fields.into_iter().skip(1) {
363                if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
364                    values.push(value);
365                }
366            }
367        }
368
369        values
370    }
371
372    pub struct NostrRelay {
373        config: NostrRelayConfig,
374        trusted: NostrStore,
375        public_pubkeys: HashSet<String>,
376        spambox: Option<SpamboxStore>,
377        social_graph: Option<Arc<SocialGraphAccessControl>>,
378        clients: Mutex<HashMap<u64, ClientState>>,
379        subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
380        recent_events: Mutex<RecentEvents>,
381        next_client_id: AtomicU64,
382        bluetooth_event_log: Arc<BluetoothEventLog>,
383    }
384
385    impl NostrRelay {
386        async fn collect_filter_events(
387            &self,
388            filter: &NostrFilter,
389            limit: usize,
390            seen: &mut HashSet<EventId>,
391            events: &mut Vec<Event>,
392        ) {
393            if limit == 0 {
394                return;
395            }
396
397            let mut added = 0usize;
398
399            if !prefers_trusted_only(filter) {
400                let recent = {
401                    let cache = self.recent_events.lock().await;
402                    cache.matching(filter)
403                };
404                for event in recent {
405                    if seen.insert(event.id) {
406                        events.push(event);
407                        added += 1;
408                        if added >= limit {
409                            return;
410                        }
411                    }
412                }
413            }
414
415            for event in self.trusted.query(filter, limit) {
416                if seen.insert(event.id) {
417                    events.push(event);
418                    added += 1;
419                    if added >= limit {
420                        return;
421                    }
422                }
423            }
424        }
425
426        async fn collect_filter_count(
427            &self,
428            filter: &NostrFilter,
429            limit: usize,
430            seen: &mut HashSet<EventId>,
431        ) {
432            if limit == 0 {
433                return;
434            }
435
436            let mut added = 0usize;
437
438            if !prefers_trusted_only(filter) {
439                let recent = {
440                    let cache = self.recent_events.lock().await;
441                    cache.matching(filter)
442                };
443                for event in recent {
444                    if seen.insert(event.id) {
445                        added += 1;
446                        if added >= limit {
447                            return;
448                        }
449                    }
450                }
451            }
452
453            for event in self.trusted.query(filter, limit) {
454                if seen.insert(event.id) {
455                    added += 1;
456                    if added >= limit {
457                        return;
458                    }
459                }
460            }
461        }
462
463        pub fn new(
464            trusted_store: Arc<dyn SocialGraphBackend>,
465            data_dir: PathBuf,
466            public_pubkeys: HashSet<String>,
467            social_graph: Option<Arc<SocialGraphAccessControl>>,
468            config: NostrRelayConfig,
469        ) -> Result<Self> {
470            let spambox = if config.spambox_db_max_bytes == 0 {
471                Some(SpamboxStore::Memory(MemorySpambox::new(
472                    config.max_query_limit * 2,
473                )))
474            } else {
475                let spam_dir = data_dir.join("socialgraph_spambox");
476                match socialgraph::open_social_graph_store_at_path(
477                    &spam_dir,
478                    Some(config.spambox_db_max_bytes),
479                ) {
480                    Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
481                    Err(err) => {
482                        warn!(
483                            "Failed to open social graph spambox (falling back to memory): {}",
484                            err
485                        );
486                        Some(SpamboxStore::Memory(MemorySpambox::new(
487                            config.max_query_limit * 2,
488                        )))
489                    }
490                }
491            };
492
493            let recent_size = config.max_query_limit.saturating_mul(2);
494            let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
495                data_dir.join("bluetooth-events.jsonl"),
496            ));
497
498            Ok(Self {
499                config,
500                trusted: NostrStore::new(trusted_store),
501                public_pubkeys,
502                spambox,
503                social_graph,
504                clients: Mutex::new(HashMap::new()),
505                subscriptions: Mutex::new(HashMap::new()),
506                recent_events: Mutex::new(RecentEvents::new(recent_size)),
507                next_client_id: AtomicU64::new(1),
508                bluetooth_event_log,
509            })
510        }
511
512        pub fn next_client_id(&self) -> u64 {
513            self.next_client_id.fetch_add(1, Ordering::SeqCst)
514        }
515
516        pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
517            self.ingest_trusted_event_inner(event, true).await
518        }
519
520        pub async fn ingest_trusted_event_from_bluetooth(
521            &self,
522            event: Event,
523            peer_id: Option<String>,
524        ) -> Result<()> {
525            self.ingest_trusted_event_inner(event.clone(), true).await?;
526            self.bluetooth_event_log.record(&event, peer_id).await;
527            Ok(())
528        }
529
530        pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
531            self.ingest_trusted_event_inner(event, false).await
532        }
533
534        pub async fn bluetooth_received_events(
535            &self,
536            limit: usize,
537        ) -> Vec<BluetoothReceivedEventRecord> {
538            self.bluetooth_event_log.recent(limit).await
539        }
540
541        async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
542            event
543                .verify()
544                .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
545
546            let is_ephemeral = event.kind.is_ephemeral();
547            {
548                let mut recent = self.recent_events.lock().await;
549                recent.insert(event.clone());
550            }
551
552            if !is_ephemeral {
553                let storage_class = self.event_storage_class(&event);
554                self.trusted
555                    .ingest_with_storage_class(&event, storage_class)?;
556            }
557
558            if broadcast {
559                self.broadcast_event(&event).await;
560            }
561            Ok(())
562        }
563
564        pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
565            let limit = limit.min(self.config.max_query_limit);
566            if limit == 0 {
567                return Vec::new();
568            }
569
570            let mut seen: HashSet<EventId> = HashSet::new();
571            let mut events = Vec::new();
572
573            if !prefers_trusted_only(filter) {
574                let recent = {
575                    let cache = self.recent_events.lock().await;
576                    cache.matching(filter)
577                };
578                for event in recent {
579                    if seen.insert(event.id) {
580                        events.push(event);
581                        if events.len() >= limit {
582                            return events;
583                        }
584                    }
585                }
586            }
587
588            for event in self.trusted.query(filter, limit) {
589                if seen.insert(event.id) {
590                    events.push(event);
591                    if events.len() >= limit {
592                        break;
593                    }
594                }
595            }
596
597            events
598        }
599
600        pub async fn register_client(
601            &self,
602            client_id: u64,
603            sender: mpsc::UnboundedSender<String>,
604            pubkey: Option<String>,
605        ) {
606            let mut clients = self.clients.lock().await;
607            clients.insert(
608                client_id,
609                ClientState {
610                    sender,
611                    pubkey,
612                    quota: ClientQuota::new(),
613                },
614            );
615        }
616
617        pub async fn unregister_client(&self, client_id: u64) {
618            let mut clients = self.clients.lock().await;
619            clients.remove(&client_id);
620            drop(clients);
621            let mut subs = self.subscriptions.lock().await;
622            subs.remove(&client_id);
623        }
624
625        pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
626            match msg {
627                NostrClientMessage::Event(event) => {
628                    self.handle_event(client_id, *event).await;
629                }
630                NostrClientMessage::Req {
631                    subscription_id,
632                    filters,
633                } => {
634                    self.handle_req(client_id, subscription_id, filters).await;
635                }
636                NostrClientMessage::Count {
637                    subscription_id,
638                    filters,
639                } => {
640                    self.handle_count(client_id, subscription_id, filters).await;
641                }
642                NostrClientMessage::Close(subscription_id) => {
643                    self.handle_close(client_id, subscription_id).await;
644                }
645                NostrClientMessage::Auth(event) => {
646                    self.handle_auth(client_id, *event).await;
647                }
648                NostrClientMessage::NegOpen { .. }
649                | NostrClientMessage::NegMsg { .. }
650                | NostrClientMessage::NegClose { .. } => {
651                    self.send_to_client(
652                        client_id,
653                        NostrRelayMessage::notice("negentropy not supported"),
654                    )
655                    .await;
656                }
657            }
658        }
659
660        pub async fn register_subscription_query(
661            &self,
662            client_id: u64,
663            subscription_id: SubscriptionId,
664            mut filters: Vec<NostrFilter>,
665        ) -> std::result::Result<Vec<Event>, &'static str> {
666            if !self.allow_req(client_id).await {
667                return Err("rate limited");
668            }
669
670            if filters.len() > self.config.max_filters_per_sub {
671                filters.truncate(self.config.max_filters_per_sub);
672            }
673
674            {
675                let mut subs = self.subscriptions.lock().await;
676                let entry = subs.entry(client_id).or_default();
677                if !entry.contains_key(&subscription_id)
678                    && entry.len() >= self.config.max_subs_per_client
679                {
680                    return Err("too many subscriptions");
681                }
682                entry.insert(subscription_id, filters.clone());
683            }
684
685            let mut seen: HashSet<EventId> = HashSet::new();
686            let mut events = Vec::new();
687            for filter in &filters {
688                let limit = filter
689                    .limit
690                    .unwrap_or(self.config.max_query_limit)
691                    .min(self.config.max_query_limit);
692                self.collect_filter_events(filter, limit, &mut seen, &mut events)
693                    .await;
694            }
695
696            Ok(events)
697        }
698
699        async fn handle_auth(&self, client_id: u64, event: Event) {
700            let ok = event.verify().is_ok();
701            let message = if ok { "" } else { "invalid auth" };
702            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
703                .await;
704        }
705
706        async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
707            let mut subs = self.subscriptions.lock().await;
708            if let Some(map) = subs.get_mut(&client_id) {
709                map.remove(&subscription_id);
710            }
711        }
712
713        async fn handle_event(&self, client_id: u64, event: Event) {
714            let ok = event.verify().is_ok();
715            if !ok {
716                self.send_to_client(
717                    client_id,
718                    NostrRelayMessage::ok(event.id, false, "invalid: signature"),
719                )
720                .await;
721                return;
722            }
723
724            let trusted = self.is_trusted_event(client_id, &event).await;
725            if !trusted && !self.allow_spambox_event(client_id).await {
726                self.send_to_client(
727                    client_id,
728                    NostrRelayMessage::ok(event.id, false, "rate limited"),
729                )
730                .await;
731                return;
732            }
733
734            let is_ephemeral = event.kind.is_ephemeral();
735            if trusted {
736                let mut recent = self.recent_events.lock().await;
737                recent.insert(event.clone());
738            }
739            if !is_ephemeral {
740                let stored = if trusted {
741                    let storage_class = self.event_storage_class(&event);
742                    self.trusted
743                        .ingest_with_storage_class(&event, storage_class)
744                        .is_ok()
745                } else {
746                    match self.spambox.as_ref() {
747                        Some(spambox) => spambox.ingest(&event).await,
748                        None => false,
749                    }
750                };
751
752                if !stored {
753                    let message = if trusted {
754                        "store failed"
755                    } else {
756                        "spambox full"
757                    };
758                    self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
759                        .await;
760                    return;
761                }
762            }
763
764            let message = if trusted { "" } else { "spambox" };
765            self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
766                .await;
767
768            if trusted {
769                self.broadcast_event(&event).await;
770            }
771        }
772
773        async fn handle_req(
774            &self,
775            client_id: u64,
776            subscription_id: SubscriptionId,
777            filters: Vec<NostrFilter>,
778        ) {
779            match self
780                .register_subscription_query(client_id, subscription_id.clone(), filters)
781                .await
782            {
783                Ok(events) => {
784                    for event in events {
785                        self.send_to_client(
786                            client_id,
787                            NostrRelayMessage::event(subscription_id.clone(), event),
788                        )
789                        .await;
790                    }
791
792                    self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
793                        .await;
794                }
795                Err(message) => {
796                    self.send_to_client(
797                        client_id,
798                        NostrRelayMessage::closed(subscription_id, message),
799                    )
800                    .await;
801                }
802            }
803        }
804
805        async fn handle_count(
806            &self,
807            client_id: u64,
808            subscription_id: SubscriptionId,
809            filters: Vec<NostrFilter>,
810        ) {
811            if !self.allow_req(client_id).await {
812                self.send_to_client(
813                    client_id,
814                    NostrRelayMessage::closed(subscription_id, "rate limited"),
815                )
816                .await;
817                return;
818            }
819
820            let mut seen: HashSet<EventId> = HashSet::new();
821            for filter in &filters {
822                let limit = filter
823                    .limit
824                    .unwrap_or(self.config.max_query_limit)
825                    .min(self.config.max_query_limit);
826                self.collect_filter_count(filter, limit, &mut seen).await;
827            }
828
829            self.send_to_client(
830                client_id,
831                NostrRelayMessage::count(subscription_id, seen.len()),
832            )
833            .await;
834        }
835
836        async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
837            let event_pubkey = event.pubkey.to_hex();
838            let client_pubkey = {
839                let clients = self.clients.lock().await;
840                clients
841                    .get(&client_id)
842                    .and_then(|state| state.pubkey.clone())
843            };
844            if let Some(pubkey) = client_pubkey {
845                return pubkey == event_pubkey
846                    || self.social_graph.as_ref().is_some_and(|social_graph| {
847                        social_graph.check_write_access(&event_pubkey)
848                    });
849            }
850            if let Some(ref social_graph) = self.social_graph {
851                return social_graph.check_write_access(&event_pubkey);
852            }
853            true
854        }
855
856        fn event_storage_class(&self, event: &Event) -> EventStorageClass {
857            if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
858                EventStorageClass::Public
859            } else {
860                EventStorageClass::Ambient
861            }
862        }
863
864        async fn allow_spambox_event(&self, client_id: u64) -> bool {
865            let mut clients = self.clients.lock().await;
866            let Some(state) = clients.get_mut(&client_id) else {
867                return false;
868            };
869            state
870                .quota
871                .allow_spambox_event(self.config.spambox_max_events_per_min)
872        }
873
874        async fn allow_req(&self, client_id: u64) -> bool {
875            let mut clients = self.clients.lock().await;
876            let Some(state) = clients.get_mut(&client_id) else {
877                return false;
878            };
879            state.quota.allow_req(self.config.spambox_max_reqs_per_min)
880        }
881
882        async fn broadcast_event(&self, event: &Event) {
883            let subscriptions = self.subscriptions.lock().await;
884            let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
885            for (client_id, subs) in subscriptions.iter() {
886                for (sub_id, filters) in subs.iter() {
887                    if filters.iter().any(|f| f.match_event(event)) {
888                        deliveries.push((*client_id, sub_id.clone()));
889                    }
890                }
891            }
892            drop(subscriptions);
893
894            for (client_id, sub_id) in deliveries {
895                self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
896                    .await;
897            }
898        }
899
900        async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
901            let sender = {
902                let clients = self.clients.lock().await;
903                clients.get(&client_id).map(|state| state.sender.clone())
904            };
905            if let Some(tx) = sender {
906                let _ = tx.send(msg.as_json());
907            }
908        }
909    }
910}
911
912pub use imp::NostrRelay;
913
914#[cfg(test)]
915mod tests {
916    use super::*;
917    use anyhow::Result;
918    use nostr::{EventBuilder, Filter, JsonUtil, Keys, Kind, RelayMessage, SubscriptionId};
919    use std::collections::HashSet;
920    use tempfile::TempDir;
921    use tokio::time::{timeout, Duration};
922
923    async fn recv_relay_message(rx: &mut mpsc::UnboundedReceiver<String>) -> Result<RelayMessage> {
924        let msg = timeout(Duration::from_secs(1), rx.recv())
925            .await?
926            .ok_or_else(|| anyhow::anyhow!("channel closed"))?;
927        Ok(RelayMessage::from_json(msg)?)
928    }
929
930    #[tokio::test]
931    async fn relay_stores_and_serves_events() -> Result<()> {
932        let tmp = TempDir::new()?;
933        let graph_store = {
934            let _guard = crate::socialgraph::test_lock();
935            crate::socialgraph::open_social_graph_store_with_mapsize(
936                tmp.path(),
937                Some(128 * 1024 * 1024),
938            )?
939        };
940        let keys = Keys::generate();
941        let mut allowed = HashSet::new();
942        allowed.insert(keys.public_key().to_hex());
943        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
944
945        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
946            Arc::clone(&backend),
947            0,
948            allowed,
949        ));
950
951        let relay_config = NostrRelayConfig {
952            spambox_db_max_bytes: 0,
953            ..Default::default()
954        };
955        let relay = NostrRelay::new(
956            Arc::clone(&backend),
957            tmp.path().to_path_buf(),
958            HashSet::from([keys.public_key().to_hex()]),
959            Some(access),
960            relay_config,
961        )?;
962
963        let (tx, mut rx) = mpsc::unbounded_channel();
964        relay.register_client(1, tx, None).await;
965
966        let event = EventBuilder::new(Kind::TextNote, "hello", []).to_event(&keys)?;
967        relay
968            .handle_client_message(1, NostrClientMessage::event(event.clone()))
969            .await;
970
971        match recv_relay_message(&mut rx).await? {
972            RelayMessage::Ok { status, .. } => assert!(status),
973            other => anyhow::bail!("expected OK, got {:?}", other),
974        }
975
976        tokio::time::sleep(Duration::from_millis(50)).await;
977
978        let sub_id = SubscriptionId::new("sub-1");
979        let filter = Filter::new()
980            .authors(vec![event.pubkey])
981            .kinds(vec![event.kind]);
982        let mut got_event = false;
983        for _ in 0..3 {
984            relay
985                .handle_client_message(
986                    1,
987                    NostrClientMessage::req(sub_id.clone(), vec![filter.clone()]),
988                )
989                .await;
990
991            match recv_relay_message(&mut rx).await? {
992                RelayMessage::Event {
993                    subscription_id,
994                    event: ev,
995                } => {
996                    assert_eq!(subscription_id, sub_id);
997                    assert_eq!(ev.id, event.id);
998                    got_event = true;
999                    break;
1000                }
1001                RelayMessage::EndOfStoredEvents(id) => {
1002                    assert_eq!(id, sub_id);
1003                    tokio::time::sleep(Duration::from_millis(100)).await;
1004                }
1005                other => anyhow::bail!("expected EVENT/EOSE, got {:?}", other),
1006            }
1007        }
1008
1009        if !got_event {
1010            anyhow::bail!("event not available in time");
1011        }
1012
1013        match recv_relay_message(&mut rx).await? {
1014            RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1015            other => anyhow::bail!("expected EOSE, got {:?}", other),
1016        }
1017
1018        Ok(())
1019    }
1020
1021    #[tokio::test]
1022    async fn relay_persists_bluetooth_received_event_records() -> Result<()> {
1023        let tmp = TempDir::new()?;
1024        let graph_store = {
1025            let _guard = crate::socialgraph::test_lock();
1026            crate::socialgraph::open_social_graph_store_with_mapsize(
1027                tmp.path(),
1028                Some(128 * 1024 * 1024),
1029            )?
1030        };
1031        let keys = Keys::generate();
1032        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1033
1034        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1035            Arc::clone(&backend),
1036            0,
1037            HashSet::from([keys.public_key().to_hex()]),
1038        ));
1039
1040        let relay = NostrRelay::new(
1041            Arc::clone(&backend),
1042            tmp.path().to_path_buf(),
1043            HashSet::from([keys.public_key().to_hex()]),
1044            Some(access.clone()),
1045            NostrRelayConfig {
1046                spambox_db_max_bytes: 0,
1047                ..Default::default()
1048            },
1049        )?;
1050
1051        let cid = "cd".repeat(32);
1052        let event = EventBuilder::new(
1053            Kind::TextNote,
1054            "bluetooth receipt",
1055            [nostr::Tag::parse(&["cid", &cid]).unwrap()],
1056        )
1057        .to_event(&keys)?;
1058        relay
1059            .ingest_trusted_event_from_bluetooth(
1060                event.clone(),
1061                Some("peer-a".to_string()),
1062            )
1063            .await?;
1064
1065        let receipts = relay.bluetooth_received_events(10).await;
1066        assert_eq!(receipts.len(), 1);
1067        assert_eq!(receipts[0].event_id, event.id.to_hex());
1068        assert_eq!(receipts[0].cid_values, vec![cid.clone()]);
1069
1070        let reloaded = NostrRelay::new(
1071            Arc::clone(&backend),
1072            tmp.path().to_path_buf(),
1073            HashSet::from([keys.public_key().to_hex()]),
1074            Some(access),
1075            NostrRelayConfig {
1076                spambox_db_max_bytes: 0,
1077                ..Default::default()
1078            },
1079        )?;
1080        let reloaded_receipts = reloaded.bluetooth_received_events(10).await;
1081        assert_eq!(reloaded_receipts.len(), 1);
1082        assert_eq!(reloaded_receipts[0].event_id, event.id.to_hex());
1083        assert_eq!(reloaded_receipts[0].cid_values, vec![cid]);
1084
1085        Ok(())
1086    }
1087
1088    #[tokio::test]
1089    async fn relay_persists_nhash_bluetooth_received_event_records() -> Result<()> {
1090        let tmp = TempDir::new()?;
1091        let graph_store = {
1092            let _guard = crate::socialgraph::test_lock();
1093            crate::socialgraph::open_social_graph_store_with_mapsize(
1094                tmp.path(),
1095                Some(128 * 1024 * 1024),
1096            )?
1097        };
1098        let keys = Keys::generate();
1099        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1100
1101        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1102            Arc::clone(&backend),
1103            0,
1104            HashSet::from([keys.public_key().to_hex()]),
1105        ));
1106
1107        let relay = NostrRelay::new(
1108            Arc::clone(&backend),
1109            tmp.path().to_path_buf(),
1110            HashSet::from([keys.public_key().to_hex()]),
1111            Some(access.clone()),
1112            NostrRelayConfig {
1113                spambox_db_max_bytes: 0,
1114                ..Default::default()
1115            },
1116        )?;
1117
1118        let nhash = hashtree_core::nhash_encode(&[0xef; 32])?;
1119        let event = EventBuilder::new(
1120            Kind::TextNote,
1121            "bluetooth nhash receipt",
1122            [nostr::Tag::parse(&["cid", &nhash]).unwrap()],
1123        )
1124        .to_event(&keys)?;
1125        relay
1126            .ingest_trusted_event_from_bluetooth(
1127                event.clone(),
1128                Some("peer-a".to_string()),
1129            )
1130            .await?;
1131
1132        let receipts = relay.bluetooth_received_events(10).await;
1133        assert_eq!(receipts.len(), 1);
1134        assert_eq!(receipts[0].event_id, event.id.to_hex());
1135        assert_eq!(receipts[0].cid_values, vec![nhash.clone()]);
1136
1137        let reloaded = NostrRelay::new(
1138            Arc::clone(&backend),
1139            tmp.path().to_path_buf(),
1140            HashSet::from([keys.public_key().to_hex()]),
1141            Some(access),
1142            NostrRelayConfig {
1143                spambox_db_max_bytes: 0,
1144                ..Default::default()
1145            },
1146        )?;
1147        let reloaded_receipts = reloaded.bluetooth_received_events(10).await;
1148        assert_eq!(reloaded_receipts.len(), 1);
1149        assert_eq!(reloaded_receipts[0].event_id, event.id.to_hex());
1150        assert_eq!(reloaded_receipts[0].cid_values, vec![nhash]);
1151
1152        Ok(())
1153    }
1154
1155    #[tokio::test]
1156    async fn relay_caps_bluetooth_received_event_records_to_last_100() -> Result<()> {
1157        let tmp = TempDir::new()?;
1158        let graph_store = {
1159            let _guard = crate::socialgraph::test_lock();
1160            crate::socialgraph::open_social_graph_store_with_mapsize(
1161                tmp.path(),
1162                Some(128 * 1024 * 1024),
1163            )?
1164        };
1165        let keys = Keys::generate();
1166        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1167
1168        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1169            Arc::clone(&backend),
1170            0,
1171            HashSet::from([keys.public_key().to_hex()]),
1172        ));
1173
1174        let relay = NostrRelay::new(
1175            Arc::clone(&backend),
1176            tmp.path().to_path_buf(),
1177            HashSet::from([keys.public_key().to_hex()]),
1178            Some(access),
1179            NostrRelayConfig {
1180                spambox_db_max_bytes: 0,
1181                ..Default::default()
1182            },
1183        )?;
1184
1185        let mut event_ids = Vec::new();
1186        for index in 0..(BLUETOOTH_EVENT_LOG_CAPACITY + 5) {
1187            let event = EventBuilder::new(
1188                Kind::TextNote,
1189                format!("bluetooth receipt {index}"),
1190                [nostr::Tag::parse(&["cid", &format!("{index:064x}")]).unwrap()],
1191            )
1192            .to_event(&keys)?;
1193            event_ids.push(event.id.to_hex());
1194            relay
1195                .ingest_trusted_event_from_bluetooth(event, Some("peer-a".to_string()))
1196                .await?;
1197        }
1198
1199        let receipts = relay
1200            .bluetooth_received_events(BLUETOOTH_EVENT_LOG_CAPACITY + 10)
1201            .await;
1202        assert_eq!(receipts.len(), BLUETOOTH_EVENT_LOG_CAPACITY);
1203        assert_eq!(receipts[0].event_id, event_ids.last().cloned().unwrap());
1204        assert!(
1205            receipts
1206                .iter()
1207                .all(|receipt| !event_ids[..5].contains(&receipt.event_id)),
1208            "oldest receipts should be trimmed from the capped log"
1209        );
1210        assert_eq!(
1211            receipts.last().map(|receipt| receipt.event_id.clone()),
1212            Some(event_ids[5].clone())
1213        );
1214
1215        Ok(())
1216    }
1217
1218    #[tokio::test]
1219    async fn relay_serves_all_events_for_since_zero_catch_all_filter() -> Result<()> {
1220        let tmp = TempDir::new()?;
1221        let graph_store = {
1222            let _guard = crate::socialgraph::test_lock();
1223            crate::socialgraph::open_social_graph_store_with_mapsize(
1224                tmp.path(),
1225                Some(128 * 1024 * 1024),
1226            )?
1227        };
1228        let keys = Keys::generate();
1229        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1230
1231        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1232            Arc::clone(&backend),
1233            0,
1234            HashSet::from([keys.public_key().to_hex()]),
1235        ));
1236
1237        let relay = NostrRelay::new(
1238            Arc::clone(&backend),
1239            tmp.path().to_path_buf(),
1240            HashSet::from([keys.public_key().to_hex()]),
1241            Some(access),
1242            NostrRelayConfig {
1243                spambox_db_max_bytes: 0,
1244                max_query_limit: 10,
1245                ..Default::default()
1246            },
1247        )?;
1248
1249        let (tx, mut rx) = mpsc::unbounded_channel();
1250        relay.register_client(8, tx, None).await;
1251
1252        let first = EventBuilder::new(Kind::TextNote, "first", [])
1253            .custom_created_at(nostr::Timestamp::from_secs(5))
1254            .to_event(&keys)?;
1255        let second = EventBuilder::new(Kind::TextNote, "second", [])
1256            .custom_created_at(nostr::Timestamp::from_secs(6))
1257            .to_event(&keys)?;
1258        let third = EventBuilder::new(Kind::TextNote, "third", [])
1259            .custom_created_at(nostr::Timestamp::from_secs(7))
1260            .to_event(&keys)?;
1261
1262        for event in [&first, &second, &third] {
1263            relay
1264                .handle_client_message(8, NostrClientMessage::event(event.clone()))
1265                .await;
1266            match recv_relay_message(&mut rx).await? {
1267                RelayMessage::Ok { status, .. } => assert!(status),
1268                other => anyhow::bail!("expected OK, got {:?}", other),
1269            }
1270        }
1271
1272        tokio::time::sleep(Duration::from_millis(50)).await;
1273
1274        relay
1275            .handle_client_message(
1276                8,
1277                NostrClientMessage::from_json(r#"["REQ","sub-all",{"since":0}]"#)?,
1278            )
1279            .await;
1280
1281        let mut received = HashSet::new();
1282        loop {
1283            match recv_relay_message(&mut rx).await? {
1284                RelayMessage::Event {
1285                    subscription_id,
1286                    event,
1287                } => {
1288                    assert_eq!(subscription_id, SubscriptionId::new("sub-all"));
1289                    received.insert(event.id);
1290                }
1291                RelayMessage::EndOfStoredEvents(id) => {
1292                    assert_eq!(id, SubscriptionId::new("sub-all"));
1293                    break;
1294                }
1295                other => anyhow::bail!("expected EVENT/EOSE, got {:?}", other),
1296            }
1297        }
1298
1299        assert_eq!(received.len(), 3);
1300        assert_eq!(received, HashSet::from([first.id, second.id, third.id]));
1301
1302        Ok(())
1303    }
1304
1305    #[tokio::test]
1306    async fn relay_spambox_does_not_serve_untrusted_events() -> Result<()> {
1307        let tmp = TempDir::new()?;
1308        let graph_store = {
1309            let _guard = crate::socialgraph::test_lock();
1310            crate::socialgraph::open_social_graph_store_with_mapsize(
1311                tmp.path(),
1312                Some(128 * 1024 * 1024),
1313            )?
1314        };
1315
1316        crate::socialgraph::set_social_graph_root(&graph_store, &[1u8; 32]);
1317        std::thread::sleep(std::time::Duration::from_millis(100));
1318        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1319
1320        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1321            Arc::clone(&backend),
1322            0,
1323            HashSet::new(),
1324        ));
1325
1326        let relay_config = NostrRelayConfig {
1327            spambox_db_max_bytes: 0,
1328            ..Default::default()
1329        };
1330        let relay = NostrRelay::new(
1331            Arc::clone(&backend),
1332            tmp.path().to_path_buf(),
1333            HashSet::new(),
1334            Some(access),
1335            relay_config,
1336        )?;
1337
1338        let (tx, mut rx) = mpsc::unbounded_channel();
1339        relay.register_client(2, tx, None).await;
1340
1341        let keys = Keys::generate();
1342        let event = EventBuilder::new(Kind::TextNote, "spam", []).to_event(&keys)?;
1343        relay
1344            .handle_client_message(2, NostrClientMessage::event(event.clone()))
1345            .await;
1346
1347        match recv_relay_message(&mut rx).await? {
1348            RelayMessage::Ok { status, .. } => assert!(status),
1349            other => anyhow::bail!("expected OK, got {:?}", other),
1350        }
1351
1352        tokio::time::sleep(Duration::from_millis(50)).await;
1353
1354        let sub_id = SubscriptionId::new("sub-2");
1355        let filter = Filter::new()
1356            .authors(vec![event.pubkey])
1357            .kinds(vec![event.kind]);
1358        relay
1359            .handle_client_message(2, NostrClientMessage::req(sub_id.clone(), vec![filter]))
1360            .await;
1361
1362        match recv_relay_message(&mut rx).await? {
1363            RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1364            other => anyhow::bail!("expected EOSE only, got {:?}", other),
1365        }
1366
1367        Ok(())
1368    }
1369
1370    #[tokio::test]
1371    async fn relay_trusts_authenticated_client_for_its_own_events() -> Result<()> {
1372        let tmp = TempDir::new()?;
1373        let graph_store = {
1374            let _guard = crate::socialgraph::test_lock();
1375            crate::socialgraph::open_social_graph_store_with_mapsize(
1376                tmp.path(),
1377                Some(128 * 1024 * 1024),
1378            )?
1379        };
1380
1381        crate::socialgraph::set_social_graph_root(&graph_store, &[1u8; 32]);
1382        std::thread::sleep(std::time::Duration::from_millis(100));
1383        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1384
1385        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1386            Arc::clone(&backend),
1387            0,
1388            HashSet::new(),
1389        ));
1390
1391        let relay = NostrRelay::new(
1392            Arc::clone(&backend),
1393            tmp.path().to_path_buf(),
1394            HashSet::new(),
1395            Some(access),
1396            NostrRelayConfig {
1397                spambox_db_max_bytes: 0,
1398                ..Default::default()
1399            },
1400        )?;
1401
1402        let (tx, mut rx) = mpsc::unbounded_channel();
1403        let keys = Keys::generate();
1404        relay
1405            .register_client(9, tx, Some(keys.public_key().to_hex()))
1406            .await;
1407
1408        let event = EventBuilder::new(Kind::TextNote, "self-authored", []).to_event(&keys)?;
1409        relay
1410            .handle_client_message(9, NostrClientMessage::event(event.clone()))
1411            .await;
1412
1413        match recv_relay_message(&mut rx).await? {
1414            RelayMessage::Ok {
1415                status, message, ..
1416            } => {
1417                assert!(status);
1418                assert_eq!(message, "");
1419            }
1420            other => anyhow::bail!("expected OK, got {:?}", other),
1421        }
1422
1423        tokio::time::sleep(Duration::from_millis(50)).await;
1424
1425        let sub_id = SubscriptionId::new("sub-auth");
1426        let filter = Filter::new()
1427            .authors(vec![event.pubkey])
1428            .kinds(vec![event.kind]);
1429        relay
1430            .handle_client_message(9, NostrClientMessage::req(sub_id.clone(), vec![filter]))
1431            .await;
1432
1433        match recv_relay_message(&mut rx).await? {
1434            RelayMessage::Event {
1435                subscription_id,
1436                event: stored,
1437            } => {
1438                assert_eq!(subscription_id, sub_id);
1439                assert_eq!(stored.id, event.id);
1440            }
1441            other => anyhow::bail!("expected EVENT, got {:?}", other),
1442        }
1443
1444        match recv_relay_message(&mut rx).await? {
1445            RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1446            other => anyhow::bail!("expected EOSE, got {:?}", other),
1447        }
1448
1449        Ok(())
1450    }
1451
1452    #[tokio::test]
1453    async fn relay_routes_non_authored_trusted_events_to_ambient_index() -> Result<()> {
1454        let tmp = TempDir::new()?;
1455        let graph_store = {
1456            let _guard = crate::socialgraph::test_lock();
1457            crate::socialgraph::open_social_graph_store_with_mapsize(
1458                tmp.path(),
1459                Some(128 * 1024 * 1024),
1460            )?
1461        };
1462        let authored_keys = Keys::generate();
1463        let remote_keys = Keys::generate();
1464        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1465
1466        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1467            Arc::clone(&backend),
1468            0,
1469            HashSet::from([authored_keys.public_key().to_hex()]),
1470        ));
1471
1472        let relay = NostrRelay::new(
1473            Arc::clone(&backend),
1474            tmp.path().to_path_buf(),
1475            HashSet::from([authored_keys.public_key().to_hex()]),
1476            Some(access),
1477            NostrRelayConfig {
1478                spambox_db_max_bytes: 0,
1479                ..Default::default()
1480            },
1481        )?;
1482
1483        let ambient_event = EventBuilder::new(Kind::TextNote, "ambient", [])
1484            .custom_created_at(nostr::Timestamp::from_secs(5))
1485            .to_event(&remote_keys)?;
1486        relay.ingest_trusted_event(ambient_event.clone()).await?;
1487
1488        let filter = Filter::new()
1489            .author(remote_keys.public_key())
1490            .kind(Kind::TextNote);
1491        let ambient_only = graph_store
1492            .query_events_in_scope(
1493                &filter,
1494                10,
1495                crate::socialgraph::EventQueryScope::AmbientOnly,
1496            )
1497            .unwrap();
1498        assert_eq!(ambient_only.len(), 1);
1499        assert_eq!(ambient_only[0].id, ambient_event.id);
1500
1501        let public_only = graph_store
1502            .query_events_in_scope(&filter, 10, crate::socialgraph::EventQueryScope::PublicOnly)
1503            .unwrap();
1504        assert!(public_only.is_empty());
1505
1506        Ok(())
1507    }
1508
1509    #[tokio::test]
1510    async fn relay_serves_parameterized_replaceable_queries() -> Result<()> {
1511        let tmp = TempDir::new()?;
1512        let graph_store = {
1513            let _guard = crate::socialgraph::test_lock();
1514            crate::socialgraph::open_social_graph_store_with_mapsize(
1515                tmp.path(),
1516                Some(128 * 1024 * 1024),
1517            )?
1518        };
1519        let keys = Keys::generate();
1520        let mut allowed = HashSet::new();
1521        allowed.insert(keys.public_key().to_hex());
1522        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1523
1524        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1525            Arc::clone(&backend),
1526            0,
1527            allowed,
1528        ));
1529
1530        let relay = NostrRelay::new(
1531            Arc::clone(&backend),
1532            tmp.path().to_path_buf(),
1533            HashSet::from([keys.public_key().to_hex()]),
1534            Some(access),
1535            NostrRelayConfig {
1536                spambox_db_max_bytes: 0,
1537                ..Default::default()
1538            },
1539        )?;
1540
1541        let (tx, mut rx) = mpsc::unbounded_channel();
1542        relay.register_client(3, tx, None).await;
1543
1544        let older = EventBuilder::new(
1545            Kind::Custom(30078),
1546            "",
1547            vec![
1548                nostr::Tag::identifier("video"),
1549                nostr::Tag::parse(&["l", "hashtree"])?,
1550                nostr::Tag::parse(&["hash", &"11".repeat(32)])?,
1551            ],
1552        )
1553        .custom_created_at(nostr::Timestamp::from_secs(5))
1554        .to_event(&keys)?;
1555        let newer = EventBuilder::new(
1556            Kind::Custom(30078),
1557            "",
1558            vec![
1559                nostr::Tag::identifier("video"),
1560                nostr::Tag::parse(&["l", "hashtree"])?,
1561                nostr::Tag::parse(&["hash", &"22".repeat(32)])?,
1562            ],
1563        )
1564        .custom_created_at(nostr::Timestamp::from_secs(6))
1565        .to_event(&keys)?;
1566
1567        relay
1568            .handle_client_message(3, NostrClientMessage::event(older.clone()))
1569            .await;
1570        let _ = recv_relay_message(&mut rx).await?;
1571        relay
1572            .handle_client_message(3, NostrClientMessage::event(newer.clone()))
1573            .await;
1574        let _ = recv_relay_message(&mut rx).await?;
1575
1576        let sub_id = SubscriptionId::new("sub-d");
1577        let filter = Filter::new()
1578            .author(keys.public_key())
1579            .kind(Kind::Custom(30078))
1580            .identifier("video");
1581        relay
1582            .handle_client_message(3, NostrClientMessage::req(sub_id.clone(), vec![filter]))
1583            .await;
1584
1585        match recv_relay_message(&mut rx).await? {
1586            RelayMessage::Event {
1587                subscription_id,
1588                event,
1589            } => {
1590                assert_eq!(subscription_id, sub_id);
1591                assert_eq!(event.id, newer.id);
1592            }
1593            other => anyhow::bail!("expected EVENT, got {:?}", other),
1594        }
1595
1596        match recv_relay_message(&mut rx).await? {
1597            RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1598            other => anyhow::bail!("expected EOSE, got {:?}", other),
1599        }
1600
1601        Ok(())
1602    }
1603
1604    #[tokio::test]
1605    async fn relay_serves_replaceable_queries() -> Result<()> {
1606        let tmp = TempDir::new()?;
1607        let graph_store = {
1608            let _guard = crate::socialgraph::test_lock();
1609            crate::socialgraph::open_social_graph_store_with_mapsize(
1610                tmp.path(),
1611                Some(128 * 1024 * 1024),
1612            )?
1613        };
1614        let keys = Keys::generate();
1615        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1616
1617        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1618            Arc::clone(&backend),
1619            0,
1620            HashSet::from([keys.public_key().to_hex()]),
1621        ));
1622
1623        let relay = NostrRelay::new(
1624            Arc::clone(&backend),
1625            tmp.path().to_path_buf(),
1626            HashSet::from([keys.public_key().to_hex()]),
1627            Some(access),
1628            NostrRelayConfig {
1629                spambox_db_max_bytes: 0,
1630                ..Default::default()
1631            },
1632        )?;
1633
1634        let (tx, mut rx) = mpsc::unbounded_channel();
1635        relay.register_client(4, tx, None).await;
1636
1637        let older = EventBuilder::new(Kind::Metadata, r#"{"name":"older"}"#, [])
1638            .custom_created_at(nostr::Timestamp::from_secs(5))
1639            .to_event(&keys)?;
1640        let newer = EventBuilder::new(Kind::Metadata, r#"{"name":"newer"}"#, [])
1641            .custom_created_at(nostr::Timestamp::from_secs(6))
1642            .to_event(&keys)?;
1643
1644        relay
1645            .handle_client_message(4, NostrClientMessage::event(older.clone()))
1646            .await;
1647        let _ = recv_relay_message(&mut rx).await?;
1648        relay
1649            .handle_client_message(4, NostrClientMessage::event(newer.clone()))
1650            .await;
1651        let _ = recv_relay_message(&mut rx).await?;
1652
1653        let sub_id = SubscriptionId::new("sub-profile");
1654        let filter = Filter::new().author(keys.public_key()).kind(Kind::Metadata);
1655        relay
1656            .handle_client_message(4, NostrClientMessage::req(sub_id.clone(), vec![filter]))
1657            .await;
1658
1659        match recv_relay_message(&mut rx).await? {
1660            RelayMessage::Event {
1661                subscription_id,
1662                event,
1663            } => {
1664                assert_eq!(subscription_id, sub_id);
1665                assert_eq!(event.id, newer.id);
1666            }
1667            other => anyhow::bail!("expected EVENT, got {:?}", other),
1668        }
1669
1670        match recv_relay_message(&mut rx).await? {
1671            RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1672            other => anyhow::bail!("expected EOSE, got {:?}", other),
1673        }
1674
1675        Ok(())
1676    }
1677
1678    #[tokio::test]
1679    async fn relay_count_dedupes_across_filters_and_honors_filter_limits() -> Result<()> {
1680        let tmp = TempDir::new()?;
1681        let graph_store = {
1682            let _guard = crate::socialgraph::test_lock();
1683            crate::socialgraph::open_social_graph_store_with_mapsize(
1684                tmp.path(),
1685                Some(128 * 1024 * 1024),
1686            )?
1687        };
1688        let keys = Keys::generate();
1689        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1690
1691        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1692            Arc::clone(&backend),
1693            0,
1694            HashSet::from([keys.public_key().to_hex()]),
1695        ));
1696
1697        let relay = NostrRelay::new(
1698            Arc::clone(&backend),
1699            tmp.path().to_path_buf(),
1700            HashSet::from([keys.public_key().to_hex()]),
1701            Some(access),
1702            NostrRelayConfig {
1703                spambox_db_max_bytes: 0,
1704                ..Default::default()
1705            },
1706        )?;
1707
1708        let (tx, mut rx) = mpsc::unbounded_channel();
1709        relay.register_client(5, tx, None).await;
1710
1711        let older = EventBuilder::new(Kind::Metadata, r#"{"name":"older"}"#, [])
1712            .custom_created_at(nostr::Timestamp::from_secs(5))
1713            .to_event(&keys)?;
1714        let newer = EventBuilder::new(Kind::Metadata, r#"{"name":"newer"}"#, [])
1715            .custom_created_at(nostr::Timestamp::from_secs(6))
1716            .to_event(&keys)?;
1717
1718        relay
1719            .handle_client_message(5, NostrClientMessage::event(older.clone()))
1720            .await;
1721        let _ = recv_relay_message(&mut rx).await?;
1722        relay
1723            .handle_client_message(5, NostrClientMessage::event(newer.clone()))
1724            .await;
1725        let _ = recv_relay_message(&mut rx).await?;
1726
1727        let sub_id = SubscriptionId::new("sub-count");
1728        let filters = vec![
1729            Filter::new()
1730                .author(keys.public_key())
1731                .kind(Kind::Metadata)
1732                .limit(1),
1733            Filter::new().id(older.id),
1734        ];
1735        relay
1736            .handle_client_message(5, NostrClientMessage::count(sub_id.clone(), filters))
1737            .await;
1738
1739        match recv_relay_message(&mut rx).await? {
1740            RelayMessage::Count {
1741                subscription_id,
1742                count,
1743            } => {
1744                assert_eq!(subscription_id, sub_id);
1745                assert_eq!(count, 2);
1746            }
1747            other => anyhow::bail!("expected COUNT, got {:?}", other),
1748        }
1749
1750        Ok(())
1751    }
1752
1753    #[tokio::test]
1754    async fn relay_count_caps_filter_limit_to_config_max() -> Result<()> {
1755        let tmp = TempDir::new()?;
1756        let graph_store = {
1757            let _guard = crate::socialgraph::test_lock();
1758            crate::socialgraph::open_social_graph_store_with_mapsize(
1759                tmp.path(),
1760                Some(128 * 1024 * 1024),
1761            )?
1762        };
1763        let keys = Keys::generate();
1764        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1765
1766        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1767            Arc::clone(&backend),
1768            0,
1769            HashSet::from([keys.public_key().to_hex()]),
1770        ));
1771
1772        let relay = NostrRelay::new(
1773            Arc::clone(&backend),
1774            tmp.path().to_path_buf(),
1775            HashSet::from([keys.public_key().to_hex()]),
1776            Some(access),
1777            NostrRelayConfig {
1778                spambox_db_max_bytes: 0,
1779                max_query_limit: 1,
1780                ..Default::default()
1781            },
1782        )?;
1783
1784        let (tx, mut rx) = mpsc::unbounded_channel();
1785        relay.register_client(7, tx, None).await;
1786
1787        let older = EventBuilder::new(Kind::TextNote, "older", [])
1788            .custom_created_at(nostr::Timestamp::from_secs(5))
1789            .to_event(&keys)?;
1790        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
1791            .custom_created_at(nostr::Timestamp::from_secs(6))
1792            .to_event(&keys)?;
1793
1794        relay
1795            .handle_client_message(7, NostrClientMessage::event(older))
1796            .await;
1797        let _ = recv_relay_message(&mut rx).await?;
1798        relay
1799            .handle_client_message(7, NostrClientMessage::event(newer))
1800            .await;
1801        let _ = recv_relay_message(&mut rx).await?;
1802
1803        relay
1804            .handle_client_message(
1805                7,
1806                NostrClientMessage::count(
1807                    SubscriptionId::new("sub-count-cap"),
1808                    vec![Filter::new()
1809                        .author(keys.public_key())
1810                        .kind(Kind::TextNote)
1811                        .limit(10)],
1812                ),
1813            )
1814            .await;
1815
1816        match recv_relay_message(&mut rx).await? {
1817            RelayMessage::Count { count, .. } => assert_eq!(count, 1),
1818            other => anyhow::bail!("expected COUNT, got {:?}", other),
1819        }
1820
1821        Ok(())
1822    }
1823
1824    #[tokio::test]
1825    async fn relay_register_subscription_query_caps_filter_limit_to_config_max() -> Result<()> {
1826        let tmp = TempDir::new()?;
1827        let graph_store = {
1828            let _guard = crate::socialgraph::test_lock();
1829            crate::socialgraph::open_social_graph_store_with_mapsize(
1830                tmp.path(),
1831                Some(128 * 1024 * 1024),
1832            )?
1833        };
1834        let keys = Keys::generate();
1835        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1836
1837        let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1838            Arc::clone(&backend),
1839            0,
1840            HashSet::from([keys.public_key().to_hex()]),
1841        ));
1842
1843        let relay = NostrRelay::new(
1844            Arc::clone(&backend),
1845            tmp.path().to_path_buf(),
1846            HashSet::from([keys.public_key().to_hex()]),
1847            Some(access),
1848            NostrRelayConfig {
1849                spambox_db_max_bytes: 0,
1850                max_query_limit: 1,
1851                ..Default::default()
1852            },
1853        )?;
1854
1855        let (tx, mut rx) = mpsc::unbounded_channel();
1856        relay.register_client(6, tx, None).await;
1857
1858        let older = EventBuilder::new(Kind::TextNote, "older", [])
1859            .custom_created_at(nostr::Timestamp::from_secs(5))
1860            .to_event(&keys)?;
1861        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
1862            .custom_created_at(nostr::Timestamp::from_secs(6))
1863            .to_event(&keys)?;
1864
1865        relay
1866            .handle_client_message(6, NostrClientMessage::event(older))
1867            .await;
1868        let _ = recv_relay_message(&mut rx).await?;
1869        relay
1870            .handle_client_message(6, NostrClientMessage::event(newer))
1871            .await;
1872        let _ = recv_relay_message(&mut rx).await?;
1873
1874        let events = relay
1875            .register_subscription_query(
1876                6,
1877                SubscriptionId::new("sub-limit"),
1878                vec![Filter::new()
1879                    .author(keys.public_key())
1880                    .kind(Kind::TextNote)
1881                    .limit(10)],
1882            )
1883            .await
1884            .map_err(anyhow::Error::msg)?;
1885
1886        assert_eq!(events.len(), 1);
1887
1888        Ok(())
1889    }
1890}