1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use rns_core::constants;
19use rns_core::transport::types::{InterfaceId, InterfaceInfo};
20
21use crate::event::{Event, EventSender};
22use crate::hdlc;
23use crate::interface::Writer;
24
25#[derive(Debug, Clone)]
27pub struct LocalServerConfig {
28 pub instance_name: String,
29 pub port: u16,
30 pub interface_id: InterfaceId,
31}
32
33impl Default for LocalServerConfig {
34 fn default() -> Self {
35 LocalServerConfig {
36 instance_name: "default".into(),
37 port: 37428,
38 interface_id: InterfaceId(0),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct LocalClientConfig {
46 pub name: String,
47 pub instance_name: String,
48 pub port: u16,
49 pub interface_id: InterfaceId,
50 pub reconnect_wait: Duration,
51}
52
53impl Default for LocalClientConfig {
54 fn default() -> Self {
55 LocalClientConfig {
56 name: "Local shared instance".into(),
57 instance_name: "default".into(),
58 port: 37428,
59 interface_id: InterfaceId(0),
60 reconnect_wait: Duration::from_secs(8),
61 }
62 }
63}
64
65struct LocalWriter {
67 stream: TcpStream,
68}
69
70impl Writer for LocalWriter {
71 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
72 self.stream.write_all(&hdlc::frame(data))
73 }
74}
75
76#[cfg(target_os = "linux")]
77mod unix_socket {
78 use std::io;
79 use std::os::unix::net::{UnixListener, UnixStream};
80
81 pub fn try_bind_unix(instance_name: &str) -> io::Result<UnixListener> {
83 let path = format!("\0rns/{}", instance_name);
84 UnixListener::bind(path)
85 }
86
87 pub fn try_connect_unix(instance_name: &str) -> io::Result<UnixStream> {
89 let path = format!("\0rns/{}", instance_name);
90 UnixStream::connect(path)
91 }
92}
93
94pub fn start_server(
100 config: LocalServerConfig,
101 tx: EventSender,
102 next_id: Arc<AtomicU64>,
103) -> io::Result<()> {
104 #[cfg(target_os = "linux")]
106 {
107 match unix_socket::try_bind_unix(&config.instance_name) {
108 Ok(listener) => {
109 log::info!(
110 "Local server using Unix socket: rns/{}",
111 config.instance_name
112 );
113 let name = format!("rns/{}", config.instance_name);
114 thread::Builder::new()
115 .name("local-server".into())
116 .spawn(move || {
117 unix_server_loop(listener, name, tx, next_id);
118 })?;
119 return Ok(());
120 }
121 Err(e) => {
122 log::info!(
123 "Unix socket bind failed ({}), falling back to TCP",
124 e
125 );
126 }
127 }
128 }
129
130 let addr = format!("127.0.0.1:{}", config.port);
132 let listener = TcpListener::bind(&addr)?;
133
134 log::info!("Local server listening on TCP {}", addr);
135
136 thread::Builder::new()
137 .name("local-server".into())
138 .spawn(move || {
139 tcp_server_loop(listener, tx, next_id);
140 })?;
141
142 Ok(())
143}
144
145fn tcp_server_loop(listener: TcpListener, tx: EventSender, next_id: Arc<AtomicU64>) {
147 for stream_result in listener.incoming() {
148 let stream = match stream_result {
149 Ok(s) => s,
150 Err(e) => {
151 log::warn!("Local server accept failed: {}", e);
152 continue;
153 }
154 };
155
156 if let Err(e) = stream.set_nodelay(true) {
157 log::warn!("Local server set_nodelay failed: {}", e);
158 }
159
160 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
161 spawn_local_client_handler(stream, client_id, tx.clone());
162 }
163}
164
165#[cfg(target_os = "linux")]
167fn unix_server_loop(
168 listener: std::os::unix::net::UnixListener,
169 name: String,
170 tx: EventSender,
171 next_id: Arc<AtomicU64>,
172) {
173 for stream_result in listener.incoming() {
174 let stream = match stream_result {
175 Ok(s) => s,
176 Err(e) => {
177 log::warn!("[{}] Local server accept failed: {}", name, e);
178 continue;
179 }
180 };
181
182 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
183
184 let writer_stream = match stream.try_clone() {
186 Ok(s) => s,
187 Err(e) => {
188 log::warn!("Local server clone failed: {}", e);
189 continue;
190 }
191 };
192
193 let info = make_local_interface_info(client_id);
194 let writer: Box<dyn Writer> = Box::new(UnixLocalWriter { stream: writer_stream });
195
196 if tx
197 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
198 .is_err()
199 {
200 return;
201 }
202
203 let client_tx = tx.clone();
204 thread::Builder::new()
205 .name(format!("local-unix-reader-{}", client_id.0))
206 .spawn(move || {
207 unix_reader_loop(stream, client_id, client_tx);
208 })
209 .ok();
210 }
211}
212
213#[cfg(target_os = "linux")]
214struct UnixLocalWriter {
215 stream: std::os::unix::net::UnixStream,
216}
217
218#[cfg(target_os = "linux")]
219impl Writer for UnixLocalWriter {
220 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
221 use std::io::Write;
222 self.stream.write_all(&hdlc::frame(data))
223 }
224}
225
226#[cfg(target_os = "linux")]
227fn unix_reader_loop(
228 mut stream: std::os::unix::net::UnixStream,
229 id: InterfaceId,
230 tx: EventSender,
231) {
232 use std::io::Read;
233 let mut decoder = hdlc::Decoder::new();
234 let mut buf = [0u8; 4096];
235
236 loop {
237 match stream.read(&mut buf) {
238 Ok(0) => {
239 let _ = tx.send(Event::InterfaceDown(id));
240 return;
241 }
242 Ok(n) => {
243 for frame in decoder.feed(&buf[..n]) {
244 if tx
245 .send(Event::Frame {
246 interface_id: id,
247 data: frame,
248 })
249 .is_err()
250 {
251 return;
252 }
253 }
254 }
255 Err(_) => {
256 let _ = tx.send(Event::InterfaceDown(id));
257 return;
258 }
259 }
260 }
261}
262
263fn spawn_local_client_handler(stream: TcpStream, client_id: InterfaceId, tx: EventSender) {
265 let writer_stream = match stream.try_clone() {
266 Ok(s) => s,
267 Err(e) => {
268 log::warn!("Local server clone failed: {}", e);
269 return;
270 }
271 };
272
273 let info = make_local_interface_info(client_id);
274 let writer: Box<dyn Writer> = Box::new(LocalWriter { stream: writer_stream });
275
276 if tx
277 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
278 .is_err()
279 {
280 return;
281 }
282
283 thread::Builder::new()
284 .name(format!("local-reader-{}", client_id.0))
285 .spawn(move || {
286 tcp_reader_loop(stream, client_id, tx);
287 })
288 .ok();
289}
290
291fn tcp_reader_loop(mut stream: TcpStream, id: InterfaceId, tx: EventSender) {
292 let mut decoder = hdlc::Decoder::new();
293 let mut buf = [0u8; 4096];
294
295 loop {
296 match stream.read(&mut buf) {
297 Ok(0) => {
298 log::info!("Local client {} disconnected", id.0);
299 let _ = tx.send(Event::InterfaceDown(id));
300 return;
301 }
302 Ok(n) => {
303 for frame in decoder.feed(&buf[..n]) {
304 if tx
305 .send(Event::Frame {
306 interface_id: id,
307 data: frame,
308 })
309 .is_err()
310 {
311 return;
312 }
313 }
314 }
315 Err(e) => {
316 log::warn!("Local client {} read error: {}", id.0, e);
317 let _ = tx.send(Event::InterfaceDown(id));
318 return;
319 }
320 }
321 }
322}
323
324fn make_local_interface_info(id: InterfaceId) -> InterfaceInfo {
325 InterfaceInfo {
326 id,
327 name: String::from("LocalInterface"),
328 mode: constants::MODE_FULL,
329 out_capable: true,
330 in_capable: true,
331 bitrate: Some(1_000_000_000), announce_rate_target: None,
333 announce_rate_grace: 0,
334 announce_rate_penalty: 0.0,
335 announce_cap: constants::ANNOUNCE_CAP,
336 is_local_client: false,
337 wants_tunnel: false,
338 tunnel_id: None,
339 mtu: 65535,
340 ia_freq: 0.0,
341 started: 0.0,
342 ingress_control: false,
343 }
344}
345
346pub fn start_client(config: LocalClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
352 let id = config.interface_id;
353
354 #[cfg(target_os = "linux")]
356 {
357 match unix_socket::try_connect_unix(&config.instance_name) {
358 Ok(stream) => {
359 log::info!(
360 "[{}] Connected to shared instance via Unix socket: rns/{}",
361 config.name,
362 config.instance_name
363 );
364
365 let writer_stream = stream.try_clone()?;
366 let _ = tx.send(Event::InterfaceUp(id, None, None));
367
368 let client_tx = tx;
369 thread::Builder::new()
370 .name(format!("local-client-reader-{}", id.0))
371 .spawn(move || {
372 unix_reader_loop(stream, id, client_tx);
373 })?;
374
375 return Ok(Box::new(UnixLocalWriter { stream: writer_stream }));
376 }
377 Err(e) => {
378 log::info!(
379 "[{}] Unix socket connect failed ({}), trying TCP",
380 config.name,
381 e
382 );
383 }
384 }
385 }
386
387 let addr = format!("127.0.0.1:{}", config.port);
389 let stream = TcpStream::connect(&addr)?;
390 stream.set_nodelay(true)?;
391
392 log::info!("[{}] Connected to shared instance via TCP {}", config.name, addr);
393
394 let reader_stream = stream.try_clone()?;
395 let writer_stream = stream.try_clone()?;
396
397 let _ = tx.send(Event::InterfaceUp(id, None, None));
398
399 thread::Builder::new()
400 .name(format!("local-client-reader-{}", id.0))
401 .spawn(move || {
402 tcp_reader_loop(reader_stream, id, tx);
403 })?;
404
405 Ok(Box::new(LocalWriter { stream: writer_stream }))
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use std::sync::mpsc;
412
413 fn find_free_port() -> u16 {
414 TcpListener::bind("127.0.0.1:0")
415 .unwrap()
416 .local_addr()
417 .unwrap()
418 .port()
419 }
420
421 #[test]
422 fn server_bind_tcp() {
423 let port = find_free_port();
424 let (tx, _rx) = mpsc::channel();
425 let next_id = Arc::new(AtomicU64::new(7000));
426
427 let config = LocalServerConfig {
428 instance_name: "test-bind".into(),
429 port,
430 interface_id: InterfaceId(70),
431 };
432
433 start_server(config, tx, next_id).unwrap();
436 thread::sleep(Duration::from_millis(50));
437
438 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
440 }
441
442 #[test]
443 fn server_accept_client() {
444 let port = find_free_port();
445 let (tx, rx) = mpsc::channel();
446 let next_id = Arc::new(AtomicU64::new(7100));
447
448 let config = LocalServerConfig {
449 instance_name: "test-accept".into(),
450 port,
451 interface_id: InterfaceId(71),
452 };
453
454 start_server(config, tx, next_id).unwrap();
455 thread::sleep(Duration::from_millis(50));
456
457 let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
458
459 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
460 match event {
461 Event::InterfaceUp(id, writer, info) => {
462 assert_eq!(id, InterfaceId(7100));
463 assert!(writer.is_some());
464 assert!(info.is_some());
465 }
466 other => panic!("expected InterfaceUp, got {:?}", other),
467 }
468 }
469
470 #[test]
471 fn client_send_receive() {
472 let port = find_free_port();
473 let (server_tx, server_rx) = mpsc::channel();
474 let next_id = Arc::new(AtomicU64::new(7200));
475
476 let server_config = LocalServerConfig {
477 instance_name: "test-sr".into(),
478 port,
479 interface_id: InterfaceId(72),
480 };
481
482 start_server(server_config, server_tx, next_id).unwrap();
483 thread::sleep(Duration::from_millis(50));
484
485 let (client_tx, client_rx) = mpsc::channel();
487 let client_config = LocalClientConfig {
488 name: "test-client".into(),
489 instance_name: "test-sr".into(),
490 port,
491 interface_id: InterfaceId(73),
492 reconnect_wait: Duration::from_secs(1),
493 };
494
495 let mut client_writer = start_client(client_config, client_tx).unwrap();
496
497 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
499 let mut server_writer = match event {
500 Event::InterfaceUp(_, Some(w), _) => w,
501 other => panic!("expected InterfaceUp with writer, got {:?}", other),
502 };
503
504 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
506 match event {
507 Event::InterfaceUp(id, _, _) => assert_eq!(id, InterfaceId(73)),
508 other => panic!("expected InterfaceUp, got {:?}", other),
509 }
510
511 let payload: Vec<u8> = (0..32).collect();
513 client_writer.send_frame(&payload).unwrap();
514
515 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
516 match event {
517 Event::Frame { data, .. } => assert_eq!(data, payload),
518 other => panic!("expected Frame, got {:?}", other),
519 }
520
521 let payload2: Vec<u8> = (100..132).collect();
523 server_writer.send_frame(&payload2).unwrap();
524
525 let event = client_rx.recv_timeout(Duration::from_secs(2)).unwrap();
526 match event {
527 Event::Frame { data, .. } => assert_eq!(data, payload2),
528 other => panic!("expected Frame, got {:?}", other),
529 }
530 }
531
532 #[test]
533 fn multiple_local_clients() {
534 let port = find_free_port();
535 let (tx, rx) = mpsc::channel();
536 let next_id = Arc::new(AtomicU64::new(7300));
537
538 let config = LocalServerConfig {
539 instance_name: "test-multi".into(),
540 port,
541 interface_id: InterfaceId(74),
542 };
543
544 start_server(config, tx, next_id).unwrap();
545 thread::sleep(Duration::from_millis(50));
546
547 let _c1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
548 let _c2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
549
550 let mut ids = Vec::new();
551 for _ in 0..2 {
552 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
553 match event {
554 Event::InterfaceUp(id, _, _) => ids.push(id),
555 other => panic!("expected InterfaceUp, got {:?}", other),
556 }
557 }
558
559 assert_eq!(ids.len(), 2);
560 assert_ne!(ids[0], ids[1]);
561 }
562
563 #[test]
564 fn client_disconnect_detected() {
565 let port = find_free_port();
566 let (tx, rx) = mpsc::channel();
567 let next_id = Arc::new(AtomicU64::new(7400));
568
569 let config = LocalServerConfig {
570 instance_name: "test-dc".into(),
571 port,
572 interface_id: InterfaceId(75),
573 };
574
575 start_server(config, tx, next_id).unwrap();
576 thread::sleep(Duration::from_millis(50));
577
578 let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
579
580 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
582
583 drop(client);
585
586 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
587 assert!(
588 matches!(event, Event::InterfaceDown(_)),
589 "expected InterfaceDown, got {:?}",
590 event
591 );
592 }
593}