over_there/core/event/
mod.rs1mod tcp;
2mod udp;
3
4use crate::core::Msg;
5
6use log::{error, trace, warn};
7use crate::core::transport::InboundWireError;
8use std::net::SocketAddr;
9use tokio::{sync::mpsc, task};
10
11pub struct EventManager {
12 inbound_handle: task::JoinHandle<()>,
13 outbound_handle: task::JoinHandle<()>,
14 tx: mpsc::Sender<Vec<u8>>,
15}
16
17impl EventManager {
18 pub async fn send(&mut self, data: Vec<u8>) -> Result<(), Vec<u8>> {
19 self.tx.send(data).await.map_err(|x| x.0)
20 }
21
22 pub async fn wait(self) -> Result<(), task::JoinError> {
23 tokio::try_join!(self.inbound_handle, self.outbound_handle).map(|_| ())
24 }
25}
26
27pub struct AddrEventManager {
28 inbound_handle: task::JoinHandle<()>,
29 outbound_handle: task::JoinHandle<()>,
30 tx: mpsc::Sender<(Vec<u8>, SocketAddr)>,
31}
32
33impl AddrEventManager {
34 pub async fn send_to(
35 &mut self,
36 data: Vec<u8>,
37 addr: SocketAddr,
38 ) -> Result<(), (Vec<u8>, SocketAddr)> {
39 self.tx.send((data, addr)).await.map_err(|x| x.0)
40 }
41
42 pub async fn wait(self) -> Result<(), task::JoinError> {
43 tokio::try_join!(self.inbound_handle, self.outbound_handle).map(|_| ())
44 }
45}
46
47async fn process_inbound<T>(
50 result: Result<(Option<Vec<u8>>, SocketAddr), InboundWireError>,
51 sender: mpsc::Sender<T>,
52 mut on_inbound_tx: mpsc::Sender<(Msg, SocketAddr, mpsc::Sender<T>)>,
53) -> bool
54where
55 T: Send + 'static,
56{
57 match result {
58 Ok((None, _)) => true,
59 Ok((Some(data), addr)) => {
60 trace!("Incoming data of size {} from {}", data.len(), addr);
61 match Msg::from_slice(&data) {
62 Ok(msg) => {
63 trace!("Valid msg {:?} from {}", msg, addr);
64
65 if let Err(x) =
66 on_inbound_tx.send((msg, addr, sender)).await
67 {
68 error!("Encountered error: {}", x);
69 }
70
71 true
72 }
73 Err(x) => {
74 warn!(
75 "Discarding data of size {} as not valid msg: {}",
76 data.len(),
77 x
78 );
79 true
80 }
81 }
82 }
83 Err(x) => match x {
84 InboundWireError::IO(x) => {
85 error!("Fatal IO on wire: {}", x);
86 false
87 }
88 InboundWireError::InputProcessor(x) => {
89 error!("Process error on wire: {}", x);
90 true
91 }
92 },
93 }
94}