1use std::collections::HashMap;
18use std::sync::Arc;
19
20use std::sync::Mutex;
21
22use k256::schnorr::{signature::Verifier, Signature, VerifyingKey};
23use serde::{Deserialize, Serialize};
24use sha2::{Digest, Sha256};
25use tokio::sync::broadcast;
26
27use crate::error::RelayError;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct Event {
36 pub id: String,
37 pub pubkey: String,
38 pub created_at: u64,
39 pub kind: u64,
40 pub tags: Vec<Vec<String>>,
41 pub content: String,
42 pub sig: String,
43}
44
45impl Event {
46 pub fn canonical_id(&self) -> String {
49 let canonical = serde_json::json!([
50 0,
51 self.pubkey,
52 self.created_at,
53 self.kind,
54 self.tags,
55 self.content,
56 ]);
57 let s = serde_json::to_string(&canonical).unwrap_or_default();
58 hex::encode(Sha256::digest(s.as_bytes()))
59 }
60
61 pub fn verify(&self) -> Result<(), RelayError> {
64 if self.pubkey.len() != 64 || hex::decode(&self.pubkey).is_err() {
66 return Err(RelayError::InvalidEvent("pubkey not 32-byte hex".into()));
67 }
68 if self.sig.len() != 128 || hex::decode(&self.sig).is_err() {
69 return Err(RelayError::InvalidEvent("sig not 64-byte hex".into()));
70 }
71 let computed = self.canonical_id();
72 if computed.to_lowercase() != self.id.to_lowercase() {
73 return Err(RelayError::IdMismatch);
74 }
75 let pk_bytes = hex::decode(&self.pubkey)
76 .map_err(|e| RelayError::InvalidEvent(e.to_string()))?;
77 let sig_bytes = hex::decode(&self.sig)
78 .map_err(|e| RelayError::InvalidEvent(e.to_string()))?;
79 let id_bytes = hex::decode(&computed)
80 .map_err(|e| RelayError::InvalidEvent(e.to_string()))?;
81 let vk = VerifyingKey::from_bytes(&pk_bytes)
82 .map_err(|e| RelayError::BadSignature(e.to_string()))?;
83 let sig = Signature::try_from(sig_bytes.as_slice())
84 .map_err(|e| RelayError::BadSignature(e.to_string()))?;
85 vk.verify(&id_bytes, &sig)
86 .map_err(|e| RelayError::BadSignature(e.to_string()))
87 }
88
89 pub fn d_tag(&self) -> Option<&str> {
92 self.tags
93 .iter()
94 .find(|t| t.first().map(|s| s.as_str()) == Some("d"))
95 .and_then(|t| t.get(1).map(|s| s.as_str()))
96 }
97}
98
99#[derive(Debug, Clone, Default, Serialize, Deserialize)]
108pub struct Filter {
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub ids: Option<Vec<String>>,
111 #[serde(default, skip_serializing_if = "Option::is_none")]
112 pub authors: Option<Vec<String>>,
113 #[serde(default, skip_serializing_if = "Option::is_none")]
114 pub kinds: Option<Vec<u64>>,
115 #[serde(default, skip_serializing_if = "Option::is_none")]
116 pub since: Option<u64>,
117 #[serde(default, skip_serializing_if = "Option::is_none")]
118 pub until: Option<u64>,
119 #[serde(default, skip_serializing_if = "Option::is_none")]
120 pub limit: Option<usize>,
121 #[serde(flatten)]
123 pub tags: HashMap<String, serde_json::Value>,
124}
125
126impl Filter {
127 pub fn from_value(v: serde_json::Value) -> Result<Self, RelayError> {
130 let mut filter: Filter = serde_json::from_value(v.clone())
131 .map_err(|e| RelayError::BadMessage(format!("filter decode: {e}")))?;
132 let mut normalised: HashMap<String, serde_json::Value> = HashMap::new();
135 for (k, val) in filter.tags.drain() {
136 if let Some(short) = k.strip_prefix('#') {
137 if short.len() == 1 {
138 normalised.insert(short.to_string(), val);
139 }
140 }
141 }
142 filter.tags = normalised;
143 Ok(filter)
144 }
145
146 pub fn matches(&self, event: &Event) -> bool {
148 if let Some(ids) = &self.ids {
149 if !ids.iter().any(|i| i.eq_ignore_ascii_case(&event.id)) {
150 return false;
151 }
152 }
153 if let Some(authors) = &self.authors {
154 if !authors
155 .iter()
156 .any(|a| a.eq_ignore_ascii_case(&event.pubkey))
157 {
158 return false;
159 }
160 }
161 if let Some(kinds) = &self.kinds {
162 if !kinds.contains(&event.kind) {
163 return false;
164 }
165 }
166 if let Some(since) = self.since {
167 if event.created_at < since {
168 return false;
169 }
170 }
171 if let Some(until) = self.until {
172 if event.created_at > until {
173 return false;
174 }
175 }
176 for (tag_name, values) in &self.tags {
177 let Some(values) = values.as_array() else {
178 return false;
179 };
180 let event_tag_values: Vec<&str> = event
181 .tags
182 .iter()
183 .filter(|t| t.first().map(|s| s.as_str()) == Some(tag_name.as_str()))
184 .filter_map(|t| t.get(1).map(|s| s.as_str()))
185 .collect();
186 let any = values.iter().any(|v| {
187 v.as_str()
188 .map(|s| event_tag_values.contains(&s))
189 .unwrap_or(false)
190 });
191 if !any {
192 return false;
193 }
194 }
195 true
196 }
197}
198
199pub fn is_replaceable(kind: u64) -> bool {
204 kind == 0 || kind == 3 || (10_000..20_000).contains(&kind)
205}
206
207pub fn is_ephemeral(kind: u64) -> bool {
208 (20_000..30_000).contains(&kind)
209}
210
211pub fn is_parameterised_replaceable(kind: u64) -> bool {
212 (30_000..40_000).contains(&kind)
213}
214
215pub trait EventStore: Send + Sync {
223 fn put(&self, event: Event);
225 fn remove(&self, id: &str);
227 fn snapshot(&self) -> Vec<Event>;
229 fn replace_where(
232 &self,
233 predicate: &dyn Fn(&Event) -> bool,
234 event: Event,
235 ) -> bool;
236 fn len(&self) -> usize;
238 fn is_empty(&self) -> bool {
240 self.len() == 0
241 }
242}
243
244#[derive(Debug)]
246pub struct InMemoryEventStore {
247 inner: Mutex<Vec<Event>>,
248 max_events: usize,
249}
250
251impl InMemoryEventStore {
252 pub fn new(max_events: usize) -> Self {
253 Self {
254 inner: Mutex::new(Vec::new()),
255 max_events: max_events.max(1),
256 }
257 }
258}
259
260impl Default for InMemoryEventStore {
261 fn default() -> Self {
262 Self::new(1000)
263 }
264}
265
266impl EventStore for InMemoryEventStore {
267 fn put(&self, event: Event) {
268 let mut guard = self.inner.lock().expect("event store lock poisoned");
269 if guard.len() >= self.max_events {
270 guard.remove(0);
271 }
272 guard.push(event);
273 }
274
275 fn remove(&self, id: &str) {
276 let mut guard = self.inner.lock().expect("event store lock poisoned");
277 guard.retain(|e| e.id != id);
278 }
279
280 fn snapshot(&self) -> Vec<Event> {
281 self.inner
282 .lock()
283 .expect("event store lock poisoned")
284 .clone()
285 }
286
287 fn replace_where(
288 &self,
289 predicate: &dyn Fn(&Event) -> bool,
290 event: Event,
291 ) -> bool {
292 let mut guard = self.inner.lock().expect("event store lock poisoned");
293 for slot in guard.iter_mut() {
294 if predicate(slot) {
295 *slot = event;
296 return true;
297 }
298 }
299 false
300 }
301
302 fn len(&self) -> usize {
303 self.inner.lock().expect("event store lock poisoned").len()
304 }
305}
306
307#[derive(Debug, Clone, Serialize, Deserialize)]
313pub struct RelayInfo {
314 pub name: String,
315 pub description: String,
316 pub pubkey: String,
317 pub contact: String,
318 pub supported_nips: Vec<u64>,
319 pub software: String,
320 pub version: String,
321}
322
323impl RelayInfo {
324 pub fn jss_compatible() -> Self {
325 Self {
326 name: "solid-pod-rs Nostr Relay".into(),
327 description: "Embedded Nostr relay for solid-pod-rs pods".into(),
328 pubkey: String::new(),
329 contact: String::new(),
330 supported_nips: vec![1, 11, 16],
331 software: "https://github.com/dreamlab-ai/solid-pod-rs".into(),
332 version: env!("CARGO_PKG_VERSION").to_string(),
333 }
334 }
335}
336
337#[derive(Clone)]
345pub struct Relay {
346 store: Arc<dyn EventStore>,
347 events_tx: broadcast::Sender<Event>,
348 info: Arc<RelayInfo>,
349}
350
351impl Relay {
352 pub fn new(
355 store: Arc<dyn EventStore>,
356 info: RelayInfo,
357 broadcast_capacity: usize,
358 ) -> Self {
359 let (events_tx, _) = broadcast::channel(broadcast_capacity.max(1));
360 Self {
361 store,
362 events_tx,
363 info: Arc::new(info),
364 }
365 }
366
367 pub fn in_memory() -> Self {
370 Self::new(
371 Arc::new(InMemoryEventStore::default()),
372 RelayInfo::jss_compatible(),
373 256,
374 )
375 }
376
377 pub fn info(&self) -> &RelayInfo {
379 &self.info
380 }
381
382 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
384 self.events_tx.subscribe()
385 }
386
387 pub fn snapshot(&self) -> Vec<Event> {
390 self.store.snapshot()
391 }
392
393 pub fn ingest(&self, event: Event) -> Result<(), RelayError> {
404 event.verify()?;
405
406 if is_ephemeral(event.kind) {
407 let _ = self.events_tx.send(event);
409 return Ok(());
410 }
411
412 if is_replaceable(event.kind) {
413 let target_pubkey = event.pubkey.clone();
414 let target_kind = event.kind;
415 let replaced = self.store.replace_where(
416 &move |e| e.pubkey == target_pubkey && e.kind == target_kind,
417 event.clone(),
418 );
419 if !replaced {
420 self.store.put(event.clone());
421 }
422 let _ = self.events_tx.send(event);
423 return Ok(());
424 }
425
426 if is_parameterised_replaceable(event.kind) {
427 let target_pubkey = event.pubkey.clone();
428 let target_kind = event.kind;
429 let target_d = event.d_tag().map(|s| s.to_string());
430 let replaced = self.store.replace_where(
431 &move |e| {
432 e.pubkey == target_pubkey
433 && e.kind == target_kind
434 && e.d_tag().map(|s| s.to_string()) == target_d
435 },
436 event.clone(),
437 );
438 if !replaced {
439 self.store.put(event.clone());
440 }
441 let _ = self.events_tx.send(event);
442 return Ok(());
443 }
444
445 self.store.put(event.clone());
447 let _ = self.events_tx.send(event);
448 Ok(())
449 }
450
451 pub fn history(&self, filters: &[Filter]) -> Vec<Event> {
454 let all = self.store.snapshot();
455 let mut out: Vec<Event> = Vec::new();
456 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
457 for filter in filters {
458 let mut matched: Vec<Event> =
459 all.iter().filter(|e| filter.matches(e)).cloned().collect();
460 if let Some(lim) = filter.limit {
461 if matched.len() > lim {
462 let start = matched.len() - lim;
463 matched = matched.split_off(start);
464 }
465 }
466 for ev in matched {
467 if seen.insert(ev.id.clone()) {
468 out.push(ev);
469 }
470 }
471 }
472 out
473 }
474}
475
476#[cfg(test)]
480mod tests {
481 use super::*;
482 use k256::schnorr::{signature::Signer, SigningKey};
483
484 fn test_sk() -> SigningKey {
486 SigningKey::from_bytes(&[0x42u8; 32]).expect("valid schnorr key")
487 }
488
489 fn make_event(kind: u64, created_at: u64, tags: Vec<Vec<String>>, content: &str) -> Event {
490 let sk = test_sk();
491 let pubkey_hex = hex::encode(sk.verifying_key().to_bytes());
492 let skeleton = Event {
493 id: String::new(),
494 pubkey: pubkey_hex.clone(),
495 created_at,
496 kind,
497 tags: tags.clone(),
498 content: content.to_string(),
499 sig: String::new(),
500 };
501 let id = skeleton.canonical_id();
502 let id_bytes = hex::decode(&id).unwrap();
503 let sig: k256::schnorr::Signature = sk.sign(&id_bytes);
504 Event {
505 id,
506 pubkey: pubkey_hex,
507 created_at,
508 kind,
509 tags,
510 content: content.to_string(),
511 sig: hex::encode(sig.to_bytes()),
512 }
513 }
514
515 #[test]
516 fn verify_accepts_well_formed_signed_event() {
517 let ev = make_event(1, 1_700_000_000, vec![], "hello");
518 ev.verify().unwrap();
519 }
520
521 #[test]
522 fn verify_rejects_tampered_content() {
523 let mut ev = make_event(1, 1_700_000_000, vec![], "hello");
524 ev.content = "tampered".into();
525 assert!(matches!(ev.verify(), Err(RelayError::IdMismatch)));
526 }
527
528 #[test]
529 fn verify_rejects_bad_signature() {
530 let mut ev = make_event(1, 1_700_000_000, vec![], "hello");
531 let mut bytes = hex::decode(&ev.sig).unwrap();
533 bytes[0] ^= 0x01;
534 ev.sig = hex::encode(bytes);
535 assert!(matches!(
536 ev.verify(),
537 Err(RelayError::BadSignature(_))
538 ));
539 }
540
541 #[test]
542 fn filter_matches_ids_and_authors() {
543 let ev = make_event(1, 1_700_000_000, vec![], "hi");
544 let filter = Filter {
545 ids: Some(vec![ev.id.clone()]),
546 authors: Some(vec![ev.pubkey.clone()]),
547 kinds: Some(vec![1]),
548 ..Default::default()
549 };
550 assert!(filter.matches(&ev));
551 }
552
553 #[test]
554 fn filter_rejects_wrong_kind() {
555 let ev = make_event(1, 1_700_000_000, vec![], "hi");
556 let filter = Filter {
557 kinds: Some(vec![7]),
558 ..Default::default()
559 };
560 assert!(!filter.matches(&ev));
561 }
562
563 #[test]
564 fn filter_matches_since_and_until() {
565 let ev = make_event(1, 1_700_000_000, vec![], "hi");
566 let ok = Filter {
567 since: Some(1_699_999_000),
568 until: Some(1_700_000_500),
569 ..Default::default()
570 };
571 assert!(ok.matches(&ev));
572 let late = Filter {
573 since: Some(1_700_000_500),
574 ..Default::default()
575 };
576 assert!(!late.matches(&ev));
577 }
578
579 #[test]
580 fn filter_matches_tag_query_via_from_value() {
581 let tags = vec![vec!["e".into(), "aaa".into()]];
582 let ev = make_event(1, 1_700_000_000, tags, "hi");
583 let v = serde_json::json!({"#e": ["aaa"]});
584 let filter = Filter::from_value(v).unwrap();
585 assert!(filter.matches(&ev));
586 }
587
588 #[test]
589 fn filter_rejects_missing_tag() {
590 let ev = make_event(1, 1_700_000_000, vec![], "hi");
591 let v = serde_json::json!({"#p": ["xxx"]});
592 let filter = Filter::from_value(v).unwrap();
593 assert!(!filter.matches(&ev));
594 }
595
596 #[test]
597 fn relay_accepts_nip01_event() {
598 let relay = Relay::in_memory();
599 let ev = make_event(1, 1_700_000_000, vec![], "hello");
600 relay.ingest(ev.clone()).unwrap();
601 let snap = relay.snapshot();
602 assert_eq!(snap.len(), 1);
603 assert_eq!(snap[0].id, ev.id);
604 }
605
606 #[test]
607 fn relay_rejects_bad_signature() {
608 let relay = Relay::in_memory();
609 let mut ev = make_event(1, 1_700_000_000, vec![], "hello");
610 let mut sig = hex::decode(&ev.sig).unwrap();
611 sig[1] ^= 0x01;
612 ev.sig = hex::encode(sig);
613 assert!(relay.ingest(ev).is_err());
614 }
615
616 #[test]
617 fn replaceable_event_replaces_prior_nip16() {
618 let relay = Relay::in_memory();
619 let a = make_event(0, 1_700_000_000, vec![], r#"{"name":"alice-v1"}"#);
621 let b = make_event(0, 1_700_000_100, vec![], r#"{"name":"alice-v2"}"#);
622 relay.ingest(a).unwrap();
623 relay.ingest(b.clone()).unwrap();
624 let snap = relay.snapshot();
625 assert_eq!(snap.len(), 1);
626 assert_eq!(snap[0].content, r#"{"name":"alice-v2"}"#);
627 assert_eq!(snap[0].id, b.id);
628 }
629
630 #[test]
631 fn parameterised_replaceable_keyed_by_d_tag() {
632 let relay = Relay::in_memory();
633 let a = make_event(
634 30_000,
635 1_700_000_000,
636 vec![vec!["d".into(), "slot-a".into()]],
637 "v1",
638 );
639 let b = make_event(
640 30_000,
641 1_700_000_100,
642 vec![vec!["d".into(), "slot-a".into()]],
643 "v2",
644 );
645 let c = make_event(
646 30_000,
647 1_700_000_200,
648 vec![vec!["d".into(), "slot-b".into()]],
649 "other-slot",
650 );
651 relay.ingest(a).unwrap();
652 relay.ingest(b.clone()).unwrap();
653 relay.ingest(c.clone()).unwrap();
654 let snap = relay.snapshot();
655 assert_eq!(snap.len(), 2);
657 let slot_a = snap.iter().find(|e| e.d_tag() == Some("slot-a")).unwrap();
658 assert_eq!(slot_a.content, "v2");
659 let slot_b = snap.iter().find(|e| e.d_tag() == Some("slot-b")).unwrap();
660 assert_eq!(slot_b.id, c.id);
661 }
662
663 #[test]
664 fn ephemeral_event_not_stored_but_broadcast() {
665 let relay = Relay::in_memory();
666 let mut rx = relay.subscribe();
667 let ev = make_event(20_001, 1_700_000_000, vec![], "ephemeral");
669 relay.ingest(ev.clone()).unwrap();
670 assert_eq!(relay.snapshot().len(), 0);
671 let received = rx.try_recv().unwrap();
673 assert_eq!(received.id, ev.id);
674 }
675
676 #[test]
677 fn history_applies_per_filter_limit() {
678 let relay = Relay::in_memory();
679 for i in 0..5 {
680 let ev = make_event(1, 1_700_000_000 + i, vec![], &format!("msg-{i}"));
681 relay.ingest(ev).unwrap();
682 }
683 let filter = Filter {
684 kinds: Some(vec![1]),
685 limit: Some(2),
686 ..Default::default()
687 };
688 let hist = relay.history(&[filter]);
689 assert_eq!(hist.len(), 2);
690 assert_eq!(hist[0].content, "msg-3");
692 assert_eq!(hist[1].content, "msg-4");
693 }
694
695 #[test]
696 fn in_memory_store_evicts_oldest_when_full() {
697 let store = InMemoryEventStore::new(2);
698 let a = make_event(1, 1_700_000_000, vec![], "a");
699 let b = make_event(1, 1_700_000_001, vec![], "b");
700 let c = make_event(1, 1_700_000_002, vec![], "c");
701 store.put(a.clone());
702 store.put(b.clone());
703 store.put(c.clone());
704 let snap = store.snapshot();
705 assert_eq!(snap.len(), 2);
706 assert_eq!(snap[0].id, b.id);
707 assert_eq!(snap[1].id, c.id);
708 }
709
710 #[test]
711 fn classifiers_cover_spec_ranges() {
712 assert!(is_replaceable(0));
713 assert!(is_replaceable(3));
714 assert!(is_replaceable(10_000));
715 assert!(is_replaceable(19_999));
716 assert!(!is_replaceable(20_000));
717
718 assert!(is_ephemeral(20_000));
719 assert!(is_ephemeral(29_999));
720 assert!(!is_ephemeral(30_000));
721
722 assert!(is_parameterised_replaceable(30_000));
723 assert!(is_parameterised_replaceable(39_999));
724 assert!(!is_parameterised_replaceable(40_000));
725 }
726
727 #[test]
728 fn relay_info_is_jss_compatible() {
729 let info = RelayInfo::jss_compatible();
730 assert!(info.supported_nips.contains(&1));
731 assert!(info.supported_nips.contains(&11));
732 assert!(info.supported_nips.contains(&16));
733 }
734}