1use crate::{message::*, setting::SettingWrapper, Reader, Subscriber, Writer};
2use actix::prelude::*;
3use nostr_db::{CheckEventResult, Db};
4use std::{collections::HashMap, sync::Arc};
5use tracing::info;
6
7#[derive(Debug)]
9pub struct Server {
10 id: usize,
11 writer: Addr<Writer>,
12 reader: Addr<Reader>,
13 subscriber: Addr<Subscriber>,
14 sessions: HashMap<usize, Recipient<OutgoingMessage>>,
15}
16
17impl Server {
18 pub fn create_with(db: Arc<Db>, setting: SettingWrapper) -> Addr<Server> {
19 let r = setting.read();
20 let num = if r.thread.reader == 0 {
21 num_cpus::get()
22 } else {
23 r.thread.reader
24 };
25 drop(r);
26
27 Server::create(|ctx| {
28 let writer = Writer::new(Arc::clone(&db), ctx.address().recipient()).start();
29 let subscriber = Subscriber::new(ctx.address().recipient(), setting.clone()).start();
30 let addr = ctx.address().recipient();
31 info!("starting {} reader workers", num);
32 let reader = SyncArbiter::start(num, move || {
33 Reader::new(Arc::clone(&db), addr.clone(), setting.clone())
34 });
35
36 Server {
37 id: 0,
38 writer,
39 reader,
40 subscriber,
41 sessions: HashMap::new(),
42 }
43 })
44 }
45
46 fn send_to_client(&self, id: usize, msg: OutgoingMessage) {
47 if let Some(addr) = self.sessions.get(&id) {
48 addr.do_send(msg);
49 }
50 }
51}
52
53impl Actor for Server {
55 type Context = Context<Self>;
58 fn started(&mut self, ctx: &mut Self::Context) {
59 ctx.set_mailbox_capacity(10000);
60 info!("Actor server started");
61 }
62}
63
64impl Handler<Connect> for Server {
68 type Result = usize;
69 fn handle(&mut self, msg: Connect, _ctx: &mut Self::Context) -> Self::Result {
70 if self.id == usize::MAX {
71 self.id = 0;
72 }
73 self.id += 1;
74 self.sessions.insert(self.id, msg.addr);
75 self.id
77 }
78}
79
80impl Handler<Disconnect> for Server {
82 type Result = ();
83
84 fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) {
85 self.sessions.remove(&msg.id);
87
88 self.subscriber.do_send(Unsubscribe {
90 id: msg.id,
91 sub_id: None,
92 });
93 }
94}
95
96impl Handler<ClientMessage> for Server {
98 type Result = ();
99 fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) {
100 match msg.msg {
101 IncomingMessage::Event(event) => {
102 self.writer.do_send(WriteEvent { id: msg.id, event })
105 }
106 IncomingMessage::Close(id) => self.subscriber.do_send(Unsubscribe {
107 id: msg.id,
108 sub_id: Some(id),
109 }),
110 IncomingMessage::Req(subscription) => {
111 let session_id = msg.id;
112 let read_event = ReadEvent {
113 id: msg.id,
114 subscription: subscription.clone(),
115 };
116 let sub_id = subscription.id.clone();
117 self.subscriber
118 .send(Subscribe {
119 id: msg.id,
120 subscription,
121 })
122 .into_actor(self)
123 .then(move |res, act, _ctx| {
124 match res {
125 Ok(res) => match res {
126 Subscribed::Ok => {
127 act.reader.do_send(read_event);
128 }
129 Subscribed::Overlimit => {
130 act.send_to_client(
131 session_id,
132 OutgoingMessage::closed(
133 &sub_id,
134 "Number of subscriptions exceeds limit",
135 ),
136 );
137 }
138 Subscribed::InvalidIdLength => {
139 act.send_to_client(
140 session_id,
141 OutgoingMessage::closed(&sub_id, "Subscription id should be non-empty string of max length 64 chars"),
142 );
143 }
144 },
145 Err(_err) => {
146 act.send_to_client(
147 session_id,
148 OutgoingMessage::closed(&sub_id, "Something is wrong"),
149 );
150 }
151 }
152 fut::ready(())
153 })
154 .wait(ctx);
155 }
156 _ => {
157 self.send_to_client(msg.id, OutgoingMessage::notice("Unsupported message"));
158 }
159 }
160 }
161}
162
163impl Handler<WriteEventResult> for Server {
164 type Result = ();
165 fn handle(&mut self, msg: WriteEventResult, _: &mut Self::Context) {
166 match msg {
167 WriteEventResult::Write { id, event, result } => {
168 let event_id = event.id_str();
169 let out_msg = match &result {
170 CheckEventResult::Ok(_num) => OutgoingMessage::ok(&event_id, true, ""),
171 CheckEventResult::Duplicate => {
172 OutgoingMessage::ok(&event_id, true, "duplicate: event exists")
173 }
174 CheckEventResult::Invald(msg) => {
175 OutgoingMessage::ok(&event_id, false, &format!("invalid: {}", msg))
176 }
177 CheckEventResult::Deleted => {
178 OutgoingMessage::ok(&event_id, false, "deleted: user requested deletion")
179 }
180 CheckEventResult::ReplaceIgnored => {
181 OutgoingMessage::ok(&event_id, false, "replaced: have newer event")
182 }
183 };
184 self.send_to_client(id, out_msg);
185 if let CheckEventResult::Ok(_num) = result {
187 self.subscriber.do_send(Dispatch { id, event });
188 }
189 }
190 WriteEventResult::Message { id, event: _, msg } => {
191 self.send_to_client(id, msg);
192 }
193 }
194 }
195}
196
197impl Handler<ReadEventResult> for Server {
198 type Result = ();
199 fn handle(&mut self, msg: ReadEventResult, _: &mut Self::Context) {
200 self.send_to_client(msg.id, msg.msg);
201 }
202}
203
204impl Handler<SubscribeResult> for Server {
205 type Result = ();
206 fn handle(&mut self, msg: SubscribeResult, _: &mut Self::Context) {
207 self.send_to_client(msg.id, msg.msg);
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::{temp_data_path, Setting};
215 use actix_rt::time::sleep;
216 use anyhow::Result;
217 use parking_lot::RwLock;
218 use std::time::Duration;
219
220 #[derive(Default)]
221 struct Receiver(Arc<RwLock<Vec<OutgoingMessage>>>);
222 impl Actor for Receiver {
223 type Context = Context<Self>;
224 }
225
226 impl Handler<OutgoingMessage> for Receiver {
227 type Result = ();
228 fn handle(&mut self, msg: OutgoingMessage, _ctx: &mut Self::Context) {
229 self.0.write().push(msg);
230 }
231 }
232
233 #[actix_rt::test]
234 async fn message() -> Result<()> {
235 let db = Arc::new(Db::open(temp_data_path("server")?)?);
236 let note = r#"
237 {
238 "content": "Good morning everyone 😃",
239 "created_at": 1680690006,
240 "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d7d",
241 "kind": 1,
242 "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
243 "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
244 "tags": [["t", "nostr"]]
245 }
246 "#;
247 let ephemeral_note = r#"
248 {
249 "content": "Good morning everyone 😃",
250 "created_at": 1680690006,
251 "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d78",
252 "kind": 20000,
253 "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
254 "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
255 "tags": [["t", "nostr"]]
256 }
257 "#;
258
259 let receiver = Receiver::default();
260 let messages = receiver.0.clone();
261 let receiver = receiver.start();
262 let addr = receiver.recipient();
263
264 let server = Server::create_with(db, Setting::default().into());
265
266 let id = server.send(Connect { addr }).await?;
267 assert_eq!(id, 1);
268
269 {
271 let text = r#"["UNKNOWN"]"#.to_owned();
272 let msg = serde_json::from_str::<IncomingMessage>(&text)?;
273 let client_msg = ClientMessage::new(id, text, msg);
274 server.send(client_msg).await?;
275 sleep(Duration::from_millis(50)).await;
276 {
277 let mut w = messages.write();
278 assert_eq!(w.len(), 1);
279 assert!(w.get(0).unwrap().0.contains("Unsupported"));
280 w.clear();
281 }
282 }
283
284 {
286 let text = r#"["REQ", "1", {}]"#.to_owned();
287 let msg = serde_json::from_str::<IncomingMessage>(&text)?;
288 let client_msg = ClientMessage::new(id, text, msg);
289 server.send(client_msg).await?;
290 sleep(Duration::from_millis(50)).await;
291 {
292 let mut w = messages.write();
293 assert_eq!(w.len(), 1);
294 assert!(w.get(0).unwrap().0.contains("EOSE"));
295 w.clear();
296 }
297
298 let text = format!(r#"["EVENT", {}]"#, note);
300 let msg = serde_json::from_str::<IncomingMessage>(&text)?;
301 let client_msg = ClientMessage::new(id, text, msg);
302 server.send(client_msg.clone()).await?;
303 sleep(Duration::from_millis(200)).await;
304 {
305 let mut w = messages.write();
306 assert_eq!(w.len(), 2);
307 assert!(w.get(0).unwrap().0.contains("OK"));
308 assert!(w.get(1).unwrap().0.contains("EVENT"));
310 w.clear();
311 }
312 server.send(client_msg.clone()).await?;
314 sleep(Duration::from_millis(200)).await;
315 {
316 let mut w = messages.write();
317 assert_eq!(w.len(), 1);
318 assert!(w.get(0).unwrap().0.contains("OK"));
319 w.clear();
321 }
322
323 {
325 let text = format!(r#"["EVENT", {}]"#, ephemeral_note);
326 let msg = serde_json::from_str::<IncomingMessage>(&text)?;
327 let client_msg = ClientMessage::new(id, text, msg);
328 server.send(client_msg.clone()).await?;
329 sleep(Duration::from_millis(200)).await;
330 {
331 let mut w = messages.write();
332 assert_eq!(w.len(), 2);
333 assert!(w.get(0).unwrap().0.contains("OK"));
334 assert!(w.get(1).unwrap().0.contains("EVENT"));
336 w.clear();
337 }
338 server.send(client_msg.clone()).await?;
340 sleep(Duration::from_millis(200)).await;
341 {
342 let mut w = messages.write();
343 assert_eq!(w.len(), 1);
344 assert!(w.get(0).unwrap().0.contains("OK"));
345 w.clear();
347 }
348 }
349
350 let text = r#"["CLOSE", "1"]"#.to_owned();
353 let msg = serde_json::from_str::<IncomingMessage>(&text)?;
354 let client_msg = ClientMessage::new(id, text, msg);
355 server.send(client_msg).await?;
356 sleep(Duration::from_millis(50)).await;
357 {
358 let mut w = messages.write();
359 w.clear();
362 }
363 }
364
365 {
367 let text = r#"["REQ", "1", {}]"#.to_owned();
368 let msg = serde_json::from_str::<IncomingMessage>(&text)?;
369 let client_msg = ClientMessage::new(id, text, msg);
370 server.send(client_msg).await?;
371 sleep(Duration::from_millis(50)).await;
372 {
373 let mut w = messages.write();
374 assert_eq!(w.len(), 3);
375 assert!(w.get(0).unwrap().0.contains("EVENT"));
376 assert!(w.get(1).unwrap().0.contains("EVENT"));
377 assert!(w.get(2).unwrap().0.contains("EOSE"));
378 w.clear();
379 }
380 }
381
382 Ok(())
383 }
384}