gnostr_relay/
reader.rs

1use crate::{message::*, setting::SettingWrapper, Result};
2use actix::prelude::*;
3use metrics::histogram;
4use nostr_db::Db;
5use std::{sync::Arc, time::Instant};
6
7/// Requst by filter
8/// Concurrent read events from db
9pub struct Reader {
10    pub db: Arc<Db>,
11    pub addr: Recipient<ReadEventResult>,
12    pub setting: SettingWrapper,
13}
14
15impl Reader {
16    pub fn new(db: Arc<Db>, addr: Recipient<ReadEventResult>, setting: SettingWrapper) -> Self {
17        Self { db, addr, setting }
18    }
19
20    pub fn read(&self, msg: &ReadEvent) -> Result<()> {
21        let reader = self.db.reader()?;
22        let timeout = self.setting.read().data.db_query_timeout;
23        for filter in &msg.subscription.filters {
24            let start = Instant::now();
25            let mut iter = self.db.iter::<String, _>(&reader, filter)?;
26            if let Some(time) = timeout {
27                iter.scan_time(time.into(), 2000);
28            }
29            for event in iter {
30                let event = event?;
31                self.addr.do_send(ReadEventResult {
32                    id: msg.id,
33                    sub_id: msg.subscription.id.clone(),
34                    msg: OutgoingMessage::event(&msg.subscription.id, &event),
35                });
36            }
37            histogram!("nostr_relay_db_get").record(start.elapsed());
38        }
39        self.addr.do_send(ReadEventResult {
40            id: msg.id,
41            sub_id: msg.subscription.id.clone(),
42            msg: OutgoingMessage::eose(&msg.subscription.id),
43        });
44
45        Ok(())
46    }
47}
48
49impl Actor for Reader {
50    type Context = SyncContext<Self>;
51    fn started(&mut self, _ctx: &mut Self::Context) {}
52}
53
54impl Handler<ReadEvent> for Reader {
55    type Result = ();
56    fn handle(&mut self, msg: ReadEvent, _: &mut Self::Context) {
57        if let Err(err) = self.read(&msg) {
58            let m = OutgoingMessage::closed(
59                msg.subscription.id.as_str(),
60                &format!("get event error: {}", err),
61            );
62            self.addr.do_send(ReadEventResult {
63                id: msg.id,
64                sub_id: msg.subscription.id,
65                msg: m,
66            });
67        }
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74    use crate::{temp_data_path, Setting};
75    use actix_rt::time::sleep;
76    use anyhow::Result;
77    use nostr_db::{Event, Filter};
78    use parking_lot::RwLock;
79    use std::{str::FromStr, time::Duration};
80
81    #[derive(Default)]
82    struct Receiver(Arc<RwLock<Vec<ReadEventResult>>>);
83    impl Actor for Receiver {
84        type Context = Context<Self>;
85    }
86
87    impl Handler<ReadEventResult> for Receiver {
88        type Result = ();
89        fn handle(&mut self, msg: ReadEventResult, _ctx: &mut Self::Context) {
90            self.0.write().push(msg);
91        }
92    }
93
94    #[actix_rt::test]
95    async fn read() -> Result<()> {
96        let db = Arc::new(Db::open(temp_data_path("reader")?)?);
97        let note = r#"
98        {
99            "content": "Good morning everyone 😃",
100            "created_at": 1680690006,
101            "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d7d",
102            "kind": 1,
103            "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
104            "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
105            "tags": [["t", "nostr"]]
106          }
107        "#;
108        let event = Event::from_str(note)?;
109        db.batch_put(vec![event])?;
110
111        let receiver = Receiver::default();
112        let messages = receiver.0.clone();
113        let receiver = receiver.start();
114        let addr = receiver.recipient();
115
116        let reader = SyncArbiter::start(3, move || {
117            Reader::new(Arc::clone(&db), addr.clone(), Setting::default().into())
118        });
119
120        for i in 0..4 {
121            reader
122                .send(ReadEvent {
123                    id: i,
124                    subscription: Subscription {
125                        id: i.to_string(),
126                        filters: vec![Filter {
127                            ..Default::default()
128                        }],
129                    },
130                })
131                .await?;
132        }
133
134        sleep(Duration::from_millis(100)).await;
135        let r = messages.read();
136        assert_eq!(r.len(), 8);
137        Ok(())
138    }
139}