ombrac_server/
server.rs

1use std::{io, sync::Arc};
2
3use ombrac::prelude::*;
4use ombrac_transport::{Acceptor, Reliable};
5
6#[cfg(feature = "datagram")]
7use ombrac_transport::Unreliable;
8
9use ombrac_macros::{error, info};
10
11pub struct Server<T> {
12    secret: Secret,
13    transport: T,
14}
15
16impl<T: Acceptor> Server<T> {
17    pub fn new(secret: Secret, transport: T) -> Self {
18        Self { secret, transport }
19    }
20
21    #[inline]
22    async fn handle_reliable(stream: impl Reliable, secret: Secret) -> io::Result<()> {
23        Self::handle_connect(stream, secret).await
24    }
25
26    #[cfg(feature = "datagram")]
27    #[inline]
28    async fn handle_unreliable(datagram: impl Unreliable, secret: Secret) -> io::Result<()> {
29        Self::handle_associate(datagram, secret).await
30    }
31
32    #[inline]
33    async fn handle_connect(mut stream: impl Reliable, secret: Secret) -> io::Result<()> {
34        use tokio::net::TcpStream;
35
36        let request = Connect::from_async_read(&mut stream).await?;
37
38        if request.secret != secret {
39            return Err(io::Error::new(
40                io::ErrorKind::PermissionDenied,
41                "Secret does not match",
42            ));
43        }
44
45        let addr = request.address.to_socket_addr().await?;
46
47        info!("Connect {}", addr);
48
49        let mut target = TcpStream::connect(addr).await?;
50
51        ombrac::io::util::copy_bidirectional(&mut stream, &mut target).await?;
52
53        Ok(())
54    }
55
56    #[cfg(feature = "datagram")]
57    #[inline]
58    async fn handle_associate(datagram: impl Unreliable, secret: Secret) -> io::Result<()> {
59        use std::net::SocketAddr;
60
61        use bytes::Bytes;
62        use tokio::net::UdpSocket;
63        use tokio::time::{Duration, timeout};
64
65        const DEFAULT_BUFFER_SIZE: usize = 2 * 1024;
66        const IDLE_TIMEOUT: Duration = Duration::from_secs(30);
67
68        let local = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], 0));
69        let socket = UdpSocket::bind(local).await?;
70
71        let socket_1 = Arc::new(socket);
72        let socket_2 = Arc::clone(&socket_1);
73        let datagram_1 = Arc::new(datagram);
74        let datagram_2 = Arc::clone(&datagram_1);
75
76        let mut handle_1 = tokio::spawn(async move {
77            loop {
78                let packet_result = timeout(IDLE_TIMEOUT, datagram_1.recv()).await;
79
80                let mut bytes = match packet_result {
81                    Ok(value) => value?,
82                    Err(_) => return Ok(()), // Timeout
83                };
84
85                let packet = Associate::from_bytes(&mut bytes)?;
86                if packet.secret != secret {
87                    return Err(io::Error::new(
88                        io::ErrorKind::PermissionDenied,
89                        "Secret does not match",
90                    ));
91                };
92
93                let target = packet.address.to_socket_addr().await?;
94                socket_1.send_to(&packet.data, target).await?;
95            }
96        });
97
98        let mut handle_2 = tokio::spawn(async move {
99            let mut buf = [0u8; DEFAULT_BUFFER_SIZE];
100            loop {
101                match timeout(IDLE_TIMEOUT, socket_2.recv_from(&mut buf)).await {
102                    Ok(Ok((n, addr))) => {
103                        let data = Bytes::copy_from_slice(&buf[..n]);
104                        let packet = Associate::with(secret, addr, data);
105
106                        datagram_2.send(packet.to_bytes()?).await?;
107                    }
108                    Ok(Err(e)) => return Err(e),
109                    Err(_) => break, // Timeout
110                }
111            }
112
113            Ok(())
114        });
115
116        let result = tokio::select! {
117            result = &mut handle_1 => {
118                handle_2.abort();
119                result
120            },
121            result = &mut handle_2 => {
122                handle_1.abort();
123                result
124            },
125        };
126
127        match result {
128            Ok(inner_result) => inner_result,
129            Err(e) if e.is_cancelled() => Ok(()),
130            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
131        }
132    }
133
134    pub async fn listen(self) -> io::Result<()> {
135        let secret = self.secret;
136
137        let transport = Arc::new(self.transport);
138
139        #[cfg(feature = "datagram")]
140        let mut datagram_handle = {
141            let transport = Arc::clone(&transport);
142            tokio::spawn(async move {
143                loop {
144                    match transport.accept_datagram().await {
145                        Ok(datagram) => tokio::spawn(async move {
146                            if let Err(_error) = Self::handle_unreliable(datagram, secret).await {
147                                error!("{_error}");
148                            }
149                        }),
150
151                        Err(error) => return error,
152                    };
153                }
154            })
155        };
156
157        let mut stream_handle = {
158            let transport = Arc::clone(&transport);
159            tokio::spawn(async move {
160                loop {
161                    match transport.accept_bidirectional().await {
162                        Ok(stream) => tokio::spawn(async move {
163                            if let Err(_error) = Self::handle_reliable(stream, secret).await {
164                                error!("{_error}");
165                            }
166                        }),
167                        Err(error) => return error,
168                    };
169                }
170            })
171        };
172
173        let error = {
174            #[cfg(feature = "datagram")]
175            {
176                tokio::select! {
177                    result = &mut stream_handle => {
178                        datagram_handle.abort();
179                        result?
180                    },
181                    result = &mut datagram_handle => {
182                        stream_handle.abort();
183                        result?
184                    },
185                }
186            }
187
188            #[cfg(not(feature = "datagram"))]
189            {
190                tokio::select! {
191                    result = &mut stream_handle => {
192                        result?
193                    },
194                }
195            }
196        };
197
198        Err(error)
199    }
200}