1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::Writer;
24
25#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28 pub instance_name: String,
29 pub port: u16,
30 pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34 fn default() -> Self {
35 LocalServerConfig {
36 instance_name: "default".into(),
37 port: 37428,
38 interface_id: InterfaceId(0),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46 pub name: String,
47 pub instance_name: String,
48 pub port: u16,
49 pub interface_id: InterfaceId,
50 pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54 fn default() -> Self {
55 LocalClientConfig {
56 name: "Local shared instance".into(),
57 instance_name: "default".into(),
58 port: 37428,
59 interface_id: InterfaceId(0),
60 reconnect_wait: Duration::from_secs(8),
61 }
62 }
63}
64
65struct LocalWriter {
67 stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72 self.stream.write_all(&hdlc::frame(data))
73 }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78 use std::io;
79 use std::os::unix::net::{UnixListener, UnixStream};
80
81 pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
83 let path = format!("\0rns/{}", instance_name);
84 UnixListener::bind(path)
85 }
86
87 pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
89 let path = format!("\0rns/{}", instance_name);
90 UnixStream::connect(path)
91 }
92}
93
94pub fn start_server(
100 config: LocalServerConfig,
101 tx: EventSender,
102 next_id: Arc<AtomicU64>,
103) -> io::Result<()> {
104 #[cfg(target_os = "linux")]
106 {
107 match unix_socket::try_bind_unix(&config.instance_name) {
108 Ok(listener) => {
109 log::info!(
110 "Local server using Unix socket: rns/{}",
111 config.instance_name
112 );
113 let name = format!("rns/{}", config.instance_name);
114 thread::Builder::new()
115 .name("local-server".into())
116 .spawn(move || {
117 unix_server_loop(listener, name, tx, next_id);
118 })?;
119 return Ok(());
120 }
121 Err(e) => {
122 log::info!("Unix socket bind failed ({}), falling back to TCP", e);
123 }
124 }
125 }
126
127 let addr = format!("127.0.0.1:{}", config.port);
129 let listener = TcpListener::bind(&addr)?;
130
131 log::info!("Local server listening on TCP {}", addr);
132
133 thread::Builder::new()
134 .name("local-server".into())
135 .spawn(move || {
136 tcp_server_loop(listener, tx, next_id);
137 })?;
138
139 Ok(())
140}
141
142fn tcp_server_loop(listener: TcpListener, tx: EventSender, next_id: Arc<AtomicU64>) {
144 for stream_result in listener.incoming() {
145 let stream = match stream_result {
146 Ok(s) => s,
147 Err(e) => {
148 log::warn!("Local server accept failed: {}", e);
149 continue;
150 }
151 };
152
153 if let Err(e) = stream.set_nodelay(true) {
154 log::warn!("Local server set_nodelay failed: {}", e);
155 }
156
157 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
158 spawn_local_client_handler(stream, client_id, tx.clone());
159 }
160}
161
162#[cfg(target_os = "linux")]
164fn unix_server_loop(
165 listener: std::os::unix::net::UnixListener,
166 name: String,
167 tx: EventSender,
168 next_id: Arc<AtomicU64>,
169) {
170 for stream_result in listener.incoming() {
171 let stream = match stream_result {
172 Ok(s) => s,
173 Err(e) => {
174 log::warn!("[{}] Local server accept failed: {}", name, e);
175 continue;
176 }
177 };
178
179 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
180
181 let writer_stream = match stream.try_clone() {
183 Ok(s) => s,
184 Err(e) => {
185 log::warn!("Local server clone failed: {}", e);
186 continue;
187 }
188 };
189
190 let info = make_local_interface_info(client_id);
191 let writer: Box<dyn Writer> = Box::new(UnixLocalWriter {
192 stream: writer_stream,
193 });
194
195 if tx
196 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
197 .is_err()
198 {
199 return;
200 }
201
202 let client_tx = tx.clone();
203 thread::Builder::new()
204 .name(format!("local-unix-reader-{}", client_id.0))
205 .spawn(move || {
206 unix_reader_loop(stream, client_id, client_tx);
207 })
208 .ok();
209 }
210}
211
212#[cfg(target_os = "linux")]
213struct UnixLocalWriter {
214 stream: std::os::unix::net::UnixStream,
215}
216
217#[cfg(target_os = "linux")]
218impl Writer for UnixLocalWriter {
219 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
220 use std::io::Write;
221 self.stream.write_all(&hdlc::frame(data))
222 }
223}
224
225#[cfg(target_os = "linux")]
226fn unix_reader_loop(mut stream: std::os::unix::net::UnixStream, id: InterfaceId, tx: EventSender) {
227 use std::io::Read;
228 let mut decoder = hdlc::Decoder::new();
229 let mut buf = [0u8; 4096];
230
231 loop {
232 match stream.read(&mut buf) {
233 Ok(0) => {
234 let _ = tx.send(Event::InterfaceDown(id));
235 return;
236 }
237 Ok(n) => {
238 for frame in decoder.feed(&buf[..n]) {
239 if tx
240 .send(Event::Frame {
241 interface_id: id,
242 data: frame,
243 })
244 .is_err()
245 {
246 return;
247 }
248 }
249 }
250 Err(_) => {
251 let _ = tx.send(Event::InterfaceDown(id));
252 return;
253 }
254 }
255 }
256}
257
258fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
260 let writer_stream = match stream.try_clone() {
261 Ok(s) => s,
262 Err(e) => {
263 log::warn!("Local server clone failed: {}", e);
264 return;
265 }
266 };
267
268 let info = make_local_interface_info(client_id);
269 let writer: Box<dyn Writer> = Box::new(LocalWriter {
270 stream: writer_stream,
271 });
272
273 if tx
274 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
275 .is_err()
276 {
277 return;
278 }
279
280 thread::Builder::new()
281 .name(format!("local-reader-{}", client_id.0))
282 .spawn(move || {
283 tcp_reader_loop(stream, client_id, tx);
284 })
285 .ok();
286}
287
288fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
289 let mut decoder = hdlc::Decoder::new();
290 let mut buf = [0u8; 4096];
291
292 loop {
293 match stream.read(&mut buf) {
294 Ok(0) => {
295 log::info!("Local client {} disconnected", id.0);
296 let _ = tx.send(Event::InterfaceDown(id));
297 return;
298 }
299 Ok(n) => {
300 for frame in decoder.feed(&buf[..n]) {
301 if tx
302 .send(Event::Frame {
303 interface_id: id,
304 data: frame,
305 })
306 .is_err()
307 {
308 return;
309 }
310 }
311 }
312 Err(e) => {
313 log::warn!("Local client {} read error: {}", id.0, e);
314 let _ = tx.send(Event::InterfaceDown(id));
315 return;
316 }
317 }
318 }
319}
320
321fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
322 InterfaceInfo {
323 id,
324 name: String::from("LocalInterface"),
325 mode: constants::MODE_FULL,
326 out_capable: true,
327 in_capable: true,
328 bitrate: Some(1_000_000_000), announce_rate_target: None,
330 announce_rate_grace: 0,
331 announce_rate_penalty: 0.0,
332 announce_cap: constants::ANNOUNCE_CAP,
333 is_local_client: false,
334 wants_tunnel: false,
335 tunnel_id: None,
336 mtu: 65535,
337 ia_freq: 0.0,
338 started: 0.0,
339 ingress_control: false,
340 }
341}
342
343pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
349 let id = config.interface_id;
350
351 #[cfg(target_os = "linux")]
353 {
354 match unix_socket::try_connect_unix(&config.instance_name) {
355 Ok(stream) => {
356 log::info!(
357 "[{}] Connected to shared instance via Unix socket: rns/{}",
358 config.name,
359 config.instance_name
360 );
361
362 let writer_stream = stream.try_clone()?;
363 let _ = tx.send(Event::InterfaceUp(id, None, None));
364
365 let client_tx = tx;
366 thread::Builder::new()
367 .name(format!("local-client-reader-{}", id.0))
368 .spawn(move || {
369 unix_reader_loop(stream, id, client_tx);
370 })?;
371
372 return Ok(Box::new(UnixLocalWriter {
373 stream: writer_stream,
374 }));
375 }
376 Err(e) => {
377 log::info!(
378 "[{}] Unix socket connect failed ({}), trying TCP",
379 config.name,
380 e
381 );
382 }
383 }
384 }
385
386 let addr = format!("127.0.0.1:{}", config.port);
388 let stream = TcpStream::connect(&addr)?;
389 stream.set_nodelay(true)?;
390
391 log::info!(
392 "[{}] Connected to shared instance via TCP {}",
393 config.name,
394 addr
395 );
396
397 let reader_stream = stream.try_clone()?;
398 let writer_stream = stream.try_clone()?;
399
400 let _ = tx.send(Event::InterfaceUp(id, None, None));
401
402 thread::Builder::new()
403 .name(format!("local-client-reader-{}", id.0))
404 .spawn(move || {
405 tcp_reader_loop(reader_stream, id, tx);
406 })?;
407
408 Ok(Box::new(LocalWriter {
409 stream: writer_stream,
410 }))
411}
412
413use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
416use std::collections::HashMap;
417
418pub struct LocalServerFactory;
420
421impl InterfaceFactory for LocalServerFactory {
422 fn type_name(&self) -> &str {
423 "LocalServerInterface"
424 }
425
426 fn parse_config(
427 &self,
428 _name: &str,
429 id: InterfaceId,
430 params: &HashMap<String, String>,
431 ) -> Result<Box<dyn InterfaceConfigData>, String> {
432 let instance_name = params
433 .get("instance_name")
434 .cloned()
435 .unwrap_or_else(|| "default".into());
436 let port = params
437 .get("port")
438 .and_then(|v| v.parse().ok())
439 .unwrap_or(37428);
440
441 Ok(Box::new(LocalServerConfig {
442 instance_name,
443 port,
444 interface_id: id,
445 }))
446 }
447
448 fn start(
449 &self,
450 config: Box<dyn InterfaceConfigData>,
451 ctx: StartContext,
452 ) -> std::io::Result<StartResult> {
453 let server_config = *config
454 .into_any()
455 .downcast::<LocalServerConfig>()
456 .map_err(|_| {
457 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
458 })?;
459
460 start_server(server_config, ctx.tx, ctx.next_dynamic_id)?;
461 Ok(StartResult::Listener)
462 }
463}
464
465pub struct LocalClientFactory;
467
468impl InterfaceFactory for LocalClientFactory {
469 fn type_name(&self) -> &str {
470 "LocalClientInterface"
471 }
472
473 fn parse_config(
474 &self,
475 _name: &str,
476 id: InterfaceId,
477 params: &HashMap<String, String>,
478 ) -> Result<Box<dyn InterfaceConfigData>, String> {
479 let instance_name = params
480 .get("instance_name")
481 .cloned()
482 .unwrap_or_else(|| "default".into());
483 let port = params
484 .get("port")
485 .and_then(|v| v.parse().ok())
486 .unwrap_or(37428);
487
488 Ok(Box::new(LocalClientConfig {
489 instance_name,
490 port,
491 interface_id: id,
492 ..LocalClientConfig::default()
493 }))
494 }
495
496 fn start(
497 &self,
498 config: Box<dyn InterfaceConfigData>,
499 ctx: StartContext,
500 ) -> std::io::Result<StartResult> {
501 let client_config = *config
502 .into_any()
503 .downcast::<LocalClientConfig>()
504 .map_err(|_| {
505 std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
506 })?;
507
508 let id = client_config.interface_id;
509 let name = client_config.name.clone();
510 let info = InterfaceInfo {
511 id,
512 name,
513 mode: ctx.mode,
514 out_capable: true,
515 in_capable: true,
516 bitrate: Some(1_000_000_000),
517 announce_rate_target: None,
518 announce_rate_grace: 0,
519 announce_rate_penalty: 0.0,
520 announce_cap: rns_core::constants::ANNOUNCE_CAP,
521 is_local_client: false,
522 wants_tunnel: false,
523 tunnel_id: None,
524 mtu: 65535,
525 ingress_control: false,
526 ia_freq: 0.0,
527 started: crate::time::now(),
528 };
529
530 let writer = start_client(client_config, ctx.tx)?;
531
532 Ok(StartResult::Simple {
533 id,
534 info,
535 writer,
536 interface_type_name: "LocalInterface".to_string(),
537 })
538 }
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544 use std::sync::mpsc;
545
546 fn find_free_port() -> u16 {
547 TcpListener::bind("127.0.0.1:0")
548 .unwrap()
549 .local_addr()
550 .unwrap()
551 .port()
552 }
553
554 #[test]
555 fn server_bind_tcp() {
556 let port = find_free_port();
557 let (tx, _rx) = mpsc::channel();
558 let next_id = Arc::new(AtomicU64::new(7000));
559
560 let config = LocalServerConfig {
561 instance_name: "test-bind".into(),
562 port,
563 interface_id: InterfaceId(70),
564 };
565
566 start_server(config, tx, next_id).unwrap();
569 thread::sleep(Duration::from_millis(50));
570
571 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
573 }
574
575 #[test]
576 fn server_accept_client() {
577 let port = find_free_port();
578 let (tx, rx) = mpsc::channel();
579 let next_id = Arc::new(AtomicU64::new(7100));
580
581 let config = LocalServerConfig {
582 instance_name: "test-accept".into(),
583 port,
584 interface_id: InterfaceId(71),
585 };
586
587 start_server(config, tx, next_id).unwrap();
588 thread::sleep(Duration::from_millis(50));
589
590 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
591
592 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
593 match event {
594 Event::InterfaceUp(id, writer, info) => {
595 assert_eq!(id, InterfaceId(7100));
596 assert!(writer.is_some());
597 assert!(info.is_some());
598 }
599 other => panic!("expected InterfaceUp, got {:?}", other),
600 }
601 }
602
603 #[test]
604 fn client_send_receive() {
605 let port = find_free_port();
606 let (server_tx, server_rx) = mpsc::channel();
607 let next_id = Arc::new(AtomicU64::new(7200));
608
609 let server_config = LocalServerConfig {
610 instance_name: "test-sr".into(),
611 port,
612 interface_id: InterfaceId(72),
613 };
614
615 start_server(server_config, server_tx, next_id).unwrap();
616 thread::sleep(Duration::from_millis(50));
617
618 let (client_tx, client_rx) = mpsc::channel();
620 let client_config = LocalClientConfig {
621 name: "test-client".into(),
622 instance_name: "test-sr".into(),
623 port,
624 interface_id: InterfaceId(73),
625 reconnect_wait: Duration::from_secs(1),
626 };
627
628 let mut client_writer = start_client(client_config, client_tx).unwrap();
629
630 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
632 let mut server_writer = match event {
633 Event::InterfaceUp(_, Some(w), _) => w,
634 other => panic!("expected InterfaceUp with writer, got {:?}", other),
635 };
636
637 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
639 match event {
640 Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
641 other => panic!("expected InterfaceUp, got {:?}", other),
642 }
643
644 let payload: Vec<u8> = (0..32).collect();
646 client_writer.send_frame(&payload).unwrap();
647
648 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
649 match event {
650 Event::Frame { data, .. } => assert_eq!(data, payload),
651 other => panic!("expected Frame, got {:?}", other),
652 }
653
654 let payload2: Vec<u8> = (100..132).collect();
656 server_writer.send_frame(&payload2).unwrap();
657
658 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
659 match event {
660 Event::Frame { data, .. } => assert_eq!(data, payload2),
661 other => panic!("expected Frame, got {:?}", other),
662 }
663 }
664
665 #[test]
666 fn multiple_local_clients() {
667 let port = find_free_port();
668 let (tx, rx) = mpsc::channel();
669 let next_id = Arc::new(AtomicU64::new(7300));
670
671 let config = LocalServerConfig {
672 instance_name: "test-multi".into(),
673 port,
674 interface_id: InterfaceId(74),
675 };
676
677 start_server(config, tx, next_id).unwrap();
678 thread::sleep(Duration::from_millis(50));
679
680 let _c1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
681 let _c2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
682
683 let mut ids = Vec::new();
684 for _ in 0..2 {
685 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
686 match event {
687 Event::InterfaceUp(id, _, _) => ids.push(id),
688 other => panic!("expected InterfaceUp, got {:?}", other),
689 }
690 }
691
692 assert_eq!(ids.len(), 2);
693 assert_ne!(ids[0], ids[1]);
694 }
695
696 #[test]
697 fn client_disconnect_detected() {
698 let port = find_free_port();
699 let (tx, rx) = mpsc::channel();
700 let next_id = Arc::new(AtomicU64::new(7400));
701
702 let config = LocalServerConfig {
703 instance_name: "test-dc".into(),
704 port,
705 interface_id: InterfaceId(75),
706 };
707
708 start_server(config, tx, next_id).unwrap();
709 thread::sleep(Duration::from_millis(50));
710
711 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
712
713 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
715
716 drop(client);
718
719 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
720 assert!(
721 matches!(event, Event::InterfaceDown(_)),
722 "expected InterfaceDown, got {:?}",
723 event
724 );
725 }
726}