1use tokio::net::UdpSocket;
3use tokio::runtime::Handle;
4use tokio::sync::Mutex;
5use tokio::time::{sleep, Duration}; use tracing::*;
8use crate::error::Error;
10use postcard::*;
11use std::sync::Arc;
13use chrono::Utc;
15
16use crate::host::GenericStore;
17use crate::prelude::*;
18use std::convert::TryInto;
19
20#[tracing::instrument(skip(db))]
22#[inline]
23pub async fn process_udp(
24    rt_handle: Handle,
25    socket: UdpSocket,
26    mut db: sled::Db,
27    max_buffer_size: usize,
28) {
29    let mut buf = vec![0u8; max_buffer_size];
30    let s = Arc::new(socket);
31
32    loop {
33        let s = s.clone();
35        match s.recv_from(&mut buf).await {
36            Ok((0, _)) => break, Ok((n, return_addr)) => {
38                let bytes = &buf[..n];
39                let msg: GenericMsg = match from_bytes(bytes) {
40                    Ok(msg) => msg,
41                    Err(e) => {
42                        error!("Had received Msg of {} bytes: {:?}, Error: {}", n, bytes, e);
43                        continue;
44                    }
45                };
46
47                match msg.msg_type {
48                    MsgType::Set => {
49                        if let Err(e) = db.insert_generic(msg) {
50                            error!("{}", e);
51                        }
52                    }
53                    MsgType::Get => {
54                        let response = match db.get_generic_nth(&msg.topic, 0) {
55                            Ok(g) => g,
56                            Err(e) => GenericMsg::result(Err(e)),
57                        };
58
59                        if let Ok(return_bytes) = response.as_bytes() {
60                            if let Ok(()) = s.writable().await {
61                                if let Err(e) = s.try_send_to(&return_bytes, return_addr) {
62                                    error!("Error sending data back on UDP/GET: {}", e)
63                                };
64                            };
65                        }
66                    }
67                    MsgType::GetNth(n) => {
68                        let response = match db.get_generic_nth(&msg.topic, n) {
69                            Ok(g) => g,
70                            Err(e) => GenericMsg::result(Err(e)),
71                        };
72
73                        if let Ok(return_bytes) = response.as_bytes() {
74                            if let Ok(()) = s.writable().await {
75                                if let Err(e) = s.try_send_to(&return_bytes, return_addr) {
76                                    error!("Error sending data back on UDP/GET: {}", e)
77                                };
78                            };
79                        }
80                    }
81                    MsgType::Topics => {
82                        let response = match db.topics() {
83                            Ok(mut topics) => {
84                                topics.sort();
85                                let msg = Msg::new(MsgType::Topics, "", topics);
86                                match msg.to_generic() {
87                                    Ok(msg) => msg,
88                                    Err(e) => GenericMsg::result(Err(e)),
89                                }
90                            }
91                            Err(e) => GenericMsg::result(Err(e)),
92                        };
93
94                        if let Ok(return_bytes) = response.as_bytes() {
95                            if let Ok(()) = s.writable().await {
96                                if let Err(e) = s.try_send_to(&return_bytes, return_addr) {
97                                    error!("Error sending data back on UDP/GET: {}", e)
98                                };
99                            };
100                        }
101                    }
102                    MsgType::Subscribe => {
103                        let specialized: Msg<Duration> = msg.clone().try_into().unwrap();
104                        let rate = specialized.data;
105
106                        let db = db.clone();
107                        rt_handle.spawn(async move {
108                            loop {
109                                let response = match db.get_generic_nth(&msg.topic, 0) {
110                                    Ok(g) => g,
111                                    Err(e) => GenericMsg::result(Err(e)),
112                                };
113
114                                if let Ok(return_bytes) = response.as_bytes() {
115                                    if let Ok(()) = s.writable().await {
116                                        if let Err(e) = s.try_send_to(&return_bytes, return_addr) {
117                                            error!("Error sending data back on UDP/GET: {}", e)
118                                        };
119                                    };
120                                }
121
122                                sleep(rate).await;
123                            }
124                        });
125                    }
126                    _ => {}
127                }
128            }
129            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
130                continue;
132            }
133            Err(e) => {
134                error!("Error: {:?}", e);
136            }
137        }
138    }
139}