udp_relay/
relay.rs

1use net_relay::builder::Parts;
2use net_relay::{Builder, Error};
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use std::time::Duration;
8use udp_pool::bytes::BytesMut;
9use udp_pool::net_pool::{Pool, debug, info, instrument_debug_span, tokio_spawn, warn2};
10
11/// udp relay
12pub struct Relay<F, S, P = udp_pool::Pool>
13where
14    F: Fn(udp_pool::Sender, BytesMut) -> S,
15    S: Future<Output = ()>,
16    P: Pool + udp_pool::UdpPool,
17{
18    parts: Parts<P, F>,
19    pending: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>>,
20}
21
22/// 常规方法
23impl<F, S, P> Relay<F, S, P>
24where
25    F: Fn(udp_pool::Sender, BytesMut) -> S,
26    S: Future<Output = ()>,
27    P: Pool + udp_pool::UdpPool,
28{
29    pub fn build<B: FnOnce(Builder<P, F>) -> Builder<P, F>>(b: B) -> Result<Self, Error> {
30        let builder = Builder::new();
31        let parts = b(builder).build()?;
32        Ok(Relay {
33            parts,
34            pending: None,
35        })
36    }
37
38    pub fn bind_addrs(&self) -> &Vec<SocketAddr> {
39        &self.parts.bind_addrs
40    }
41
42    pub fn relay_fn(&self) -> Arc<F> {
43        self.parts.relay_fn.as_ref().unwrap().clone()
44    }
45
46    pub fn pool(&self) -> Arc<P> {
47        self.parts.pools[0].clone()
48    }
49
50    /// 设置最大连接数
51    pub fn set_max_conn(&self, max: Option<usize>) {
52        self.pool().set_max_conn(max)
53    }
54
55    /// 设置空闲连接保留时长
56    pub fn set_keepalive(&self, duration: Option<Duration>) {
57        self.pool().set_keepalive(duration)
58    }
59}
60
61/// 实现relay接口
62impl<F, S, P> net_relay::Relay for Relay<F, S, P>
63where
64    F: Fn(udp_pool::Sender, BytesMut) -> S + Send + Sync + 'static,
65    S: Future<Output = ()> + Send + 'static,
66    P: Pool + udp_pool::UdpPool + Send + 'static,
67{
68    fn poll_run(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
69        if self.pending.is_none() {
70            let addrs = self.bind_addrs().clone();
71            let pool = self.pool();
72            let relay_fn = self.relay_fn();
73
74            self.pending = Some(Box::pin(async move {
75                let udp = tokio::net::UdpSocket::bind(addrs.as_slice())
76                    .await
77                    .map(|u| Arc::new(u))?;
78
79                info!("[Udp Relay] listen on: {:?}", udp.local_addr().unwrap());
80
81                loop {
82                    let mut buf = BytesMut::with_capacity(1500);
83                    if let Ok((_, addr)) = udp.recv_buf_from(&mut buf).await {
84                        let tuple = (pool.clone(), udp.clone(), relay_fn.clone());
85                        tokio_spawn! {
86                            instrument_debug_span! {
87                                async move {
88                                    match tuple.0.get(addr, Some(tuple.1)).await {
89                                        Ok(s) => {
90                                            debug!("[Udp Relay] recv udp packet: {}", buf.len());
91                                            tuple.2(s, buf).await;
92                                        },
93                                        Err(_e) => {
94                                            warn2!("[Udp Relay] get udp socket from pool, error occurred: {:?}", _e);
95                                        }
96                                    }
97                                },
98                                "udp_socket",
99                                address=addr.to_string()
100                            }
101                        };
102                    }
103                }
104            }));
105        }
106
107        self.pending.as_mut().unwrap().as_mut().poll(cx)
108    }
109}
110
111/// 默认的relay_fn实现
112pub async fn default_relay_fn(sender: udp_pool::Sender, data: BytesMut) {
113    match sender.send(data).await {
114        Err(_e) => {
115            debug!("[Udp Relay] transfer packet, error occurred: {:?}", _e);
116        }
117        _ => {}
118    }
119}