1use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23 pub name: String,
24 pub listen_ip: String,
25 pub listen_port: u16,
26 pub interface_id: InterfaceId,
27}
28
29impl Default for TcpServerConfig {
30 fn default() -> Self {
31 TcpServerConfig {
32 name: String::new(),
33 listen_ip: "0.0.0.0".into(),
34 listen_port: 4242,
35 interface_id: InterfaceId(0),
36 }
37 }
38}
39
40struct TcpServerWriter {
42 stream: TcpStream,
43}
44
45impl Writer for TcpServerWriter {
46 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47 self.stream.write_all(&hdlc::frame(data))
48 }
49}
50
51pub fn start(
57 config: TcpServerConfig,
58 tx: EventSender,
59 next_id: Arc<AtomicU64>,
60) -> io::Result<()> {
61 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
62 let listener = TcpListener::bind(&addr)?;
63
64 log::info!("[{}] TCP server listening on {}", config.name, addr);
65
66 let name = config.name.clone();
67 thread::Builder::new()
68 .name(format!("tcp-server-{}", config.interface_id.0))
69 .spawn(move || {
70 listener_loop(listener, name, tx, next_id);
71 })?;
72
73 Ok(())
74}
75
76fn listener_loop(
78 listener: TcpListener,
79 name: String,
80 tx: EventSender,
81 next_id: Arc<AtomicU64>,
82) {
83 for stream_result in listener.incoming() {
84 let stream = match stream_result {
85 Ok(s) => s,
86 Err(e) => {
87 log::warn!("[{}] accept failed: {}", name, e);
88 continue;
89 }
90 };
91
92 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
93 let peer_addr = stream.peer_addr().ok();
94
95 log::info!(
96 "[{}] client connected: {:?} → id {}",
97 name,
98 peer_addr,
99 client_id.0
100 );
101
102 if let Err(e) = stream.set_nodelay(true) {
104 log::warn!("[{}] set_nodelay failed: {}", name, e);
105 }
106
107 let writer_stream = match stream.try_clone() {
109 Ok(s) => s,
110 Err(e) => {
111 log::warn!("[{}] failed to clone stream: {}", name, e);
112 continue;
113 }
114 };
115
116 let writer: Box<dyn Writer> = Box::new(TcpServerWriter { stream: writer_stream });
117
118 let info = InterfaceInfo {
119 id: client_id,
120 name: format!("TCPServerInterface/Client-{}", client_id.0),
121 mode: constants::MODE_FULL,
122 out_capable: true,
123 in_capable: true,
124 bitrate: None,
125 announce_rate_target: None,
126 announce_rate_grace: 0,
127 announce_rate_penalty: 0.0,
128 announce_cap: constants::ANNOUNCE_CAP,
129 is_local_client: false,
130 wants_tunnel: false,
131 tunnel_id: None,
132 };
133
134 if tx
136 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
137 .is_err()
138 {
139 return;
141 }
142
143 let client_tx = tx.clone();
145 let client_name = name.clone();
146 thread::Builder::new()
147 .name(format!("tcp-server-reader-{}", client_id.0))
148 .spawn(move || {
149 client_reader_loop(stream, client_id, client_name, client_tx);
150 })
151 .ok();
152 }
153}
154
155fn client_reader_loop(
157 mut stream: TcpStream,
158 id: InterfaceId,
159 name: String,
160 tx: EventSender,
161) {
162 let mut decoder = hdlc::Decoder::new();
163 let mut buf = [0u8; 4096];
164
165 loop {
166 match stream.read(&mut buf) {
167 Ok(0) => {
168 log::info!("[{}] client {} disconnected", name, id.0);
169 let _ = tx.send(Event::InterfaceDown(id));
170 return;
171 }
172 Ok(n) => {
173 for frame in decoder.feed(&buf[..n]) {
174 if tx
175 .send(Event::Frame {
176 interface_id: id,
177 data: frame,
178 })
179 .is_err()
180 {
181 return;
183 }
184 }
185 }
186 Err(e) => {
187 log::warn!("[{}] client {} read error: {}", name, id.0, e);
188 let _ = tx.send(Event::InterfaceDown(id));
189 return;
190 }
191 }
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use std::net::TcpStream;
199 use std::sync::mpsc;
200 use std::time::Duration;
201
202 fn find_free_port() -> u16 {
203 TcpListener::bind("127.0.0.1:0")
204 .unwrap()
205 .local_addr()
206 .unwrap()
207 .port()
208 }
209
210 #[test]
211 fn accept_connection() {
212 let port = find_free_port();
213 let (tx, rx) = mpsc::channel();
214 let next_id = Arc::new(AtomicU64::new(1000));
215
216 let config = TcpServerConfig {
217 name: "test-server".into(),
218 listen_ip: "127.0.0.1".into(),
219 listen_port: port,
220 interface_id: InterfaceId(1),
221 };
222
223 start(config, tx, next_id).unwrap();
224
225 thread::sleep(Duration::from_millis(50));
227
228 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
230
231 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
233 match event {
234 Event::InterfaceUp(id, writer, info) => {
235 assert_eq!(id, InterfaceId(1000));
236 assert!(writer.is_some());
237 assert!(info.is_some());
238 }
239 other => panic!("expected InterfaceUp, got {:?}", other),
240 }
241 }
242
243 #[test]
244 fn receive_frame_from_client() {
245 let port = find_free_port();
246 let (tx, rx) = mpsc::channel();
247 let next_id = Arc::new(AtomicU64::new(2000));
248
249 let config = TcpServerConfig {
250 name: "test-server".into(),
251 listen_ip: "127.0.0.1".into(),
252 listen_port: port,
253 interface_id: InterfaceId(2),
254 };
255
256 start(config, tx, next_id).unwrap();
257 thread::sleep(Duration::from_millis(50));
258
259 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
260
261 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
263
264 let payload: Vec<u8> = (0..32).collect();
266 let framed = hdlc::frame(&payload);
267 client.write_all(&framed).unwrap();
268
269 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
271 match event {
272 Event::Frame { interface_id, data } => {
273 assert_eq!(interface_id, InterfaceId(2000));
274 assert_eq!(data, payload);
275 }
276 other => panic!("expected Frame, got {:?}", other),
277 }
278 }
279
280 #[test]
281 fn send_frame_to_client() {
282 let port = find_free_port();
283 let (tx, rx) = mpsc::channel();
284 let next_id = Arc::new(AtomicU64::new(3000));
285
286 let config = TcpServerConfig {
287 name: "test-server".into(),
288 listen_ip: "127.0.0.1".into(),
289 listen_port: port,
290 interface_id: InterfaceId(3),
291 };
292
293 start(config, tx, next_id).unwrap();
294 thread::sleep(Duration::from_millis(50));
295
296 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
297 client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
298
299 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
301 let mut writer = match event {
302 Event::InterfaceUp(_, Some(w), _) => w,
303 other => panic!("expected InterfaceUp with writer, got {:?}", other),
304 };
305
306 let payload: Vec<u8> = (0..24).collect();
308 writer.send_frame(&payload).unwrap();
309
310 let mut buf = [0u8; 256];
312 let n = client.read(&mut buf).unwrap();
313 let expected = hdlc::frame(&payload);
314 assert_eq!(&buf[..n], &expected[..]);
315 }
316
317 #[test]
318 fn multiple_clients() {
319 let port = find_free_port();
320 let (tx, rx) = mpsc::channel();
321 let next_id = Arc::new(AtomicU64::new(4000));
322
323 let config = TcpServerConfig {
324 name: "test-server".into(),
325 listen_ip: "127.0.0.1".into(),
326 listen_port: port,
327 interface_id: InterfaceId(4),
328 };
329
330 start(config, tx, next_id).unwrap();
331 thread::sleep(Duration::from_millis(50));
332
333 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
335 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
336
337 let mut ids = Vec::new();
339 for _ in 0..2 {
340 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
341 match event {
342 Event::InterfaceUp(id, _, _) => ids.push(id),
343 other => panic!("expected InterfaceUp, got {:?}", other),
344 }
345 }
346
347 assert_eq!(ids.len(), 2);
349 assert_ne!(ids[0], ids[1]);
350 }
351
352 #[test]
353 fn client_disconnect() {
354 let port = find_free_port();
355 let (tx, rx) = mpsc::channel();
356 let next_id = Arc::new(AtomicU64::new(5000));
357
358 let config = TcpServerConfig {
359 name: "test-server".into(),
360 listen_ip: "127.0.0.1".into(),
361 listen_port: port,
362 interface_id: InterfaceId(5),
363 };
364
365 start(config, tx, next_id).unwrap();
366 thread::sleep(Duration::from_millis(50));
367
368 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
369
370 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
372
373 drop(client);
375
376 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
378 assert!(
379 matches!(event, Event::InterfaceDown(InterfaceId(5000))),
380 "expected InterfaceDown(5000), got {:?}",
381 event
382 );
383 }
384
385 #[test]
386 fn server_bind_port() {
387 let port = find_free_port();
388 let (tx, _rx) = mpsc::channel();
389 let next_id = Arc::new(AtomicU64::new(6000));
390
391 let config = TcpServerConfig {
392 name: "test-server".into(),
393 listen_ip: "127.0.0.1".into(),
394 listen_port: port,
395 interface_id: InterfaceId(6),
396 };
397
398 start(config, tx, next_id).unwrap();
400 }
401}