1use std::collections::HashMap;
2use std::collections::{HashSet, VecDeque};
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc,
7};
8use std::time::{Duration, Instant};
9
10use tokio::sync::{mpsc, Mutex, Semaphore};
11
12#[cfg(feature = "p2p")]
13use hashtree_network::{MeshEventStore, MeshRelayClient};
14use nostr::{ClientMessage as NostrClientMessage, JsonUtil, RelayMessage as NostrRelayMessage};
15use nostr::{Event, EventId, Filter as NostrFilter, SubscriptionId};
16
17use crate::socialgraph;
18
19const BLUETOOTH_EVENT_LOG_CAPACITY: usize = 100;
20const MAX_CONCURRENT_NOSTR_STORE_BLOCKING_TASKS: usize = 4;
21
22#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
23pub struct BluetoothReceivedEventRecord {
24 pub event_id: String,
25 pub pubkey: String,
26 pub kind: u32,
27 pub created_at: u64,
28 pub received_at: u64,
29 pub peer_id: Option<String>,
30 pub cid_values: Vec<String>,
31}
32
33#[derive(Debug, Clone)]
34pub struct NostrRelayConfig {
35 pub spambox_db_max_bytes: u64,
36 pub max_query_limit: usize,
37 pub max_subs_per_client: usize,
38 pub max_filters_per_sub: usize,
39 pub spambox_max_events_per_min: u32,
40 pub spambox_max_reqs_per_min: u32,
41}
42
43impl Default for NostrRelayConfig {
44 fn default() -> Self {
45 Self {
46 spambox_db_max_bytes: 1024 * 1024 * 1024,
47 max_query_limit: 200,
48 max_subs_per_client: 64,
49 max_filters_per_sub: 32,
50 spambox_max_events_per_min: 120,
51 spambox_max_reqs_per_min: 120,
52 }
53 }
54}
55
56mod imp {
57 use super::*;
58 use anyhow::Result;
59
60 use crate::diagnostics::{
61 nostr_filter_summary, nostr_filters_summary, process_memory_snapshot,
62 trim_process_allocations,
63 };
64 use crate::socialgraph::{EventStorageClass, SocialGraphAccessControl, SocialGraphBackend};
65 use hashtree_core::{nhash_decode, Cid};
66 use hashtree_nostr::{is_parameterized_replaceable_kind, is_replaceable_kind};
67 use tracing::{info, warn};
68
69 fn prefers_trusted_only(filter: &NostrFilter) -> bool {
70 let Some(kinds) = filter.kinds.as_ref() else {
71 return false;
72 };
73 if kinds.len() != 1 {
74 return false;
75 }
76
77 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
78 let has_authors = filter
79 .authors
80 .as_ref()
81 .is_some_and(|authors| !authors.is_empty());
82 if !has_authors {
83 return false;
84 }
85
86 if is_replaceable_kind(kind) {
87 return true;
88 }
89
90 if is_parameterized_replaceable_kind(kind) {
91 let d_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::D);
92 return filter
93 .generic_tags
94 .get(&d_tag)
95 .is_some_and(|values| !values.is_empty());
96 }
97
98 false
99 }
100
101 struct NostrStore {
102 store: Arc<dyn SocialGraphBackend>,
103 blocking_permits: Arc<Semaphore>,
104 }
105
106 impl NostrStore {
107 fn new(store: Arc<dyn SocialGraphBackend>) -> Self {
108 Self {
109 store,
110 blocking_permits: Arc::new(Semaphore::new(
111 MAX_CONCURRENT_NOSTR_STORE_BLOCKING_TASKS,
112 )),
113 }
114 }
115
116 async fn ingest(&self, event: Event) -> Result<()> {
117 let store = Arc::clone(&self.store);
118 let _permit = self
119 .blocking_permits
120 .clone()
121 .acquire_owned()
122 .await
123 .map_err(|err| anyhow::anyhow!("trusted nostr store closed: {err}"))?;
124 tokio::task::spawn_blocking(move || {
125 crate::socialgraph::ingest_parsed_event(store.as_ref(), &event)
126 })
127 .await
128 .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
129 }
130
131 async fn ingest_with_storage_class(
132 &self,
133 event: Event,
134 storage_class: EventStorageClass,
135 ) -> Result<()> {
136 let store = Arc::clone(&self.store);
137 let _permit = self
138 .blocking_permits
139 .clone()
140 .acquire_owned()
141 .await
142 .map_err(|err| anyhow::anyhow!("trusted nostr store closed: {err}"))?;
143 tokio::task::spawn_blocking(move || {
144 crate::socialgraph::ingest_parsed_event_with_storage_class(
145 store.as_ref(),
146 &event,
147 storage_class,
148 )
149 })
150 .await
151 .map_err(|err| anyhow::anyhow!("trusted nostr store ingest task failed: {err}"))?
152 }
153
154 async fn query(&self, filter: NostrFilter, limit: usize) -> Vec<Event> {
155 let store = Arc::clone(&self.store);
156 let filter_summary = nostr_filter_summary(&filter);
157 let memory_before = process_memory_snapshot();
158 let started = Instant::now();
159 let Ok(_permit) = self.blocking_permits.clone().acquire_owned().await else {
160 warn!("trusted nostr store query skipped: blocking semaphore closed");
161 return Vec::new();
162 };
163 let result = tokio::task::spawn_blocking(move || {
164 crate::socialgraph::query_events(store.as_ref(), &filter, limit)
165 })
166 .await;
167 match result {
168 Ok(events) => {
169 info!(
170 target: "hashtree_cli::nostr_relay::query",
171 limit,
172 events = events.len(),
173 elapsed_ms = started.elapsed().as_millis() as u64,
174 filter = %filter_summary,
175 memory_before = ?memory_before,
176 memory_after = ?process_memory_snapshot(),
177 "trusted nostr store query completed",
178 );
179 events
180 }
181 Err(err) => {
182 warn!("trusted nostr store query task failed: {}", err);
183 Vec::new()
184 }
185 }
186 }
187 }
188
189 #[derive(Debug, Clone)]
190 struct ClientQuota {
191 last_reset: Instant,
192 spambox_events: u32,
193 reqs: u32,
194 }
195
196 impl ClientQuota {
197 fn new() -> Self {
198 Self {
199 last_reset: Instant::now(),
200 spambox_events: 0,
201 reqs: 0,
202 }
203 }
204
205 fn reset_if_needed(&mut self) {
206 if self.last_reset.elapsed() >= Duration::from_secs(60) {
207 self.last_reset = Instant::now();
208 self.spambox_events = 0;
209 self.reqs = 0;
210 }
211 }
212
213 fn allow_spambox_event(&mut self, limit: u32) -> bool {
214 self.reset_if_needed();
215 if self.spambox_events >= limit {
216 return false;
217 }
218 self.spambox_events += 1;
219 true
220 }
221
222 fn allow_req(&mut self, limit: u32) -> bool {
223 self.reset_if_needed();
224 if self.reqs >= limit {
225 return false;
226 }
227 self.reqs += 1;
228 true
229 }
230 }
231
232 struct ClientState {
233 sender: mpsc::UnboundedSender<String>,
234 pubkey: Option<String>,
235 quota: ClientQuota,
236 }
237
238 struct RecentEvents {
239 order: VecDeque<EventId>,
240 events: HashMap<EventId, Event>,
241 max_len: usize,
242 }
243
244 impl RecentEvents {
245 fn new(max_len: usize) -> Self {
246 Self {
247 order: VecDeque::new(),
248 events: HashMap::new(),
249 max_len: max_len.max(128),
250 }
251 }
252
253 fn insert(&mut self, event: Event) {
254 if self.events.contains_key(&event.id) {
255 return;
256 }
257 self.order.push_back(event.id);
258 self.events.insert(event.id, event);
259 while self.order.len() > self.max_len {
260 if let Some(oldest) = self.order.pop_front() {
261 self.events.remove(&oldest);
262 }
263 }
264 }
265
266 fn matching(&self, filter: &NostrFilter) -> Vec<Event> {
267 self.events
268 .values()
269 .filter(|event| filter.match_event(event, Default::default()))
270 .cloned()
271 .collect()
272 }
273 }
274
275 enum SpamboxStore {
276 Persistent(NostrStore),
277 Memory(MemorySpambox),
278 }
279
280 struct MemorySpambox {
281 events: Mutex<VecDeque<Event>>,
282 max_len: usize,
283 }
284
285 impl MemorySpambox {
286 fn new(max_len: usize) -> Self {
287 Self {
288 events: Mutex::new(VecDeque::new()),
289 max_len: max_len.max(128),
290 }
291 }
292
293 async fn ingest(&self, event: &Event) -> bool {
294 let mut events = self.events.lock().await;
295 events.push_back(event.clone());
296 while events.len() > self.max_len {
297 events.pop_front();
298 }
299 true
300 }
301 }
302
303 impl SpamboxStore {
304 async fn ingest(&self, event: &Event) -> bool {
305 match self {
306 SpamboxStore::Persistent(store) => store.ingest(event.clone()).await.is_ok(),
307 SpamboxStore::Memory(store) => store.ingest(event).await,
308 }
309 }
310 }
311
312 struct BluetoothEventLog {
313 path: PathBuf,
314 state: Mutex<BluetoothEventLogState>,
315 }
316
317 struct BluetoothEventLogState {
318 records: VecDeque<BluetoothReceivedEventRecord>,
319 event_ids: HashSet<String>,
320 }
321
322 impl BluetoothEventLog {
323 fn load(path: PathBuf) -> Self {
324 let records = std::fs::read_to_string(&path)
325 .ok()
326 .map(|serialized| {
327 serialized
328 .lines()
329 .filter_map(|line| {
330 serde_json::from_str::<BluetoothReceivedEventRecord>(line).ok()
331 })
332 .collect::<Vec<_>>()
333 })
334 .unwrap_or_default();
335 let mut trimmed = VecDeque::with_capacity(BLUETOOTH_EVENT_LOG_CAPACITY);
336 let start = records.len().saturating_sub(BLUETOOTH_EVENT_LOG_CAPACITY);
337 for record in records.into_iter().skip(start) {
338 trimmed.push_back(record);
339 }
340 let event_ids = trimmed
341 .iter()
342 .map(|record| record.event_id.clone())
343 .collect::<HashSet<_>>();
344
345 Self {
346 path,
347 state: Mutex::new(BluetoothEventLogState {
348 records: trimmed,
349 event_ids,
350 }),
351 }
352 }
353
354 async fn recent(&self, limit: usize) -> Vec<BluetoothReceivedEventRecord> {
355 let state = self.state.lock().await;
356 state
357 .records
358 .iter()
359 .rev()
360 .take(limit.max(1))
361 .cloned()
362 .collect()
363 }
364
365 async fn record(&self, event: &Event, peer_id: Option<String>) {
366 let record = BluetoothReceivedEventRecord {
367 event_id: event.id.to_hex(),
368 pubkey: event.pubkey.to_hex(),
369 kind: event.kind.as_u16() as u32,
370 created_at: event.created_at.as_secs(),
371 received_at: std::time::SystemTime::now()
372 .duration_since(std::time::UNIX_EPOCH)
373 .map(|value| value.as_secs())
374 .unwrap_or(0),
375 peer_id,
376 cid_values: cid_values_from_event(event),
377 };
378
379 let serialized = {
380 let mut state = self.state.lock().await;
381 if state.event_ids.contains(&record.event_id) {
382 return;
383 }
384
385 state.event_ids.insert(record.event_id.clone());
386 state.records.push_back(record);
387 while state.records.len() > BLUETOOTH_EVENT_LOG_CAPACITY {
388 if let Some(removed) = state.records.pop_front() {
389 state.event_ids.remove(&removed.event_id);
390 }
391 }
392
393 state
394 .records
395 .iter()
396 .filter_map(|entry| serde_json::to_string(entry).ok())
397 .collect::<Vec<_>>()
398 .join("\n")
399 };
400
401 if let Some(parent) = self.path.parent() {
402 let _ = std::fs::create_dir_all(parent);
403 }
404 let _ = std::fs::write(&self.path, serialized);
405 }
406 }
407
408 fn looks_like_cid_reference(value: &str) -> bool {
409 Cid::parse(value).is_ok() || nhash_decode(value).is_ok()
410 }
411
412 fn cid_values_from_event(event: &Event) -> Vec<String> {
413 let mut values = Vec::new();
414 let mut seen = HashSet::new();
415
416 for tag in event.tags.iter() {
417 let fields = tag.clone().to_vec();
418 if fields.first().is_some_and(|name| name == "cid") {
419 if let Some(value) = fields.get(1).filter(|value| !value.is_empty()) {
420 if seen.insert(value.clone()) {
421 values.push(value.clone());
422 }
423 }
424 continue;
425 }
426
427 for value in fields.into_iter().skip(1) {
428 if looks_like_cid_reference(&value) && seen.insert(value.clone()) {
429 values.push(value);
430 }
431 }
432 }
433
434 values
435 }
436
437 pub struct NostrRelay {
438 config: NostrRelayConfig,
439 trusted: NostrStore,
440 public_pubkeys: HashSet<String>,
441 spambox: Option<SpamboxStore>,
442 social_graph: Option<Arc<SocialGraphAccessControl>>,
443 clients: Mutex<HashMap<u64, ClientState>>,
444 subscriptions: Mutex<HashMap<u64, HashMap<SubscriptionId, Vec<NostrFilter>>>>,
445 recent_events: Mutex<RecentEvents>,
446 next_client_id: AtomicU64,
447 bluetooth_event_log: Arc<BluetoothEventLog>,
448 }
449
450 impl NostrRelay {
451 async fn collect_filter_events(
452 &self,
453 filter: &NostrFilter,
454 limit: usize,
455 seen: &mut HashSet<EventId>,
456 events: &mut Vec<Event>,
457 ) {
458 if limit == 0 {
459 return;
460 }
461
462 let mut added = 0usize;
463
464 if !prefers_trusted_only(filter) {
465 let recent = {
466 let cache = self.recent_events.lock().await;
467 cache.matching(filter)
468 };
469 for event in recent {
470 if seen.insert(event.id) {
471 events.push(event);
472 added += 1;
473 if added >= limit {
474 return;
475 }
476 }
477 }
478 }
479
480 for event in self.trusted.query(filter.clone(), limit).await {
481 if seen.insert(event.id) {
482 events.push(event);
483 added += 1;
484 if added >= limit {
485 return;
486 }
487 }
488 }
489 }
490
491 async fn collect_filter_count(
492 &self,
493 filter: &NostrFilter,
494 limit: usize,
495 seen: &mut HashSet<EventId>,
496 ) {
497 if limit == 0 {
498 return;
499 }
500
501 let mut added = 0usize;
502
503 if !prefers_trusted_only(filter) {
504 let recent = {
505 let cache = self.recent_events.lock().await;
506 cache.matching(filter)
507 };
508 for event in recent {
509 if seen.insert(event.id) {
510 added += 1;
511 if added >= limit {
512 return;
513 }
514 }
515 }
516 }
517
518 for event in self.trusted.query(filter.clone(), limit).await {
519 if seen.insert(event.id) {
520 added += 1;
521 if added >= limit {
522 return;
523 }
524 }
525 }
526 }
527
528 pub fn new(
529 trusted_store: Arc<dyn SocialGraphBackend>,
530 data_dir: PathBuf,
531 public_pubkeys: HashSet<String>,
532 social_graph: Option<Arc<SocialGraphAccessControl>>,
533 config: NostrRelayConfig,
534 ) -> Result<Self> {
535 let spambox = if config.spambox_db_max_bytes == 0 {
536 Some(SpamboxStore::Memory(MemorySpambox::new(
537 config.max_query_limit * 2,
538 )))
539 } else {
540 let spam_dir = data_dir.join("socialgraph_spambox");
541 match socialgraph::open_social_graph_store_at_path(
542 &spam_dir,
543 Some(config.spambox_db_max_bytes),
544 ) {
545 Ok(store) => Some(SpamboxStore::Persistent(NostrStore::new(store))),
546 Err(err) => {
547 warn!(
548 "Failed to open social graph spambox (falling back to memory): {}",
549 err
550 );
551 Some(SpamboxStore::Memory(MemorySpambox::new(
552 config.max_query_limit * 2,
553 )))
554 }
555 }
556 };
557
558 let recent_size = config.max_query_limit.saturating_mul(2);
559 let bluetooth_event_log = Arc::new(BluetoothEventLog::load(
560 data_dir.join("bluetooth-events.jsonl"),
561 ));
562
563 Ok(Self {
564 config,
565 trusted: NostrStore::new(trusted_store),
566 public_pubkeys,
567 spambox,
568 social_graph,
569 clients: Mutex::new(HashMap::new()),
570 subscriptions: Mutex::new(HashMap::new()),
571 recent_events: Mutex::new(RecentEvents::new(recent_size)),
572 next_client_id: AtomicU64::new(1),
573 bluetooth_event_log,
574 })
575 }
576
577 pub fn next_client_id(&self) -> u64 {
578 self.next_client_id.fetch_add(1, Ordering::SeqCst)
579 }
580
581 pub async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
582 self.ingest_trusted_event_inner(event, true).await
583 }
584
585 pub async fn ingest_trusted_event_from_bluetooth(
586 &self,
587 event: Event,
588 peer_id: Option<String>,
589 ) -> Result<()> {
590 self.ingest_trusted_event_inner(event.clone(), true).await?;
591 self.bluetooth_event_log.record(&event, peer_id).await;
592 Ok(())
593 }
594
595 pub async fn ingest_trusted_event_silent(&self, event: Event) -> Result<()> {
596 self.ingest_trusted_event_inner(event, false).await
597 }
598
599 pub async fn bluetooth_received_events(
600 &self,
601 limit: usize,
602 ) -> Vec<BluetoothReceivedEventRecord> {
603 self.bluetooth_event_log.recent(limit).await
604 }
605
606 async fn ingest_trusted_event_inner(&self, event: Event, broadcast: bool) -> Result<()> {
607 event
608 .verify()
609 .map_err(|e| anyhow::anyhow!("invalid signature: {}", e))?;
610
611 let is_ephemeral = event.kind.is_ephemeral();
612 {
613 let mut recent = self.recent_events.lock().await;
614 recent.insert(event.clone());
615 }
616
617 if !is_ephemeral {
618 let storage_class = self.event_storage_class(&event);
619 self.trusted
620 .ingest_with_storage_class(event.clone(), storage_class)
621 .await?;
622 }
623
624 if broadcast {
625 self.broadcast_event(&event).await;
626 }
627 Ok(())
628 }
629
630 pub async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
631 let limit = limit.min(self.config.max_query_limit);
632 if limit == 0 {
633 return Vec::new();
634 }
635
636 let mut seen: HashSet<EventId> = HashSet::new();
637 let mut events = Vec::new();
638
639 if !prefers_trusted_only(filter) {
640 let recent = {
641 let cache = self.recent_events.lock().await;
642 cache.matching(filter)
643 };
644 for event in recent {
645 if seen.insert(event.id) {
646 events.push(event);
647 if events.len() >= limit {
648 return events;
649 }
650 }
651 }
652 }
653
654 for event in self.trusted.query(filter.clone(), limit).await {
655 if seen.insert(event.id) {
656 events.push(event);
657 if events.len() >= limit {
658 break;
659 }
660 }
661 }
662
663 events
664 }
665
666 pub async fn register_client(
667 &self,
668 client_id: u64,
669 sender: mpsc::UnboundedSender<String>,
670 pubkey: Option<String>,
671 ) {
672 let mut clients = self.clients.lock().await;
673 clients.insert(
674 client_id,
675 ClientState {
676 sender,
677 pubkey,
678 quota: ClientQuota::new(),
679 },
680 );
681 }
682
683 pub async fn unregister_client(&self, client_id: u64) {
684 let mut clients = self.clients.lock().await;
685 clients.remove(&client_id);
686 drop(clients);
687 let mut subs = self.subscriptions.lock().await;
688 subs.remove(&client_id);
689 }
690
691 pub async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage<'_>) {
692 match msg {
693 NostrClientMessage::Event(event) => {
694 self.handle_event(client_id, event.into_owned()).await;
695 }
696 NostrClientMessage::Req {
697 subscription_id,
698 filters,
699 } => {
700 self.handle_req(
701 client_id,
702 subscription_id.into_owned(),
703 filters
704 .into_iter()
705 .map(|filter| filter.into_owned())
706 .collect(),
707 )
708 .await;
709 }
710 NostrClientMessage::Count {
711 subscription_id,
712 filter,
713 } => {
714 self.handle_count(
715 client_id,
716 subscription_id.into_owned(),
717 vec![filter.into_owned()],
718 )
719 .await;
720 }
721 NostrClientMessage::Close(subscription_id) => {
722 self.handle_close(client_id, subscription_id.into_owned())
723 .await;
724 }
725 NostrClientMessage::Auth(event) => {
726 self.handle_auth(client_id, event.into_owned()).await;
727 }
728 NostrClientMessage::NegOpen { .. }
729 | NostrClientMessage::NegMsg { .. }
730 | NostrClientMessage::NegClose { .. } => {
731 self.send_to_client(
732 client_id,
733 NostrRelayMessage::notice("negentropy not supported"),
734 )
735 .await;
736 }
737 }
738 }
739
740 pub async fn register_subscription_query(
741 &self,
742 client_id: u64,
743 subscription_id: SubscriptionId,
744 mut filters: Vec<NostrFilter>,
745 ) -> std::result::Result<Vec<Event>, &'static str> {
746 if !self.allow_req(client_id).await {
747 return Err("rate limited");
748 }
749
750 if filters.len() > self.config.max_filters_per_sub {
751 filters.truncate(self.config.max_filters_per_sub);
752 }
753
754 {
755 let mut subs = self.subscriptions.lock().await;
756 let entry = subs.entry(client_id).or_default();
757 if !entry.contains_key(&subscription_id)
758 && entry.len() >= self.config.max_subs_per_client
759 {
760 return Err("too many subscriptions");
761 }
762 entry.insert(subscription_id.clone(), filters.clone());
763 }
764
765 let mut seen: HashSet<EventId> = HashSet::new();
766 let mut events = Vec::new();
767 let memory_before = process_memory_snapshot();
768 let started = Instant::now();
769 let filter_summary = nostr_filters_summary(&filters);
770 for filter in &filters {
771 let remaining = self.config.max_query_limit.saturating_sub(events.len());
772 if remaining == 0 {
773 break;
774 }
775 let limit = filter
776 .limit
777 .unwrap_or(self.config.max_query_limit)
778 .min(self.config.max_query_limit)
779 .min(remaining);
780 self.collect_filter_events(filter, limit, &mut seen, &mut events)
781 .await;
782 }
783
784 info!(
785 target: "hashtree_cli::nostr_relay::query",
786 client_id,
787 subscription_id = %subscription_id,
788 filters = filters.len(),
789 events = events.len(),
790 elapsed_ms = started.elapsed().as_millis() as u64,
791 filter = %filter_summary,
792 memory_before = ?memory_before,
793 memory_after = ?process_memory_snapshot(),
794 "nostr relay local subscription query completed",
795 );
796 Ok(events)
797 }
798
799 async fn handle_auth(&self, client_id: u64, event: Event) {
800 let ok = event.verify().is_ok();
801 let message = if ok { "" } else { "invalid auth" };
802 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, ok, message))
803 .await;
804 }
805
806 async fn handle_close(&self, client_id: u64, subscription_id: SubscriptionId) {
807 let mut subs = self.subscriptions.lock().await;
808 if let Some(map) = subs.get_mut(&client_id) {
809 map.remove(&subscription_id);
810 }
811 }
812
813 async fn handle_event(&self, client_id: u64, event: Event) {
814 let ok = event.verify().is_ok();
815 if !ok {
816 self.send_to_client(
817 client_id,
818 NostrRelayMessage::ok(event.id, false, "invalid: signature"),
819 )
820 .await;
821 return;
822 }
823
824 let trusted = self.is_trusted_event(client_id, &event).await;
825 if !trusted && !self.allow_spambox_event(client_id).await {
826 self.send_to_client(
827 client_id,
828 NostrRelayMessage::ok(event.id, false, "rate limited"),
829 )
830 .await;
831 return;
832 }
833
834 let is_ephemeral = event.kind.is_ephemeral();
835 if trusted {
836 let mut recent = self.recent_events.lock().await;
837 recent.insert(event.clone());
838 }
839 if !is_ephemeral {
840 let stored = if trusted {
841 let storage_class = self.event_storage_class(&event);
842 self.trusted
843 .ingest_with_storage_class(event.clone(), storage_class)
844 .await
845 .is_ok()
846 } else {
847 match self.spambox.as_ref() {
848 Some(spambox) => spambox.ingest(&event).await,
849 None => false,
850 }
851 };
852
853 if !stored {
854 let message = if trusted {
855 "store failed"
856 } else {
857 "spambox full"
858 };
859 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, false, message))
860 .await;
861 return;
862 }
863 }
864
865 let message = if trusted { "" } else { "spambox" };
866 self.send_to_client(client_id, NostrRelayMessage::ok(event.id, true, message))
867 .await;
868
869 if trusted {
870 self.broadcast_event(&event).await;
871 }
872 }
873
874 async fn handle_req(
875 &self,
876 client_id: u64,
877 subscription_id: SubscriptionId,
878 filters: Vec<NostrFilter>,
879 ) {
880 match self
881 .register_subscription_query(client_id, subscription_id.clone(), filters)
882 .await
883 {
884 Ok(events) => {
885 for event in events {
886 self.send_to_client(
887 client_id,
888 NostrRelayMessage::event(subscription_id.clone(), event),
889 )
890 .await;
891 }
892 trim_process_allocations();
893
894 self.send_to_client(client_id, NostrRelayMessage::eose(subscription_id))
895 .await;
896 }
897 Err(message) => {
898 self.send_to_client(
899 client_id,
900 NostrRelayMessage::closed(subscription_id, message),
901 )
902 .await;
903 }
904 }
905 }
906
907 async fn handle_count(
908 &self,
909 client_id: u64,
910 subscription_id: SubscriptionId,
911 filters: Vec<NostrFilter>,
912 ) {
913 if !self.allow_req(client_id).await {
914 self.send_to_client(
915 client_id,
916 NostrRelayMessage::closed(subscription_id, "rate limited"),
917 )
918 .await;
919 return;
920 }
921
922 let mut seen: HashSet<EventId> = HashSet::new();
923 for filter in &filters {
924 let limit = filter
925 .limit
926 .unwrap_or(self.config.max_query_limit)
927 .min(self.config.max_query_limit);
928 self.collect_filter_count(filter, limit, &mut seen).await;
929 }
930
931 self.send_to_client(
932 client_id,
933 NostrRelayMessage::count(subscription_id, seen.len()),
934 )
935 .await;
936 }
937
938 async fn is_trusted_event(&self, client_id: u64, event: &Event) -> bool {
939 let event_pubkey = event.pubkey.to_hex();
940 let client_pubkey = {
941 let clients = self.clients.lock().await;
942 clients
943 .get(&client_id)
944 .and_then(|state| state.pubkey.clone())
945 };
946 if let Some(pubkey) = client_pubkey {
947 return pubkey == event_pubkey
948 || self.social_graph.as_ref().is_some_and(|social_graph| {
949 social_graph.check_write_access(&event_pubkey)
950 });
951 }
952 if let Some(ref social_graph) = self.social_graph {
953 return social_graph.check_write_access(&event_pubkey);
954 }
955 true
956 }
957
958 fn event_storage_class(&self, event: &Event) -> EventStorageClass {
959 if self.public_pubkeys.contains(&event.pubkey.to_hex()) {
960 EventStorageClass::Public
961 } else {
962 EventStorageClass::Ambient
963 }
964 }
965
966 async fn allow_spambox_event(&self, client_id: u64) -> bool {
967 let mut clients = self.clients.lock().await;
968 let Some(state) = clients.get_mut(&client_id) else {
969 return false;
970 };
971 state
972 .quota
973 .allow_spambox_event(self.config.spambox_max_events_per_min)
974 }
975
976 async fn allow_req(&self, client_id: u64) -> bool {
977 let mut clients = self.clients.lock().await;
978 let Some(state) = clients.get_mut(&client_id) else {
979 return false;
980 };
981 state.quota.allow_req(self.config.spambox_max_reqs_per_min)
982 }
983
984 async fn broadcast_event(&self, event: &Event) {
985 let subscriptions = self.subscriptions.lock().await;
986 let mut deliveries: Vec<(u64, SubscriptionId)> = Vec::new();
987 for (client_id, subs) in subscriptions.iter() {
988 for (sub_id, filters) in subs.iter() {
989 if filters
990 .iter()
991 .any(|f| f.match_event(event, Default::default()))
992 {
993 deliveries.push((*client_id, sub_id.clone()));
994 }
995 }
996 }
997 drop(subscriptions);
998
999 for (client_id, sub_id) in deliveries {
1000 self.send_to_client(client_id, NostrRelayMessage::event(sub_id, event.clone()))
1001 .await;
1002 }
1003 }
1004
1005 async fn send_to_client(&self, client_id: u64, msg: NostrRelayMessage<'_>) {
1006 let sender = {
1007 let clients = self.clients.lock().await;
1008 clients.get(&client_id).map(|state| state.sender.clone())
1009 };
1010 if let Some(tx) = sender {
1011 let _ = tx.send(msg.as_json());
1012 }
1013 }
1014 }
1015}
1016
1017pub use imp::NostrRelay;
1018
1019#[cfg(feature = "p2p")]
1020#[async_trait::async_trait]
1021impl MeshEventStore for NostrRelay {
1022 async fn ingest_trusted_event(&self, event: Event) -> anyhow::Result<()> {
1023 NostrRelay::ingest_trusted_event(self, event).await
1024 }
1025
1026 async fn query_events(&self, filter: &NostrFilter, limit: usize) -> Vec<Event> {
1027 NostrRelay::query_events(self, filter, limit).await
1028 }
1029}
1030
1031#[cfg(feature = "p2p")]
1032#[async_trait::async_trait]
1033impl MeshRelayClient for NostrRelay {
1034 fn next_client_id(&self) -> u64 {
1035 NostrRelay::next_client_id(self)
1036 }
1037
1038 async fn register_client(
1039 &self,
1040 client_id: u64,
1041 sender: mpsc::UnboundedSender<String>,
1042 pubkey: Option<String>,
1043 ) {
1044 NostrRelay::register_client(self, client_id, sender, pubkey).await
1045 }
1046
1047 async fn unregister_client(&self, client_id: u64) {
1048 NostrRelay::unregister_client(self, client_id).await
1049 }
1050
1051 async fn handle_client_message(&self, client_id: u64, msg: NostrClientMessage) {
1052 NostrRelay::handle_client_message(self, client_id, msg).await
1053 }
1054
1055 async fn register_subscription_query(
1056 &self,
1057 client_id: u64,
1058 subscription_id: SubscriptionId,
1059 filters: Vec<NostrFilter>,
1060 ) -> std::result::Result<Vec<Event>, &'static str> {
1061 NostrRelay::register_subscription_query(self, client_id, subscription_id, filters).await
1062 }
1063
1064 async fn ingest_trusted_event_from_peer(
1065 &self,
1066 event: Event,
1067 peer_id: Option<String>,
1068 ) -> anyhow::Result<()> {
1069 NostrRelay::ingest_trusted_event_from_bluetooth(self, event, peer_id).await
1070 }
1071}
1072
1073#[cfg(test)]
1074#[path = "nostr_relay/tests.rs"]
1075mod tests;