over_there/core/event/
udp.rs1use super::AddrEventManager;
2use crate::core::Msg;
3
4use log::error;
5use crate::core::transport::{
6 Authenticator, Bicrypter, Decrypter, Encrypter, Signer,
7 UdpSocketInboundWire, UdpSocketOutboundWire, Verifier, Wire,
8};
9use std::net::SocketAddr;
10use tokio::{net::UdpSocket, runtime::Handle, sync::mpsc};
11
12impl AddrEventManager {
14 pub fn for_udp_socket<A, B>(
15 handle: Handle,
16 max_outbound_queue: usize,
17 socket: UdpSocket,
18 wire: Wire<A, B>,
19 on_inbound_tx: mpsc::Sender<(
20 Msg,
21 SocketAddr,
22 mpsc::Sender<(Vec<u8>, SocketAddr)>,
23 )>,
24 ) -> AddrEventManager
25 where
26 A: Authenticator + Send + Sync + 'static,
27 B: Bicrypter + Send + Sync + 'static,
28 {
29 let (reader, writer) = wire.with_udp_socket(socket).arc_split();
30
31 let (tx, rx) =
32 mpsc::channel::<(Vec<u8>, SocketAddr)>(max_outbound_queue);
33 let outbound_handle =
34 handle.spawn(udp_socket_outbound_loop(rx, writer));
35 let inbound_handle = handle.spawn(udp_socket_inbound_loop(
36 tx.clone(),
37 reader,
38 on_inbound_tx,
39 ));
40
41 AddrEventManager {
42 outbound_handle,
43 inbound_handle,
44 tx,
45 }
46 }
47}
48
49impl AddrEventManager {
50 pub fn for_udp_socket_with_cloneable_wire<A, B>(
52 handle: Handle,
53 max_outbound_queue: usize,
54 socket: UdpSocket,
55 wire: Wire<A, B>,
56 on_inbound_tx: mpsc::Sender<(
57 Msg,
58 SocketAddr,
59 mpsc::Sender<(Vec<u8>, SocketAddr)>,
60 )>,
61 ) -> AddrEventManager
62 where
63 A: Authenticator + Send + Sync + Clone + 'static,
64 B: Bicrypter + Send + Sync + Clone + 'static,
65 {
66 let (reader, writer) = wire.with_udp_socket(socket).clone_split();
67
68 let (tx, rx) =
69 mpsc::channel::<(Vec<u8>, SocketAddr)>(max_outbound_queue);
70 let outbound_handle =
71 handle.spawn(udp_socket_outbound_loop(rx, writer));
72 let inbound_handle = handle.spawn(udp_socket_inbound_loop(
73 tx.clone(),
74 reader,
75 on_inbound_tx,
76 ));
77
78 AddrEventManager {
79 outbound_handle,
80 inbound_handle,
81 tx,
82 }
83 }
84}
85
86async fn udp_socket_outbound_loop<S, E>(
87 mut rx: mpsc::Receiver<(Vec<u8>, SocketAddr)>,
88 mut writer: UdpSocketOutboundWire<S, E>,
89) where
90 S: Signer,
91 E: Encrypter,
92{
93 while let Some((msg, addr)) = rx.recv().await {
94 if let Err(x) = writer.write_to(&msg, addr).await {
95 error!("Failed to send: {}", x);
96 break;
97 }
98 }
99}
100
101async fn udp_socket_inbound_loop<V, D>(
102 tx: mpsc::Sender<(Vec<u8>, SocketAddr)>,
103 mut reader: UdpSocketInboundWire<V, D>,
104 on_inbound_tx: mpsc::Sender<(
105 Msg,
106 SocketAddr,
107 mpsc::Sender<(Vec<u8>, SocketAddr)>,
108 )>,
109) where
110 V: Verifier,
111 D: Decrypter,
112{
113 loop {
114 let tx_2 = tx.clone();
115 let result = reader.read().await;
116 if !super::process_inbound(result, tx_2, on_inbound_tx.clone()).await {
117 break;
118 }
119 }
120}