1use std::io::{self, Read, Write};
8use std::net::{TcpListener, TcpStream};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread;
12
13use rns_core::constants;
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::hdlc;
18use crate::interface::Writer;
19
20#[derive(Debug, Clone)]
22pub struct TcpServerConfig {
23 pub name: String,
24 pub listen_ip: String,
25 pub listen_port: u16,
26 pub interface_id: InterfaceId,
27}
28
29impl Default for TcpServerConfig {
30 fn default() -> Self {
31 TcpServerConfig {
32 name: String::new(),
33 listen_ip: "0.0.0.0".into(),
34 listen_port: 4242,
35 interface_id: InterfaceId(0),
36 }
37 }
38}
39
40struct TcpServerWriter {
42 stream: TcpStream,
43}
44
45impl Writer for TcpServerWriter {
46 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47 self.stream.write_all(&hdlc::frame(data))
48 }
49}
50
51pub fn start(config: TcpServerConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
57 let addr = format!("{}:{}", config.listen_ip, config.listen_port);
58 let listener = TcpListener::bind(&addr)?;
59
60 log::info!("[{}] TCP server listening on {}", config.name, addr);
61
62 let name = config.name.clone();
63 thread::Builder::new()
64 .name(format!("tcp-server-{}", config.interface_id.0))
65 .spawn(move || {
66 listener_loop(listener, name, tx, next_id);
67 })?;
68
69 Ok(())
70}
71
72fn listener_loop(listener: TcpListener, name: String, tx: EventSender, next_id: Arc<AtomicU64>) {
74 for stream_result in listener.incoming() {
75 let stream = match stream_result {
76 Ok(s) => s,
77 Err(e) => {
78 log::warn!("[{}] accept failed: {}", name, e);
79 continue;
80 }
81 };
82
83 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
84 let peer_addr = stream.peer_addr().ok();
85
86 log::info!(
87 "[{}] client connected: {:?} → id {}",
88 name,
89 peer_addr,
90 client_id.0
91 );
92
93 if let Err(e) = stream.set_nodelay(true) {
95 log::warn!("[{}] set_nodelay failed: {}", name, e);
96 }
97
98 let writer_stream = match stream.try_clone() {
100 Ok(s) => s,
101 Err(e) => {
102 log::warn!("[{}] failed to clone stream: {}", name, e);
103 continue;
104 }
105 };
106
107 let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
108 stream: writer_stream,
109 });
110
111 let info = InterfaceInfo {
112 id: client_id,
113 name: format!("TCPServerInterface/Client-{}", client_id.0),
114 mode: constants::MODE_FULL,
115 out_capable: true,
116 in_capable: true,
117 bitrate: None,
118 announce_rate_target: None,
119 announce_rate_grace: 0,
120 announce_rate_penalty: 0.0,
121 announce_cap: constants::ANNOUNCE_CAP,
122 is_local_client: false,
123 wants_tunnel: false,
124 tunnel_id: None,
125 mtu: 65535,
126 ia_freq: 0.0,
127 started: 0.0,
128 ingress_control: true,
129 };
130
131 if tx
133 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
134 .is_err()
135 {
136 return;
138 }
139
140 let client_tx = tx.clone();
142 let client_name = name.clone();
143 thread::Builder::new()
144 .name(format!("tcp-server-reader-{}", client_id.0))
145 .spawn(move || {
146 client_reader_loop(stream, client_id, client_name, client_tx);
147 })
148 .ok();
149 }
150}
151
152fn client_reader_loop(mut stream: TcpStream, id: InterfaceId, name: String, tx: EventSender) {
154 let mut decoder = hdlc::Decoder::new();
155 let mut buf = [0u8; 4096];
156
157 loop {
158 match stream.read(&mut buf) {
159 Ok(0) => {
160 log::info!("[{}] client {} disconnected", name, id.0);
161 let _ = tx.send(Event::InterfaceDown(id));
162 return;
163 }
164 Ok(n) => {
165 for frame in decoder.feed(&buf[..n]) {
166 if tx
167 .send(Event::Frame {
168 interface_id: id,
169 data: frame,
170 })
171 .is_err()
172 {
173 return;
175 }
176 }
177 }
178 Err(e) => {
179 log::warn!("[{}] client {} read error: {}", name, id.0, e);
180 let _ = tx.send(Event::InterfaceDown(id));
181 return;
182 }
183 }
184 }
185}
186
187use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
190use std::collections::HashMap;
191
192pub struct TcpServerFactory;
194
195impl InterfaceFactory for TcpServerFactory {
196 fn type_name(&self) -> &str {
197 "TCPServerInterface"
198 }
199
200 fn parse_config(
201 &self,
202 name: &str,
203 id: InterfaceId,
204 params: &HashMap<String, String>,
205 ) -> Result<Box<dyn InterfaceConfigData>, String> {
206 let listen_ip = params
207 .get("listen_ip")
208 .cloned()
209 .unwrap_or_else(|| "0.0.0.0".into());
210 let listen_port = params
211 .get("listen_port")
212 .and_then(|v| v.parse().ok())
213 .unwrap_or(4242);
214
215 Ok(Box::new(TcpServerConfig {
216 name: name.to_string(),
217 listen_ip,
218 listen_port,
219 interface_id: id,
220 }))
221 }
222
223 fn start(
224 &self,
225 config: Box<dyn InterfaceConfigData>,
226 ctx: StartContext,
227 ) -> io::Result<StartResult> {
228 let cfg = *config
229 .into_any()
230 .downcast::<TcpServerConfig>()
231 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
232 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
233 Ok(StartResult::Listener)
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use std::net::TcpStream;
241 use std::sync::mpsc;
242 use std::time::Duration;
243
244 fn find_free_port() -> u16 {
245 TcpListener::bind("127.0.0.1:0")
246 .unwrap()
247 .local_addr()
248 .unwrap()
249 .port()
250 }
251
252 #[test]
253 fn accept_connection() {
254 let port = find_free_port();
255 let (tx, rx) = mpsc::channel();
256 let next_id = Arc::new(AtomicU64::new(1000));
257
258 let config = TcpServerConfig {
259 name: "test-server".into(),
260 listen_ip: "127.0.0.1".into(),
261 listen_port: port,
262 interface_id: InterfaceId(1),
263 };
264
265 start(config, tx, next_id).unwrap();
266
267 thread::sleep(Duration::from_millis(50));
269
270 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
272
273 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
275 match event {
276 Event::InterfaceUp(id, writer, info) => {
277 assert_eq!(id, InterfaceId(1000));
278 assert!(writer.is_some());
279 assert!(info.is_some());
280 }
281 other => panic!("expected InterfaceUp, got {:?}", other),
282 }
283 }
284
285 #[test]
286 fn receive_frame_from_client() {
287 let port = find_free_port();
288 let (tx, rx) = mpsc::channel();
289 let next_id = Arc::new(AtomicU64::new(2000));
290
291 let config = TcpServerConfig {
292 name: "test-server".into(),
293 listen_ip: "127.0.0.1".into(),
294 listen_port: port,
295 interface_id: InterfaceId(2),
296 };
297
298 start(config, tx, next_id).unwrap();
299 thread::sleep(Duration::from_millis(50));
300
301 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
302
303 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
305
306 let payload: Vec<u8> = (0..32).collect();
308 let framed = hdlc::frame(&payload);
309 client.write_all(&framed).unwrap();
310
311 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
313 match event {
314 Event::Frame { interface_id, data } => {
315 assert_eq!(interface_id, InterfaceId(2000));
316 assert_eq!(data, payload);
317 }
318 other => panic!("expected Frame, got {:?}", other),
319 }
320 }
321
322 #[test]
323 fn send_frame_to_client() {
324 let port = find_free_port();
325 let (tx, rx) = mpsc::channel();
326 let next_id = Arc::new(AtomicU64::new(3000));
327
328 let config = TcpServerConfig {
329 name: "test-server".into(),
330 listen_ip: "127.0.0.1".into(),
331 listen_port: port,
332 interface_id: InterfaceId(3),
333 };
334
335 start(config, tx, next_id).unwrap();
336 thread::sleep(Duration::from_millis(50));
337
338 let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
339 client
340 .set_read_timeout(Some(Duration::from_secs(2)))
341 .unwrap();
342
343 let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
345 let mut writer = match event {
346 Event::InterfaceUp(_, Some(w), _) => w,
347 other => panic!("expected InterfaceUp with writer, got {:?}", other),
348 };
349
350 let payload: Vec<u8> = (0..24).collect();
352 writer.send_frame(&payload).unwrap();
353
354 let mut buf = [0u8; 256];
356 let n = client.read(&mut buf).unwrap();
357 let expected = hdlc::frame(&payload);
358 assert_eq!(&buf[..n], &expected[..]);
359 }
360
361 #[test]
362 fn multiple_clients() {
363 let port = find_free_port();
364 let (tx, rx) = mpsc::channel();
365 let next_id = Arc::new(AtomicU64::new(4000));
366
367 let config = TcpServerConfig {
368 name: "test-server".into(),
369 listen_ip: "127.0.0.1".into(),
370 listen_port: port,
371 interface_id: InterfaceId(4),
372 };
373
374 start(config, tx, next_id).unwrap();
375 thread::sleep(Duration::from_millis(50));
376
377 let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
379 let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
380
381 let mut ids = Vec::new();
383 for _ in 0..2 {
384 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
385 match event {
386 Event::InterfaceUp(id, _, _) => ids.push(id),
387 other => panic!("expected InterfaceUp, got {:?}", other),
388 }
389 }
390
391 assert_eq!(ids.len(), 2);
393 assert_ne!(ids[0], ids[1]);
394 }
395
396 #[test]
397 fn client_disconnect() {
398 let port = find_free_port();
399 let (tx, rx) = mpsc::channel();
400 let next_id = Arc::new(AtomicU64::new(5000));
401
402 let config = TcpServerConfig {
403 name: "test-server".into(),
404 listen_ip: "127.0.0.1".into(),
405 listen_port: port,
406 interface_id: InterfaceId(5),
407 };
408
409 start(config, tx, next_id).unwrap();
410 thread::sleep(Duration::from_millis(50));
411
412 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
413
414 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
416
417 drop(client);
419
420 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
422 assert!(
423 matches!(event, Event::InterfaceDown(InterfaceId(5000))),
424 "expected InterfaceDown(5000), got {:?}",
425 event
426 );
427 }
428
429 #[test]
430 fn server_bind_port() {
431 let port = find_free_port();
432 let (tx, _rx) = mpsc::channel();
433 let next_id = Arc::new(AtomicU64::new(6000));
434
435 let config = TcpServerConfig {
436 name: "test-server".into(),
437 listen_ip: "127.0.0.1".into(),
438 listen_port: port,
439 interface_id: InterfaceId(6),
440 };
441
442 start(config, tx, next_id).unwrap();
444 }
445}