gnostr_relay/
writer.rs

1use crate::{message::*, Result};
2use actix::prelude::*;
3use metrics::{counter, histogram};
4use nostr_db::{now, CheckEventResult, Db};
5use std::{
6    sync::Arc,
7    time::{Duration, Instant},
8};
9use tracing::{debug, error, info};
10
11/// Single-threaded write events, delete expired events
12/// Batch write can improve tps
13
14const WRITE_INTERVAL_MS: u64 = 100;
15const DEL_INTERVAL_SECONDS: u64 = 60;
16const EPHEMERAL_EXPIRED_SECONDS: u64 = 60 * 5;
17
18pub struct Writer {
19    pub db: Arc<Db>,
20    pub addr: Recipient<WriteEventResult>,
21    pub events: Vec<WriteEvent>,
22    pub write_interval_ms: u64,
23    pub del_interval_seconds: u64,
24}
25
26impl Writer {
27    pub fn new(db: Arc<Db>, addr: Recipient<WriteEventResult>) -> Self {
28        Self {
29            db,
30            addr,
31            events: Vec::new(),
32            write_interval_ms: WRITE_INTERVAL_MS,
33            del_interval_seconds: DEL_INTERVAL_SECONDS,
34        }
35    }
36
37    pub fn write(&mut self) -> Result<()> {
38        if !self.events.is_empty() {
39            let start = Instant::now();
40            let mut writer = self.db.writer()?;
41            while let Some(event) = self.events.pop() {
42                let res = self.db.put(&mut writer, &event.event);
43                debug!(
44                    "write event: {} {} {:?}",
45                    event.id,
46                    event.event.id_str(),
47                    res,
48                );
49
50                match res {
51                    Ok(result) => {
52                        if let CheckEventResult::Ok(_num) = result {
53                            counter!("nostr_relay_new_event").increment(1);
54                        }
55                        self.addr.do_send(WriteEventResult::Write {
56                            id: event.id,
57                            event: event.event,
58                            result,
59                        });
60                    }
61                    Err(err) => {
62                        error!(error = err.to_string(), "write event error");
63                        let eid = event.event.id_str();
64                        self.addr.do_send(WriteEventResult::Message {
65                            id: event.id,
66                            event: event.event,
67                            msg: OutgoingMessage::ok(&eid, false, "write event error"),
68                        });
69                    }
70                }
71            }
72            self.db.commit(writer)?;
73            histogram!("nostr_relay_db_write").record(start.elapsed());
74        }
75        Ok(())
76    }
77
78    pub fn do_write(&mut self) {
79        if let Err(err) = self.write() {
80            error!(error = err.to_string(), "write events error");
81        }
82    }
83
84    pub fn del_expired(&self) -> Result<()> {
85        let reader = self.db.reader()?;
86        let iter = self
87            .db
88            .iter_expiration::<Vec<u8>, _>(&reader, Some(now()))?;
89        let mut ids = vec![];
90        for id in iter {
91            let id = id?;
92            ids.push(id);
93        }
94        self.db.batch_del(ids)?;
95        Ok(())
96    }
97
98    pub fn del_ephemeral(&self) -> Result<()> {
99        let reader = self.db.reader()?;
100        let iter = self
101            .db
102            .iter_ephemeral::<Vec<u8>, _>(&reader, Some(now() - EPHEMERAL_EXPIRED_SECONDS))?;
103        let mut ids = vec![];
104        for id in iter {
105            let id = id?;
106            ids.push(id);
107        }
108        self.db.batch_del(ids)?;
109        Ok(())
110    }
111
112    pub fn do_del(&self) {
113        if let Err(err) = self.del_expired() {
114            error!(error = err.to_string(), "delete expired events error");
115        }
116        if let Err(err) = self.del_ephemeral() {
117            error!(error = err.to_string(), "delete ephemeral events error");
118        }
119    }
120}
121
122impl Actor for Writer {
123    type Context = Context<Self>;
124    fn started(&mut self, ctx: &mut Self::Context) {
125        info!("Actor writer started");
126        // save event every 100ms
127        ctx.run_interval(
128            Duration::from_millis(self.write_interval_ms),
129            |act, _ctx| {
130                act.do_write();
131            },
132        );
133        // delete expired and ephemeral events
134        ctx.run_interval(
135            Duration::from_secs(self.del_interval_seconds),
136            |act, _ctx| {
137                act.do_del();
138            },
139        );
140    }
141
142    fn stopped(&mut self, _ctx: &mut Self::Context) {
143        info!("Actor writer stopped");
144        // save event when stopped
145        self.do_write();
146    }
147}
148
149impl Handler<WriteEvent> for Writer {
150    type Result = ();
151    fn handle(&mut self, msg: WriteEvent, _: &mut Self::Context) {
152        self.events.push(msg);
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use std::{str::FromStr, time::Duration};
159
160    use super::*;
161    use crate::temp_data_path;
162    use actix_rt::time::sleep;
163    use anyhow::Result;
164    use nostr_db::{Event, Filter};
165    use parking_lot::RwLock;
166
167    #[derive(Default)]
168    struct Receiver(Arc<RwLock<Vec<WriteEventResult>>>);
169    impl Actor for Receiver {
170        type Context = Context<Self>;
171    }
172
173    impl Handler<WriteEventResult> for Receiver {
174        type Result = ();
175        fn handle(&mut self, msg: WriteEventResult, _ctx: &mut Self::Context) {
176            self.0.write().push(msg);
177        }
178    }
179
180    #[actix_rt::test]
181    async fn write() -> Result<()> {
182        let db = Arc::new(Db::open(temp_data_path("writer")?)?);
183        let note = r#"
184        {
185            "content": "Good morning everyone 😃",
186            "created_at": 1680690006,
187            "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d7d",
188            "kind": 1,
189            "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
190            "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
191            "tags": [["t", "nostr"]]
192          }
193        "#;
194        let event = Event::from_str(note)?;
195
196        let receiver = Receiver::default();
197        let messages = receiver.0.clone();
198        let receiver = receiver.start();
199        let addr = receiver.recipient();
200
201        let mut writer = Writer::new(Arc::clone(&db), addr.clone());
202        writer.del_interval_seconds = 1;
203        writer.write_interval_ms = 100;
204        let writer = writer.start();
205
206        for i in 0..4 {
207            writer
208                .send(WriteEvent {
209                    id: i,
210                    event: event.clone(),
211                })
212                .await?;
213        }
214
215        // ephemeral
216        writer
217          .send(WriteEvent {
218              id: 100,
219              event: Event::from_str(r#"
220              {
221                  "content": "Good morning everyone 😃",
222                  "created_at": 10,
223                  "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d71",
224                  "kind": 20001,
225                  "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
226                  "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
227                  "tags": [["t", "nostr"]]
228                }
229              "#)?,
230          })
231          .await?;
232        // ephemeral
233        writer
234          .send(WriteEvent {
235              id: 100,
236              event: Event::from_str(&format!(r#"
237              {{
238                  "content": "Good morning everyone 😃",
239                  "created_at": {},
240                  "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d72",
241                  "kind": 20001,
242                  "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
243                  "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
244                  "tags": [["t", "nostr"]]
245                }}
246              "#, now()))?,
247          })
248          .await?;
249
250        // expiration
251        writer
252          .send(WriteEvent {
253              id: 100,
254              event: Event::from_str(r#"
255              {
256                  "content": "Good morning everyone 😃",
257                  "created_at": 10,
258                  "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d73",
259                  "kind": 1,
260                  "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
261                  "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
262                  "tags": [["t", "nostr"], ["expiration", "10"]]
263                }
264              "#)?,
265          })
266          .await?;
267
268        sleep(Duration::from_millis(200)).await;
269        let r = messages.read();
270        assert_eq!(r.len(), 7);
271        {
272            let txn = db.reader()?;
273            let iter = db.iter::<Event, _>(
274                &txn,
275                &Filter {
276                    ..Default::default()
277                },
278            )?;
279            assert_eq!(iter.count(), 4);
280        }
281
282        sleep(Duration::from_millis(1100)).await;
283        {
284            let txn = db.reader()?;
285            let iter = db.iter::<Event, _>(
286                &txn,
287                &Filter {
288                    ..Default::default()
289                },
290            )?;
291            assert_eq!(iter.count(), 2);
292        }
293
294        Ok(())
295    }
296}