1use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23 pub name: String,
24 pub listen_ip: String,
25 pub listen_port: u16,
26 pub interface_id: InterfaceId,
27}
28
29impl Default for TcpServerConfig {
30 fn default() -> Self {
31 TcpServerConfig {
32 name: String::new(),
33 listen_ip: "0.0.0.0".into(),
34 listen_port: 4242,
35 interface_id: InterfaceId(0),
36 }
37 }
38}
39
40struct TcpServerWriter {
42 stream: TcpStream,
43}
44
45impl Writer for TcpServerWriter {
46 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47 self.stream.write_all(&hdlc::frame(data))
48 }
49}
50
51pub fn start(
57 config: TcpServerConfig,
58 tx: EventSender,
59 next_id: Arc<AtomicU64>,
60) -> io::Result<()> {
61 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
62 let listener = TcpListener::bind(&addr)?;
63
64 log::info!("[{}] TCP server listening on {}", config.name, addr);
65
66 let name = config.name.clone();
67 thread::Builder::new()
68 .name(format!("tcp-server-{}", config.interface_id.0))
69 .spawn(move || {
70 listener_loop(listener, name, tx, next_id);
71 })?;
72
73 Ok(())
74}
75
76fn listener_loop(
78 listener: TcpListener,
79 name: String,
80 tx: EventSender,
81 next_id: Arc<AtomicU64>,
82) {
83 for stream_result in listener.incoming() {
84 let stream = match stream_result {
85 Ok(s) => s,
86 Err(e) => {
87 log::warn!("[{}] accept failed: {}", name, e);
88 continue;
89 }
90 };
91
92 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
93 let peer_addr = stream.peer_addr().ok();
94
95 log::info!(
96 "[{}] client connected: {:?} → id {}",
97 name,
98 peer_addr,
99 client_id.0
100 );
101
102 if let Err(e) = stream.set_nodelay(true) {
104 log::warn!("[{}] set_nodelay failed: {}", name, e);
105 }
106
107 let writer_stream = match stream.try_clone() {
109 Ok(s) => s,
110 Err(e) => {
111 log::warn!("[{}] failed to clone stream: {}", name, e);
112 continue;
113 }
114 };
115
116 let writer: Box<dyn Writer> = Box::new(TcpServerWriter { stream: writer_stream });
117
118 let info = InterfaceInfo {
119 id: client_id,
120 name: format!("TCPServerInterface/Client-{}", client_id.0),
121 mode: constants::MODE_FULL,
122 out_capable: true,
123 in_capable: true,
124 bitrate: None,
125 announce_rate_target: None,
126 announce_rate_grace: 0,
127 announce_rate_penalty: 0.0,
128 announce_cap: constants::ANNOUNCE_CAP,
129 is_local_client: false,
130 wants_tunnel: false,
131 tunnel_id: None,
132 mtu: 65535,
133 ia_freq: 0.0,
134 started: 0.0,
135 ingress_control: true,
136 };
137
138 if tx
140 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
141 .is_err()
142 {
143 return;
145 }
146
147 let client_tx = tx.clone();
149 let client_name = name.clone();
150 thread::Builder::new()
151 .name(format!("tcp-server-reader-{}", client_id.0))
152 .spawn(move || {
153 client_reader_loop(stream, client_id, client_name, client_tx);
154 })
155 .ok();
156 }
157}
158
159fn client_reader_loop(
161 mut stream: TcpStream,
162 id: InterfaceId,
163 name: String,
164 tx: EventSender,
165) {
166 let mut decoder = hdlc::Decoder::new();
167 let mut buf = [0u8; 4096];
168
169 loop {
170 match stream.read(&mut buf) {
171 Ok(0) => {
172 log::info!("[{}] client {} disconnected", name, id.0);
173 let _ = tx.send(Event::InterfaceDown(id));
174 return;
175 }
176 Ok(n) => {
177 for frame in decoder.feed(&buf[..n]) {
178 if tx
179 .send(Event::Frame {
180 interface_id: id,
181 data: frame,
182 })
183 .is_err()
184 {
185 return;
187 }
188 }
189 }
190 Err(e) => {
191 log::warn!("[{}] client {} read error: {}", name, id.0, e);
192 let _ = tx.send(Event::InterfaceDown(id));
193 return;
194 }
195 }
196 }
197}
198
199use std::collections::HashMap;
202use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult};
203
204pub struct TcpServerFactory;
206
207impl InterfaceFactory for TcpServerFactory {
208 fn type_name(&self) -> &str { "TCPServerInterface" }
209
210 fn parse_config(
211 &self,
212 name: &str,
213 id: InterfaceId,
214 params: &HashMap<String, String>,
215 ) -> Result<Box<dyn InterfaceConfigData>, String> {
216 let listen_ip = params.get("listen_ip")
217 .cloned()
218 .unwrap_or_else(|| "0.0.0.0".into());
219 let listen_port = params.get("listen_port")
220 .and_then(|v| v.parse().ok())
221 .unwrap_or(4242);
222
223 Ok(Box::new(TcpServerConfig {
224 name: name.to_string(),
225 listen_ip,
226 listen_port,
227 interface_id: id,
228 }))
229 }
230
231 fn start(
232 &self,
233 config: Box<dyn InterfaceConfigData>,
234 ctx: StartContext,
235 ) -> io::Result<StartResult> {
236 let cfg = *config.into_any().downcast::<TcpServerConfig>()
237 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
238 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
239 Ok(StartResult::Listener)
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use std::net::TcpStream;
247 use std::sync::mpsc;
248 use std::time::Duration;
249
250 fn find_free_port() -> u16 {
251 TcpListener::bind("127.0.0.1:0")
252 .unwrap()
253 .local_addr()
254 .unwrap()
255 .port()
256 }
257
258 #[test]
259 fn accept_connection() {
260 let port = find_free_port();
261 let (tx, rx) = mpsc::channel();
262 let next_id = Arc::new(AtomicU64::new(1000));
263
264 let config = TcpServerConfig {
265 name: "test-server".into(),
266 listen_ip: "127.0.0.1".into(),
267 listen_port: port,
268 interface_id: InterfaceId(1),
269 };
270
271 start(config, tx, next_id).unwrap();
272
273 thread::sleep(Duration::from_millis(50));
275
276 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
278
279 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
281 match event {
282 Event::InterfaceUp(id, writer, info) => {
283 assert_eq!(id, InterfaceId(1000));
284 assert!(writer.is_some());
285 assert!(info.is_some());
286 }
287 other => panic!("expected InterfaceUp, got {:?}", other),
288 }
289 }
290
291 #[test]
292 fn receive_frame_from_client() {
293 let port = find_free_port();
294 let (tx, rx) = mpsc::channel();
295 let next_id = Arc::new(AtomicU64::new(2000));
296
297 let config = TcpServerConfig {
298 name: "test-server".into(),
299 listen_ip: "127.0.0.1".into(),
300 listen_port: port,
301 interface_id: InterfaceId(2),
302 };
303
304 start(config, tx, next_id).unwrap();
305 thread::sleep(Duration::from_millis(50));
306
307 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
308
309 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
311
312 let payload: Vec<u8> = (0..32).collect();
314 let framed = hdlc::frame(&payload);
315 client.write_all(&framed).unwrap();
316
317 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
319 match event {
320 Event::Frame { interface_id, data } => {
321 assert_eq!(interface_id, InterfaceId(2000));
322 assert_eq!(data, payload);
323 }
324 other => panic!("expected Frame, got {:?}", other),
325 }
326 }
327
328 #[test]
329 fn send_frame_to_client() {
330 let port = find_free_port();
331 let (tx, rx) = mpsc::channel();
332 let next_id = Arc::new(AtomicU64::new(3000));
333
334 let config = TcpServerConfig {
335 name: "test-server".into(),
336 listen_ip: "127.0.0.1".into(),
337 listen_port: port,
338 interface_id: InterfaceId(3),
339 };
340
341 start(config, tx, next_id).unwrap();
342 thread::sleep(Duration::from_millis(50));
343
344 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
345 client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
346
347 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
349 let mut writer = match event {
350 Event::InterfaceUp(_, Some(w), _) => w,
351 other => panic!("expected InterfaceUp with writer, got {:?}", other),
352 };
353
354 let payload: Vec<u8> = (0..24).collect();
356 writer.send_frame(&payload).unwrap();
357
358 let mut buf = [0u8; 256];
360 let n = client.read(&mut buf).unwrap();
361 let expected = hdlc::frame(&payload);
362 assert_eq!(&buf[..n], &expected[..]);
363 }
364
365 #[test]
366 fn multiple_clients() {
367 let port = find_free_port();
368 let (tx, rx) = mpsc::channel();
369 let next_id = Arc::new(AtomicU64::new(4000));
370
371 let config = TcpServerConfig {
372 name: "test-server".into(),
373 listen_ip: "127.0.0.1".into(),
374 listen_port: port,
375 interface_id: InterfaceId(4),
376 };
377
378 start(config, tx, next_id).unwrap();
379 thread::sleep(Duration::from_millis(50));
380
381 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
383 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
384
385 let mut ids = Vec::new();
387 for _ in 0..2 {
388 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
389 match event {
390 Event::InterfaceUp(id, _, _) => ids.push(id),
391 other => panic!("expected InterfaceUp, got {:?}", other),
392 }
393 }
394
395 assert_eq!(ids.len(), 2);
397 assert_ne!(ids[0], ids[1]);
398 }
399
400 #[test]
401 fn client_disconnect() {
402 let port = find_free_port();
403 let (tx, rx) = mpsc::channel();
404 let next_id = Arc::new(AtomicU64::new(5000));
405
406 let config = TcpServerConfig {
407 name: "test-server".into(),
408 listen_ip: "127.0.0.1".into(),
409 listen_port: port,
410 interface_id: InterfaceId(5),
411 };
412
413 start(config, tx, next_id).unwrap();
414 thread::sleep(Duration::from_millis(50));
415
416 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
417
418 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
420
421 drop(client);
423
424 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
426 assert!(
427 matches!(event, Event::InterfaceDown(InterfaceId(5000))),
428 "expected InterfaceDown(5000), got {:?}",
429 event
430 );
431 }
432
433 #[test]
434 fn server_bind_port() {
435 let port = find_free_port();
436 let (tx, _rx) = mpsc::channel();
437 let next_id = Arc::new(AtomicU64::new(6000));
438
439 let config = TcpServerConfig {
440 name: "test-server".into(),
441 listen_ip: "127.0.0.1".into(),
442 listen_port: port,
443 interface_id: InterfaceId(6),
444 };
445
446 start(config, tx, next_id).unwrap();
448 }
449}