1use std::sync::Arc;
2
3use ombrac::prelude::*;
4use ombrac_transport::{Acceptor, Reliable};
5
6#[cfg(feature = "datagram")]
7use ombrac_transport::Unreliable;
8
9use ombrac_macros::{error, info};
10
11use super::{Error, Result};
12
13pub struct Server<T> {
14 secret: Secret,
15 transport: T,
16}
17
18impl<T> Server<T> {
19 pub fn new(secret: Secret, transport: T) -> Self {
20 Self { secret, transport }
21 }
22}
23
24impl<T: Acceptor> Server<T> {
25 #[inline]
26 async fn handle_reliable(stream: impl Reliable, secret: Secret) -> Result<()> {
27 Self::handle_connect(stream, secret).await
28 }
29
30 #[cfg(feature = "datagram")]
31 #[inline]
32 async fn handle_unreliable(datagram: impl Unreliable, secret: Secret) -> Result<()> {
33 Self::handle_associate(datagram, secret).await
34 }
35
36 #[inline]
37 async fn handle_connect(mut stream: impl Reliable, secret: Secret) -> Result<()> {
38 use tokio::net::TcpStream;
39
40 let request = Connect::from_async_read(&mut stream).await?;
41
42 if request.secret != secret {
43 return Err(Error::PermissionDenied);
44 }
45
46 let addr = request.address.to_socket_addr().await?;
47
48 info!("Connect {}", addr);
49
50 let mut target = TcpStream::connect(addr).await?;
51
52 ombrac::io::util::copy_bidirectional(&mut stream, &mut target).await?;
53
54 Ok(())
55 }
56
57 #[cfg(feature = "datagram")]
58 #[inline]
59 async fn handle_associate(datagram: impl Unreliable, secret: Secret) -> Result<()> {
60 use std::net::SocketAddr;
61
62 use bytes::Bytes;
63 use tokio::net::UdpSocket;
64 use tokio::time::{Duration, timeout};
65
66 const DEFAULT_BUFFER_SIZE: usize = 2 * 1024;
67 const IDLE_TIMEOUT: Duration = Duration::from_secs(30);
68
69 let local = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], 0));
70 let socket = UdpSocket::bind(local).await?;
71
72 let socket_1 = Arc::new(socket);
73 let socket_2 = Arc::clone(&socket_1);
74 let datagram_1 = Arc::new(datagram);
75 let datagram_2 = Arc::clone(&datagram_1);
76
77 let mut handle_1 = tokio::spawn(async move {
78 loop {
79 let packet_result = timeout(IDLE_TIMEOUT, datagram_1.recv()).await;
80
81 let mut bytes = match packet_result {
82 Ok(value) => value?,
83 Err(_) => return Ok(()), };
85
86 let packet = Associate::from_bytes(&mut bytes)?;
87 if packet.secret != secret {
88 return Err(Error::PermissionDenied);
89 };
90
91 let target = packet.address.to_socket_addr().await?;
92 socket_1.send_to(&packet.data, target).await?;
93 }
94 });
95
96 let mut handle_2 = tokio::spawn(async move {
97 let mut buf = [0u8; DEFAULT_BUFFER_SIZE];
98 loop {
99 match timeout(IDLE_TIMEOUT, socket_2.recv_from(&mut buf)).await {
100 Ok(Ok((n, addr))) => {
101 let data = Bytes::copy_from_slice(&buf[..n]);
102 let packet = Associate::with(secret, addr, data);
103
104 datagram_2.send(packet.to_bytes()?).await?;
105 }
106 Ok(Err(e)) => return Err(Error::Io(e)),
107 Err(_) => break, }
109 }
110
111 Ok(())
112 });
113
114 let result = tokio::select! {
115 result = &mut handle_1 => {
116 handle_2.abort();
117 result
118 },
119 result = &mut handle_2 => {
120 handle_1.abort();
121 result
122 },
123 };
124
125 match result {
126 Ok(inner_result) => Ok(inner_result?),
127 Err(e) if e.is_cancelled() => Ok(()),
128 Err(e) => Err(e.into()),
129 }
130 }
131
132 pub async fn listen(self) -> Result<()> {
133 let secret = self.secret;
134
135 let transport = Arc::new(self.transport);
136
137 #[cfg(feature = "datagram")]
138 let mut datagram_handle = {
139 let transport = Arc::clone(&transport);
140 tokio::spawn(async move {
141 loop {
142 match transport.accept_datagram().await {
143 Ok(datagram) => tokio::spawn(async move {
144 if let Err(_error) = Self::handle_unreliable(datagram, secret).await {
145 error!("{_error}");
146 }
147 }),
148
149 Err(error) => return error,
150 };
151 }
152 })
153 };
154
155 let mut stream_handle = {
156 let transport = Arc::clone(&transport);
157 tokio::spawn(async move {
158 loop {
159 match transport.accept_bidirectional().await {
160 Ok(stream) => tokio::spawn(async move {
161 if let Err(_error) = Self::handle_reliable(stream, secret).await {
162 error!("{_error}");
163 }
164 }),
165 Err(error) => return error,
166 };
167 }
168 })
169 };
170
171 let error = {
172 #[cfg(feature = "datagram")]
173 {
174 tokio::select! {
175 result = &mut stream_handle => {
176 datagram_handle.abort();
177 result?
178 },
179 result = &mut datagram_handle => {
180 stream_handle.abort();
181 result?
182 },
183 }
184 }
185
186 #[cfg(not(feature = "datagram"))]
187 {
188 tokio::select! {
189 result = &mut stream_handle => {
190 result?
191 },
192 }
193 }
194 };
195
196 Err(Error::Io(error))
197 }
198}