1use net_relay::builder::Parts;
2use net_relay::{Builder, Error};
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use std::time::Duration;
8use udp_pool::bytes::BytesMut;
9use udp_pool::net_pool::{Pool, debug, info, instrument_debug_span, tokio_spawn, warn2};
10
11pub struct Relay<F, S, P = udp_pool::Pool>
13where
14 F: Fn(udp_pool::Sender, BytesMut) -> S,
15 S: Future<Output = ()>,
16 P: Pool + udp_pool::UdpPool,
17{
18 parts: Parts<P, F>,
19 pending: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>>,
20}
21
22impl<F, S, P> Relay<F, S, P>
24where
25 F: Fn(udp_pool::Sender, BytesMut) -> S,
26 S: Future<Output = ()>,
27 P: Pool + udp_pool::UdpPool,
28{
29 pub fn build<B: FnOnce(Builder<P, F>) -> Builder<P, F>>(b: B) -> Result<Self, Error> {
30 let builder = Builder::new();
31 let parts = b(builder).build()?;
32 Ok(Relay {
33 parts,
34 pending: None,
35 })
36 }
37
38 pub fn bind_addrs(&self) -> &Vec<SocketAddr> {
39 &self.parts.bind_addrs
40 }
41
42 pub fn relay_fn(&self) -> Arc<F> {
43 self.parts.relay_fn.as_ref().unwrap().clone()
44 }
45
46 pub fn pool(&self) -> Arc<P> {
47 self.parts.pools[0].clone()
48 }
49
50 pub fn set_max_conn(&self, max: Option<usize>) {
52 self.pool().set_max_conn(max)
53 }
54
55 pub fn set_keepalive(&self, duration: Option<Duration>) {
57 self.pool().set_keepalive(duration)
58 }
59}
60
61impl<F, S, P> net_relay::Relay for Relay<F, S, P>
63where
64 F: Fn(udp_pool::Sender, BytesMut) -> S + Send + Sync + 'static,
65 S: Future<Output = ()> + Send + 'static,
66 P: Pool + udp_pool::UdpPool + Send + 'static,
67{
68 fn poll_run(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
69 if self.pending.is_none() {
70 let addrs = self.bind_addrs().clone();
71 let pool = self.pool();
72 let relay_fn = self.relay_fn();
73
74 self.pending = Some(Box::pin(async move {
75 let udp = tokio::net::UdpSocket::bind(addrs.as_slice())
76 .await
77 .map(|u| Arc::new(u))?;
78
79 info!("[Udp Relay] listen on: {:?}", udp.local_addr().unwrap());
80
81 loop {
82 let mut buf = BytesMut::with_capacity(1500);
83 if let Ok((_, addr)) = udp.recv_buf_from(&mut buf).await {
84 let tuple = (pool.clone(), udp.clone(), relay_fn.clone());
85 tokio_spawn! {
86 instrument_debug_span! {
87 async move {
88 match tuple.0.get(addr, Some(tuple.1)).await {
89 Ok(s) => {
90 debug!("[Udp Relay] recv udp packet: {}", buf.len());
91 tuple.2(s, buf).await;
92 },
93 Err(_e) => {
94 warn2!("[Udp Relay] get udp socket from pool, error occurred: {:?}", _e);
95 }
96 }
97 },
98 "udp_socket",
99 address=addr.to_string()
100 }
101 };
102 }
103 }
104 }));
105 }
106
107 self.pending.as_mut().unwrap().as_mut().poll(cx)
108 }
109}
110
111pub async fn default_relay_fn(sender: udp_pool::Sender, data: BytesMut) {
113 match sender.send(data).await {
114 Err(_e) => {
115 debug!("[Udp Relay] transfer packet, error occurred: {:?}", _e);
116 }
117 _ => {}
118 }
119}