1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex, Semaphore};
11
12#[cfg(feature = "p2p")]
13use hashtree_network::{MeshEventStore, MeshRelayClient};
14use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
15use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
16
17use crate::socialgraph;
18
19const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
20const MAX_CONCURRENT_NOSTR_STORE_BLOCKING_TASKS: usize = 4;
21
22#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
23pub struct BluetoothReceivedEventRecord {
24 pub event_id: String,
25 pub pubkey: String,
26 pub kind: u32,
27 pub created_at: u64,
28 pub received_at: u64,
29 pub peer_id: Option<String>,
30 pub cid_values: Vec<String>,
31}
32
33#[derive(Debug, Clone)]
34pub struct NostrRelayConfig {
35 pub spambox_db_max_bytes: u64,
36 pub max_query_limit: usize,
37 pub max_subs_per_client: usize,
38 pub max_filters_per_sub: usize,
39 pub spambox_max_events_per_min: u32,
40 pub spambox_max_reqs_per_min: u32,
41}
42
43impl Default for NostrRelayConfig {
44 fn default() -> Self {
45 Self {
46 spambox_db_max_bytes: 1024 * 1024 * 1024,
47 max_query_limit: 200,
48 max_subs_per_client: 64,
49 max_filters_per_sub: 32,
50 spambox_max_events_per_min: 120,
51 spambox_max_reqs_per_min: 120,
52 }
53 }
54}
55
56mod imp {
57 use super::*;
58 use anyhow::Result;
59
60 use crate::diagnostics::{
61 nostr_filter_summary, nostr_filters_summary, process_memory_snapshot,
62 trim_process_allocations,
63 };
64 use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
65 use hashtree_core::{nhash_decode, Cid};
66 use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
67 use tracing::{info, warn};
68
69 fn prefers_trusted_only(filter: &NostrFilter) -> bool {
70 let Some(kinds) = filter.kinds.as_ref() else {
71 return false;
72 };
73 if kinds.len() != 1 {
74 return false;
75 }
76
77 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
78 let has_authors = filter
79 .authors
80 .as_ref()
81 .is_some_and(|authors| !authors.is_empty());
82 if !has_authors {
83 return false;
84 }
85
86 if is_replaceable_kind(kind) {
87 return true;
88 }
89
90 if is_parameterized_replaceable_kind(kind) {
91 let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
92 return filter
93 .generic_tags
94 .get(&d_tag)
95 .is_some_and(|values| !values.is_empty());
96 }
97
98 false
99 }
100
101 struct NostrStore {
102 store: Arc<dyn SocialGraphBackend>,
103 blocking_permits: Arc<Semaphore>,
104 }
105
106 impl NostrStore {
107 fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
108 Self {
109 store,
110 blocking_permits: Arc::new(Semaphore::new(
111 MAX_CONCURRENT_NOSTR_STORE_BLOCKING_TASKS,
112 )),
113 }
114 }
115
116 async fn ingest(&self, event: Event) -> Result<()> {
117 let store = Arc::clone(&self.store);
118 let _permit = self
119 .blocking_permits
120 .clone()
121 .acquire_owned()
122 .await
123 .map_err(|err| anyhow::anyhow!("trusted nostr store closed: {err}"))?;
124 tokio::task::spawn_blocking(move || {
125 crate::socialgraph::ingest_parsed_event(store.as_ref(), &event)
126 })
127 .await
128 .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
129 }
130
131 async fn ingest_with_storage_class(
132 &self,
133 event: Event,
134 storage_class: EventStorageClass,
135 ) -> Result<()> {
136 let store = Arc::clone(&self.store);
137 let _permit = self
138 .blocking_permits
139 .clone()
140 .acquire_owned()
141 .await
142 .map_err(|err| anyhow::anyhow!("trusted nostr store closed: {err}"))?;
143 tokio::task::spawn_blocking(move || {
144 crate::socialgraph::ingest_parsed_event_with_storage_class(
145 store.as_ref(),
146 &event,
147 storage_class,
148 )
149 })
150 .await
151 .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
152 }
153
154 async fn query(&self, filter: NostrFilter, limit: usize) -> Vec<Event> {
155 let store = Arc::clone(&self.store);
156 let filter_summary = nostr_filter_summary(&filter);
157 let memory_before = process_memory_snapshot();
158 let started = Instant::now();
159 let Ok(_permit) = self.blocking_permits.clone().acquire_owned().await else {
160 warn!("trusted nostr store query skipped: blocking semaphore closed");
161 return Vec::new();
162 };
163 let result = tokio::task::spawn_blocking(move || {
164 crate::socialgraph::query_events(store.as_ref(), &filter, limit)
165 })
166 .await;
167 match result {
168 Ok(events) => {
169 info!(
170 target: "hashtree_cli::nostr_relay::query",
171 limit,
172 events = events.len(),
173 elapsed_ms = started.elapsed().as_millis() as u64,
174 filter = %filter_summary,
175 memory_before = ?memory_before,
176 memory_after = ?process_memory_snapshot(),
177 "trusted nostr store query completed",
178 );
179 events
180 }
181 Err(err) => {
182 warn!("trusted nostr store query task failed: {}", err);
183 Vec::new()
184 }
185 }
186 }
187 }
188
189 #[derive(Debug, Clone)]
190 struct ClientQuota {
191 last_reset: Instant,
192 spambox_events: u32,
193 reqs: u32,
194 }
195
196 impl ClientQuota {
197 fn new() -> Self {
198 Self {
199 last_reset: Instant::now(),
200 spambox_events: 0,
201 reqs: 0,
202 }
203 }
204
205 fn reset_if_needed(&mut self) {
206 if self.last_reset.elapsed() >= Duration::from_secs(60) {
207 self.last_reset = Instant::now();
208 self.spambox_events = 0;
209 self.reqs = 0;
210 }
211 }
212
213 fn allow_spambox_event(&mut self, limit: u32) -> bool {
214 self.reset_if_needed();
215 if self.spambox_events >= limit {
216 return false;
217 }
218 self.spambox_events += 1;
219 true
220 }
221
222 fn allow_req(&mut self, limit: u32) -> bool {
223 self.reset_if_needed();
224 if self.reqs >= limit {
225 return false;
226 }
227 self.reqs += 1;
228 true
229 }
230 }
231
232 struct ClientState {
233 sender: mpsc::UnboundedSender<String>,
234 pubkey: Option<String>,
235 quota: ClientQuota,
236 }
237
238 struct RecentEvents {
239 order: VecDeque<EventId>,
240 events: HashMap<EventId, Event>,
241 max_len: usize,
242 }
243
244 impl RecentEvents {
245 fn new(max_len: usize) -> Self {
246 Self {
247 order: VecDeque::new(),
248 events: HashMap::new(),
249 max_len: max_len.max(128),
250 }
251 }
252
253 fn insert(&mut self, event: Event) {
254 if self.events.contains_key(&event.id) {
255 return;
256 }
257 self.order.push_back(event.id);
258 self.events.insert(event.id, event);
259 while self.order.len() > self.max_len {
260 if let Some(oldest) = self.order.pop_front() {
261 self.events.remove(&oldest);
262 }
263 }
264 }
265
266 fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
267 self.events
268 .values()
269 .filter(|event| filter.match_event(event))
270 .cloned()
271 .collect()
272 }
273 }
274
275 enum SpamboxStore {
276 Persistent(NostrStore),
277 Memory(MemorySpambox),
278 }
279
280 struct MemorySpambox {
281 events: Mutex<VecDeque<Event>>,
282 max_len: usize,
283 }
284
285 impl MemorySpambox {
286 fn new(max_len: usize) -> Self {
287 Self {
288 events: Mutex::new(VecDeque::new()),
289 max_len: max_len.max(128),
290 }
291 }
292
293 async fn ingest(&self, event: &Event) -> bool {
294 let mut events = self.events.lock().await;
295 events.push_back(event.clone());
296 while events.len() > self.max_len {
297 events.pop_front();
298 }
299 true
300 }
301 }
302
303 impl SpamboxStore {
304 async fn ingest(&self, event: &Event) -> bool {
305 match self {
306 SpamboxStore::Persistent(store) => store.ingest(event.clone()).await.is_ok(),
307 SpamboxStore::Memory(store) => store.ingest(event).await,
308 }
309 }
310 }
311
312 struct BluetoothEventLog {
313 path: PathBuf,
314 state: Mutex<BluetoothEventLogState>,
315 }
316
317 struct BluetoothEventLogState {
318 records: VecDeque<BluetoothReceivedEventRecord>,
319 event_ids: HashSet<String>,
320 }
321
322 impl BluetoothEventLog {
323 fn load(path: PathBuf) -> Self {
324 let records = std::fs::read_to_string(&path)
325 .ok()
326 .map(|serialized| {
327 serialized
328 .lines()
329 .filter_map(|line| {
330 serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
331 })
332 .collect::<Vec<_>>()
333 })
334 .unwrap_or_default();
335 let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
336 let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
337 for record in records.into_iter().skip(start) {
338 trimmed.push_back(record);
339 }
340 let event_ids = trimmed
341 .iter()
342 .map(|record| record.event_id.clone())
343 .collect::<HashSet<_>>();
344
345 Self {
346 path,
347 state: Mutex::new(BluetoothEventLogState {
348 records: trimmed,
349 event_ids,
350 }),
351 }
352 }
353
354 async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
355 let state = self.state.lock().await;
356 state
357 .records
358 .iter()
359 .rev()
360 .take(limit.max(1))
361 .cloned()
362 .collect()
363 }
364
365 async fn record(&self, event: &Event, peer_id: Option<String>) {
366 let record = BluetoothReceivedEventRecord {
367 event_id: event.id.to_hex(),
368 pubkey: event.pubkey.to_hex(),
369 kind: event.kind.as_u16() as u32,
370 created_at: event.created_at.as_u64(),
371 received_at: std::time::SystemTime::now()
372 .duration_since(std::time::UNIX_EPOCH)
373 .map(|value| value.as_secs())
374 .unwrap_or(0),
375 peer_id,
376 cid_values: cid_values_from_event(event),
377 };
378
379 let serialized = {
380 let mut state = self.state.lock().await;
381 if state.event_ids.contains(&record.event_id) {
382 return;
383 }
384
385 state.event_ids.insert(record.event_id.clone());
386 state.records.push_back(record);
387 while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
388 if let Some(removed) = state.records.pop_front() {
389 state.event_ids.remove(&removed.event_id);
390 }
391 }
392
393 state
394 .records
395 .iter()
396 .filter_map(|entry| serde_json::to_string(entry).ok())
397 .collect::<Vec<_>>()
398 .join("\n")
399 };
400
401 if let Some(parent) = self.path.parent() {
402 let _ = std::fs::create_dir_all(parent);
403 }
404 let _ = std::fs::write(&self.path, serialized);
405 }
406 }
407
408 fn looks_like_cid_reference(value: &str) -> bool {
409 Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
410 }
411
412 fn cid_values_from_event(event: &Event) -> Vec<String> {
413 let mut values = Vec::new();
414 let mut seen = HashSet::new();
415
416 for tag in &event.tags {
417 let fields = tag.clone().to_vec();
418 if fields.first().is_some_and(|name| name == "cid") {
419 if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
420 if seen.insert(value.clone()) {
421 values.push(value.clone());
422 }
423 }
424 continue;
425 }
426
427 for value in fields.into_iter().skip(1) {
428 if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
429 values.push(value);
430 }
431 }
432 }
433
434 values
435 }
436
437 pub struct NostrRelay {
438 config: NostrRelayConfig,
439 trusted: NostrStore,
440 public_pubkeys: HashSet<String>,
441 spambox: Option<SpamboxStore>,
442 social_graph: Option<Arc<SocialGraphAccessControl>>,
443 clients: Mutex<HashMap<u64, ClientState>>,
444 subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
445 recent_events: Mutex<RecentEvents>,
446 next_client_id: AtomicU64,
447 bluetooth_event_log: Arc<BluetoothEventLog>,
448 }
449
450 impl NostrRelay {
451 async fn collect_filter_events(
452 &self,
453 filter: &NostrFilter,
454 limit: usize,
455 seen: &mut HashSet<EventId>,
456 events: &mut Vec<Event>,
457 ) {
458 if limit == 0 {
459 return;
460 }
461
462 let mut added = 0usize;
463
464 if !prefers_trusted_only(filter) {
465 let recent = {
466 let cache = self.recent_events.lock().await;
467 cache.matching(filter)
468 };
469 for event in recent {
470 if seen.insert(event.id) {
471 events.push(event);
472 added += 1;
473 if added >= limit {
474 return;
475 }
476 }
477 }
478 }
479
480 for event in self.trusted.query(filter.clone(), limit).await {
481 if seen.insert(event.id) {
482 events.push(event);
483 added += 1;
484 if added >= limit {
485 return;
486 }
487 }
488 }
489 }
490
491 async fn collect_filter_count(
492 &self,
493 filter: &NostrFilter,
494 limit: usize,
495 seen: &mut HashSet<EventId>,
496 ) {
497 if limit == 0 {
498 return;
499 }
500
501 let mut added = 0usize;
502
503 if !prefers_trusted_only(filter) {
504 let recent = {
505 let cache = self.recent_events.lock().await;
506 cache.matching(filter)
507 };
508 for event in recent {
509 if seen.insert(event.id) {
510 added += 1;
511 if added >= limit {
512 return;
513 }
514 }
515 }
516 }
517
518 for event in self.trusted.query(filter.clone(), limit).await {
519 if seen.insert(event.id) {
520 added += 1;
521 if added >= limit {
522 return;
523 }
524 }
525 }
526 }
527
528 pub fn new(
529 trusted_store: Arc<dyn SocialGraphBackend>,
530 data_dir: PathBuf,
531 public_pubkeys: HashSet<String>,
532 social_graph: Option<Arc<SocialGraphAccessControl>>,
533 config: NostrRelayConfig,
534 ) -> Result<Self> {
535 let spambox = if config.spambox_db_max_bytes == 0 {
536 Some(SpamboxStore::Memory(MemorySpambox::new(
537 config.max_query_limit * 2,
538 )))
539 } else {
540 let spam_dir = data_dir.join("socialgraph_spambox");
541 match socialgraph::open_social_graph_store_at_path(
542 &spam_dir,
543 Some(config.spambox_db_max_bytes),
544 ) {
545 Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
546 Err(err) => {
547 warn!(
548 "Failed to open social graph spambox (falling back to memory): {}",
549 err
550 );
551 Some(SpamboxStore::Memory(MemorySpambox::new(
552 config.max_query_limit * 2,
553 )))
554 }
555 }
556 };
557
558 let recent_size = config.max_query_limit.saturating_mul(2);
559 let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
560 data_dir.join("bluetooth-events.jsonl"),
561 ));
562
563 Ok(Self {
564 config,
565 trusted: NostrStore::new(trusted_store),
566 public_pubkeys,
567 spambox,
568 social_graph,
569 clients: Mutex::new(HashMap::new()),
570 subscriptions: Mutex::new(HashMap::new()),
571 recent_events: Mutex::new(RecentEvents::new(recent_size)),
572 next_client_id: AtomicU64::new(1),
573 bluetooth_event_log,
574 })
575 }
576
577 pub fn next_client_id(&self) -> u64 {
578 self.next_client_id.fetch_add(1, Ordering::SeqCst)
579 }
580
581 pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
582 self.ingest_trusted_event_inner(event, true).await
583 }
584
585 pub async fn ingest_trusted_event_from_bluetooth(
586 &self,
587 event: Event,
588 peer_id: Option<String>,
589 ) -> Result<()> {
590 self.ingest_trusted_event_inner(event.clone(), true).await?;
591 self.bluetooth_event_log.record(&event, peer_id).await;
592 Ok(())
593 }
594
595 pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
596 self.ingest_trusted_event_inner(event, false).await
597 }
598
599 pub async fn bluetooth_received_events(
600 &self,
601 limit: usize,
602 ) -> Vec<BluetoothReceivedEventRecord> {
603 self.bluetooth_event_log.recent(limit).await
604 }
605
606 async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
607 event
608 .verify()
609 .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
610
611 let is_ephemeral = event.kind.is_ephemeral();
612 {
613 let mut recent = self.recent_events.lock().await;
614 recent.insert(event.clone());
615 }
616
617 if !is_ephemeral {
618 let storage_class = self.event_storage_class(&event);
619 self.trusted
620 .ingest_with_storage_class(event.clone(), storage_class)
621 .await?;
622 }
623
624 if broadcast {
625 self.broadcast_event(&event).await;
626 }
627 Ok(())
628 }
629
630 pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
631 let limit = limit.min(self.config.max_query_limit);
632 if limit == 0 {
633 return Vec::new();
634 }
635
636 let mut seen: HashSet<EventId> = HashSet::new();
637 let mut events = Vec::new();
638
639 if !prefers_trusted_only(filter) {
640 let recent = {
641 let cache = self.recent_events.lock().await;
642 cache.matching(filter)
643 };
644 for event in recent {
645 if seen.insert(event.id) {
646 events.push(event);
647 if events.len() >= limit {
648 return events;
649 }
650 }
651 }
652 }
653
654 for event in self.trusted.query(filter.clone(), limit).await {
655 if seen.insert(event.id) {
656 events.push(event);
657 if events.len() >= limit {
658 break;
659 }
660 }
661 }
662
663 events
664 }
665
666 pub async fn register_client(
667 &self,
668 client_id: u64,
669 sender: mpsc::UnboundedSender<String>,
670 pubkey: Option<String>,
671 ) {
672 let mut clients = self.clients.lock().await;
673 clients.insert(
674 client_id,
675 ClientState {
676 sender,
677 pubkey,
678 quota: ClientQuota::new(),
679 },
680 );
681 }
682
683 pub async fn unregister_client(&self, client_id: u64) {
684 let mut clients = self.clients.lock().await;
685 clients.remove(&client_id);
686 drop(clients);
687 let mut subs = self.subscriptions.lock().await;
688 subs.remove(&client_id);
689 }
690
691 pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
692 match msg {
693 NostrClientMessage::Event(event) => {
694 self.handle_event(client_id, *event).await;
695 }
696 NostrClientMessage::Req {
697 subscription_id,
698 filters,
699 } => {
700 self.handle_req(client_id, subscription_id, filters).await;
701 }
702 NostrClientMessage::Count {
703 subscription_id,
704 filters,
705 } => {
706 self.handle_count(client_id, subscription_id, filters).await;
707 }
708 NostrClientMessage::Close(subscription_id) => {
709 self.handle_close(client_id, subscription_id).await;
710 }
711 NostrClientMessage::Auth(event) => {
712 self.handle_auth(client_id, *event).await;
713 }
714 NostrClientMessage::NegOpen { .. }
715 | NostrClientMessage::NegMsg { .. }
716 | NostrClientMessage::NegClose { .. } => {
717 self.send_to_client(
718 client_id,
719 NostrRelayMessage::notice("negentropy not supported"),
720 )
721 .await;
722 }
723 }
724 }
725
726 pub async fn register_subscription_query(
727 &self,
728 client_id: u64,
729 subscription_id: SubscriptionId,
730 mut filters: Vec<NostrFilter>,
731 ) -> std::result::Result<Vec<Event>, &'static str> {
732 if !self.allow_req(client_id).await {
733 return Err("rate limited");
734 }
735
736 if filters.len() > self.config.max_filters_per_sub {
737 filters.truncate(self.config.max_filters_per_sub);
738 }
739
740 {
741 let mut subs = self.subscriptions.lock().await;
742 let entry = subs.entry(client_id).or_default();
743 if !entry.contains_key(&subscription_id)
744 && entry.len() >= self.config.max_subs_per_client
745 {
746 return Err("too many subscriptions");
747 }
748 entry.insert(subscription_id.clone(), filters.clone());
749 }
750
751 let mut seen: HashSet<EventId> = HashSet::new();
752 let mut events = Vec::new();
753 let memory_before = process_memory_snapshot();
754 let started = Instant::now();
755 let filter_summary = nostr_filters_summary(&filters);
756 for filter in &filters {
757 let remaining = self.config.max_query_limit.saturating_sub(events.len());
758 if remaining == 0 {
759 break;
760 }
761 let limit = filter
762 .limit
763 .unwrap_or(self.config.max_query_limit)
764 .min(self.config.max_query_limit)
765 .min(remaining);
766 self.collect_filter_events(filter, limit, &mut seen, &mut events)
767 .await;
768 }
769
770 info!(
771 target: "hashtree_cli::nostr_relay::query",
772 client_id,
773 subscription_id = %subscription_id,
774 filters = filters.len(),
775 events = events.len(),
776 elapsed_ms = started.elapsed().as_millis() as u64,
777 filter = %filter_summary,
778 memory_before = ?memory_before,
779 memory_after = ?process_memory_snapshot(),
780 "nostr relay local subscription query completed",
781 );
782 Ok(events)
783 }
784
785 async fn handle_auth(&self, client_id: u64, event: Event) {
786 let ok = event.verify().is_ok();
787 let message = if ok { "" } else { "invalid auth" };
788 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
789 .await;
790 }
791
792 async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
793 let mut subs = self.subscriptions.lock().await;
794 if let Some(map) = subs.get_mut(&client_id) {
795 map.remove(&subscription_id);
796 }
797 }
798
799 async fn handle_event(&self, client_id: u64, event: Event) {
800 let ok = event.verify().is_ok();
801 if !ok {
802 self.send_to_client(
803 client_id,
804 NostrRelayMessage::ok(event.id, false, "invalid: signature"),
805 )
806 .await;
807 return;
808 }
809
810 let trusted = self.is_trusted_event(client_id, &event).await;
811 if !trusted && !self.allow_spambox_event(client_id).await {
812 self.send_to_client(
813 client_id,
814 NostrRelayMessage::ok(event.id, false, "rate limited"),
815 )
816 .await;
817 return;
818 }
819
820 let is_ephemeral = event.kind.is_ephemeral();
821 if trusted {
822 let mut recent = self.recent_events.lock().await;
823 recent.insert(event.clone());
824 }
825 if !is_ephemeral {
826 let stored = if trusted {
827 let storage_class = self.event_storage_class(&event);
828 self.trusted
829 .ingest_with_storage_class(event.clone(), storage_class)
830 .await
831 .is_ok()
832 } else {
833 match self.spambox.as_ref() {
834 Some(spambox) => spambox.ingest(&event).await,
835 None => false,
836 }
837 };
838
839 if !stored {
840 let message = if trusted {
841 "store failed"
842 } else {
843 "spambox full"
844 };
845 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
846 .await;
847 return;
848 }
849 }
850
851 let message = if trusted { "" } else { "spambox" };
852 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
853 .await;
854
855 if trusted {
856 self.broadcast_event(&event).await;
857 }
858 }
859
860 async fn handle_req(
861 &self,
862 client_id: u64,
863 subscription_id: SubscriptionId,
864 filters: Vec<NostrFilter>,
865 ) {
866 match self
867 .register_subscription_query(client_id, subscription_id.clone(), filters)
868 .await
869 {
870 Ok(events) => {
871 for event in events {
872 self.send_to_client(
873 client_id,
874 NostrRelayMessage::event(subscription_id.clone(), event),
875 )
876 .await;
877 }
878 trim_process_allocations();
879
880 self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
881 .await;
882 }
883 Err(message) => {
884 self.send_to_client(
885 client_id,
886 NostrRelayMessage::closed(subscription_id, message),
887 )
888 .await;
889 }
890 }
891 }
892
893 async fn handle_count(
894 &self,
895 client_id: u64,
896 subscription_id: SubscriptionId,
897 filters: Vec<NostrFilter>,
898 ) {
899 if !self.allow_req(client_id).await {
900 self.send_to_client(
901 client_id,
902 NostrRelayMessage::closed(subscription_id, "rate limited"),
903 )
904 .await;
905 return;
906 }
907
908 let mut seen: HashSet<EventId> = HashSet::new();
909 for filter in &filters {
910 let limit = filter
911 .limit
912 .unwrap_or(self.config.max_query_limit)
913 .min(self.config.max_query_limit);
914 self.collect_filter_count(filter, limit, &mut seen).await;
915 }
916
917 self.send_to_client(
918 client_id,
919 NostrRelayMessage::count(subscription_id, seen.len()),
920 )
921 .await;
922 }
923
924 async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
925 let event_pubkey = event.pubkey.to_hex();
926 let client_pubkey = {
927 let clients = self.clients.lock().await;
928 clients
929 .get(&client_id)
930 .and_then(|state| state.pubkey.clone())
931 };
932 if let Some(pubkey) = client_pubkey {
933 return pubkey == event_pubkey
934 || self.social_graph.as_ref().is_some_and(|social_graph| {
935 social_graph.check_write_access(&event_pubkey)
936 });
937 }
938 if let Some(ref social_graph) = self.social_graph {
939 return social_graph.check_write_access(&event_pubkey);
940 }
941 true
942 }
943
944 fn event_storage_class(&self, event: &Event) -> EventStorageClass {
945 if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
946 EventStorageClass::Public
947 } else {
948 EventStorageClass::Ambient
949 }
950 }
951
952 async fn allow_spambox_event(&self, client_id: u64) -> bool {
953 let mut clients = self.clients.lock().await;
954 let Some(state) = clients.get_mut(&client_id) else {
955 return false;
956 };
957 state
958 .quota
959 .allow_spambox_event(self.config.spambox_max_events_per_min)
960 }
961
962 async fn allow_req(&self, client_id: u64) -> bool {
963 let mut clients = self.clients.lock().await;
964 let Some(state) = clients.get_mut(&client_id) else {
965 return false;
966 };
967 state.quota.allow_req(self.config.spambox_max_reqs_per_min)
968 }
969
970 async fn broadcast_event(&self, event: &Event) {
971 let subscriptions = self.subscriptions.lock().await;
972 let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
973 for (client_id, subs) in subscriptions.iter() {
974 for (sub_id, filters) in subs.iter() {
975 if filters.iter().any(|f| f.match_event(event)) {
976 deliveries.push((*client_id, sub_id.clone()));
977 }
978 }
979 }
980 drop(subscriptions);
981
982 for (client_id, sub_id) in deliveries {
983 self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
984 .await;
985 }
986 }
987
988 async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage) {
989 let sender = {
990 let clients = self.clients.lock().await;
991 clients.get(&client_id).map(|state| state.sender.clone())
992 };
993 if let Some(tx) = sender {
994 let _ = tx.send(msg.as_json());
995 }
996 }
997 }
998}
999
1000pub use imp::NostrRelay;
1001
1002#[cfg(feature = "p2p")]
1003#[async_trait::async_trait]
1004impl MeshEventStore for NostrRelay {
1005 async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
1006 NostrRelay::ingest_trusted_event(self, event).await
1007 }
1008
1009 async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
1010 NostrRelay::query_events(self, filter, limit).await
1011 }
1012}
1013
1014#[cfg(feature = "p2p")]
1015#[async_trait::async_trait]
1016impl MeshRelayClient for NostrRelay {
1017 fn next_client_id(&self) -> u64 {
1018 NostrRelay::next_client_id(self)
1019 }
1020
1021 async fn register_client(
1022 &self,
1023 client_id: u64,
1024 sender: mpsc::UnboundedSender<String>,
1025 pubkey: Option<String>,
1026 ) {
1027 NostrRelay::register_client(self, client_id, sender, pubkey).await
1028 }
1029
1030 async fn unregister_client(&self, client_id: u64) {
1031 NostrRelay::unregister_client(self, client_id).await
1032 }
1033
1034 async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
1035 NostrRelay::handle_client_message(self, client_id, msg).await
1036 }
1037
1038 async fn register_subscription_query(
1039 &self,
1040 client_id: u64,
1041 subscription_id: SubscriptionId,
1042 filters: Vec<NostrFilter>,
1043 ) -> std::result::Result<Vec<Event>, &'static str> {
1044 NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
1045 }
1046
1047 async fn ingest_trusted_event_from_peer(
1048 &self,
1049 event: Event,
1050 peer_id: Option<String>,
1051 ) -> anyhow::Result<()> {
1052 NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
1053 }
1054}
1055
1056#[cfg(test)]
1057#[path = "nostr_relay/tests.rs"]
1058mod tests;