ombrac_server/
server.rs

1use std::sync::Arc;
2
3use ombrac::prelude::*;
4use ombrac_transport::{Acceptor, Reliable};
5
6#[cfg(feature = "datagram")]
7use ombrac_transport::Unreliable;
8
9use ombrac_macros::{error, info};
10
11use super::{Error, Result};
12
13pub struct Server<T> {
14    secret: Secret,
15    transport: T,
16}
17
18impl<T> Server<T> {
19    pub fn new(secret: Secret, transport: T) -> Self {
20        Self { secret, transport }
21    }
22}
23
24impl<T: Acceptor> Server<T> {
25    #[inline]
26    async fn handle_reliable(stream: impl Reliable, secret: Secret) -> Result<()> {
27        Self::handle_connect(stream, secret).await
28    }
29
30    #[cfg(feature = "datagram")]
31    #[inline]
32    async fn handle_unreliable(datagram: impl Unreliable, secret: Secret) -> Result<()> {
33        Self::handle_associate(datagram, secret).await
34    }
35
36    #[inline]
37    async fn handle_connect(mut stream: impl Reliable, secret: Secret) -> Result<()> {
38        use tokio::net::TcpStream;
39
40        let request = Connect::from_async_read(&mut stream).await?;
41
42        if request.secret != secret {
43            return Err(Error::PermissionDenied);
44        }
45
46        let addr = request.address.to_socket_addr().await?;
47
48        info!("Connect {}", addr);
49
50        let mut target = TcpStream::connect(addr).await?;
51
52        ombrac::io::util::copy_bidirectional(&mut stream, &mut target).await?;
53
54        Ok(())
55    }
56
57    #[cfg(feature = "datagram")]
58    #[inline]
59    async fn handle_associate(datagram: impl Unreliable, secret: Secret) -> Result<()> {
60        use std::net::SocketAddr;
61
62        use bytes::Bytes;
63        use tokio::net::UdpSocket;
64        use tokio::time::{Duration, timeout};
65
66        const DEFAULT_BUFFER_SIZE: usize = 2 * 1024;
67        const IDLE_TIMEOUT: Duration = Duration::from_secs(30);
68
69        let local = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], 0));
70        let socket = UdpSocket::bind(local).await?;
71
72        let socket_1 = Arc::new(socket);
73        let socket_2 = Arc::clone(&socket_1);
74        let datagram_1 = Arc::new(datagram);
75        let datagram_2 = Arc::clone(&datagram_1);
76
77        let mut handle_1 = tokio::spawn(async move {
78            loop {
79                let packet_result = timeout(IDLE_TIMEOUT, datagram_1.recv()).await;
80
81                let mut bytes = match packet_result {
82                    Ok(value) => value?,
83                    Err(_) => return Ok(()), // Timeout
84                };
85
86                let packet = Associate::from_bytes(&mut bytes)?;
87                if packet.secret != secret {
88                    return Err(Error::PermissionDenied);
89                };
90
91                let target = packet.address.to_socket_addr().await?;
92                socket_1.send_to(&packet.data, target).await?;
93            }
94        });
95
96        let mut handle_2 = tokio::spawn(async move {
97            let mut buf = [0u8; DEFAULT_BUFFER_SIZE];
98            loop {
99                match timeout(IDLE_TIMEOUT, socket_2.recv_from(&mut buf)).await {
100                    Ok(Ok((n, addr))) => {
101                        let data = Bytes::copy_from_slice(&buf[..n]);
102                        let packet = Associate::with(secret, addr, data);
103
104                        datagram_2.send(packet.to_bytes()?).await?;
105                    }
106                    Ok(Err(e)) => return Err(Error::Io(e)),
107                    Err(_) => break, // Timeout
108                }
109            }
110
111            Ok(())
112        });
113
114        let result = tokio::select! {
115            result = &mut handle_1 => {
116                handle_2.abort();
117                result
118            },
119            result = &mut handle_2 => {
120                handle_1.abort();
121                result
122            },
123        };
124
125        match result {
126            Ok(inner_result) => Ok(inner_result?),
127            Err(e) if e.is_cancelled() => Ok(()),
128            Err(e) => Err(e.into()),
129        }
130    }
131
132    pub async fn listen(self) -> Result<()> {
133        let secret = self.secret;
134
135        let transport = Arc::new(self.transport);
136
137        #[cfg(feature = "datagram")]
138        let mut datagram_handle = {
139            let transport = Arc::clone(&transport);
140            tokio::spawn(async move {
141                loop {
142                    match transport.accept_datagram().await {
143                        Ok(datagram) => tokio::spawn(async move {
144                            if let Err(_error) = Self::handle_unreliable(datagram, secret).await {
145                                error!("{_error}");
146                            }
147                        }),
148
149                        Err(error) => return error,
150                    };
151                }
152            })
153        };
154
155        let mut stream_handle = {
156            let transport = Arc::clone(&transport);
157            tokio::spawn(async move {
158                loop {
159                    match transport.accept_bidirectional().await {
160                        Ok(stream) => tokio::spawn(async move {
161                            if let Err(_error) = Self::handle_reliable(stream, secret).await {
162                                error!("{_error}");
163                            }
164                        }),
165                        Err(error) => return error,
166                    };
167                }
168            })
169        };
170
171        let error = {
172            #[cfg(feature = "datagram")]
173            {
174                tokio::select! {
175                    result = &mut stream_handle => {
176                        datagram_handle.abort();
177                        result?
178                    },
179                    result = &mut datagram_handle => {
180                        stream_handle.abort();
181                        result?
182                    },
183                }
184            }
185
186            #[cfg(not(feature = "datagram"))]
187            {
188                tokio::select! {
189                    result = &mut stream_handle => {
190                        result?
191                    },
192                }
193            }
194        };
195
196        Err(Error::Io(error))
197    }
198}