1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex};
11
12use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
13use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
14
15use crate::socialgraph;
16
17const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
18
19#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
20pub struct BluetoothReceivedEventRecord {
21 pub event_id: String,
22 pub pubkey: String,
23 pub kind: u32,
24 pub created_at: u64,
25 pub received_at: u64,
26 pub peer_id: Option<String>,
27 pub cid_values: Vec<String>,
28}
29
30#[derive(Debug, Clone)]
31pub struct NostrRelayConfig {
32 pub spambox_db_max_bytes: u64,
33 pub max_query_limit: usize,
34 pub max_subs_per_client: usize,
35 pub max_filters_per_sub: usize,
36 pub spambox_max_events_per_min: u32,
37 pub spambox_max_reqs_per_min: u32,
38}
39
40impl Default for NostrRelayConfig {
41 fn default() -> Self {
42 Self {
43 spambox_db_max_bytes: 1024 * 1024 * 1024,
44 max_query_limit: 200,
45 max_subs_per_client: 64,
46 max_filters_per_sub: 32,
47 spambox_max_events_per_min: 120,
48 spambox_max_reqs_per_min: 120,
49 }
50 }
51}
52
53mod imp {
54 use super::*;
55 use anyhow::Result;
56
57 use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
58 use hashtree_core::{nhash_decode, Cid};
59 use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
60 use tracing::warn;
61
62 fn prefers_trusted_only(filter: &NostrFilter) -> bool {
63 let Some(kinds) = filter.kinds.as_ref() else {
64 return false;
65 };
66 if kinds.len() != 1 {
67 return false;
68 }
69
70 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
71 let has_authors = filter
72 .authors
73 .as_ref()
74 .is_some_and(|authors| !authors.is_empty());
75 if !has_authors {
76 return false;
77 }
78
79 if is_replaceable_kind(kind) {
80 return true;
81 }
82
83 if is_parameterized_replaceable_kind(kind) {
84 let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
85 return filter
86 .generic_tags
87 .get(&d_tag)
88 .is_some_and(|values| !values.is_empty());
89 }
90
91 false
92 }
93
94 struct NostrStore {
95 store: Arc<dyn SocialGraphBackend>,
96 }
97
98 impl NostrStore {
99 fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
100 Self { store }
101 }
102
103 fn ingest(&self, event: &Event) -> Result<()> {
104 crate::socialgraph::ingest_parsed_event(self.store.as_ref(), event)
105 }
106
107 fn ingest_with_storage_class(
108 &self,
109 event: &Event,
110 storage_class: EventStorageClass,
111 ) -> Result<()> {
112 crate::socialgraph::ingest_parsed_event_with_storage_class(
113 self.store.as_ref(),
114 event,
115 storage_class,
116 )
117 }
118
119 fn query(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
120 crate::socialgraph::query_events(self.store.as_ref(), filter, limit)
121 }
122 }
123
124 #[derive(Debug, Clone)]
125 struct ClientQuota {
126 last_reset: Instant,
127 spambox_events: u32,
128 reqs: u32,
129 }
130
131 impl ClientQuota {
132 fn new() -> Self {
133 Self {
134 last_reset: Instant::now(),
135 spambox_events: 0,
136 reqs: 0,
137 }
138 }
139
140 fn reset_if_needed(&mut self) {
141 if self.last_reset.elapsed() >= Duration::from_secs(60) {
142 self.last_reset = Instant::now();
143 self.spambox_events = 0;
144 self.reqs = 0;
145 }
146 }
147
148 fn allow_spambox_event(&mut self, limit: u32) -> bool {
149 self.reset_if_needed();
150 if self.spambox_events >= limit {
151 return false;
152 }
153 self.spambox_events += 1;
154 true
155 }
156
157 fn allow_req(&mut self, limit: u32) -> bool {
158 self.reset_if_needed();
159 if self.reqs >= limit {
160 return false;
161 }
162 self.reqs += 1;
163 true
164 }
165 }
166
167 struct ClientState {
168 sender: mpsc::UnboundedSender<String>,
169 pubkey: Option<String>,
170 quota: ClientQuota,
171 }
172
173 struct RecentEvents {
174 order: VecDeque<EventId>,
175 events: HashMap<EventId, Event>,
176 max_len: usize,
177 }
178
179 impl RecentEvents {
180 fn new(max_len: usize) -> Self {
181 Self {
182 order: VecDeque::new(),
183 events: HashMap::new(),
184 max_len: max_len.max(128),
185 }
186 }
187
188 fn insert(&mut self, event: Event) {
189 if self.events.contains_key(&event.id) {
190 return;
191 }
192 self.order.push_back(event.id);
193 self.events.insert(event.id, event);
194 while self.order.len() > self.max_len {
195 if let Some(oldest) = self.order.pop_front() {
196 self.events.remove(&oldest);
197 }
198 }
199 }
200
201 fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
202 self.events
203 .values()
204 .filter(|event| filter.match_event(event))
205 .cloned()
206 .collect()
207 }
208 }
209
210 enum SpamboxStore {
211 Persistent(NostrStore),
212 Memory(MemorySpambox),
213 }
214
215 struct MemorySpambox {
216 events: Mutex<VecDeque<Event>>,
217 max_len: usize,
218 }
219
220 impl MemorySpambox {
221 fn new(max_len: usize) -> Self {
222 Self {
223 events: Mutex::new(VecDeque::new()),
224 max_len: max_len.max(128),
225 }
226 }
227
228 async fn ingest(&self, event: &Event) -> bool {
229 let mut events = self.events.lock().await;
230 events.push_back(event.clone());
231 while events.len() > self.max_len {
232 events.pop_front();
233 }
234 true
235 }
236 }
237
238 impl SpamboxStore {
239 async fn ingest(&self, event: &Event) -> bool {
240 match self {
241 SpamboxStore::Persistent(store) => store.ingest(event).is_ok(),
242 SpamboxStore::Memory(store) => store.ingest(event).await,
243 }
244 }
245 }
246
247 struct BluetoothEventLog {
248 path: PathBuf,
249 state: Mutex<BluetoothEventLogState>,
250 }
251
252 struct BluetoothEventLogState {
253 records: VecDeque<BluetoothReceivedEventRecord>,
254 event_ids: HashSet<String>,
255 }
256
257 impl BluetoothEventLog {
258 fn load(path: PathBuf) -> Self {
259 let records = std::fs::read_to_string(&path)
260 .ok()
261 .map(|serialized| {
262 serialized
263 .lines()
264 .filter_map(|line| {
265 serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
266 })
267 .collect::<Vec<_>>()
268 })
269 .unwrap_or_default();
270 let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
271 let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
272 for record in records.into_iter().skip(start) {
273 trimmed.push_back(record);
274 }
275 let event_ids = trimmed
276 .iter()
277 .map(|record| record.event_id.clone())
278 .collect::<HashSet<_>>();
279
280 Self {
281 path,
282 state: Mutex::new(BluetoothEventLogState {
283 records: trimmed,
284 event_ids,
285 }),
286 }
287 }
288
289 async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
290 let state = self.state.lock().await;
291 state
292 .records
293 .iter()
294 .rev()
295 .take(limit.max(1))
296 .cloned()
297 .collect()
298 }
299
300 async fn record(&self, event: &Event, peer_id: Option<String>) {
301 let record = BluetoothReceivedEventRecord {
302 event_id: event.id.to_hex(),
303 pubkey: event.pubkey.to_hex(),
304 kind: event.kind.as_u16() as u32,
305 created_at: event.created_at.as_u64(),
306 received_at: std::time::SystemTime::now()
307 .duration_since(std::time::UNIX_EPOCH)
308 .map(|value| value.as_secs())
309 .unwrap_or(0),
310 peer_id,
311 cid_values: cid_values_from_event(event),
312 };
313
314 let serialized = {
315 let mut state = self.state.lock().await;
316 if state.event_ids.contains(&record.event_id) {
317 return;
318 }
319
320 state.event_ids.insert(record.event_id.clone());
321 state.records.push_back(record);
322 while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
323 if let Some(removed) = state.records.pop_front() {
324 state.event_ids.remove(&removed.event_id);
325 }
326 }
327
328 state
329 .records
330 .iter()
331 .filter_map(|entry| serde_json::to_string(entry).ok())
332 .collect::<Vec<_>>()
333 .join("\n")
334 };
335
336 if let Some(parent) = self.path.parent() {
337 let _ = std::fs::create_dir_all(parent);
338 }
339 let _ = std::fs::write(&self.path, serialized);
340 }
341 }
342
343 fn looks_like_cid_reference(value: &str) -> bool {
344 Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
345 }
346
347 fn cid_values_from_event(event: &Event) -> Vec<String> {
348 let mut values = Vec::new();
349 let mut seen = HashSet::new();
350
351 for tag in &event.tags {
352 let fields = tag.clone().to_vec();
353 if fields.first().is_some_and(|name| name == "cid") {
354 if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
355 if seen.insert(value.clone()) {
356 values.push(value.clone());
357 }
358 }
359 continue;
360 }
361
362 for value in fields.into_iter().skip(1) {
363 if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
364 values.push(value);
365 }
366 }
367 }
368
369 values
370 }
371
372 pub struct NostrRelay {
373 config: NostrRelayConfig,
374 trusted: NostrStore,
375 public_pubkeys: HashSet<String>,
376 spambox: Option<SpamboxStore>,
377 social_graph: Option<Arc<SocialGraphAccessControl>>,
378 clients: Mutex<HashMap<u64, ClientState>>,
379 subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
380 recent_events: Mutex<RecentEvents>,
381 next_client_id: AtomicU64,
382 bluetooth_event_log: Arc<BluetoothEventLog>,
383 }
384
385 impl NostrRelay {
386 async fn collect_filter_events(
387 &self,
388 filter: &NostrFilter,
389 limit: usize,
390 seen: &mut HashSet<EventId>,
391 events: &mut Vec<Event>,
392 ) {
393 if limit == 0 {
394 return;
395 }
396
397 let mut added = 0usize;
398
399 if !prefers_trusted_only(filter) {
400 let recent = {
401 let cache = self.recent_events.lock().await;
402 cache.matching(filter)
403 };
404 for event in recent {
405 if seen.insert(event.id) {
406 events.push(event);
407 added += 1;
408 if added >= limit {
409 return;
410 }
411 }
412 }
413 }
414
415 for event in self.trusted.query(filter, limit) {
416 if seen.insert(event.id) {
417 events.push(event);
418 added += 1;
419 if added >= limit {
420 return;
421 }
422 }
423 }
424 }
425
426 async fn collect_filter_count(
427 &self,
428 filter: &NostrFilter,
429 limit: usize,
430 seen: &mut HashSet<EventId>,
431 ) {
432 if limit == 0 {
433 return;
434 }
435
436 let mut added = 0usize;
437
438 if !prefers_trusted_only(filter) {
439 let recent = {
440 let cache = self.recent_events.lock().await;
441 cache.matching(filter)
442 };
443 for event in recent {
444 if seen.insert(event.id) {
445 added += 1;
446 if added >= limit {
447 return;
448 }
449 }
450 }
451 }
452
453 for event in self.trusted.query(filter, limit) {
454 if seen.insert(event.id) {
455 added += 1;
456 if added >= limit {
457 return;
458 }
459 }
460 }
461 }
462
463 pub fn new(
464 trusted_store: Arc<dyn SocialGraphBackend>,
465 data_dir: PathBuf,
466 public_pubkeys: HashSet<String>,
467 social_graph: Option<Arc<SocialGraphAccessControl>>,
468 config: NostrRelayConfig,
469 ) -> Result<Self> {
470 let spambox = if config.spambox_db_max_bytes == 0 {
471 Some(SpamboxStore::Memory(MemorySpambox::new(
472 config.max_query_limit * 2,
473 )))
474 } else {
475 let spam_dir = data_dir.join("socialgraph_spambox");
476 match socialgraph::open_social_graph_store_at_path(
477 &spam_dir,
478 Some(config.spambox_db_max_bytes),
479 ) {
480 Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
481 Err(err) => {
482 warn!(
483 "Failed to open social graph spambox (falling back to memory): {}",
484 err
485 );
486 Some(SpamboxStore::Memory(MemorySpambox::new(
487 config.max_query_limit * 2,
488 )))
489 }
490 }
491 };
492
493 let recent_size = config.max_query_limit.saturating_mul(2);
494 let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
495 data_dir.join("bluetooth-events.jsonl"),
496 ));
497
498 Ok(Self {
499 config,
500 trusted: NostrStore::new(trusted_store),
501 public_pubkeys,
502 spambox,
503 social_graph,
504 clients: Mutex::new(HashMap::new()),
505 subscriptions: Mutex::new(HashMap::new()),
506 recent_events: Mutex::new(RecentEvents::new(recent_size)),
507 next_client_id: AtomicU64::new(1),
508 bluetooth_event_log,
509 })
510 }
511
512 pub fn next_client_id(&self) -> u64 {
513 self.next_client_id.fetch_add(1, Ordering::SeqCst)
514 }
515
516 pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
517 self.ingest_trusted_event_inner(event, true).await
518 }
519
520 pub async fn ingest_trusted_event_from_bluetooth(
521 &self,
522 event: Event,
523 peer_id: Option<String>,
524 ) -> Result<()> {
525 self.ingest_trusted_event_inner(event.clone(), true).await?;
526 self.bluetooth_event_log.record(&event, peer_id).await;
527 Ok(())
528 }
529
530 pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
531 self.ingest_trusted_event_inner(event, false).await
532 }
533
534 pub async fn bluetooth_received_events(
535 &self,
536 limit: usize,
537 ) -> Vec<BluetoothReceivedEventRecord> {
538 self.bluetooth_event_log.recent(limit).await
539 }
540
541 async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
542 event
543 .verify()
544 .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
545
546 let is_ephemeral = event.kind.is_ephemeral();
547 {
548 let mut recent = self.recent_events.lock().await;
549 recent.insert(event.clone());
550 }
551
552 if !is_ephemeral {
553 let storage_class = self.event_storage_class(&event);
554 self.trusted
555 .ingest_with_storage_class(&event, storage_class)?;
556 }
557
558 if broadcast {
559 self.broadcast_event(&event).await;
560 }
561 Ok(())
562 }
563
564 pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
565 let limit = limit.min(self.config.max_query_limit);
566 if limit == 0 {
567 return Vec::new();
568 }
569
570 let mut seen: HashSet<EventId> = HashSet::new();
571 let mut events = Vec::new();
572
573 if !prefers_trusted_only(filter) {
574 let recent = {
575 let cache = self.recent_events.lock().await;
576 cache.matching(filter)
577 };
578 for event in recent {
579 if seen.insert(event.id) {
580 events.push(event);
581 if events.len() >= limit {
582 return events;
583 }
584 }
585 }
586 }
587
588 for event in self.trusted.query(filter, limit) {
589 if seen.insert(event.id) {
590 events.push(event);
591 if events.len() >= limit {
592 break;
593 }
594 }
595 }
596
597 events
598 }
599
600 pub async fn register_client(
601 &self,
602 client_id: u64,
603 sender: mpsc::UnboundedSender<String>,
604 pubkey: Option<String>,
605 ) {
606 let mut clients = self.clients.lock().await;
607 clients.insert(
608 client_id,
609 ClientState {
610 sender,
611 pubkey,
612 quota: ClientQuota::new(),
613 },
614 );
615 }
616
617 pub async fn unregister_client(&self, client_id: u64) {
618 let mut clients = self.clients.lock().await;
619 clients.remove(&client_id);
620 drop(clients);
621 let mut subs = self.subscriptions.lock().await;
622 subs.remove(&client_id);
623 }
624
625 pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
626 match msg {
627 NostrClientMessage::Event(event) => {
628 self.handle_event(client_id, *event).await;
629 }
630 NostrClientMessage::Req {
631 subscription_id,
632 filters,
633 } => {
634 self.handle_req(client_id, subscription_id, filters).await;
635 }
636 NostrClientMessage::Count {
637 subscription_id,
638 filters,
639 } => {
640 self.handle_count(client_id, subscription_id, filters).await;
641 }
642 NostrClientMessage::Close(subscription_id) => {
643 self.handle_close(client_id, subscription_id).await;
644 }
645 NostrClientMessage::Auth(event) => {
646 self.handle_auth(client_id, *event).await;
647 }
648 NostrClientMessage::NegOpen { .. }
649 | NostrClientMessage::NegMsg { .. }
650 | NostrClientMessage::NegClose { .. } => {
651 self.send_to_client(
652 client_id,
653 NostrRelayMessage::notice("negentropy not supported"),
654 )
655 .await;
656 }
657 }
658 }
659
660 pub async fn register_subscription_query(
661 &self,
662 client_id: u64,
663 subscription_id: SubscriptionId,
664 mut filters: Vec<NostrFilter>,
665 ) -> std::result::Result<Vec<Event>, &'static str> {
666 if !self.allow_req(client_id).await {
667 return Err("rate limited");
668 }
669
670 if filters.len() > self.config.max_filters_per_sub {
671 filters.truncate(self.config.max_filters_per_sub);
672 }
673
674 {
675 let mut subs = self.subscriptions.lock().await;
676 let entry = subs.entry(client_id).or_default();
677 if !entry.contains_key(&subscription_id)
678 && entry.len() >= self.config.max_subs_per_client
679 {
680 return Err("too many subscriptions");
681 }
682 entry.insert(subscription_id, filters.clone());
683 }
684
685 let mut seen: HashSet<EventId> = HashSet::new();
686 let mut events = Vec::new();
687 for filter in &filters {
688 let limit = filter
689 .limit
690 .unwrap_or(self.config.max_query_limit)
691 .min(self.config.max_query_limit);
692 self.collect_filter_events(filter, limit, &mut seen, &mut events)
693 .await;
694 }
695
696 Ok(events)
697 }
698
699 async fn handle_auth(&self, client_id: u64, event: Event) {
700 let ok = event.verify().is_ok();
701 let message = if ok { "" } else { "invalid auth" };
702 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
703 .await;
704 }
705
706 async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
707 let mut subs = self.subscriptions.lock().await;
708 if let Some(map) = subs.get_mut(&client_id) {
709 map.remove(&subscription_id);
710 }
711 }
712
713 async fn handle_event(&self, client_id: u64, event: Event) {
714 let ok = event.verify().is_ok();
715 if !ok {
716 self.send_to_client(
717 client_id,
718 NostrRelayMessage::ok(event.id, false, "invalid: signature"),
719 )
720 .await;
721 return;
722 }
723
724 let trusted = self.is_trusted_event(client_id, &event).await;
725 if !trusted && !self.allow_spambox_event(client_id).await {
726 self.send_to_client(
727 client_id,
728 NostrRelayMessage::ok(event.id, false, "rate limited"),
729 )
730 .await;
731 return;
732 }
733
734 let is_ephemeral = event.kind.is_ephemeral();
735 if trusted {
736 let mut recent = self.recent_events.lock().await;
737 recent.insert(event.clone());
738 }
739 if !is_ephemeral {
740 let stored = if trusted {
741 let storage_class = self.event_storage_class(&event);
742 self.trusted
743 .ingest_with_storage_class(&event, storage_class)
744 .is_ok()
745 } else {
746 match self.spambox.as_ref() {
747 Some(spambox) => spambox.ingest(&event).await,
748 None => false,
749 }
750 };
751
752 if !stored {
753 let message = if trusted {
754 "store failed"
755 } else {
756 "spambox full"
757 };
758 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
759 .await;
760 return;
761 }
762 }
763
764 let message = if trusted { "" } else { "spambox" };
765 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
766 .await;
767
768 if trusted {
769 self.broadcast_event(&event).await;
770 }
771 }
772
773 async fn handle_req(
774 &self,
775 client_id: u64,
776 subscription_id: SubscriptionId,
777 filters: Vec<NostrFilter>,
778 ) {
779 match self
780 .register_subscription_query(client_id, subscription_id.clone(), filters)
781 .await
782 {
783 Ok(events) => {
784 for event in events {
785 self.send_to_client(
786 client_id,
787 NostrRelayMessage::event(subscription_id.clone(), event),
788 )
789 .await;
790 }
791
792 self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
793 .await;
794 }
795 Err(message) => {
796 self.send_to_client(
797 client_id,
798 NostrRelayMessage::closed(subscription_id, message),
799 )
800 .await;
801 }
802 }
803 }
804
805 async fn handle_count(
806 &self,
807 client_id: u64,
808 subscription_id: SubscriptionId,
809 filters: Vec<NostrFilter>,
810 ) {
811 if !self.allow_req(client_id).await {
812 self.send_to_client(
813 client_id,
814 NostrRelayMessage::closed(subscription_id, "rate limited"),
815 )
816 .await;
817 return;
818 }
819
820 let mut seen: HashSet<EventId> = HashSet::new();
821 for filter in &filters {
822 let limit = filter
823 .limit
824 .unwrap_or(self.config.max_query_limit)
825 .min(self.config.max_query_limit);
826 self.collect_filter_count(filter, limit, &mut seen).await;
827 }
828
829 self.send_to_client(
830 client_id,
831 NostrRelayMessage::count(subscription_id, seen.len()),
832 )
833 .await;
834 }
835
836 async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
837 let event_pubkey = event.pubkey.to_hex();
838 let client_pubkey = {
839 let clients = self.clients.lock().await;
840 clients
841 .get(&client_id)
842 .and_then(|state| state.pubkey.clone())
843 };
844 if let Some(pubkey) = client_pubkey {
845 return pubkey == event_pubkey
846 || self.social_graph.as_ref().is_some_and(|social_graph| {
847 social_graph.check_write_access(&event_pubkey)
848 });
849 }
850 if let Some(ref social_graph) = self.social_graph {
851 return social_graph.check_write_access(&event_pubkey);
852 }
853 true
854 }
855
856 fn event_storage_class(&self, event: &Event) -> EventStorageClass {
857 if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
858 EventStorageClass::Public
859 } else {
860 EventStorageClass::Ambient
861 }
862 }
863
864 async fn allow_spambox_event(&self, client_id: u64) -> bool {
865 let mut clients = self.clients.lock().await;
866 let Some(state) = clients.get_mut(&client_id) else {
867 return false;
868 };
869 state
870 .quota
871 .allow_spambox_event(self.config.spambox_max_events_per_min)
872 }
873
874 async fn allow_req(&self, client_id: u64) -> bool {
875 let mut clients = self.clients.lock().await;
876 let Some(state) = clients.get_mut(&client_id) else {
877 return false;
878 };
879 state.quota.allow_req(self.config.spambox_max_reqs_per_min)
880 }
881
882 async fn broadcast_event(&self, event: &Event) {
883 let subscriptions = self.subscriptions.lock().await;
884 let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
885 for (client_id, subs) in subscriptions.iter() {
886 for (sub_id, filters) in subs.iter() {
887 if filters.iter().any(|f| f.match_event(event)) {
888 deliveries.push((*client_id, sub_id.clone()));
889 }
890 }
891 }
892 drop(subscriptions);
893
894 for (client_id, sub_id) in deliveries {
895 self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
896 .await;
897 }
898 }
899
900 async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
901 let sender = {
902 let clients = self.clients.lock().await;
903 clients.get(&client_id).map(|state| state.sender.clone())
904 };
905 if let Some(tx) = sender {
906 let _ = tx.send(msg.as_json());
907 }
908 }
909 }
910}
911
912pub use imp::NostrRelay;
913
914#[cfg(test)]
915mod tests {
916 use super::*;
917 use anyhow::Result;
918 use nostr::{EventBuilder, Filter, JsonUtil, Keys, Kind, RelayMessage, SubscriptionId};
919 use std::collections::HashSet;
920 use tempfile::TempDir;
921 use tokio::time::{timeout, Duration};
922
923 async fn recv_relay_message(rx: &mut mpsc::UnboundedReceiver<String>) -> Result<RelayMessage> {
924 let msg = timeout(Duration::from_secs(1), rx.recv())
925 .await?
926 .ok_or_else(|| anyhow::anyhow!("channel closed"))?;
927 Ok(RelayMessage::from_json(msg)?)
928 }
929
930 #[tokio::test]
931 async fn relay_stores_and_serves_events() -> Result<()> {
932 let tmp = TempDir::new()?;
933 let graph_store = {
934 let _guard = crate::socialgraph::test_lock();
935 crate::socialgraph::open_social_graph_store_with_mapsize(
936 tmp.path(),
937 Some(128 * 1024 * 1024),
938 )?
939 };
940 let keys = Keys::generate();
941 let mut allowed = HashSet::new();
942 allowed.insert(keys.public_key().to_hex());
943 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
944
945 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
946 Arc::clone(&backend),
947 0,
948 allowed,
949 ));
950
951 let relay_config = NostrRelayConfig {
952 spambox_db_max_bytes: 0,
953 ..Default::default()
954 };
955 let relay = NostrRelay::new(
956 Arc::clone(&backend),
957 tmp.path().to_path_buf(),
958 HashSet::from([keys.public_key().to_hex()]),
959 Some(access),
960 relay_config,
961 )?;
962
963 let (tx, mut rx) = mpsc::unbounded_channel();
964 relay.register_client(1, tx, None).await;
965
966 let event = EventBuilder::new(Kind::TextNote, "hello", []).to_event(&keys)?;
967 relay
968 .handle_client_message(1, NostrClientMessage::event(event.clone()))
969 .await;
970
971 match recv_relay_message(&mut rx).await? {
972 RelayMessage::Ok { status, .. } => assert!(status),
973 other => anyhow::bail!("expected OK, got {:?}", other),
974 }
975
976 tokio::time::sleep(Duration::from_millis(50)).await;
977
978 let sub_id = SubscriptionId::new("sub-1");
979 let filter = Filter::new()
980 .authors(vec![event.pubkey])
981 .kinds(vec![event.kind]);
982 let mut got_event = false;
983 for _ in 0..3 {
984 relay
985 .handle_client_message(
986 1,
987 NostrClientMessage::req(sub_id.clone(), vec![filter.clone()]),
988 )
989 .await;
990
991 match recv_relay_message(&mut rx).await? {
992 RelayMessage::Event {
993 subscription_id,
994 event: ev,
995 } => {
996 assert_eq!(subscription_id, sub_id);
997 assert_eq!(ev.id, event.id);
998 got_event = true;
999 break;
1000 }
1001 RelayMessage::EndOfStoredEvents(id) => {
1002 assert_eq!(id, sub_id);
1003 tokio::time::sleep(Duration::from_millis(100)).await;
1004 }
1005 other => anyhow::bail!("expected EVENT/EOSE, got {:?}", other),
1006 }
1007 }
1008
1009 if !got_event {
1010 anyhow::bail!("event not available in time");
1011 }
1012
1013 match recv_relay_message(&mut rx).await? {
1014 RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1015 other => anyhow::bail!("expected EOSE, got {:?}", other),
1016 }
1017
1018 Ok(())
1019 }
1020
1021 #[tokio::test]
1022 async fn relay_persists_bluetooth_received_event_records() -> Result<()> {
1023 let tmp = TempDir::new()?;
1024 let graph_store = {
1025 let _guard = crate::socialgraph::test_lock();
1026 crate::socialgraph::open_social_graph_store_with_mapsize(
1027 tmp.path(),
1028 Some(128 * 1024 * 1024),
1029 )?
1030 };
1031 let keys = Keys::generate();
1032 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1033
1034 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1035 Arc::clone(&backend),
1036 0,
1037 HashSet::from([keys.public_key().to_hex()]),
1038 ));
1039
1040 let relay = NostrRelay::new(
1041 Arc::clone(&backend),
1042 tmp.path().to_path_buf(),
1043 HashSet::from([keys.public_key().to_hex()]),
1044 Some(access.clone()),
1045 NostrRelayConfig {
1046 spambox_db_max_bytes: 0,
1047 ..Default::default()
1048 },
1049 )?;
1050
1051 let cid = "cd".repeat(32);
1052 let event = EventBuilder::new(
1053 Kind::TextNote,
1054 "bluetooth receipt",
1055 [nostr::Tag::parse(&["cid", &cid]).unwrap()],
1056 )
1057 .to_event(&keys)?;
1058 relay
1059 .ingest_trusted_event_from_bluetooth(
1060 event.clone(),
1061 Some("peer-a".to_string()),
1062 )
1063 .await?;
1064
1065 let receipts = relay.bluetooth_received_events(10).await;
1066 assert_eq!(receipts.len(), 1);
1067 assert_eq!(receipts[0].event_id, event.id.to_hex());
1068 assert_eq!(receipts[0].cid_values, vec![cid.clone()]);
1069
1070 let reloaded = NostrRelay::new(
1071 Arc::clone(&backend),
1072 tmp.path().to_path_buf(),
1073 HashSet::from([keys.public_key().to_hex()]),
1074 Some(access),
1075 NostrRelayConfig {
1076 spambox_db_max_bytes: 0,
1077 ..Default::default()
1078 },
1079 )?;
1080 let reloaded_receipts = reloaded.bluetooth_received_events(10).await;
1081 assert_eq!(reloaded_receipts.len(), 1);
1082 assert_eq!(reloaded_receipts[0].event_id, event.id.to_hex());
1083 assert_eq!(reloaded_receipts[0].cid_values, vec![cid]);
1084
1085 Ok(())
1086 }
1087
1088 #[tokio::test]
1089 async fn relay_persists_nhash_bluetooth_received_event_records() -> Result<()> {
1090 let tmp = TempDir::new()?;
1091 let graph_store = {
1092 let _guard = crate::socialgraph::test_lock();
1093 crate::socialgraph::open_social_graph_store_with_mapsize(
1094 tmp.path(),
1095 Some(128 * 1024 * 1024),
1096 )?
1097 };
1098 let keys = Keys::generate();
1099 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1100
1101 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1102 Arc::clone(&backend),
1103 0,
1104 HashSet::from([keys.public_key().to_hex()]),
1105 ));
1106
1107 let relay = NostrRelay::new(
1108 Arc::clone(&backend),
1109 tmp.path().to_path_buf(),
1110 HashSet::from([keys.public_key().to_hex()]),
1111 Some(access.clone()),
1112 NostrRelayConfig {
1113 spambox_db_max_bytes: 0,
1114 ..Default::default()
1115 },
1116 )?;
1117
1118 let nhash = hashtree_core::nhash_encode(&[0xef; 32])?;
1119 let event = EventBuilder::new(
1120 Kind::TextNote,
1121 "bluetooth nhash receipt",
1122 [nostr::Tag::parse(&["cid", &nhash]).unwrap()],
1123 )
1124 .to_event(&keys)?;
1125 relay
1126 .ingest_trusted_event_from_bluetooth(
1127 event.clone(),
1128 Some("peer-a".to_string()),
1129 )
1130 .await?;
1131
1132 let receipts = relay.bluetooth_received_events(10).await;
1133 assert_eq!(receipts.len(), 1);
1134 assert_eq!(receipts[0].event_id, event.id.to_hex());
1135 assert_eq!(receipts[0].cid_values, vec![nhash.clone()]);
1136
1137 let reloaded = NostrRelay::new(
1138 Arc::clone(&backend),
1139 tmp.path().to_path_buf(),
1140 HashSet::from([keys.public_key().to_hex()]),
1141 Some(access),
1142 NostrRelayConfig {
1143 spambox_db_max_bytes: 0,
1144 ..Default::default()
1145 },
1146 )?;
1147 let reloaded_receipts = reloaded.bluetooth_received_events(10).await;
1148 assert_eq!(reloaded_receipts.len(), 1);
1149 assert_eq!(reloaded_receipts[0].event_id, event.id.to_hex());
1150 assert_eq!(reloaded_receipts[0].cid_values, vec![nhash]);
1151
1152 Ok(())
1153 }
1154
1155 #[tokio::test]
1156 async fn relay_caps_bluetooth_received_event_records_to_last_100() -> Result<()> {
1157 let tmp = TempDir::new()?;
1158 let graph_store = {
1159 let _guard = crate::socialgraph::test_lock();
1160 crate::socialgraph::open_social_graph_store_with_mapsize(
1161 tmp.path(),
1162 Some(128 * 1024 * 1024),
1163 )?
1164 };
1165 let keys = Keys::generate();
1166 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1167
1168 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1169 Arc::clone(&backend),
1170 0,
1171 HashSet::from([keys.public_key().to_hex()]),
1172 ));
1173
1174 let relay = NostrRelay::new(
1175 Arc::clone(&backend),
1176 tmp.path().to_path_buf(),
1177 HashSet::from([keys.public_key().to_hex()]),
1178 Some(access),
1179 NostrRelayConfig {
1180 spambox_db_max_bytes: 0,
1181 ..Default::default()
1182 },
1183 )?;
1184
1185 let mut event_ids = Vec::new();
1186 for index in 0..(BLUETOOTH_EVENT_LOG_CAPACITY + 5) {
1187 let event = EventBuilder::new(
1188 Kind::TextNote,
1189 format!("bluetooth receipt {index}"),
1190 [nostr::Tag::parse(&["cid", &format!("{index:064x}")]).unwrap()],
1191 )
1192 .to_event(&keys)?;
1193 event_ids.push(event.id.to_hex());
1194 relay
1195 .ingest_trusted_event_from_bluetooth(event, Some("peer-a".to_string()))
1196 .await?;
1197 }
1198
1199 let receipts = relay
1200 .bluetooth_received_events(BLUETOOTH_EVENT_LOG_CAPACITY + 10)
1201 .await;
1202 assert_eq!(receipts.len(), BLUETOOTH_EVENT_LOG_CAPACITY);
1203 assert_eq!(receipts[0].event_id, event_ids.last().cloned().unwrap());
1204 assert!(
1205 receipts
1206 .iter()
1207 .all(|receipt| !event_ids[..5].contains(&receipt.event_id)),
1208 "oldest receipts should be trimmed from the capped log"
1209 );
1210 assert_eq!(
1211 receipts.last().map(|receipt| receipt.event_id.clone()),
1212 Some(event_ids[5].clone())
1213 );
1214
1215 Ok(())
1216 }
1217
1218 #[tokio::test]
1219 async fn relay_serves_all_events_for_since_zero_catch_all_filter() -> Result<()> {
1220 let tmp = TempDir::new()?;
1221 let graph_store = {
1222 let _guard = crate::socialgraph::test_lock();
1223 crate::socialgraph::open_social_graph_store_with_mapsize(
1224 tmp.path(),
1225 Some(128 * 1024 * 1024),
1226 )?
1227 };
1228 let keys = Keys::generate();
1229 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1230
1231 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1232 Arc::clone(&backend),
1233 0,
1234 HashSet::from([keys.public_key().to_hex()]),
1235 ));
1236
1237 let relay = NostrRelay::new(
1238 Arc::clone(&backend),
1239 tmp.path().to_path_buf(),
1240 HashSet::from([keys.public_key().to_hex()]),
1241 Some(access),
1242 NostrRelayConfig {
1243 spambox_db_max_bytes: 0,
1244 max_query_limit: 10,
1245 ..Default::default()
1246 },
1247 )?;
1248
1249 let (tx, mut rx) = mpsc::unbounded_channel();
1250 relay.register_client(8, tx, None).await;
1251
1252 let first = EventBuilder::new(Kind::TextNote, "first", [])
1253 .custom_created_at(nostr::Timestamp::from_secs(5))
1254 .to_event(&keys)?;
1255 let second = EventBuilder::new(Kind::TextNote, "second", [])
1256 .custom_created_at(nostr::Timestamp::from_secs(6))
1257 .to_event(&keys)?;
1258 let third = EventBuilder::new(Kind::TextNote, "third", [])
1259 .custom_created_at(nostr::Timestamp::from_secs(7))
1260 .to_event(&keys)?;
1261
1262 for event in [&first, &second, &third] {
1263 relay
1264 .handle_client_message(8, NostrClientMessage::event(event.clone()))
1265 .await;
1266 match recv_relay_message(&mut rx).await? {
1267 RelayMessage::Ok { status, .. } => assert!(status),
1268 other => anyhow::bail!("expected OK, got {:?}", other),
1269 }
1270 }
1271
1272 tokio::time::sleep(Duration::from_millis(50)).await;
1273
1274 relay
1275 .handle_client_message(
1276 8,
1277 NostrClientMessage::from_json(r#"["REQ","sub-all",{"since":0}]"#)?,
1278 )
1279 .await;
1280
1281 let mut received = HashSet::new();
1282 loop {
1283 match recv_relay_message(&mut rx).await? {
1284 RelayMessage::Event {
1285 subscription_id,
1286 event,
1287 } => {
1288 assert_eq!(subscription_id, SubscriptionId::new("sub-all"));
1289 received.insert(event.id);
1290 }
1291 RelayMessage::EndOfStoredEvents(id) => {
1292 assert_eq!(id, SubscriptionId::new("sub-all"));
1293 break;
1294 }
1295 other => anyhow::bail!("expected EVENT/EOSE, got {:?}", other),
1296 }
1297 }
1298
1299 assert_eq!(received.len(), 3);
1300 assert_eq!(received, HashSet::from([first.id, second.id, third.id]));
1301
1302 Ok(())
1303 }
1304
1305 #[tokio::test]
1306 async fn relay_spambox_does_not_serve_untrusted_events() -> Result<()> {
1307 let tmp = TempDir::new()?;
1308 let graph_store = {
1309 let _guard = crate::socialgraph::test_lock();
1310 crate::socialgraph::open_social_graph_store_with_mapsize(
1311 tmp.path(),
1312 Some(128 * 1024 * 1024),
1313 )?
1314 };
1315
1316 crate::socialgraph::set_social_graph_root(&graph_store, &[1u8; 32]);
1317 std::thread::sleep(std::time::Duration::from_millis(100));
1318 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1319
1320 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1321 Arc::clone(&backend),
1322 0,
1323 HashSet::new(),
1324 ));
1325
1326 let relay_config = NostrRelayConfig {
1327 spambox_db_max_bytes: 0,
1328 ..Default::default()
1329 };
1330 let relay = NostrRelay::new(
1331 Arc::clone(&backend),
1332 tmp.path().to_path_buf(),
1333 HashSet::new(),
1334 Some(access),
1335 relay_config,
1336 )?;
1337
1338 let (tx, mut rx) = mpsc::unbounded_channel();
1339 relay.register_client(2, tx, None).await;
1340
1341 let keys = Keys::generate();
1342 let event = EventBuilder::new(Kind::TextNote, "spam", []).to_event(&keys)?;
1343 relay
1344 .handle_client_message(2, NostrClientMessage::event(event.clone()))
1345 .await;
1346
1347 match recv_relay_message(&mut rx).await? {
1348 RelayMessage::Ok { status, .. } => assert!(status),
1349 other => anyhow::bail!("expected OK, got {:?}", other),
1350 }
1351
1352 tokio::time::sleep(Duration::from_millis(50)).await;
1353
1354 let sub_id = SubscriptionId::new("sub-2");
1355 let filter = Filter::new()
1356 .authors(vec![event.pubkey])
1357 .kinds(vec![event.kind]);
1358 relay
1359 .handle_client_message(2, NostrClientMessage::req(sub_id.clone(), vec![filter]))
1360 .await;
1361
1362 match recv_relay_message(&mut rx).await? {
1363 RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1364 other => anyhow::bail!("expected EOSE only, got {:?}", other),
1365 }
1366
1367 Ok(())
1368 }
1369
1370 #[tokio::test]
1371 async fn relay_trusts_authenticated_client_for_its_own_events() -> Result<()> {
1372 let tmp = TempDir::new()?;
1373 let graph_store = {
1374 let _guard = crate::socialgraph::test_lock();
1375 crate::socialgraph::open_social_graph_store_with_mapsize(
1376 tmp.path(),
1377 Some(128 * 1024 * 1024),
1378 )?
1379 };
1380
1381 crate::socialgraph::set_social_graph_root(&graph_store, &[1u8; 32]);
1382 std::thread::sleep(std::time::Duration::from_millis(100));
1383 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1384
1385 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1386 Arc::clone(&backend),
1387 0,
1388 HashSet::new(),
1389 ));
1390
1391 let relay = NostrRelay::new(
1392 Arc::clone(&backend),
1393 tmp.path().to_path_buf(),
1394 HashSet::new(),
1395 Some(access),
1396 NostrRelayConfig {
1397 spambox_db_max_bytes: 0,
1398 ..Default::default()
1399 },
1400 )?;
1401
1402 let (tx, mut rx) = mpsc::unbounded_channel();
1403 let keys = Keys::generate();
1404 relay
1405 .register_client(9, tx, Some(keys.public_key().to_hex()))
1406 .await;
1407
1408 let event = EventBuilder::new(Kind::TextNote, "self-authored", []).to_event(&keys)?;
1409 relay
1410 .handle_client_message(9, NostrClientMessage::event(event.clone()))
1411 .await;
1412
1413 match recv_relay_message(&mut rx).await? {
1414 RelayMessage::Ok {
1415 status, message, ..
1416 } => {
1417 assert!(status);
1418 assert_eq!(message, "");
1419 }
1420 other => anyhow::bail!("expected OK, got {:?}", other),
1421 }
1422
1423 tokio::time::sleep(Duration::from_millis(50)).await;
1424
1425 let sub_id = SubscriptionId::new("sub-auth");
1426 let filter = Filter::new()
1427 .authors(vec![event.pubkey])
1428 .kinds(vec![event.kind]);
1429 relay
1430 .handle_client_message(9, NostrClientMessage::req(sub_id.clone(), vec![filter]))
1431 .await;
1432
1433 match recv_relay_message(&mut rx).await? {
1434 RelayMessage::Event {
1435 subscription_id,
1436 event: stored,
1437 } => {
1438 assert_eq!(subscription_id, sub_id);
1439 assert_eq!(stored.id, event.id);
1440 }
1441 other => anyhow::bail!("expected EVENT, got {:?}", other),
1442 }
1443
1444 match recv_relay_message(&mut rx).await? {
1445 RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1446 other => anyhow::bail!("expected EOSE, got {:?}", other),
1447 }
1448
1449 Ok(())
1450 }
1451
1452 #[tokio::test]
1453 async fn relay_routes_non_authored_trusted_events_to_ambient_index() -> Result<()> {
1454 let tmp = TempDir::new()?;
1455 let graph_store = {
1456 let _guard = crate::socialgraph::test_lock();
1457 crate::socialgraph::open_social_graph_store_with_mapsize(
1458 tmp.path(),
1459 Some(128 * 1024 * 1024),
1460 )?
1461 };
1462 let authored_keys = Keys::generate();
1463 let remote_keys = Keys::generate();
1464 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1465
1466 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1467 Arc::clone(&backend),
1468 0,
1469 HashSet::from([authored_keys.public_key().to_hex()]),
1470 ));
1471
1472 let relay = NostrRelay::new(
1473 Arc::clone(&backend),
1474 tmp.path().to_path_buf(),
1475 HashSet::from([authored_keys.public_key().to_hex()]),
1476 Some(access),
1477 NostrRelayConfig {
1478 spambox_db_max_bytes: 0,
1479 ..Default::default()
1480 },
1481 )?;
1482
1483 let ambient_event = EventBuilder::new(Kind::TextNote, "ambient", [])
1484 .custom_created_at(nostr::Timestamp::from_secs(5))
1485 .to_event(&remote_keys)?;
1486 relay.ingest_trusted_event(ambient_event.clone()).await?;
1487
1488 let filter = Filter::new()
1489 .author(remote_keys.public_key())
1490 .kind(Kind::TextNote);
1491 let ambient_only = graph_store
1492 .query_events_in_scope(
1493 &filter,
1494 10,
1495 crate::socialgraph::EventQueryScope::AmbientOnly,
1496 )
1497 .unwrap();
1498 assert_eq!(ambient_only.len(), 1);
1499 assert_eq!(ambient_only[0].id, ambient_event.id);
1500
1501 let public_only = graph_store
1502 .query_events_in_scope(&filter, 10, crate::socialgraph::EventQueryScope::PublicOnly)
1503 .unwrap();
1504 assert!(public_only.is_empty());
1505
1506 Ok(())
1507 }
1508
1509 #[tokio::test]
1510 async fn relay_serves_parameterized_replaceable_queries() -> Result<()> {
1511 let tmp = TempDir::new()?;
1512 let graph_store = {
1513 let _guard = crate::socialgraph::test_lock();
1514 crate::socialgraph::open_social_graph_store_with_mapsize(
1515 tmp.path(),
1516 Some(128 * 1024 * 1024),
1517 )?
1518 };
1519 let keys = Keys::generate();
1520 let mut allowed = HashSet::new();
1521 allowed.insert(keys.public_key().to_hex());
1522 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1523
1524 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1525 Arc::clone(&backend),
1526 0,
1527 allowed,
1528 ));
1529
1530 let relay = NostrRelay::new(
1531 Arc::clone(&backend),
1532 tmp.path().to_path_buf(),
1533 HashSet::from([keys.public_key().to_hex()]),
1534 Some(access),
1535 NostrRelayConfig {
1536 spambox_db_max_bytes: 0,
1537 ..Default::default()
1538 },
1539 )?;
1540
1541 let (tx, mut rx) = mpsc::unbounded_channel();
1542 relay.register_client(3, tx, None).await;
1543
1544 let older = EventBuilder::new(
1545 Kind::Custom(30078),
1546 "",
1547 vec![
1548 nostr::Tag::identifier("video"),
1549 nostr::Tag::parse(&["l", "hashtree"])?,
1550 nostr::Tag::parse(&["hash", &"11".repeat(32)])?,
1551 ],
1552 )
1553 .custom_created_at(nostr::Timestamp::from_secs(5))
1554 .to_event(&keys)?;
1555 let newer = EventBuilder::new(
1556 Kind::Custom(30078),
1557 "",
1558 vec![
1559 nostr::Tag::identifier("video"),
1560 nostr::Tag::parse(&["l", "hashtree"])?,
1561 nostr::Tag::parse(&["hash", &"22".repeat(32)])?,
1562 ],
1563 )
1564 .custom_created_at(nostr::Timestamp::from_secs(6))
1565 .to_event(&keys)?;
1566
1567 relay
1568 .handle_client_message(3, NostrClientMessage::event(older.clone()))
1569 .await;
1570 let _ = recv_relay_message(&mut rx).await?;
1571 relay
1572 .handle_client_message(3, NostrClientMessage::event(newer.clone()))
1573 .await;
1574 let _ = recv_relay_message(&mut rx).await?;
1575
1576 let sub_id = SubscriptionId::new("sub-d");
1577 let filter = Filter::new()
1578 .author(keys.public_key())
1579 .kind(Kind::Custom(30078))
1580 .identifier("video");
1581 relay
1582 .handle_client_message(3, NostrClientMessage::req(sub_id.clone(), vec![filter]))
1583 .await;
1584
1585 match recv_relay_message(&mut rx).await? {
1586 RelayMessage::Event {
1587 subscription_id,
1588 event,
1589 } => {
1590 assert_eq!(subscription_id, sub_id);
1591 assert_eq!(event.id, newer.id);
1592 }
1593 other => anyhow::bail!("expected EVENT, got {:?}", other),
1594 }
1595
1596 match recv_relay_message(&mut rx).await? {
1597 RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1598 other => anyhow::bail!("expected EOSE, got {:?}", other),
1599 }
1600
1601 Ok(())
1602 }
1603
1604 #[tokio::test]
1605 async fn relay_serves_replaceable_queries() -> Result<()> {
1606 let tmp = TempDir::new()?;
1607 let graph_store = {
1608 let _guard = crate::socialgraph::test_lock();
1609 crate::socialgraph::open_social_graph_store_with_mapsize(
1610 tmp.path(),
1611 Some(128 * 1024 * 1024),
1612 )?
1613 };
1614 let keys = Keys::generate();
1615 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1616
1617 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1618 Arc::clone(&backend),
1619 0,
1620 HashSet::from([keys.public_key().to_hex()]),
1621 ));
1622
1623 let relay = NostrRelay::new(
1624 Arc::clone(&backend),
1625 tmp.path().to_path_buf(),
1626 HashSet::from([keys.public_key().to_hex()]),
1627 Some(access),
1628 NostrRelayConfig {
1629 spambox_db_max_bytes: 0,
1630 ..Default::default()
1631 },
1632 )?;
1633
1634 let (tx, mut rx) = mpsc::unbounded_channel();
1635 relay.register_client(4, tx, None).await;
1636
1637 let older = EventBuilder::new(Kind::Metadata, r#"{"name":"older"}"#, [])
1638 .custom_created_at(nostr::Timestamp::from_secs(5))
1639 .to_event(&keys)?;
1640 let newer = EventBuilder::new(Kind::Metadata, r#"{"name":"newer"}"#, [])
1641 .custom_created_at(nostr::Timestamp::from_secs(6))
1642 .to_event(&keys)?;
1643
1644 relay
1645 .handle_client_message(4, NostrClientMessage::event(older.clone()))
1646 .await;
1647 let _ = recv_relay_message(&mut rx).await?;
1648 relay
1649 .handle_client_message(4, NostrClientMessage::event(newer.clone()))
1650 .await;
1651 let _ = recv_relay_message(&mut rx).await?;
1652
1653 let sub_id = SubscriptionId::new("sub-profile");
1654 let filter = Filter::new().author(keys.public_key()).kind(Kind::Metadata);
1655 relay
1656 .handle_client_message(4, NostrClientMessage::req(sub_id.clone(), vec![filter]))
1657 .await;
1658
1659 match recv_relay_message(&mut rx).await? {
1660 RelayMessage::Event {
1661 subscription_id,
1662 event,
1663 } => {
1664 assert_eq!(subscription_id, sub_id);
1665 assert_eq!(event.id, newer.id);
1666 }
1667 other => anyhow::bail!("expected EVENT, got {:?}", other),
1668 }
1669
1670 match recv_relay_message(&mut rx).await? {
1671 RelayMessage::EndOfStoredEvents(id) => assert_eq!(id, sub_id),
1672 other => anyhow::bail!("expected EOSE, got {:?}", other),
1673 }
1674
1675 Ok(())
1676 }
1677
1678 #[tokio::test]
1679 async fn relay_count_dedupes_across_filters_and_honors_filter_limits() -> Result<()> {
1680 let tmp = TempDir::new()?;
1681 let graph_store = {
1682 let _guard = crate::socialgraph::test_lock();
1683 crate::socialgraph::open_social_graph_store_with_mapsize(
1684 tmp.path(),
1685 Some(128 * 1024 * 1024),
1686 )?
1687 };
1688 let keys = Keys::generate();
1689 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1690
1691 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1692 Arc::clone(&backend),
1693 0,
1694 HashSet::from([keys.public_key().to_hex()]),
1695 ));
1696
1697 let relay = NostrRelay::new(
1698 Arc::clone(&backend),
1699 tmp.path().to_path_buf(),
1700 HashSet::from([keys.public_key().to_hex()]),
1701 Some(access),
1702 NostrRelayConfig {
1703 spambox_db_max_bytes: 0,
1704 ..Default::default()
1705 },
1706 )?;
1707
1708 let (tx, mut rx) = mpsc::unbounded_channel();
1709 relay.register_client(5, tx, None).await;
1710
1711 let older = EventBuilder::new(Kind::Metadata, r#"{"name":"older"}"#, [])
1712 .custom_created_at(nostr::Timestamp::from_secs(5))
1713 .to_event(&keys)?;
1714 let newer = EventBuilder::new(Kind::Metadata, r#"{"name":"newer"}"#, [])
1715 .custom_created_at(nostr::Timestamp::from_secs(6))
1716 .to_event(&keys)?;
1717
1718 relay
1719 .handle_client_message(5, NostrClientMessage::event(older.clone()))
1720 .await;
1721 let _ = recv_relay_message(&mut rx).await?;
1722 relay
1723 .handle_client_message(5, NostrClientMessage::event(newer.clone()))
1724 .await;
1725 let _ = recv_relay_message(&mut rx).await?;
1726
1727 let sub_id = SubscriptionId::new("sub-count");
1728 let filters = vec![
1729 Filter::new()
1730 .author(keys.public_key())
1731 .kind(Kind::Metadata)
1732 .limit(1),
1733 Filter::new().id(older.id),
1734 ];
1735 relay
1736 .handle_client_message(5, NostrClientMessage::count(sub_id.clone(), filters))
1737 .await;
1738
1739 match recv_relay_message(&mut rx).await? {
1740 RelayMessage::Count {
1741 subscription_id,
1742 count,
1743 } => {
1744 assert_eq!(subscription_id, sub_id);
1745 assert_eq!(count, 2);
1746 }
1747 other => anyhow::bail!("expected COUNT, got {:?}", other),
1748 }
1749
1750 Ok(())
1751 }
1752
1753 #[tokio::test]
1754 async fn relay_count_caps_filter_limit_to_config_max() -> Result<()> {
1755 let tmp = TempDir::new()?;
1756 let graph_store = {
1757 let _guard = crate::socialgraph::test_lock();
1758 crate::socialgraph::open_social_graph_store_with_mapsize(
1759 tmp.path(),
1760 Some(128 * 1024 * 1024),
1761 )?
1762 };
1763 let keys = Keys::generate();
1764 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1765
1766 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1767 Arc::clone(&backend),
1768 0,
1769 HashSet::from([keys.public_key().to_hex()]),
1770 ));
1771
1772 let relay = NostrRelay::new(
1773 Arc::clone(&backend),
1774 tmp.path().to_path_buf(),
1775 HashSet::from([keys.public_key().to_hex()]),
1776 Some(access),
1777 NostrRelayConfig {
1778 spambox_db_max_bytes: 0,
1779 max_query_limit: 1,
1780 ..Default::default()
1781 },
1782 )?;
1783
1784 let (tx, mut rx) = mpsc::unbounded_channel();
1785 relay.register_client(7, tx, None).await;
1786
1787 let older = EventBuilder::new(Kind::TextNote, "older", [])
1788 .custom_created_at(nostr::Timestamp::from_secs(5))
1789 .to_event(&keys)?;
1790 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
1791 .custom_created_at(nostr::Timestamp::from_secs(6))
1792 .to_event(&keys)?;
1793
1794 relay
1795 .handle_client_message(7, NostrClientMessage::event(older))
1796 .await;
1797 let _ = recv_relay_message(&mut rx).await?;
1798 relay
1799 .handle_client_message(7, NostrClientMessage::event(newer))
1800 .await;
1801 let _ = recv_relay_message(&mut rx).await?;
1802
1803 relay
1804 .handle_client_message(
1805 7,
1806 NostrClientMessage::count(
1807 SubscriptionId::new("sub-count-cap"),
1808 vec![Filter::new()
1809 .author(keys.public_key())
1810 .kind(Kind::TextNote)
1811 .limit(10)],
1812 ),
1813 )
1814 .await;
1815
1816 match recv_relay_message(&mut rx).await? {
1817 RelayMessage::Count { count, .. } => assert_eq!(count, 1),
1818 other => anyhow::bail!("expected COUNT, got {:?}", other),
1819 }
1820
1821 Ok(())
1822 }
1823
1824 #[tokio::test]
1825 async fn relay_register_subscription_query_caps_filter_limit_to_config_max() -> Result<()> {
1826 let tmp = TempDir::new()?;
1827 let graph_store = {
1828 let _guard = crate::socialgraph::test_lock();
1829 crate::socialgraph::open_social_graph_store_with_mapsize(
1830 tmp.path(),
1831 Some(128 * 1024 * 1024),
1832 )?
1833 };
1834 let keys = Keys::generate();
1835 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1836
1837 let access = Arc::new(crate::socialgraph::SocialGraphAccessControl::new(
1838 Arc::clone(&backend),
1839 0,
1840 HashSet::from([keys.public_key().to_hex()]),
1841 ));
1842
1843 let relay = NostrRelay::new(
1844 Arc::clone(&backend),
1845 tmp.path().to_path_buf(),
1846 HashSet::from([keys.public_key().to_hex()]),
1847 Some(access),
1848 NostrRelayConfig {
1849 spambox_db_max_bytes: 0,
1850 max_query_limit: 1,
1851 ..Default::default()
1852 },
1853 )?;
1854
1855 let (tx, mut rx) = mpsc::unbounded_channel();
1856 relay.register_client(6, tx, None).await;
1857
1858 let older = EventBuilder::new(Kind::TextNote, "older", [])
1859 .custom_created_at(nostr::Timestamp::from_secs(5))
1860 .to_event(&keys)?;
1861 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
1862 .custom_created_at(nostr::Timestamp::from_secs(6))
1863 .to_event(&keys)?;
1864
1865 relay
1866 .handle_client_message(6, NostrClientMessage::event(older))
1867 .await;
1868 let _ = recv_relay_message(&mut rx).await?;
1869 relay
1870 .handle_client_message(6, NostrClientMessage::event(newer))
1871 .await;
1872 let _ = recv_relay_message(&mut rx).await?;
1873
1874 let events = relay
1875 .register_subscription_query(
1876 6,
1877 SubscriptionId::new("sub-limit"),
1878 vec![Filter::new()
1879 .author(keys.public_key())
1880 .kind(Kind::TextNote)
1881 .limit(10)],
1882 )
1883 .await
1884 .map_err(anyhow::Error::msg)?;
1885
1886 assert_eq!(events.len(), 1);
1887
1888 Ok(())
1889 }
1890}