1use crate::{message::*, setting::SettingWrapper, Result};
2use actix::prelude::*;
3use metrics::histogram;
4use nostr_db::Db;
5use std::{sync::Arc, time::Instant};
6
7pub struct Reader {
10 pub db: Arc<Db>,
11 pub addr: Recipient<ReadEventResult>,
12 pub setting: SettingWrapper,
13}
14
15impl Reader {
16 pub fn new(db: Arc<Db>, addr: Recipient<ReadEventResult>, setting: SettingWrapper) -> Self {
17 Self { db, addr, setting }
18 }
19
20 pub fn read(&self, msg: &ReadEvent) -> Result<()> {
21 let reader = self.db.reader()?;
22 let timeout = self.setting.read().data.db_query_timeout;
23 for filter in &msg.subscription.filters {
24 let start = Instant::now();
25 let mut iter = self.db.iter::<String, _>(&reader, filter)?;
26 if let Some(time) = timeout {
27 iter.scan_time(time.into(), 2000);
28 }
29 for event in iter {
30 let event = event?;
31 self.addr.do_send(ReadEventResult {
32 id: msg.id,
33 sub_id: msg.subscription.id.clone(),
34 msg: OutgoingMessage::event(&msg.subscription.id, &event),
35 });
36 }
37 histogram!("nostr_relay_db_get").record(start.elapsed());
38 }
39 self.addr.do_send(ReadEventResult {
40 id: msg.id,
41 sub_id: msg.subscription.id.clone(),
42 msg: OutgoingMessage::eose(&msg.subscription.id),
43 });
44
45 Ok(())
46 }
47}
48
49impl Actor for Reader {
50 type Context = SyncContext<Self>;
51 fn started(&mut self, _ctx: &mut Self::Context) {}
52}
53
54impl Handler<ReadEvent> for Reader {
55 type Result = ();
56 fn handle(&mut self, msg: ReadEvent, _: &mut Self::Context) {
57 if let Err(err) = self.read(&msg) {
58 let m = OutgoingMessage::closed(
59 msg.subscription.id.as_str(),
60 &format!("get event error: {}", err),
61 );
62 self.addr.do_send(ReadEventResult {
63 id: msg.id,
64 sub_id: msg.subscription.id,
65 msg: m,
66 });
67 }
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74 use crate::{temp_data_path, Setting};
75 use actix_rt::time::sleep;
76 use anyhow::Result;
77 use nostr_db::{Event, Filter};
78 use parking_lot::RwLock;
79 use std::{str::FromStr, time::Duration};
80
81 #[derive(Default)]
82 struct Receiver(Arc<RwLock<Vec<ReadEventResult>>>);
83 impl Actor for Receiver {
84 type Context = Context<Self>;
85 }
86
87 impl Handler<ReadEventResult> for Receiver {
88 type Result = ();
89 fn handle(&mut self, msg: ReadEventResult, _ctx: &mut Self::Context) {
90 self.0.write().push(msg);
91 }
92 }
93
94 #[actix_rt::test]
95 async fn read() -> Result<()> {
96 let db = Arc::new(Db::open(temp_data_path("reader")?)?);
97 let note = r#"
98 {
99 "content": "Good morning everyone 😃",
100 "created_at": 1680690006,
101 "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d7d",
102 "kind": 1,
103 "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
104 "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
105 "tags": [["t", "nostr"]]
106 }
107 "#;
108 let event = Event::from_str(note)?;
109 db.batch_put(vec![event])?;
110
111 let receiver = Receiver::default();
112 let messages = receiver.0.clone();
113 let receiver = receiver.start();
114 let addr = receiver.recipient();
115
116 let reader = SyncArbiter::start(3, move || {
117 Reader::new(Arc::clone(&db), addr.clone(), Setting::default().into())
118 });
119
120 for i in 0..4 {
121 reader
122 .send(ReadEvent {
123 id: i,
124 subscription: Subscription {
125 id: i.to_string(),
126 filters: vec![Filter {
127 ..Default::default()
128 }],
129 },
130 })
131 .await?;
132 }
133
134 sleep(Duration::from_millis(100)).await;
135 let r = messages.read();
136 assert_eq!(r.len(), 8);
137 Ok(())
138 }
139}