Skip to main content

over_there/core/event/
mod.rs

1mod tcp;
2mod udp;
3
4use crate::core::Msg;
5
6use log::{error, trace, warn};
7use crate::core::transport::InboundWireError;
8use std::net::SocketAddr;
9use tokio::{sync::mpsc, task};
10
11pub struct EventManager {
12    inbound_handle: task::JoinHandle<()>,
13    outbound_handle: task::JoinHandle<()>,
14    tx: mpsc::Sender<Vec<u8>>,
15}
16
17impl EventManager {
18    pub async fn send(&mut self, data: Vec<u8>) -> Result<(), Vec<u8>> {
19        self.tx.send(data).await.map_err(|x| x.0)
20    }
21
22    pub async fn wait(self) -> Result<(), task::JoinError> {
23        tokio::try_join!(self.inbound_handle, self.outbound_handle).map(|_| ())
24    }
25}
26
27pub struct AddrEventManager {
28    inbound_handle: task::JoinHandle<()>,
29    outbound_handle: task::JoinHandle<()>,
30    tx: mpsc::Sender<(Vec<u8>, SocketAddr)>,
31}
32
33impl AddrEventManager {
34    pub async fn send_to(
35        &mut self,
36        data: Vec<u8>,
37        addr: SocketAddr,
38    ) -> Result<(), (Vec<u8>, SocketAddr)> {
39        self.tx.send((data, addr)).await.map_err(|x| x.0)
40    }
41
42    pub async fn wait(self) -> Result<(), task::JoinError> {
43        tokio::try_join!(self.inbound_handle, self.outbound_handle).map(|_| ())
44    }
45}
46
47/// Process result of receiving data, indicating whether should continue
48/// processing additional data
49async fn process_inbound<T>(
50    result: Result<(Option<Vec<u8>>, SocketAddr), InboundWireError>,
51    sender: mpsc::Sender<T>,
52    mut on_inbound_tx: mpsc::Sender<(Msg, SocketAddr, mpsc::Sender<T>)>,
53) -> bool
54where
55    T: Send + 'static,
56{
57    match result {
58        Ok((None, _)) => true,
59        Ok((Some(data), addr)) => {
60            trace!("Incoming data of size {} from {}", data.len(), addr);
61            match Msg::from_slice(&data) {
62                Ok(msg) => {
63                    trace!("Valid msg {:?} from {}", msg, addr);
64
65                    if let Err(x) =
66                        on_inbound_tx.send((msg, addr, sender)).await
67                    {
68                        error!("Encountered error: {}", x);
69                    }
70
71                    true
72                }
73                Err(x) => {
74                    warn!(
75                        "Discarding data of size {} as not valid msg: {}",
76                        data.len(),
77                        x
78                    );
79                    true
80                }
81            }
82        }
83        Err(x) => match x {
84            InboundWireError::IO(x) => {
85                error!("Fatal IO on wire: {}", x);
86                false
87            }
88            InboundWireError::InputProcessor(x) => {
89                error!("Process error on wire: {}", x);
90                true
91            }
92        },
93    }
94}