1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::Writer;
24
25#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28 pub instance_name: String,
29 pub port: u16,
30 pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34 fn default() -> Self {
35 LocalServerConfig {
36 instance_name: "default".into(),
37 port: 37428,
38 interface_id: InterfaceId(0),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46 pub name: String,
47 pub instance_name: String,
48 pub port: u16,
49 pub interface_id: InterfaceId,
50 pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54 fn default() -> Self {
55 LocalClientConfig {
56 name: "Local shared instance".into(),
57 instance_name: "default".into(),
58 port: 37428,
59 interface_id: InterfaceId(0),
60 reconnect_wait: Duration::from_secs(8),
61 }
62 }
63}
64
65struct LocalWriter {
67 stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72 self.stream.write_all(&hdlc::frame(data))
73 }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78 use std::io;
79 use std::os::unix::net::{UnixListener, UnixStream};
80
81 pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
83 let path = format!("\0rns/{}", instance_name);
84 UnixListener::bind(path)
85 }
86
87 pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
89 let path = format!("\0rns/{}", instance_name);
90 UnixStream::connect(path)
91 }
92}
93
94pub fn start_server(
100 config: LocalServerConfig,
101 tx: EventSender,
102 next_id: Arc<AtomicU64>,
103) -> io::Result<()> {
104 #[cfg(target_os = "linux")]
106 {
107 match unix_socket::try_bind_unix(&config.instance_name) {
108 Ok(listener) => {
109 log::info!(
110 "Local server using Unix socket: rns/{}",
111 config.instance_name
112 );
113 let name = format!("rns/{}", config.instance_name);
114 thread::Builder::new()
115 .name("local-server".into())
116 .spawn(move || {
117 unix_server_loop(listener, name, tx, next_id);
118 })?;
119 return Ok(());
120 }
121 Err(e) => {
122 log::info!(
123 "Unix socket bind failed ({}), falling back to TCP",
124 e
125 );
126 }
127 }
128 }
129
130 let addr = format!("127.0.0.1:{}", config.port);
132 let listener = TcpListener::bind(&addr)?;
133
134 log::info!("Local server listening on TCP {}", addr);
135
136 thread::Builder::new()
137 .name("local-server".into())
138 .spawn(move || {
139 tcp_server_loop(listener, tx, next_id);
140 })?;
141
142 Ok(())
143}
144
145fn tcp_server_loop(listener: TcpListener, tx: EventSender, next_id: Arc<AtomicU64>) {
147 for stream_result in listener.incoming() {
148 let stream = match stream_result {
149 Ok(s) => s,
150 Err(e) => {
151 log::warn!("Local server accept failed: {}", e);
152 continue;
153 }
154 };
155
156 if let Err(e) = stream.set_nodelay(true) {
157 log::warn!("Local server set_nodelay failed: {}", e);
158 }
159
160 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
161 spawn_local_client_handler(stream, client_id, tx.clone());
162 }
163}
164
165#[cfg(target_os = "linux")]
167fn unix_server_loop(
168 listener: std::os::unix::net::UnixListener,
169 name: String,
170 tx: EventSender,
171 next_id: Arc<AtomicU64>,
172) {
173 for stream_result in listener.incoming() {
174 let stream = match stream_result {
175 Ok(s) => s,
176 Err(e) => {
177 log::warn!("[{}] Local server accept failed: {}", name, e);
178 continue;
179 }
180 };
181
182 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
183
184 let writer_stream = match stream.try_clone() {
186 Ok(s) => s,
187 Err(e) => {
188 log::warn!("Local server clone failed: {}", e);
189 continue;
190 }
191 };
192
193 let info = make_local_interface_info(client_id);
194 let writer: Box<dyn Writer> = Box::new(UnixLocalWriter { stream: writer_stream });
195
196 if tx
197 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
198 .is_err()
199 {
200 return;
201 }
202
203 let client_tx = tx.clone();
204 thread::Builder::new()
205 .name(format!("local-unix-reader-{}", client_id.0))
206 .spawn(move || {
207 unix_reader_loop(stream, client_id, client_tx);
208 })
209 .ok();
210 }
211}
212
213#[cfg(target_os = "linux")]
214struct UnixLocalWriter {
215 stream: std::os::unix::net::UnixStream,
216}
217
218#[cfg(target_os = "linux")]
219impl Writer for UnixLocalWriter {
220 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
221 use std::io::Write;
222 self.stream.write_all(&hdlc::frame(data))
223 }
224}
225
226#[cfg(target_os = "linux")]
227fn unix_reader_loop(
228 mut stream: std::os::unix::net::UnixStream,
229 id: InterfaceId,
230 tx: EventSender,
231) {
232 use std::io::Read;
233 let mut decoder = hdlc::Decoder::new();
234 let mut buf = [0u8; 4096];
235
236 loop {
237 match stream.read(&mut buf) {
238 Ok(0) => {
239 let _ = tx.send(Event::InterfaceDown(id));
240 return;
241 }
242 Ok(n) => {
243 for frame in decoder.feed(&buf[..n]) {
244 if tx
245 .send(Event::Frame {
246 interface_id: id,
247 data: frame,
248 })
249 .is_err()
250 {
251 return;
252 }
253 }
254 }
255 Err(_) => {
256 let _ = tx.send(Event::InterfaceDown(id));
257 return;
258 }
259 }
260 }
261}
262
263fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
265 let writer_stream = match stream.try_clone() {
266 Ok(s) => s,
267 Err(e) => {
268 log::warn!("Local server clone failed: {}", e);
269 return;
270 }
271 };
272
273 let info = make_local_interface_info(client_id);
274 let writer: Box<dyn Writer> = Box::new(LocalWriter { stream: writer_stream });
275
276 if tx
277 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
278 .is_err()
279 {
280 return;
281 }
282
283 thread::Builder::new()
284 .name(format!("local-reader-{}", client_id.0))
285 .spawn(move || {
286 tcp_reader_loop(stream, client_id, tx);
287 })
288 .ok();
289}
290
291fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
292 let mut decoder = hdlc::Decoder::new();
293 let mut buf = [0u8; 4096];
294
295 loop {
296 match stream.read(&mut buf) {
297 Ok(0) => {
298 log::info!("Local client {} disconnected", id.0);
299 let _ = tx.send(Event::InterfaceDown(id));
300 return;
301 }
302 Ok(n) => {
303 for frame in decoder.feed(&buf[..n]) {
304 if tx
305 .send(Event::Frame {
306 interface_id: id,
307 data: frame,
308 })
309 .is_err()
310 {
311 return;
312 }
313 }
314 }
315 Err(e) => {
316 log::warn!("Local client {} read error: {}", id.0, e);
317 let _ = tx.send(Event::InterfaceDown(id));
318 return;
319 }
320 }
321 }
322}
323
324fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
325 InterfaceInfo {
326 id,
327 name: String::from("LocalInterface"),
328 mode: constants::MODE_FULL,
329 out_capable: true,
330 in_capable: true,
331 bitrate: Some(1_000_000_000), announce_rate_target: None,
333 announce_rate_grace: 0,
334 announce_rate_penalty: 0.0,
335 announce_cap: constants::ANNOUNCE_CAP,
336 is_local_client: false,
337 wants_tunnel: false,
338 tunnel_id: None,
339 }
340}
341
342pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
348 let id = config.interface_id;
349
350 #[cfg(target_os = "linux")]
352 {
353 match unix_socket::try_connect_unix(&config.instance_name) {
354 Ok(stream) => {
355 log::info!(
356 "[{}] Connected to shared instance via Unix socket: rns/{}",
357 config.name,
358 config.instance_name
359 );
360
361 let writer_stream = stream.try_clone()?;
362 let _ = tx.send(Event::InterfaceUp(id, None, None));
363
364 let client_tx = tx;
365 thread::Builder::new()
366 .name(format!("local-client-reader-{}", id.0))
367 .spawn(move || {
368 unix_reader_loop(stream, id, client_tx);
369 })?;
370
371 return Ok(Box::new(UnixLocalWriter { stream: writer_stream }));
372 }
373 Err(e) => {
374 log::info!(
375 "[{}] Unix socket connect failed ({}), trying TCP",
376 config.name,
377 e
378 );
379 }
380 }
381 }
382
383 let addr = format!("127.0.0.1:{}", config.port);
385 let stream = TcpStream::connect(&addr)?;
386 stream.set_nodelay(true)?;
387
388 log::info!("[{}] Connected to shared instance via TCP {}", config.name, addr);
389
390 let reader_stream = stream.try_clone()?;
391 let writer_stream = stream.try_clone()?;
392
393 let _ = tx.send(Event::InterfaceUp(id, None, None));
394
395 thread::Builder::new()
396 .name(format!("local-client-reader-{}", id.0))
397 .spawn(move || {
398 tcp_reader_loop(reader_stream, id, tx);
399 })?;
400
401 Ok(Box::new(LocalWriter { stream: writer_stream }))
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use std::sync::mpsc;
408
409 fn find_free_port() -> u16 {
410 TcpListener::bind("127.0.0.1:0")
411 .unwrap()
412 .local_addr()
413 .unwrap()
414 .port()
415 }
416
417 #[test]
418 fn server_bind_tcp() {
419 let port = find_free_port();
420 let (tx, _rx) = mpsc::channel();
421 let next_id = Arc::new(AtomicU64::new(7000));
422
423 let config = LocalServerConfig {
424 instance_name: "test-bind".into(),
425 port,
426 interface_id: InterfaceId(70),
427 };
428
429 start_server(config, tx, next_id).unwrap();
432 thread::sleep(Duration::from_millis(50));
433
434 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
436 }
437
438 #[test]
439 fn server_accept_client() {
440 let port = find_free_port();
441 let (tx, rx) = mpsc::channel();
442 let next_id = Arc::new(AtomicU64::new(7100));
443
444 let config = LocalServerConfig {
445 instance_name: "test-accept".into(),
446 port,
447 interface_id: InterfaceId(71),
448 };
449
450 start_server(config, tx, next_id).unwrap();
451 thread::sleep(Duration::from_millis(50));
452
453 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
454
455 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
456 match event {
457 Event::InterfaceUp(id, writer, info) => {
458 assert_eq!(id, InterfaceId(7100));
459 assert!(writer.is_some());
460 assert!(info.is_some());
461 }
462 other => panic!("expected InterfaceUp, got {:?}", other),
463 }
464 }
465
466 #[test]
467 fn client_send_receive() {
468 let port = find_free_port();
469 let (server_tx, server_rx) = mpsc::channel();
470 let next_id = Arc::new(AtomicU64::new(7200));
471
472 let server_config = LocalServerConfig {
473 instance_name: "test-sr".into(),
474 port,
475 interface_id: InterfaceId(72),
476 };
477
478 start_server(server_config, server_tx, next_id).unwrap();
479 thread::sleep(Duration::from_millis(50));
480
481 let (client_tx, client_rx) = mpsc::channel();
483 let client_config = LocalClientConfig {
484 name: "test-client".into(),
485 instance_name: "test-sr".into(),
486 port,
487 interface_id: InterfaceId(73),
488 reconnect_wait: Duration::from_secs(1),
489 };
490
491 let mut client_writer = start_client(client_config, client_tx).unwrap();
492
493 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
495 let mut server_writer = match event {
496 Event::InterfaceUp(_, Some(w), _) => w,
497 other => panic!("expected InterfaceUp with writer, got {:?}", other),
498 };
499
500 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
502 match event {
503 Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
504 other => panic!("expected InterfaceUp, got {:?}", other),
505 }
506
507 let payload: Vec<u8> = (0..32).collect();
509 client_writer.send_frame(&payload).unwrap();
510
511 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
512 match event {
513 Event::Frame { data, .. } => assert_eq!(data, payload),
514 other => panic!("expected Frame, got {:?}", other),
515 }
516
517 let payload2: Vec<u8> = (100..132).collect();
519 server_writer.send_frame(&payload2).unwrap();
520
521 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
522 match event {
523 Event::Frame { data, .. } => assert_eq!(data, payload2),
524 other => panic!("expected Frame, got {:?}", other),
525 }
526 }
527
528 #[test]
529 fn multiple_local_clients() {
530 let port = find_free_port();
531 let (tx, rx) = mpsc::channel();
532 let next_id = Arc::new(AtomicU64::new(7300));
533
534 let config = LocalServerConfig {
535 instance_name: "test-multi".into(),
536 port,
537 interface_id: InterfaceId(74),
538 };
539
540 start_server(config, tx, next_id).unwrap();
541 thread::sleep(Duration::from_millis(50));
542
543 let _c1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
544 let _c2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
545
546 let mut ids = Vec::new();
547 for _ in 0..2 {
548 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
549 match event {
550 Event::InterfaceUp(id, _, _) => ids.push(id),
551 other => panic!("expected InterfaceUp, got {:?}", other),
552 }
553 }
554
555 assert_eq!(ids.len(), 2);
556 assert_ne!(ids[0], ids[1]);
557 }
558
559 #[test]
560 fn client_disconnect_detected() {
561 let port = find_free_port();
562 let (tx, rx) = mpsc::channel();
563 let next_id = Arc::new(AtomicU64::new(7400));
564
565 let config = LocalServerConfig {
566 instance_name: "test-dc".into(),
567 port,
568 interface_id: InterfaceId(75),
569 };
570
571 start_server(config, tx, next_id).unwrap();
572 thread::sleep(Duration::from_millis(50));
573
574 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
575
576 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
578
579 drop(client);
581
582 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
583 assert!(
584 matches!(event, Event::InterfaceDown(_)),
585 "expected InterfaceDown, got {:?}",
586 event
587 );
588 }
589}