1use std::io;
14use std::path::Path;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::thread;
18use std::time::Duration;
19
20use rns_core::transport::types::TransportConfig;
21
22use crate::driver::{Callbacks, Driver};
23use crate::event;
24use crate::interface::local::LocalClientConfig;
25use crate::interface::{InterfaceEntry, InterfaceStats};
26use crate::node::RnsNode;
27use crate::storage;
28use crate::time;
29
30pub struct SharedClientConfig {
32 pub instance_name: String,
34 pub port: u16,
36 pub rpc_port: u16,
38}
39
40impl Default for SharedClientConfig {
41 fn default() -> Self {
42 SharedClientConfig {
43 instance_name: "default".into(),
44 port: 37428,
45 rpc_port: 37429,
46 }
47 }
48}
49
50impl RnsNode {
51 pub fn connect_shared(
57 config: SharedClientConfig,
58 callbacks: Box<dyn Callbacks>,
59 ) -> io::Result<Self> {
60 let transport_config = TransportConfig {
61 transport_enabled: false,
62 identity_hash: None,
63 prefer_shorter_path: false,
64 max_paths_per_destination: 1,
65 packet_hashlist_max_entries: rns_core::constants::HASHLIST_MAXSIZE,
66 max_discovery_pr_tags: rns_core::constants::MAX_PR_TAGS,
67 max_path_destinations: rns_core::transport::types::DEFAULT_MAX_PATH_DESTINATIONS,
68 max_tunnel_destinations_total: usize::MAX,
69 destination_timeout_secs: rns_core::constants::DESTINATION_TIMEOUT,
70 announce_table_ttl_secs: rns_core::constants::ANNOUNCE_TABLE_TTL,
71 announce_table_max_bytes: rns_core::constants::ANNOUNCE_TABLE_MAX_BYTES,
72 announce_sig_cache_enabled: true,
73 announce_sig_cache_max_entries: rns_core::constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
74 announce_sig_cache_ttl_secs: rns_core::constants::ANNOUNCE_SIG_CACHE_TTL,
75 announce_queue_max_entries: 256,
76 announce_queue_max_interfaces: 1024,
77 };
78
79 let (tx, rx) = event::channel();
80 let tick_interval_ms = Arc::new(AtomicU64::new(1000));
81 let mut driver = Driver::new(transport_config, rx, tx.clone(), callbacks);
82 driver.set_tick_interval_handle(Arc::clone(&tick_interval_ms));
83
84 let local_config = LocalClientConfig {
86 name: "Local shared instance".into(),
87 instance_name: config.instance_name.clone(),
88 port: config.port,
89 interface_id: rns_core::transport::types::InterfaceId(1),
90 reconnect_wait: Duration::from_secs(8),
91 };
92
93 let id = local_config.interface_id;
94 let info = rns_core::transport::types::InterfaceInfo {
95 id,
96 name: "LocalInterface".into(),
97 mode: rns_core::constants::MODE_FULL,
98 out_capable: true,
99 in_capable: true,
100 bitrate: Some(1_000_000_000),
101 announce_rate_target: None,
102 announce_rate_grace: 0,
103 announce_rate_penalty: 0.0,
104 announce_cap: rns_core::constants::ANNOUNCE_CAP,
105 is_local_client: true,
106 wants_tunnel: false,
107 tunnel_id: None,
108 mtu: 65535,
109 ia_freq: 0.0,
110 started: time::now(),
111 ingress_control: false,
112 };
113
114 let writer = crate::interface::local::start_client(local_config, tx.clone())?;
115
116 driver.engine.register_interface(info.clone());
117 driver.interfaces.insert(
118 id,
119 InterfaceEntry {
120 id,
121 info,
122 writer,
123 async_writer_metrics: None,
124 enabled: true,
125 online: false,
126 dynamic: false,
127 ifac: None,
128 stats: InterfaceStats {
129 started: time::now(),
130 ..Default::default()
131 },
132 interface_type: "LocalClientInterface".to_string(),
133 send_retry_at: None,
134 send_retry_backoff: Duration::ZERO,
135 },
136 );
137
138 let timer_tx = tx.clone();
140 let timer_interval = Arc::clone(&tick_interval_ms);
141 thread::Builder::new()
142 .name("rns-timer-client".into())
143 .spawn(move || loop {
144 let ms = timer_interval.load(Ordering::Relaxed);
145 thread::sleep(Duration::from_millis(ms));
146 if timer_tx.send(event::Event::Tick).is_err() {
147 break;
148 }
149 })?;
150
151 let driver_handle = thread::Builder::new()
153 .name("rns-driver-client".into())
154 .spawn(move || {
155 driver.run();
156 })?;
157
158 Ok(RnsNode::from_parts(
159 tx,
160 driver_handle,
161 None,
162 tick_interval_ms,
163 ))
164 }
165
166 pub fn connect_shared_from_config(
170 config_path: Option<&Path>,
171 callbacks: Box<dyn Callbacks>,
172 ) -> io::Result<Self> {
173 let config_dir = storage::resolve_config_dir(config_path);
174
175 let config_file = config_dir.join("config");
177 let rns_config = if config_file.exists() {
178 crate::config::parse_file(&config_file)
179 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)))?
180 } else {
181 crate::config::parse("")
182 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)))?
183 };
184
185 let shared_config = SharedClientConfig {
186 instance_name: rns_config.reticulum.instance_name.clone(),
187 port: rns_config.reticulum.shared_instance_port,
188 rpc_port: rns_config.reticulum.instance_control_port,
189 };
190
191 Self::connect_shared(shared_config, callbacks)
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use crate::hdlc;
199 use rns_core::packet::RawPacket;
200 use rns_core::types::IdentityHash;
201 use rns_crypto::identity::Identity;
202 use rns_crypto::OsRng;
203 use std::io::Read;
204 use std::sync::atomic::AtomicU64;
205 use std::sync::mpsc;
206 use std::sync::Arc;
207
208 use crate::interface::local::LocalServerConfig;
209
210 struct NoopCallbacks;
211 impl Callbacks for NoopCallbacks {
212 fn on_announce(&mut self, _: crate::destination::AnnouncedIdentity) {}
213 fn on_path_updated(&mut self, _: rns_core::types::DestHash, _: u8) {}
214 fn on_local_delivery(
215 &mut self,
216 _: rns_core::types::DestHash,
217 _: Vec<u8>,
218 _: rns_core::types::PacketHash,
219 ) {
220 }
221 }
222
223 fn find_free_port() -> u16 {
224 std::net::TcpListener::bind("127.0.0.1:0")
225 .unwrap()
226 .local_addr()
227 .unwrap()
228 .port()
229 }
230
231 #[test]
232 fn connect_shared_to_tcp_server() {
233 let port = find_free_port();
234 let next_id = Arc::new(AtomicU64::new(50000));
235 let (server_tx, server_rx) = crate::event::channel();
236
237 let server_config = LocalServerConfig {
239 instance_name: "test-shared-connect".into(),
240 port,
241 interface_id: rns_core::transport::types::InterfaceId(99),
242 };
243
244 crate::interface::local::start_server(server_config, server_tx, next_id).unwrap();
245 thread::sleep(Duration::from_millis(50));
246
247 let config = SharedClientConfig {
249 instance_name: "test-shared-connect".into(),
250 port,
251 rpc_port: 0,
252 };
253
254 let node = RnsNode::connect_shared(config, Box::new(NoopCallbacks)).unwrap();
255
256 let event = server_rx.recv_timeout(Duration::from_secs(2)).unwrap();
258 assert!(matches!(event, crate::event::Event::InterfaceUp(_, _, _)));
259
260 node.shutdown();
261 }
262
263 #[test]
264 fn shared_client_register_destination() {
265 let port = find_free_port();
266 let next_id = Arc::new(AtomicU64::new(51000));
267 let (server_tx, _server_rx) = crate::event::channel();
268
269 let server_config = LocalServerConfig {
270 instance_name: "test-shared-reg".into(),
271 port,
272 interface_id: rns_core::transport::types::InterfaceId(98),
273 };
274
275 crate::interface::local::start_server(server_config, server_tx, next_id).unwrap();
276 thread::sleep(Duration::from_millis(50));
277
278 let config = SharedClientConfig {
279 instance_name: "test-shared-reg".into(),
280 port,
281 rpc_port: 0,
282 };
283
284 let node = RnsNode::connect_shared(config, Box::new(NoopCallbacks)).unwrap();
285
286 let dest_hash = [0xAA; 16];
288 node.register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE)
289 .unwrap();
290
291 thread::sleep(Duration::from_millis(100));
293
294 node.shutdown();
295 }
296
297 #[test]
298 fn shared_client_send_packet() {
299 let port = find_free_port();
300 let next_id = Arc::new(AtomicU64::new(52000));
301 let (server_tx, server_rx) = crate::event::channel();
302
303 let server_config = LocalServerConfig {
304 instance_name: "test-shared-send".into(),
305 port,
306 interface_id: rns_core::transport::types::InterfaceId(97),
307 };
308
309 crate::interface::local::start_server(server_config, server_tx, next_id).unwrap();
310 thread::sleep(Duration::from_millis(50));
311
312 let config = SharedClientConfig {
313 instance_name: "test-shared-send".into(),
314 port,
315 rpc_port: 0,
316 };
317
318 let node = RnsNode::connect_shared(config, Box::new(NoopCallbacks)).unwrap();
319
320 let raw = vec![0x00, 0x00, 0xAA, 0xBB, 0xCC, 0xDD]; node.send_raw(raw, rns_core::constants::DESTINATION_PLAIN, None)
323 .unwrap();
324
325 for _ in 0..10 {
328 match server_rx.recv_timeout(Duration::from_secs(1)) {
329 Ok(crate::event::Event::Frame { .. }) => {
330 break;
331 }
332 Ok(_) => continue,
333 Err(_) => break,
334 }
335 }
336 node.shutdown();
340 }
341
342 #[test]
343 fn shared_client_replays_single_announces_after_reconnect() {
344 let port = find_free_port();
345 let addr = format!("127.0.0.1:{}", port);
346 let instance_name = format!("test-shared-replay-{}", port);
347
348 let listener1 = std::net::TcpListener::bind(&addr).unwrap();
349 let (accepted1_tx, accepted1_rx) = mpsc::channel();
350 thread::spawn(move || {
351 let (stream, _) = listener1.accept().unwrap();
352 accepted1_tx.send(stream).unwrap();
353 });
354
355 let node = RnsNode::connect_shared(
356 SharedClientConfig {
357 instance_name,
358 port,
359 rpc_port: 0,
360 },
361 Box::new(NoopCallbacks),
362 )
363 .unwrap();
364
365 let identity = Identity::new(&mut OsRng);
366 let dest = crate::destination::Destination::single_in(
367 "shared-replay",
368 &["echo"],
369 IdentityHash(*identity.hash()),
370 );
371 node.register_destination(dest.hash.0, dest.dest_type.to_wire_constant())
372 .unwrap();
373 node.announce(&dest, &identity, Some(b"hello")).unwrap();
374
375 let mut stream1 = accepted1_rx.recv_timeout(Duration::from_secs(2)).unwrap();
376 stream1
377 .set_read_timeout(Some(Duration::from_secs(2)))
378 .unwrap();
379
380 let mut decoder = hdlc::Decoder::new();
381 let mut buf = [0u8; 4096];
382 let n = stream1.read(&mut buf).unwrap();
383 let frames = decoder.feed(&buf[..n]);
384 assert!(!frames.is_empty(), "expected initial announce frame");
385 let packet1 = RawPacket::unpack(&frames[0]).unwrap();
386 assert_eq!(packet1.destination_hash, dest.hash.0);
387 assert_eq!(packet1.context, rns_core::constants::CONTEXT_NONE);
388
389 drop(stream1);
390
391 let listener2 = std::net::TcpListener::bind(&addr).unwrap();
392 let (accepted2_tx, accepted2_rx) = mpsc::channel();
393 thread::spawn(move || {
394 let (stream, _) = listener2.accept().unwrap();
395 accepted2_tx.send(stream).unwrap();
396 });
397
398 let mut stream2 = accepted2_rx.recv_timeout(Duration::from_secs(15)).unwrap();
399 stream2
400 .set_read_timeout(Some(Duration::from_secs(15)))
401 .unwrap();
402
403 let mut decoder = hdlc::Decoder::new();
404 let n = stream2.read(&mut buf).unwrap();
405 let frames = decoder.feed(&buf[..n]);
406 assert!(!frames.is_empty(), "expected replayed announce frame");
407 let packet2 = RawPacket::unpack(&frames[0]).unwrap();
408 assert_eq!(packet2.destination_hash, dest.hash.0);
409 assert_eq!(packet2.context, rns_core::constants::CONTEXT_PATH_RESPONSE);
410
411 node.shutdown();
412 }
413
414 #[test]
415 fn connect_shared_fails_no_server() {
416 let port = find_free_port();
417
418 let config = SharedClientConfig {
419 instance_name: "nonexistent-instance-12345".into(),
420 port,
421 rpc_port: 0,
422 };
423
424 let result = RnsNode::connect_shared(config, Box::new(NoopCallbacks));
426 assert!(result.is_err());
427 }
428
429 #[test]
430 fn shared_config_defaults() {
431 let config = SharedClientConfig::default();
432 assert_eq!(config.instance_name, "default");
433 assert_eq!(config.port, 37428);
434 assert_eq!(config.rpc_port, 37429);
435 }
436}