Skip to main content

rns_net/
shared_client.rs

1//! Shared instance client mode.
2//!
3//! Allows an RnsNode to connect as a client to an already-running Reticulum
4//! daemon, proxying operations through it. The client runs a minimal transport
5//! engine with `transport_enabled: false` — it does no routing of its own, but
6//! registers local destinations and sends/receives packets via the local
7//! connection.
8//!
9//! This matches Python's behavior when `share_instance = True` and a daemon
10//! is already running: the new process connects as a client rather than
11//! starting its own interfaces.
12
13use std::io;
14use std::path::Path;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::thread;
18use std::time::Duration;
19
20use rns_core::transport::types::TransportConfig;
21
22use crate::driver::{Callbacks, Driver};
23use crate::event;
24use crate::interface::local::LocalClientConfig;
25use crate::interface::{InterfaceEntry, InterfaceStats};
26use crate::node::RnsNode;
27use crate::storage;
28use crate::time;
29
30/// Configuration for connecting as a shared instance client.
31pub struct SharedClientConfig {
32    /// Instance name for Unix socket namespace (e.g. "default" → `\0rns/default`).
33    pub instance_name: String,
34    /// TCP port to try if Unix socket fails (default 37428).
35    pub port: u16,
36    /// RPC control port for queries (default 37429).
37    pub rpc_port: u16,
38}
39
40impl Default for SharedClientConfig {
41    fn default() -> Self {
42        SharedClientConfig {
43            instance_name: "default".into(),
44            port: 37428,
45            rpc_port: 37429,
46        }
47    }
48}
49
50impl RnsNode {
51    /// Connect to an existing shared instance as a client.
52    ///
53    /// The client runs `transport_enabled: false` — it does no routing,
54    /// but can register destinations and send/receive packets through
55    /// the daemon.
56    pub fn connect_shared(
57        config: SharedClientConfig,
58        callbacks: Box<dyn Callbacks>,
59    ) -> io::Result<Self> {
60        let transport_config = TransportConfig {
61            transport_enabled: false,
62            identity_hash: None,
63            prefer_shorter_path: false,
64            max_paths_per_destination: 1,
65            packet_hashlist_max_entries: rns_core::constants::HASHLIST_MAXSIZE,
66            max_discovery_pr_tags: rns_core::constants::MAX_PR_TAGS,
67            max_path_destinations: rns_core::transport::types::DEFAULT_MAX_PATH_DESTINATIONS,
68            max_tunnel_destinations_total: usize::MAX,
69            destination_timeout_secs: rns_core::constants::DESTINATION_TIMEOUT,
70            announce_table_ttl_secs: rns_core::constants::ANNOUNCE_TABLE_TTL,
71            announce_table_max_bytes: rns_core::constants::ANNOUNCE_TABLE_MAX_BYTES,
72            announce_sig_cache_enabled: true,
73            announce_sig_cache_max_entries: rns_core::constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
74            announce_sig_cache_ttl_secs: rns_core::constants::ANNOUNCE_SIG_CACHE_TTL,
75            announce_queue_max_entries: 256,
76            announce_queue_max_interfaces: 1024,
77        };
78
79        let (tx, rx) = event::channel();
80        let tick_interval_ms = Arc::new(AtomicU64::new(1000));
81        let mut driver = Driver::new(transport_config, rx, tx.clone(), callbacks);
82        driver.set_tick_interval_handle(Arc::clone(&tick_interval_ms));
83
84        // Connect to the daemon via LocalClientInterface
85        let local_config = LocalClientConfig {
86            name: "Local shared instance".into(),
87            instance_name: config.instance_name.clone(),
88            port: config.port,
89            interface_id: rns_core::transport::types::InterfaceId(1),
90            reconnect_wait: Duration::from_secs(8),
91        };
92
93        let id = local_config.interface_id;
94        let info = rns_core::transport::types::InterfaceInfo {
95            id,
96            name: "LocalInterface".into(),
97            mode: rns_core::constants::MODE_FULL,
98            out_capable: true,
99            in_capable: true,
100            bitrate: Some(1_000_000_000),
101            announce_rate_target: None,
102            announce_rate_grace: 0,
103            announce_rate_penalty: 0.0,
104            announce_cap: rns_core::constants::ANNOUNCE_CAP,
105            is_local_client: true,
106            wants_tunnel: false,
107            tunnel_id: None,
108            mtu: 65535,
109            ia_freq: 0.0,
110            started: time::now(),
111            ingress_control: false,
112        };
113
114        let writer = crate::interface::local::start_client(local_config, tx.clone())?;
115
116        driver.engine.register_interface(info.clone());
117        driver.interfaces.insert(
118            id,
119            InterfaceEntry {
120                id,
121                info,
122                writer,
123                async_writer_metrics: None,
124                enabled: true,
125                online: false,
126                dynamic: false,
127                ifac: None,
128                stats: InterfaceStats {
129                    started: time::now(),
130                    ..Default::default()
131                },
132                interface_type: "LocalClientInterface".to_string(),
133                send_retry_at: None,
134                send_retry_backoff: Duration::ZERO,
135            },
136        );
137
138        // Spawn timer thread with configurable tick interval
139        let timer_tx = tx.clone();
140        let timer_interval = Arc::clone(&tick_interval_ms);
141        thread::Builder::new()
142            .name("rns-timer-client".into())
143            .spawn(move || loop {
144                let ms = timer_interval.load(Ordering::Relaxed);
145                thread::sleep(Duration::from_millis(ms));
146                if timer_tx.send(event::Event::Tick).is_err() {
147                    break;
148                }
149            })?;
150
151        // Spawn driver thread
152        let driver_handle = thread::Builder::new()
153            .name("rns-driver-client".into())
154            .spawn(move || {
155                driver.run();
156            })?;
157
158        Ok(RnsNode::from_parts(
159            tx,
160            driver_handle,
161            None,
162            tick_interval_ms,
163        ))
164    }
165
166    /// Connect to a shared instance, with config loaded from a config directory.
167    ///
168    /// Reads the config file to determine instance_name and ports.
169    pub fn connect_shared_from_config(
170        config_path: Option<&Path>,
171        callbacks: Box<dyn Callbacks>,
172    ) -> io::Result<Self> {
173        let config_dir = storage::resolve_config_dir(config_path);
174
175        // Parse config file for instance settings
176        let config_file = config_dir.join("config");
177        let rns_config = if config_file.exists() {
178            crate::config::parse_file(&config_file)
179                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)))?
180        } else {
181            crate::config::parse("")
182                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)))?
183        };
184
185        let shared_config = SharedClientConfig {
186            instance_name: rns_config.reticulum.instance_name.clone(),
187            port: rns_config.reticulum.shared_instance_port,
188            rpc_port: rns_config.reticulum.instance_control_port,
189        };
190
191        Self::connect_shared(shared_config, callbacks)
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use crate::hdlc;
199    use rns_core::packet::RawPacket;
200    use rns_core::types::IdentityHash;
201    use rns_crypto::identity::Identity;
202    use rns_crypto::OsRng;
203    use std::io::Read;
204    use std::sync::atomic::AtomicU64;
205    use std::sync::mpsc;
206    use std::sync::Arc;
207
208    use crate::interface::local::LocalServerConfig;
209
210    struct NoopCallbacks;
211    impl Callbacks for NoopCallbacks {
212        fn on_announce(&mut self, _: crate::destination::AnnouncedIdentity) {}
213        fn on_path_updated(&mut self, _: rns_core::types::DestHash, _: u8) {}
214        fn on_local_delivery(
215            &mut self,
216            _: rns_core::types::DestHash,
217            _: Vec<u8>,
218            _: rns_core::types::PacketHash,
219        ) {
220        }
221    }
222
223    fn find_free_port() -> u16 {
224        std::net::TcpListener::bind("127.0.0.1:0")
225            .unwrap()
226            .local_addr()
227            .unwrap()
228            .port()
229    }
230
231    #[test]
232    fn connect_shared_to_tcp_server() {
233        let port = find_free_port();
234        let next_id = Arc::new(AtomicU64::new(50000));
235        let (server_tx, server_rx) = crate::event::channel();
236
237        // Start a local server
238        let server_config = LocalServerConfig {
239            instance_name: "test-shared-connect".into(),
240            port,
241            interface_id: rns_core::transport::types::InterfaceId(99),
242        };
243
244        crate::interface::local::start_server(server_config, server_tx, next_id).unwrap();
245        thread::sleep(Duration::from_millis(50));
246
247        // Connect as shared client
248        let config = SharedClientConfig {
249            instance_name: "test-shared-connect".into(),
250            port,
251            rpc_port: 0,
252        };
253
254        let node = RnsNode::connect_shared(config, Box::new(NoopCallbacks)).unwrap();
255
256        // Server should see InterfaceUp for the client
257        let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
258        assert!(matches!(event, crate::event::Event::InterfaceUp(_, _, _)));
259
260        node.shutdown();
261    }
262
263    #[test]
264    fn shared_client_register_destination() {
265        let port = find_free_port();
266        let next_id = Arc::new(AtomicU64::new(51000));
267        let (server_tx, _server_rx) = crate::event::channel();
268
269        let server_config = LocalServerConfig {
270            instance_name: "test-shared-reg".into(),
271            port,
272            interface_id: rns_core::transport::types::InterfaceId(98),
273        };
274
275        crate::interface::local::start_server(server_config, server_tx, next_id).unwrap();
276        thread::sleep(Duration::from_millis(50));
277
278        let config = SharedClientConfig {
279            instance_name: "test-shared-reg".into(),
280            port,
281            rpc_port: 0,
282        };
283
284        let node = RnsNode::connect_shared(config, Box::new(NoopCallbacks)).unwrap();
285
286        // Register a destination
287        let dest_hash = [0xAA; 16];
288        node.register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE)
289            .unwrap();
290
291        // Give time for event processing
292        thread::sleep(Duration::from_millis(100));
293
294        node.shutdown();
295    }
296
297    #[test]
298    fn shared_client_send_packet() {
299        let port = find_free_port();
300        let next_id = Arc::new(AtomicU64::new(52000));
301        let (server_tx, server_rx) = crate::event::channel();
302
303        let server_config = LocalServerConfig {
304            instance_name: "test-shared-send".into(),
305            port,
306            interface_id: rns_core::transport::types::InterfaceId(97),
307        };
308
309        crate::interface::local::start_server(server_config, server_tx, next_id).unwrap();
310        thread::sleep(Duration::from_millis(50));
311
312        let config = SharedClientConfig {
313            instance_name: "test-shared-send".into(),
314            port,
315            rpc_port: 0,
316        };
317
318        let node = RnsNode::connect_shared(config, Box::new(NoopCallbacks)).unwrap();
319
320        // Build a minimal packet and send it
321        let raw = vec![0x00, 0x00, 0xAA, 0xBB, 0xCC, 0xDD]; // minimal raw packet
322        node.send_raw(raw, rns_core::constants::DESTINATION_PLAIN, None)
323            .unwrap();
324
325        // Server should receive a Frame event from the client
326        // (the packet will be HDLC-framed over the local connection)
327        for _ in 0..10 {
328            match server_rx.recv_timeout(Duration::from_secs(1)) {
329                Ok(crate::event::Event::Frame { .. }) => {
330                    break;
331                }
332                Ok(_) => continue,
333                Err(_) => break,
334            }
335        }
336        // The packet may or may not arrive as a Frame depending on transport
337        // routing, so we don't assert on it — the important thing is no crash.
338
339        node.shutdown();
340    }
341
342    #[test]
343    fn shared_client_replays_single_announces_after_reconnect() {
344        let port = find_free_port();
345        let addr = format!("127.0.0.1:{}", port);
346        let instance_name = format!("test-shared-replay-{}", port);
347
348        let listener1 = std::net::TcpListener::bind(&addr).unwrap();
349        let (accepted1_tx, accepted1_rx) = mpsc::channel();
350        thread::spawn(move || {
351            let (stream, _) = listener1.accept().unwrap();
352            accepted1_tx.send(stream).unwrap();
353        });
354
355        let node = RnsNode::connect_shared(
356            SharedClientConfig {
357                instance_name,
358                port,
359                rpc_port: 0,
360            },
361            Box::new(NoopCallbacks),
362        )
363        .unwrap();
364
365        let identity = Identity::new(&mut OsRng);
366        let dest = crate::destination::Destination::single_in(
367            "shared-replay",
368            &["echo"],
369            IdentityHash(*identity.hash()),
370        );
371        node.register_destination(dest.hash.0, dest.dest_type.to_wire_constant())
372            .unwrap();
373        node.announce(&dest, &identity, Some(b"hello")).unwrap();
374
375        let mut stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
376        stream1
377            .set_read_timeout(Some(Duration::from_secs(2)))
378            .unwrap();
379
380        let mut decoder = hdlc::Decoder::new();
381        let mut buf = [0u8; 4096];
382        let n = stream1.read(&mut buf).unwrap();
383        let frames = decoder.feed(&buf[..n]);
384        assert!(!frames.is_empty(), "expected initial announce frame");
385        let packet1 = RawPacket::unpack(&frames[0]).unwrap();
386        assert_eq!(packet1.destination_hash, dest.hash.0);
387        assert_eq!(packet1.context, rns_core::constants::CONTEXT_NONE);
388
389        drop(stream1);
390
391        let listener2 = std::net::TcpListener::bind(&addr).unwrap();
392        let (accepted2_tx, accepted2_rx) = mpsc::channel();
393        thread::spawn(move || {
394            let (stream, _) = listener2.accept().unwrap();
395            accepted2_tx.send(stream).unwrap();
396        });
397
398        let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(15)).unwrap();
399        stream2
400            .set_read_timeout(Some(Duration::from_secs(15)))
401            .unwrap();
402
403        let mut decoder = hdlc::Decoder::new();
404        let n = stream2.read(&mut buf).unwrap();
405        let frames = decoder.feed(&buf[..n]);
406        assert!(!frames.is_empty(), "expected replayed announce frame");
407        let packet2 = RawPacket::unpack(&frames[0]).unwrap();
408        assert_eq!(packet2.destination_hash, dest.hash.0);
409        assert_eq!(packet2.context, rns_core::constants::CONTEXT_PATH_RESPONSE);
410
411        node.shutdown();
412    }
413
414    #[test]
415    fn connect_shared_fails_no_server() {
416        let port = find_free_port();
417
418        let config = SharedClientConfig {
419            instance_name: "nonexistent-instance-12345".into(),
420            port,
421            rpc_port: 0,
422        };
423
424        // Should fail because no server is running
425        let result = RnsNode::connect_shared(config, Box::new(NoopCallbacks));
426        assert!(result.is_err());
427    }
428
429    #[test]
430    fn shared_config_defaults() {
431        let config = SharedClientConfig::default();
432        assert_eq!(config.instance_name, "default");
433        assert_eq!(config.port, 37428);
434        assert_eq!(config.rpc_port, 37429);
435    }
436}