gnostr_relay/
server.rs

1use crate::{message::*, setting::SettingWrapper, Reader, Subscriber, Writer};
2use actix::prelude::*;
3use nostr_db::{CheckEventResult, Db};
4use std::{collections::HashMap, sync::Arc};
5use tracing::info;
6
7/// Server
8#[derive(Debug)]
9pub struct Server {
10    id: usize,
11    writer: Addr<Writer>,
12    reader: Addr<Reader>,
13    subscriber: Addr<Subscriber>,
14    sessions: HashMap<usize, Recipient<OutgoingMessage>>,
15}
16
17impl Server {
18    pub fn create_with(db: Arc<Db>, setting: SettingWrapper) -> Addr<Server> {
19        let r = setting.read();
20        let num = if r.thread.reader == 0 {
21            num_cpus::get()
22        } else {
23            r.thread.reader
24        };
25        drop(r);
26
27        Server::create(|ctx| {
28            let writer = Writer::new(Arc::clone(&db), ctx.address().recipient()).start();
29            let subscriber = Subscriber::new(ctx.address().recipient(), setting.clone()).start();
30            let addr = ctx.address().recipient();
31            info!("starting {} reader workers", num);
32            let reader = SyncArbiter::start(num, move || {
33                Reader::new(Arc::clone(&db), addr.clone(), setting.clone())
34            });
35
36            Server {
37                id: 0,
38                writer,
39                reader,
40                subscriber,
41                sessions: HashMap::new(),
42            }
43        })
44    }
45
46    fn send_to_client(&self, id: usize, msg: OutgoingMessage) {
47        if let Some(addr) = self.sessions.get(&id) {
48            addr.do_send(msg);
49        }
50    }
51}
52
53/// Make actor from `Server`
54impl Actor for Server {
55    /// We are going to use simple Context, we just need ability to communicate
56    /// with other actors.
57    type Context = Context<Self>;
58    fn started(&mut self, ctx: &mut Self::Context) {
59        ctx.set_mailbox_capacity(10000);
60        info!("Actor server started");
61    }
62}
63
64/// Handler for Connect message.
65///
66/// Register new session and assign unique id to this session
67impl Handler<Connect> for Server {
68    type Result = usize;
69    fn handle(&mut self, msg: Connect, _ctx: &mut Self::Context) -> Self::Result {
70        if self.id == usize::MAX {
71            self.id = 0;
72        }
73        self.id += 1;
74        self.sessions.insert(self.id, msg.addr);
75        // send id back
76        self.id
77    }
78}
79
80/// Handler for Disconnect message.
81impl Handler<Disconnect> for Server {
82    type Result = ();
83
84    fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) {
85        // remove address
86        self.sessions.remove(&msg.id);
87
88        // clear subscriptions
89        self.subscriber.do_send(Unsubscribe {
90            id: msg.id,
91            sub_id: None,
92        });
93    }
94}
95
96/// Handler for Message message.
97impl Handler<ClientMessage> for Server {
98    type Result = ();
99    fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) {
100        match msg.msg {
101            IncomingMessage::Event(event) => {
102                // save all event
103                // save ephemeral for check duplicate, disconnection recovery, will be deleted
104                self.writer.do_send(WriteEvent { id: msg.id, event })
105            }
106            IncomingMessage::Close(id) => self.subscriber.do_send(Unsubscribe {
107                id: msg.id,
108                sub_id: Some(id),
109            }),
110            IncomingMessage::Req(subscription) => {
111                let session_id = msg.id;
112                let read_event = ReadEvent {
113                    id: msg.id,
114                    subscription: subscription.clone(),
115                };
116                let sub_id = subscription.id.clone();
117                self.subscriber
118                    .send(Subscribe {
119                        id: msg.id,
120                        subscription,
121                    })
122                    .into_actor(self)
123                    .then(move |res, act, _ctx| {
124                        match res {
125                            Ok(res) => match res {
126                                Subscribed::Ok => {
127                                    act.reader.do_send(read_event);
128                                }
129                                Subscribed::Overlimit => {
130                                    act.send_to_client(
131                                        session_id,
132                                        OutgoingMessage::closed(
133                                            &sub_id,
134                                            "Number of subscriptions exceeds limit",
135                                        ),
136                                    );
137                                }
138                                Subscribed::InvalidIdLength => {
139                                    act.send_to_client(
140                                        session_id,
141                                        OutgoingMessage::closed(&sub_id, "Subscription id should be non-empty string of max length 64 chars"),
142                                    );
143                                }
144                            },
145                            Err(_err) => {
146                                act.send_to_client(
147                                    session_id,
148                                    OutgoingMessage::closed(&sub_id, "Something is wrong"),
149                                );
150                            }
151                        }
152                        fut::ready(())
153                    })
154                    .wait(ctx);
155            }
156            _ => {
157                self.send_to_client(msg.id, OutgoingMessage::notice("Unsupported message"));
158            }
159        }
160    }
161}
162
163impl Handler<WriteEventResult> for Server {
164    type Result = ();
165    fn handle(&mut self, msg: WriteEventResult, _: &mut Self::Context) {
166        match msg {
167            WriteEventResult::Write { id, event, result } => {
168                let event_id = event.id_str();
169                let out_msg = match &result {
170                    CheckEventResult::Ok(_num) => OutgoingMessage::ok(&event_id, true, ""),
171                    CheckEventResult::Duplicate => {
172                        OutgoingMessage::ok(&event_id, true, "duplicate: event exists")
173                    }
174                    CheckEventResult::Invald(msg) => {
175                        OutgoingMessage::ok(&event_id, false, &format!("invalid: {}", msg))
176                    }
177                    CheckEventResult::Deleted => {
178                        OutgoingMessage::ok(&event_id, false, "deleted: user requested deletion")
179                    }
180                    CheckEventResult::ReplaceIgnored => {
181                        OutgoingMessage::ok(&event_id, false, "replaced: have newer event")
182                    }
183                };
184                self.send_to_client(id, out_msg);
185                // dispatch event to subscriber
186                if let CheckEventResult::Ok(_num) = result {
187                    self.subscriber.do_send(Dispatch { id, event });
188                }
189            }
190            WriteEventResult::Message { id, event: _, msg } => {
191                self.send_to_client(id, msg);
192            }
193        }
194    }
195}
196
197impl Handler<ReadEventResult> for Server {
198    type Result = ();
199    fn handle(&mut self, msg: ReadEventResult, _: &mut Self::Context) {
200        self.send_to_client(msg.id, msg.msg);
201    }
202}
203
204impl Handler<SubscribeResult> for Server {
205    type Result = ();
206    fn handle(&mut self, msg: SubscribeResult, _: &mut Self::Context) {
207        self.send_to_client(msg.id, msg.msg);
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::{temp_data_path, Setting};
215    use actix_rt::time::sleep;
216    use anyhow::Result;
217    use parking_lot::RwLock;
218    use std::time::Duration;
219
220    #[derive(Default)]
221    struct Receiver(Arc<RwLock<Vec<OutgoingMessage>>>);
222    impl Actor for Receiver {
223        type Context = Context<Self>;
224    }
225
226    impl Handler<OutgoingMessage> for Receiver {
227        type Result = ();
228        fn handle(&mut self, msg: OutgoingMessage, _ctx: &mut Self::Context) {
229            self.0.write().push(msg);
230        }
231    }
232
233    #[actix_rt::test]
234    async fn message() -> Result<()> {
235        let db = Arc::new(Db::open(temp_data_path("server")?)?);
236        let note = r#"
237        {
238            "content": "Good morning everyone 😃",
239            "created_at": 1680690006,
240            "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d7d",
241            "kind": 1,
242            "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
243            "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
244            "tags": [["t", "nostr"]]
245          }
246        "#;
247        let ephemeral_note = r#"
248        {
249            "content": "Good morning everyone 😃",
250            "created_at": 1680690006,
251            "id": "332747c0fab8a1a92def4b0937e177be6df4382ce6dd7724f86dc4710b7d4d78",
252            "kind": 20000,
253            "pubkey": "7abf57d516b1ff7308ca3bd5650ea6a4674d469c7c5057b1d005fb13d218bfef",
254            "sig": "ef4ff4f69ac387239eb1401fb07d7a44a5d5d57127e0dc3466a0403cf7d5486b668608ebfcbe9ff1f8d3b5d710545999fe08ee767284ec0b474e4cf92537678f",
255            "tags": [["t", "nostr"]]
256          }
257        "#;
258
259        let receiver = Receiver::default();
260        let messages = receiver.0.clone();
261        let receiver = receiver.start();
262        let addr = receiver.recipient();
263
264        let server = Server::create_with(db, Setting::default().into());
265
266        let id = server.send(Connect { addr }).await?;
267        assert_eq!(id, 1);
268
269        // Unsupported
270        {
271            let text = r#"["UNKNOWN"]"#.to_owned();
272            let msg = serde_json::from_str::<IncomingMessage>(&text)?;
273            let client_msg = ClientMessage::new(id, text, msg);
274            server.send(client_msg).await?;
275            sleep(Duration::from_millis(50)).await;
276            {
277                let mut w = messages.write();
278                assert_eq!(w.len(), 1);
279                assert!(w.get(0).unwrap().0.contains("Unsupported"));
280                w.clear();
281            }
282        }
283
284        // Subscribe
285        {
286            let text = r#"["REQ", "1", {}]"#.to_owned();
287            let msg = serde_json::from_str::<IncomingMessage>(&text)?;
288            let client_msg = ClientMessage::new(id, text, msg);
289            server.send(client_msg).await?;
290            sleep(Duration::from_millis(50)).await;
291            {
292                let mut w = messages.write();
293                assert_eq!(w.len(), 1);
294                assert!(w.get(0).unwrap().0.contains("EOSE"));
295                w.clear();
296            }
297
298            // write
299            let text = format!(r#"["EVENT", {}]"#, note);
300            let msg = serde_json::from_str::<IncomingMessage>(&text)?;
301            let client_msg = ClientMessage::new(id, text, msg);
302            server.send(client_msg.clone()).await?;
303            sleep(Duration::from_millis(200)).await;
304            {
305                let mut w = messages.write();
306                assert_eq!(w.len(), 2);
307                assert!(w.get(0).unwrap().0.contains("OK"));
308                // subscription message
309                assert!(w.get(1).unwrap().0.contains("EVENT"));
310                w.clear();
311            }
312            // repeat write
313            server.send(client_msg.clone()).await?;
314            sleep(Duration::from_millis(200)).await;
315            {
316                let mut w = messages.write();
317                assert_eq!(w.len(), 1);
318                assert!(w.get(0).unwrap().0.contains("OK"));
319                // No subscription message because the message is duplicated
320                w.clear();
321            }
322
323            // ephemeral event
324            {
325                let text = format!(r#"["EVENT", {}]"#, ephemeral_note);
326                let msg = serde_json::from_str::<IncomingMessage>(&text)?;
327                let client_msg = ClientMessage::new(id, text, msg);
328                server.send(client_msg.clone()).await?;
329                sleep(Duration::from_millis(200)).await;
330                {
331                    let mut w = messages.write();
332                    assert_eq!(w.len(), 2);
333                    assert!(w.get(0).unwrap().0.contains("OK"));
334                    // subscription message
335                    assert!(w.get(1).unwrap().0.contains("EVENT"));
336                    w.clear();
337                }
338                // repeat
339                server.send(client_msg.clone()).await?;
340                sleep(Duration::from_millis(200)).await;
341                {
342                    let mut w = messages.write();
343                    assert_eq!(w.len(), 1);
344                    assert!(w.get(0).unwrap().0.contains("OK"));
345                    // No subscription message because the message is duplicated
346                    w.clear();
347                }
348            }
349
350            // unsubscribe
351
352            let text = r#"["CLOSE", "1"]"#.to_owned();
353            let msg = serde_json::from_str::<IncomingMessage>(&text)?;
354            let client_msg = ClientMessage::new(id, text, msg);
355            server.send(client_msg).await?;
356            sleep(Duration::from_millis(50)).await;
357            {
358                let mut w = messages.write();
359                // assert_eq!(w.len(), 1);
360                // assert!(w.get(0).unwrap().0.contains("EOSE"));
361                w.clear();
362            }
363        }
364
365        // get
366        {
367            let text = r#"["REQ", "1", {}]"#.to_owned();
368            let msg = serde_json::from_str::<IncomingMessage>(&text)?;
369            let client_msg = ClientMessage::new(id, text, msg);
370            server.send(client_msg).await?;
371            sleep(Duration::from_millis(50)).await;
372            {
373                let mut w = messages.write();
374                assert_eq!(w.len(), 3);
375                assert!(w.get(0).unwrap().0.contains("EVENT"));
376                assert!(w.get(1).unwrap().0.contains("EVENT"));
377                assert!(w.get(2).unwrap().0.contains("EOSE"));
378                w.clear();
379            }
380        }
381
382        Ok(())
383    }
384}