1use crate::{message::*, Result};
2use actix::prelude::*;
3use metrics::{counter, histogram};
4use nostr_db::{now, CheckEventResult, Db};
5use std::{
6 sync::Arc,
7 time::{Duration, Instant},
8};
9use tracing::{debug, error, info};
10
11const WRITE_INTERVAL_MS: u64 = 100;
15const DEL_INTERVAL_SECONDS: u64 = 60;
16const EPHEMERAL_EXPIRED_SECONDS: u64 = 60 * 5;
17
18pub struct Writer {
19 pub db: Arc<Db>,
20 pub addr: Recipient<WriteEventResult>,
21 pub events: Vec<WriteEvent>,
22 pub write_interval_ms: u64,
23 pub del_interval_seconds: u64,
24}
25
26impl Writer {
27 pub fn new(db: Arc<Db>, addr: Recipient<WriteEventResult>) -> Self {
28 Self {
29 db,
30 addr,
31 events: Vec::new(),
32 write_interval_ms: WRITE_INTERVAL_MS,
33 del_interval_seconds: DEL_INTERVAL_SECONDS,
34 }
35 }
36
37 pub fn write(&mut self) -> Result<()> {
38 if !self.events.is_empty() {
39 let start = Instant::now();
40 let mut writer = self.db.writer()?;
41 while let Some(event) = self.events.pop() {
42 let res = self.db.put(&mut writer, &event.event);
43 debug!(
44 "write event: {} {} {:?}",
45 event.id,
46 event.event.id_str(),
47 res,
48 );
49
50 match res {
51 Ok(result) => {
52 if let CheckEventResult::Ok(_num) = result {
53 counter!("nostr_relay_new_event").increment(1);
54 }
55 self.addr.do_send(WriteEventResult::Write {
56 id: event.id,
57 event: event.event,
58 result,
59 });
60 }
61 Err(err) => {
62 error!(error = err.to_string(), "write event error");
63 let eid = event.event.id_str();
64 self.addr.do_send(WriteEventResult::Message {
65 id: event.id,
66 event: event.event,
67 msg: OutgoingMessage::ok(&eid, false, "write event error"),
68 });
69 }
70 }
71 }
72 self.db.commit(writer)?;
73 histogram!("nostr_relay_db_write").record(start.elapsed());
74 }
75 Ok(())
76 }
77
78 pub fn do_write(&mut self) {
79 if let Err(err) = self.write() {
80 error!(error = err.to_string(), "write events error");
81 }
82 }
83
84 pub fn del_expired(&self) -> Result<()> {
85 let reader = self.db.reader()?;
86 let iter = self
87 .db
88 .iter_expiration::<Vec<u8>, _>(&reader, Some(now()))?;
89 let mut ids = vec![];
90 for id in iter {
91 let id = id?;
92 ids.push(id);
93 }
94 self.db.batch_del(ids)?;
95 Ok(())
96 }
97
98 pub fn del_ephemeral(&self) -> Result<()> {
99 let reader = self.db.reader()?;
100 let iter = self
101 .db
102 .iter_ephemeral::<Vec<u8>, _>(&reader, Some(now() - EPHEMERAL_EXPIRED_SECONDS))?;
103 let mut ids = vec![];
104 for id in iter {
105 let id = id?;
106 ids.push(id);
107 }
108 self.db.batch_del(ids)?;
109 Ok(())
110 }
111
112 pub fn do_del(&self) {
113 if let Err(err) = self.del_expired() {
114 error!(error = err.to_string(), "delete expired events error");
115 }
116 if let Err(err) = self.del_ephemeral() {
117 error!(error = err.to_string(), "delete ephemeral events error");
118 }
119 }
120}
121
122impl Actor for Writer {
123 type Context = Context<Self>;
124 fn started(&mut self, ctx: &mut Self::Context) {
125 info!("Actor writer started");
126 ctx.run_interval(
128 Duration::from_millis(self.write_interval_ms),
129 |act, _ctx| {
130 act.do_write();
131 },
132 );
133 ctx.run_interval(
135 Duration::from_secs(self.del_interval_seconds),
136 |act, _ctx| {
137 act.do_del();
138 },
139 );
140 }
141
142 fn stopped(&mut self, _ctx: &mut Self::Context) {
143 info!("Actor writer stopped");
144 self.do_write();
146 }
147}
148
149impl Handler<WriteEvent> for Writer {
150 type Result = ();
151 fn handle(&mut self, msg: WriteEvent, _: &mut Self::Context) {
152 self.events.push(msg);
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use std::{str::FromStr, time::Duration};
159
160 use super::*;
161 use crate::temp_data_path;
162 use actix_rt::time::sleep;
163 use anyhow::Result;
164 use nostr_db::{Event, Filter};
165 use parking_lot::RwLock;
166
167 #[derive(Default)]
168 struct Receiver(Arc<RwLock<Vec<WriteEventResult>>>);
169 impl Actor for Receiver {
170 type Context = Context<Self>;
171 }
172
173 impl Handler<WriteEventResult> for Receiver {
174 type Result = ();
175 fn handle(&mut self, msg: WriteEventResult, _ctx: &mut Self::Context) {
176 self.0.write().push(msg);
177 }
178 }
179
180 #[actix_rt::test]
181 async fn write() -> Result<()> {
182 let db = Arc::new(Db::open(temp_data_path("writer")?)?);
183 let note = r#"
184 {
185 "content": "Good morning everyone 😃",
186 "created_at": 1680690006,
187 "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d7d",
188 "kind": 1,
189 "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
190 "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
191 "tags": [["t", "nostr"]]
192 }
193 "#;
194 let event = Event::from_str(note)?;
195
196 let receiver = Receiver::default();
197 let messages = receiver.0.clone();
198 let receiver = receiver.start();
199 let addr = receiver.recipient();
200
201 let mut writer = Writer::new(Arc::clone(&db), addr.clone());
202 writer.del_interval_seconds = 1;
203 writer.write_interval_ms = 100;
204 let writer = writer.start();
205
206 for i in 0..4 {
207 writer
208 .send(WriteEvent {
209 id: i,
210 event: event.clone(),
211 })
212 .await?;
213 }
214
215 writer
217 .send(WriteEvent {
218 id: 100,
219 event: Event::from_str(r#"
220 {
221 "content": "Good morning everyone 😃",
222 "created_at": 10,
223 "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d71",
224 "kind": 20001,
225 "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
226 "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
227 "tags": [["t", "nostr"]]
228 }
229 "#)?,
230 })
231 .await?;
232 writer
234 .send(WriteEvent {
235 id: 100,
236 event: Event::from_str(&format!(r#"
237 {{
238 "content": "Good morning everyone 😃",
239 "created_at": {},
240 "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d72",
241 "kind": 20001,
242 "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
243 "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
244 "tags": [["t", "nostr"]]
245 }}
246 "#, now()))?,
247 })
248 .await?;
249
250 writer
252 .send(WriteEvent {
253 id: 100,
254 event: Event::from_str(r#"
255 {
256 "content": "Good morning everyone 😃",
257 "created_at": 10,
258 "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d73",
259 "kind": 1,
260 "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
261 "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
262 "tags": [["t", "nostr"], ["expiration", "10"]]
263 }
264 "#)?,
265 })
266 .await?;
267
268 sleep(Duration::from_millis(200)).await;
269 let r = messages.read();
270 assert_eq!(r.len(), 7);
271 {
272 let txn = db.reader()?;
273 let iter = db.iter::<Event, _>(
274 &txn,
275 &Filter {
276 ..Default::default()
277 },
278 )?;
279 assert_eq!(iter.count(), 4);
280 }
281
282 sleep(Duration::from_millis(1100)).await;
283 {
284 let txn = db.reader()?;
285 let iter = db.iter::<Event, _>(
286 &txn,
287 &Filter {
288 ..Default::default()
289 },
290 )?;
291 assert_eq!(iter.count(), 2);
292 }
293
294 Ok(())
295 }
296}