1use std::fmt;
8use std::path::Path;
9use std::sync::mpsc;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12
13use rns_core::msgpack::{self, Value};
14use rns_core::types::{DestHash, LinkId};
15use rns_crypto::identity::Identity;
16
17use crate::destination::AnnouncedIdentity;
18use crate::pickle::PickleValue;
19use crate::shared_client::SharedClientConfig;
20use crate::{Callbacks, RnsNode, TeardownReason};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23enum RemotePurpose {
24 Management,
25 Blackhole,
26}
27
28#[derive(Debug, Clone)]
29pub enum RemoteManagementError {
30 InvalidHash(String),
31 MissingIdentity,
32 IdentityLoad(String),
33 Config(String),
34 ConnectShared,
35 PathTimeout([u8; 16]),
36 RecallTimeout([u8; 16]),
37 LinkTimeout([u8; 16]),
38 RequestTimeout(String),
39 LinkClosed,
40 SendFailed,
41 MalformedResponse(String),
42 Unsupported(String),
43}
44
45impl fmt::Display for RemoteManagementError {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 Self::InvalidHash(s) => {
49 write!(
50 f,
51 "invalid transport identity hash: {s} (expected 32 hex chars)"
52 )
53 }
54 Self::MissingIdentity => {
55 write!(
56 f,
57 "remote management requires an identity file; use -i PATH"
58 )
59 }
60 Self::IdentityLoad(e) => write!(f, "could not load management identity: {e}"),
61 Self::Config(e) => write!(f, "could not load local Reticulum config: {e}"),
62 Self::ConnectShared => {
63 write!(f, "could not connect to local shared Reticulum instance")
64 }
65 Self::PathTimeout(hash) => write!(f, "timed out waiting for path to {}", hex(hash)),
66 Self::RecallTimeout(hash) => {
67 write!(f, "timed out recalling identity for {}", hex(hash))
68 }
69 Self::LinkTimeout(hash) => write!(f, "timed out establishing link to {}", hex(hash)),
70 Self::RequestTimeout(path) => write!(f, "remote request to {path} timed out"),
71 Self::LinkClosed => write!(f, "remote link closed before request completed"),
72 Self::SendFailed => write!(f, "could not send remote management request"),
73 Self::MalformedResponse(e) => write!(f, "malformed remote response: {e}"),
74 Self::Unsupported(e) => write!(f, "{e}"),
75 }
76 }
77}
78
79impl std::error::Error for RemoteManagementError {}
80
81#[derive(Debug, Clone)]
82pub struct RemoteStatus {
83 pub stats: PickleValue,
84 pub link_count: Option<i64>,
85}
86
87struct RemoteCallbacks {
88 link_established_tx: mpsc::Sender<LinkId>,
89 response_tx: mpsc::Sender<(LinkId, [u8; 16], Vec<u8>)>,
90 link_closed_tx: mpsc::Sender<LinkId>,
91}
92
93impl Callbacks for RemoteCallbacks {
94 fn on_announce(&mut self, _announced: AnnouncedIdentity) {}
95
96 fn on_path_updated(&mut self, _dest_hash: DestHash, _hops: u8) {}
97
98 fn on_local_delivery(
99 &mut self,
100 _dest_hash: DestHash,
101 _raw: Vec<u8>,
102 _packet_hash: crate::PacketHash,
103 ) {
104 }
105
106 fn on_link_established(
107 &mut self,
108 link_id: LinkId,
109 _dest_hash: DestHash,
110 _rtt: f64,
111 _is_initiator: bool,
112 ) {
113 let _ = self.link_established_tx.send(link_id);
114 }
115
116 fn on_response(&mut self, link_id: LinkId, request_id: [u8; 16], data: Vec<u8>) {
117 let _ = self.response_tx.send((link_id, request_id, data));
118 }
119
120 fn on_link_closed(&mut self, link_id: LinkId, reason: Option<TeardownReason>) {
121 if reason == Some(TeardownReason::InitiatorClosed) {
122 return;
123 }
124 let _ = self.link_closed_tx.send(link_id);
125 }
126}
127
128pub struct RemoteManagementClient {
129 node: RnsNode,
130 management_identity: Option<Identity>,
131 timeout: Duration,
132 link_rx: mpsc::Receiver<LinkId>,
133 response_rx: mpsc::Receiver<(LinkId, [u8; 16], Vec<u8>)>,
134 closed_rx: mpsc::Receiver<LinkId>,
135 management_link: Arc<Mutex<Option<[u8; 16]>>>,
136 blackhole_link: Arc<Mutex<Option<[u8; 16]>>>,
137}
138
139impl RemoteManagementClient {
140 pub fn connect(
141 config_path: Option<&Path>,
142 management_identity_path: Option<&Path>,
143 timeout: Duration,
144 ) -> Result<Self, RemoteManagementError> {
145 let management_identity = match management_identity_path {
146 Some(path) => Some(
147 crate::storage::load_identity(path)
148 .map_err(|e| RemoteManagementError::IdentityLoad(e.to_string()))?,
149 ),
150 None => None,
151 };
152
153 let (link_tx, link_rx) = mpsc::channel();
154 let (response_tx, response_rx) = mpsc::channel();
155 let (closed_tx, closed_rx) = mpsc::channel();
156 let callbacks = RemoteCallbacks {
157 link_established_tx: link_tx,
158 response_tx,
159 link_closed_tx: closed_tx,
160 };
161
162 let config_dir = crate::storage::resolve_config_dir(config_path);
163 let config_file = config_dir.join("config");
164 let rns_config = if config_file.exists() {
165 crate::config::parse_file(&config_file)
166 .map_err(|e| RemoteManagementError::Config(e.to_string()))?
167 } else {
168 crate::config::parse("").map_err(|e| RemoteManagementError::Config(e.to_string()))?
169 };
170
171 let shared_config = SharedClientConfig {
172 instance_name: rns_config.reticulum.instance_name.clone(),
173 port: rns_config.reticulum.shared_instance_port,
174 rpc_port: rns_config.reticulum.instance_control_port,
175 };
176
177 let node = RnsNode::connect_shared(shared_config, Box::new(callbacks))
178 .map_err(|_| RemoteManagementError::ConnectShared)?;
179
180 Ok(Self {
181 node,
182 management_identity,
183 timeout,
184 link_rx,
185 response_rx,
186 closed_rx,
187 management_link: Arc::new(Mutex::new(None)),
188 blackhole_link: Arc::new(Mutex::new(None)),
189 })
190 }
191
192 pub fn status(
193 &mut self,
194 transport_identity_hash: [u8; 16],
195 include_link_count: bool,
196 ) -> Result<RemoteStatus, RemoteManagementError> {
197 let data = msgpack::pack(&Value::Array(vec![Value::Bool(include_link_count)]));
198 let response = self.request_management(transport_identity_hash, "/status", &data)?;
199 decode_status_response(&response)
200 }
201
202 pub fn path_table(
203 &mut self,
204 transport_identity_hash: [u8; 16],
205 destination_filter: Option<[u8; 16]>,
206 max_hops: Option<u8>,
207 ) -> Result<PickleValue, RemoteManagementError> {
208 let mut request = vec![Value::Str("table".into())];
209 if destination_filter.is_some() || max_hops.is_some() {
210 request.push(match destination_filter {
211 Some(hash) => Value::Bin(hash.to_vec()),
212 None => Value::Nil,
213 });
214 }
215 if let Some(hops) = max_hops {
216 request.push(Value::UInt(hops as u64));
217 }
218 let data = msgpack::pack(&Value::Array(request));
219 let response = self.request_management(transport_identity_hash, "/path", &data)?;
220 let value = msgpack::unpack_exact(&response)
221 .map_err(|e| RemoteManagementError::MalformedResponse(e.to_string()))?;
222 Ok(msgpack_to_pickle(&value))
223 }
224
225 pub fn rate_table(
226 &mut self,
227 transport_identity_hash: [u8; 16],
228 destination_filter: Option<[u8; 16]>,
229 ) -> Result<PickleValue, RemoteManagementError> {
230 let mut request = vec![Value::Str("rates".into())];
231 if let Some(hash) = destination_filter {
232 request.push(Value::Bin(hash.to_vec()));
233 }
234 let data = msgpack::pack(&Value::Array(request));
235 let response = self.request_management(transport_identity_hash, "/path", &data)?;
236 let value = msgpack::unpack_exact(&response)
237 .map_err(|e| RemoteManagementError::MalformedResponse(e.to_string()))?;
238 Ok(msgpack_to_pickle(&value))
239 }
240
241 pub fn published_blackhole_list(
242 &mut self,
243 transport_identity_hash: [u8; 16],
244 ) -> Result<PickleValue, RemoteManagementError> {
245 let dest_hash = crate::management::blackhole_dest_hash(&transport_identity_hash);
246 let response = self.request(
247 RemotePurpose::Blackhole,
248 dest_hash,
249 None,
250 "/list",
251 &[],
252 false,
253 )?;
254 let value = msgpack::unpack_exact(&response)
255 .map_err(|e| RemoteManagementError::MalformedResponse(e.to_string()))?;
256 Ok(blackhole_map_to_list(&value))
257 }
258
259 fn request_management(
260 &mut self,
261 transport_identity_hash: [u8; 16],
262 path: &str,
263 data: &[u8],
264 ) -> Result<Vec<u8>, RemoteManagementError> {
265 let dest_hash = crate::management::management_dest_hash(&transport_identity_hash);
266 let prv_key = self
267 .management_identity
268 .as_ref()
269 .and_then(|id| id.get_private_key())
270 .ok_or(RemoteManagementError::MissingIdentity)?;
271 self.request(
272 RemotePurpose::Management,
273 dest_hash,
274 Some(prv_key),
275 path,
276 data,
277 true,
278 )
279 }
280
281 fn request(
282 &mut self,
283 purpose: RemotePurpose,
284 dest_hash: [u8; 16],
285 identity_prv_key: Option<[u8; 64]>,
286 path: &str,
287 data: &[u8],
288 identify_on_new_link: bool,
289 ) -> Result<Vec<u8>, RemoteManagementError> {
290 let link_slot = match purpose {
291 RemotePurpose::Management => Arc::clone(&self.management_link),
292 RemotePurpose::Blackhole => Arc::clone(&self.blackhole_link),
293 };
294
295 if let Some(link_id) = *lock_link(&link_slot) {
296 if self.node.send_request(link_id, path, data).is_ok() {
297 return self.wait_for_response(link_id, path);
298 }
299 *lock_link(&link_slot) = None;
300 }
301
302 let announced = self.wait_for_destination(dest_hash)?;
303 let sig_pub: [u8; 32] = announced.public_key[32..64]
304 .try_into()
305 .expect("slice length checked");
306 let link_id = self
307 .node
308 .create_link(dest_hash, sig_pub)
309 .map_err(|_| RemoteManagementError::LinkTimeout(dest_hash))?;
310 self.wait_for_link_established(link_id, dest_hash)?;
311
312 if identify_on_new_link {
313 let prv_key = identity_prv_key.ok_or(RemoteManagementError::MissingIdentity)?;
314 self.node
315 .identify_on_link(link_id, prv_key)
316 .map_err(|_| RemoteManagementError::SendFailed)?;
317 std::thread::sleep(Duration::from_millis(200));
318 }
319
320 *lock_link(&link_slot) = Some(link_id);
321 self.node
322 .send_request(link_id, path, data)
323 .map_err(|_| RemoteManagementError::SendFailed)?;
324 self.wait_for_response(link_id, path)
325 }
326
327 fn wait_for_destination(
328 &self,
329 dest_hash: [u8; 16],
330 ) -> Result<AnnouncedIdentity, RemoteManagementError> {
331 let dest = DestHash(dest_hash);
332 let deadline = Instant::now() + self.timeout;
333 let _ = self.node.request_path(&dest);
334
335 loop {
336 if let Ok(Some(announced)) = self.node.recall_identity(&dest) {
337 return Ok(announced);
338 }
339 if self.node.has_path(&dest).unwrap_or(false) {
340 if let Ok(Some(announced)) = self.node.recall_identity(&dest) {
341 return Ok(announced);
342 }
343 }
344 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
345 return Err(RemoteManagementError::RecallTimeout(dest_hash));
346 };
347 if remaining.is_zero() {
348 return Err(RemoteManagementError::PathTimeout(dest_hash));
349 }
350 std::thread::sleep(remaining.min(Duration::from_millis(100)));
351 }
352 }
353
354 fn wait_for_link_established(
355 &self,
356 expected_link_id: [u8; 16],
357 dest_hash: [u8; 16],
358 ) -> Result<(), RemoteManagementError> {
359 let deadline = Instant::now() + self.timeout;
360 loop {
361 if let Ok(closed) = self.closed_rx.try_recv() {
362 if closed.0 == expected_link_id {
363 return Err(RemoteManagementError::LinkClosed);
364 }
365 }
366 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
367 return Err(RemoteManagementError::LinkTimeout(dest_hash));
368 };
369 match self
370 .link_rx
371 .recv_timeout(remaining.min(Duration::from_millis(50)))
372 {
373 Ok(link_id) if link_id.0 == expected_link_id => return Ok(()),
374 Ok(_) => continue,
375 Err(mpsc::RecvTimeoutError::Timeout) => continue,
376 Err(mpsc::RecvTimeoutError::Disconnected) => {
377 return Err(RemoteManagementError::LinkTimeout(dest_hash));
378 }
379 }
380 }
381 }
382
383 fn wait_for_response(
384 &self,
385 link_id: [u8; 16],
386 path: &str,
387 ) -> Result<Vec<u8>, RemoteManagementError> {
388 let deadline = Instant::now() + self.timeout;
389 loop {
390 if let Ok(closed) = self.closed_rx.try_recv() {
391 if closed.0 == link_id {
392 return Err(RemoteManagementError::LinkClosed);
393 }
394 }
395 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
396 return Err(RemoteManagementError::RequestTimeout(path.into()));
397 };
398 match self
399 .response_rx
400 .recv_timeout(remaining.min(Duration::from_millis(50)))
401 {
402 Ok((resp_link_id, _request_id, data)) if resp_link_id.0 == link_id => {
403 return Ok(data);
404 }
405 Ok(_) => continue,
406 Err(mpsc::RecvTimeoutError::Timeout) => continue,
407 Err(mpsc::RecvTimeoutError::Disconnected) => {
408 return Err(RemoteManagementError::RequestTimeout(path.into()));
409 }
410 }
411 }
412 }
413}
414
415pub fn parse_transport_identity_hash(s: &str) -> Result<[u8; 16], RemoteManagementError> {
416 let trimmed = s.trim();
417 if trimmed.len() != 32 {
418 return Err(RemoteManagementError::InvalidHash(s.into()));
419 }
420 let mut out = [0u8; 16];
421 for i in 0..16 {
422 out[i] = u8::from_str_radix(&trimmed[i * 2..i * 2 + 2], 16)
423 .map_err(|_| RemoteManagementError::InvalidHash(s.into()))?;
424 }
425 Ok(out)
426}
427
428pub fn msgpack_to_pickle(value: &Value) -> PickleValue {
429 match value {
430 Value::Nil => PickleValue::None,
431 Value::Bool(v) => PickleValue::Bool(*v),
432 Value::UInt(v) => PickleValue::Int((*v).min(i64::MAX as u64) as i64),
433 Value::Int(v) => PickleValue::Int(*v),
434 Value::Float(v) => PickleValue::Float(*v),
435 Value::Bin(v) => PickleValue::Bytes(v.clone()),
436 Value::Str(v) => PickleValue::String(v.clone()),
437 Value::Array(items) => PickleValue::List(items.iter().map(msgpack_to_pickle).collect()),
438 Value::Map(entries) => PickleValue::Dict(
439 entries
440 .iter()
441 .map(|(k, v)| (msgpack_to_pickle(k), msgpack_to_pickle(v)))
442 .collect(),
443 ),
444 }
445}
446
447fn decode_status_response(data: &[u8]) -> Result<RemoteStatus, RemoteManagementError> {
448 let value = msgpack::unpack_exact(data)
449 .map_err(|e| RemoteManagementError::MalformedResponse(e.to_string()))?;
450 let arr = value
451 .as_array()
452 .ok_or_else(|| RemoteManagementError::MalformedResponse("expected status array".into()))?;
453 let stats = arr
454 .first()
455 .ok_or_else(|| RemoteManagementError::MalformedResponse("missing status dict".into()))?;
456 let mut stats = msgpack_to_pickle(stats);
457 normalize_remote_status(&mut stats);
458 let link_count = arr.get(1).and_then(|v| v.as_integer());
459 Ok(RemoteStatus { stats, link_count })
460}
461
462fn normalize_remote_status(value: &mut PickleValue) {
463 let PickleValue::Dict(entries) = value else {
464 return;
465 };
466 if !entries
467 .iter()
468 .any(|(k, _)| string_key(k) == Some("transport_enabled"))
469 {
470 entries.push((
471 PickleValue::String("transport_enabled".into()),
472 PickleValue::Bool(true),
473 ));
474 }
475 if let Some((_, PickleValue::List(ifaces))) = entries
476 .iter_mut()
477 .find(|(k, _)| string_key(k) == Some("interfaces"))
478 {
479 for iface in ifaces {
480 normalize_remote_interface(iface);
481 }
482 }
483}
484
485fn normalize_remote_interface(value: &mut PickleValue) {
486 let PickleValue::Dict(entries) = value else {
487 return;
488 };
489 copy_key(entries, "incoming_announce_freq", "ia_freq");
490 copy_key(entries, "outgoing_announce_freq", "oa_freq");
491 copy_key(entries, "incoming_path_request_freq", "ip_freq");
492 copy_key(entries, "outgoing_path_request_freq", "op_freq");
493}
494
495fn copy_key(entries: &mut Vec<(PickleValue, PickleValue)>, from: &str, to: &str) {
496 if entries.iter().any(|(k, _)| string_key(k) == Some(to)) {
497 return;
498 }
499 if let Some((_, value)) = entries.iter().find(|(k, _)| string_key(k) == Some(from)) {
500 entries.push((PickleValue::String(to.into()), value.clone()));
501 }
502}
503
504fn blackhole_map_to_list(value: &Value) -> PickleValue {
505 let Some(entries) = value.as_map() else {
506 return msgpack_to_pickle(value);
507 };
508 let mut list = Vec::new();
509 for (key, info) in entries {
510 let Some(identity_hash) = key.as_bin() else {
511 continue;
512 };
513 let mut item = vec![(
514 PickleValue::String("identity_hash".into()),
515 PickleValue::Bytes(identity_hash.to_vec()),
516 )];
517 if let Some(info_entries) = info.as_map() {
518 for (k, v) in info_entries {
519 if let Some(key) = k.as_str() {
520 let out_key = if key == "created" { "created" } else { key };
521 item.push((PickleValue::String(out_key.into()), msgpack_to_pickle(v)));
522 }
523 }
524 }
525 list.push(PickleValue::Dict(item));
526 }
527 PickleValue::List(list)
528}
529
530fn lock_link<'a>(
531 link: &'a Arc<Mutex<Option<[u8; 16]>>>,
532) -> std::sync::MutexGuard<'a, Option<[u8; 16]>> {
533 match link.lock() {
534 Ok(guard) => guard,
535 Err(poisoned) => poisoned.into_inner(),
536 }
537}
538
539fn string_key(value: &PickleValue) -> Option<&str> {
540 match value {
541 PickleValue::String(s) => Some(s),
542 _ => None,
543 }
544}
545
546fn hex(bytes: &[u8]) -> String {
547 let mut out = String::with_capacity(bytes.len() * 2);
548 for byte in bytes {
549 use std::fmt::Write;
550 let _ = write!(&mut out, "{byte:02x}");
551 }
552 out
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
560 fn parse_transport_identity_hash_accepts_32_hex_chars() {
561 let hash = parse_transport_identity_hash("00112233445566778899aabbccddeeff").unwrap();
562 assert_eq!(hash[0], 0x00);
563 assert_eq!(hash[15], 0xff);
564 }
565
566 #[test]
567 fn parse_transport_identity_hash_rejects_invalid_input() {
568 assert!(parse_transport_identity_hash("short").is_err());
569 assert!(parse_transport_identity_hash("00112233445566778899aabbccddeeg").is_err());
570 }
571
572 #[test]
573 fn status_response_normalizes_renderer_keys() {
574 let status = Value::Map(vec![
575 (
576 Value::Str("interfaces".into()),
577 Value::Array(vec![Value::Map(vec![
578 (Value::Str("name".into()), Value::Str("if0".into())),
579 (
580 Value::Str("incoming_announce_freq".into()),
581 Value::Float(1.5),
582 ),
583 ])]),
584 ),
585 (Value::Str("rxb".into()), Value::UInt(1)),
586 (Value::Str("txb".into()), Value::UInt(2)),
587 ]);
588 let data = msgpack::pack(&Value::Array(vec![status, Value::UInt(7)]));
589 let decoded = decode_status_response(&data).unwrap();
590 assert_eq!(decoded.link_count, Some(7));
591 assert_eq!(
592 decoded
593 .stats
594 .get("transport_enabled")
595 .and_then(|v| v.as_bool()),
596 Some(true)
597 );
598 let iface = &decoded
599 .stats
600 .get("interfaces")
601 .and_then(|v| v.as_list())
602 .unwrap()[0];
603 assert_eq!(iface.get("ia_freq").and_then(|v| v.as_float()), Some(1.5));
604 }
605
606 #[test]
607 fn blackhole_map_converts_to_renderer_list() {
608 let value = Value::Map(vec![(
609 Value::Bin(vec![0x11; 16]),
610 Value::Map(vec![
611 (Value::Str("expires".into()), Value::Float(123.0)),
612 (Value::Str("reason".into()), Value::Str("test".into())),
613 ]),
614 )]);
615 let list = blackhole_map_to_list(&value);
616 let items = list.as_list().unwrap();
617 assert_eq!(items.len(), 1);
618 assert_eq!(
619 items[0]
620 .get("identity_hash")
621 .and_then(|v| v.as_bytes())
622 .unwrap(),
623 &[0x11; 16]
624 );
625 }
626}