1use std::{io, sync::Arc};
2
3use ombrac::prelude::*;
4use ombrac_transport::{Acceptor, Reliable};
5
6#[cfg(feature = "datagram")]
7use ombrac_transport::Unreliable;
8
9use ombrac_macros::{error, info};
10
11pub struct Server<T> {
12 secret: Secret,
13 transport: T,
14}
15
16impl<T: Acceptor> Server<T> {
17 pub fn new(secret: Secret, transport: T) -> Self {
18 Self { secret, transport }
19 }
20
21 #[inline]
22 async fn handle_reliable(stream: impl Reliable, secret: Secret) -> io::Result<()> {
23 Self::handle_connect(stream, secret).await
24 }
25
26 #[cfg(feature = "datagram")]
27 #[inline]
28 async fn handle_unreliable(datagram: impl Unreliable, secret: Secret) -> io::Result<()> {
29 Self::handle_associate(datagram, secret).await
30 }
31
32 #[inline]
33 async fn handle_connect(mut stream: impl Reliable, secret: Secret) -> io::Result<()> {
34 use tokio::net::TcpStream;
35
36 let request = Connect::from_async_read(&mut stream).await?;
37
38 if request.secret != secret {
39 return Err(io::Error::new(
40 io::ErrorKind::PermissionDenied,
41 "Secret does not match",
42 ));
43 }
44
45 let addr = request.address.to_socket_addr().await?;
46
47 info!("Connect {}", addr);
48
49 let mut target = TcpStream::connect(addr).await?;
50
51 ombrac::io::util::copy_bidirectional(&mut stream, &mut target).await?;
52
53 Ok(())
54 }
55
56 #[cfg(feature = "datagram")]
57 #[inline]
58 async fn handle_associate(datagram: impl Unreliable, secret: Secret) -> io::Result<()> {
59 use std::net::SocketAddr;
60
61 use bytes::Bytes;
62 use tokio::net::UdpSocket;
63 use tokio::time::{Duration, timeout};
64
65 const DEFAULT_BUFFER_SIZE: usize = 2 * 1024;
66 const IDLE_TIMEOUT: Duration = Duration::from_secs(30);
67
68 let local = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], 0));
69 let socket = UdpSocket::bind(local).await?;
70
71 let socket_1 = Arc::new(socket);
72 let socket_2 = Arc::clone(&socket_1);
73 let datagram_1 = Arc::new(datagram);
74 let datagram_2 = Arc::clone(&datagram_1);
75
76 let mut handle_1 = tokio::spawn(async move {
77 loop {
78 let packet_result = timeout(IDLE_TIMEOUT, datagram_1.recv()).await;
79
80 let mut bytes = match packet_result {
81 Ok(value) => value?,
82 Err(_) => return Ok(()), };
84
85 let packet = Associate::from_bytes(&mut bytes)?;
86 if packet.secret != secret {
87 return Err(io::Error::new(
88 io::ErrorKind::PermissionDenied,
89 "Secret does not match",
90 ));
91 };
92
93 let target = packet.address.to_socket_addr().await?;
94 socket_1.send_to(&packet.data, target).await?;
95 }
96 });
97
98 let mut handle_2 = tokio::spawn(async move {
99 let mut buf = [0u8; DEFAULT_BUFFER_SIZE];
100 loop {
101 match timeout(IDLE_TIMEOUT, socket_2.recv_from(&mut buf)).await {
102 Ok(Ok((n, addr))) => {
103 let data = Bytes::copy_from_slice(&buf[..n]);
104 let packet = Associate::with(secret, addr, data);
105
106 datagram_2.send(packet.to_bytes()?).await?;
107 }
108 Ok(Err(e)) => return Err(e),
109 Err(_) => break, }
111 }
112
113 Ok(())
114 });
115
116 let result = tokio::select! {
117 result = &mut handle_1 => {
118 handle_2.abort();
119 result
120 },
121 result = &mut handle_2 => {
122 handle_1.abort();
123 result
124 },
125 };
126
127 match result {
128 Ok(inner_result) => inner_result,
129 Err(e) if e.is_cancelled() => Ok(()),
130 Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
131 }
132 }
133
134 pub async fn listen(self) -> io::Result<()> {
135 let secret = self.secret.clone();
136
137 let transport = Arc::new(self.transport);
138
139 #[cfg(feature = "datagram")]
140 let mut datagram_handle = {
141 let transport = Arc::clone(&transport);
142 tokio::spawn(async move {
143 loop {
144 match transport.accept_datagram().await {
145 Ok(datagram) => tokio::spawn(async move {
146 if let Err(_error) = Self::handle_unreliable(datagram, secret).await {
147 error!("{_error}");
148 }
149 }),
150
151 Err(error) => return error,
152 };
153 }
154 })
155 };
156
157 let mut stream_handle = {
158 let transport = Arc::clone(&transport);
159 tokio::spawn(async move {
160 loop {
161 match transport.accept_bidirectional().await {
162 Ok(stream) => tokio::spawn(async move {
163 if let Err(_error) = Self::handle_reliable(stream, secret).await {
164 error!("{_error}");
165 }
166 }),
167 Err(error) => return error,
168 };
169 }
170 })
171 };
172
173 let error = {
174 #[cfg(feature = "datagram")]
175 {
176 tokio::select! {
177 result = &mut stream_handle => {
178 datagram_handle.abort();
179 result?
180 },
181 result = &mut datagram_handle => {
182 stream_handle.abort();
183 result?
184 },
185 }
186 }
187
188 #[cfg(not(feature = "datagram"))]
189 {
190 tokio::select! {
191 result = &mut stream_handle => {
192 result?
193 },
194 }
195 }
196 };
197
198 Err(error)
199 }
200}