Skip to main content

over_there/core/event/
udp.rs

1use super::AddrEventManager;
2use crate::core::Msg;
3
4use log::error;
5use crate::core::transport::{
6    Authenticator, Bicrypter, Decrypter, Encrypter, Signer,
7    UdpSocketInboundWire, UdpSocketOutboundWire, Verifier, Wire,
8};
9use std::net::SocketAddr;
10use tokio::{net::UdpSocket, runtime::Handle, sync::mpsc};
11
12/// Implementation of AddrEventManager for UDP stream
13impl AddrEventManager {
14    pub fn for_udp_socket<A, B>(
15        handle: Handle,
16        max_outbound_queue: usize,
17        socket: UdpSocket,
18        wire: Wire<A, B>,
19        on_inbound_tx: mpsc::Sender<(
20            Msg,
21            SocketAddr,
22            mpsc::Sender<(Vec<u8>, SocketAddr)>,
23        )>,
24    ) -> AddrEventManager
25    where
26        A: Authenticator + Send + Sync + 'static,
27        B: Bicrypter + Send + Sync + 'static,
28    {
29        let (reader, writer) = wire.with_udp_socket(socket).arc_split();
30
31        let (tx, rx) =
32            mpsc::channel::<(Vec<u8>, SocketAddr)>(max_outbound_queue);
33        let outbound_handle =
34            handle.spawn(udp_socket_outbound_loop(rx, writer));
35        let inbound_handle = handle.spawn(udp_socket_inbound_loop(
36            tx.clone(),
37            reader,
38            on_inbound_tx,
39        ));
40
41        AddrEventManager {
42            outbound_handle,
43            inbound_handle,
44            tx,
45        }
46    }
47}
48
49impl AddrEventManager {
50    // NOTE: This explicit naming only exists as specialization is unstable
51    pub fn for_udp_socket_with_cloneable_wire<A, B>(
52        handle: Handle,
53        max_outbound_queue: usize,
54        socket: UdpSocket,
55        wire: Wire<A, B>,
56        on_inbound_tx: mpsc::Sender<(
57            Msg,
58            SocketAddr,
59            mpsc::Sender<(Vec<u8>, SocketAddr)>,
60        )>,
61    ) -> AddrEventManager
62    where
63        A: Authenticator + Send + Sync + Clone + 'static,
64        B: Bicrypter + Send + Sync + Clone + 'static,
65    {
66        let (reader, writer) = wire.with_udp_socket(socket).clone_split();
67
68        let (tx, rx) =
69            mpsc::channel::<(Vec<u8>, SocketAddr)>(max_outbound_queue);
70        let outbound_handle =
71            handle.spawn(udp_socket_outbound_loop(rx, writer));
72        let inbound_handle = handle.spawn(udp_socket_inbound_loop(
73            tx.clone(),
74            reader,
75            on_inbound_tx,
76        ));
77
78        AddrEventManager {
79            outbound_handle,
80            inbound_handle,
81            tx,
82        }
83    }
84}
85
86async fn udp_socket_outbound_loop<S, E>(
87    mut rx: mpsc::Receiver<(Vec<u8>, SocketAddr)>,
88    mut writer: UdpSocketOutboundWire<S, E>,
89) where
90    S: Signer,
91    E: Encrypter,
92{
93    while let Some((msg, addr)) = rx.recv().await {
94        if let Err(x) = writer.write_to(&msg, addr).await {
95            error!("Failed to send: {}", x);
96            break;
97        }
98    }
99}
100
101async fn udp_socket_inbound_loop<V, D>(
102    tx: mpsc::Sender<(Vec<u8>, SocketAddr)>,
103    mut reader: UdpSocketInboundWire<V, D>,
104    on_inbound_tx: mpsc::Sender<(
105        Msg,
106        SocketAddr,
107        mpsc::Sender<(Vec<u8>, SocketAddr)>,
108    )>,
109) where
110    V: Verifier,
111    D: Decrypter,
112{
113    loop {
114        let tx_2 = tx.clone();
115        let result = reader.read().await;
116        if !super::process_inbound(result, tx_2, on_inbound_tx.clone()).await {
117            break;
118        }
119    }
120}