Skip to main content

rns_net/
remote_management.rs

1//! Client-side remote management helpers.
2//!
3//! These helpers connect to a local shared instance, derive upstream-compatible
4//! remote management destinations from a remote transport identity hash, and
5//! query the remote node over a Reticulum link.
6
7use std::fmt;
8use std::path::Path;
9use std::sync::mpsc;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12
13use rns_core::msgpack::{self, Value};
14use rns_core::types::{DestHash, LinkId};
15use rns_crypto::identity::Identity;
16
17use crate::destination::AnnouncedIdentity;
18use crate::pickle::PickleValue;
19use crate::shared_client::SharedClientConfig;
20use crate::{Callbacks, RnsNode, TeardownReason};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23enum RemotePurpose {
24    Management,
25    Blackhole,
26}
27
28#[derive(Debug, Clone)]
29pub enum RemoteManagementError {
30    InvalidHash(String),
31    MissingIdentity,
32    IdentityLoad(String),
33    Config(String),
34    ConnectShared,
35    PathTimeout([u8; 16]),
36    RecallTimeout([u8; 16]),
37    LinkTimeout([u8; 16]),
38    RequestTimeout(String),
39    LinkClosed,
40    SendFailed,
41    MalformedResponse(String),
42    Unsupported(String),
43}
44
45impl fmt::Display for RemoteManagementError {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            Self::InvalidHash(s) => {
49                write!(
50                    f,
51                    "invalid transport identity hash: {s} (expected 32 hex chars)"
52                )
53            }
54            Self::MissingIdentity => {
55                write!(
56                    f,
57                    "remote management requires an identity file; use -i PATH"
58                )
59            }
60            Self::IdentityLoad(e) => write!(f, "could not load management identity: {e}"),
61            Self::Config(e) => write!(f, "could not load local Reticulum config: {e}"),
62            Self::ConnectShared => {
63                write!(f, "could not connect to local shared Reticulum instance")
64            }
65            Self::PathTimeout(hash) => write!(f, "timed out waiting for path to {}", hex(hash)),
66            Self::RecallTimeout(hash) => {
67                write!(f, "timed out recalling identity for {}", hex(hash))
68            }
69            Self::LinkTimeout(hash) => write!(f, "timed out establishing link to {}", hex(hash)),
70            Self::RequestTimeout(path) => write!(f, "remote request to {path} timed out"),
71            Self::LinkClosed => write!(f, "remote link closed before request completed"),
72            Self::SendFailed => write!(f, "could not send remote management request"),
73            Self::MalformedResponse(e) => write!(f, "malformed remote response: {e}"),
74            Self::Unsupported(e) => write!(f, "{e}"),
75        }
76    }
77}
78
79impl std::error::Error for RemoteManagementError {}
80
81#[derive(Debug, Clone)]
82pub struct RemoteStatus {
83    pub stats: PickleValue,
84    pub link_count: Option<i64>,
85}
86
87struct RemoteCallbacks {
88    link_established_tx: mpsc::Sender<LinkId>,
89    response_tx: mpsc::Sender<(LinkId, [u8; 16], Vec<u8>)>,
90    link_closed_tx: mpsc::Sender<LinkId>,
91}
92
93impl Callbacks for RemoteCallbacks {
94    fn on_announce(&mut self, _announced: AnnouncedIdentity) {}
95
96    fn on_path_updated(&mut self, _dest_hash: DestHash, _hops: u8) {}
97
98    fn on_local_delivery(
99        &mut self,
100        _dest_hash: DestHash,
101        _raw: Vec<u8>,
102        _packet_hash: crate::PacketHash,
103    ) {
104    }
105
106    fn on_link_established(
107        &mut self,
108        link_id: LinkId,
109        _dest_hash: DestHash,
110        _rtt: f64,
111        _is_initiator: bool,
112    ) {
113        let _ = self.link_established_tx.send(link_id);
114    }
115
116    fn on_response(&mut self, link_id: LinkId, request_id: [u8; 16], data: Vec<u8>) {
117        let _ = self.response_tx.send((link_id, request_id, data));
118    }
119
120    fn on_link_closed(&mut self, link_id: LinkId, reason: Option<TeardownReason>) {
121        if reason == Some(TeardownReason::InitiatorClosed) {
122            return;
123        }
124        let _ = self.link_closed_tx.send(link_id);
125    }
126}
127
128pub struct RemoteManagementClient {
129    node: RnsNode,
130    management_identity: Option<Identity>,
131    timeout: Duration,
132    link_rx: mpsc::Receiver<LinkId>,
133    response_rx: mpsc::Receiver<(LinkId, [u8; 16], Vec<u8>)>,
134    closed_rx: mpsc::Receiver<LinkId>,
135    management_link: Arc<Mutex<Option<[u8; 16]>>>,
136    blackhole_link: Arc<Mutex<Option<[u8; 16]>>>,
137}
138
139impl RemoteManagementClient {
140    pub fn connect(
141        config_path: Option<&Path>,
142        management_identity_path: Option<&Path>,
143        timeout: Duration,
144    ) -> Result<Self, RemoteManagementError> {
145        let management_identity = match management_identity_path {
146            Some(path) => Some(
147                crate::storage::load_identity(path)
148                    .map_err(|e| RemoteManagementError::IdentityLoad(e.to_string()))?,
149            ),
150            None => None,
151        };
152
153        let (link_tx, link_rx) = mpsc::channel();
154        let (response_tx, response_rx) = mpsc::channel();
155        let (closed_tx, closed_rx) = mpsc::channel();
156        let callbacks = RemoteCallbacks {
157            link_established_tx: link_tx,
158            response_tx,
159            link_closed_tx: closed_tx,
160        };
161
162        let config_dir = crate::storage::resolve_config_dir(config_path);
163        let config_file = config_dir.join("config");
164        let rns_config = if config_file.exists() {
165            crate::config::parse_file(&config_file)
166                .map_err(|e| RemoteManagementError::Config(e.to_string()))?
167        } else {
168            crate::config::parse("").map_err(|e| RemoteManagementError::Config(e.to_string()))?
169        };
170
171        let shared_config = SharedClientConfig {
172            instance_name: rns_config.reticulum.instance_name.clone(),
173            port: rns_config.reticulum.shared_instance_port,
174            rpc_port: rns_config.reticulum.instance_control_port,
175        };
176
177        let node = RnsNode::connect_shared(shared_config, Box::new(callbacks))
178            .map_err(|_| RemoteManagementError::ConnectShared)?;
179
180        Ok(Self {
181            node,
182            management_identity,
183            timeout,
184            link_rx,
185            response_rx,
186            closed_rx,
187            management_link: Arc::new(Mutex::new(None)),
188            blackhole_link: Arc::new(Mutex::new(None)),
189        })
190    }
191
192    pub fn status(
193        &mut self,
194        transport_identity_hash: [u8; 16],
195        include_link_count: bool,
196    ) -> Result<RemoteStatus, RemoteManagementError> {
197        let data = msgpack::pack(&Value::Array(vec![Value::Bool(include_link_count)]));
198        let response = self.request_management(transport_identity_hash, "/status", &data)?;
199        decode_status_response(&response)
200    }
201
202    pub fn path_table(
203        &mut self,
204        transport_identity_hash: [u8; 16],
205        destination_filter: Option<[u8; 16]>,
206        max_hops: Option<u8>,
207    ) -> Result<PickleValue, RemoteManagementError> {
208        let mut request = vec![Value::Str("table".into())];
209        if destination_filter.is_some() || max_hops.is_some() {
210            request.push(match destination_filter {
211                Some(hash) => Value::Bin(hash.to_vec()),
212                None => Value::Nil,
213            });
214        }
215        if let Some(hops) = max_hops {
216            request.push(Value::UInt(hops as u64));
217        }
218        let data = msgpack::pack(&Value::Array(request));
219        let response = self.request_management(transport_identity_hash, "/path", &data)?;
220        let value = msgpack::unpack_exact(&response)
221            .map_err(|e| RemoteManagementError::MalformedResponse(e.to_string()))?;
222        Ok(msgpack_to_pickle(&value))
223    }
224
225    pub fn rate_table(
226        &mut self,
227        transport_identity_hash: [u8; 16],
228        destination_filter: Option<[u8; 16]>,
229    ) -> Result<PickleValue, RemoteManagementError> {
230        let mut request = vec![Value::Str("rates".into())];
231        if let Some(hash) = destination_filter {
232            request.push(Value::Bin(hash.to_vec()));
233        }
234        let data = msgpack::pack(&Value::Array(request));
235        let response = self.request_management(transport_identity_hash, "/path", &data)?;
236        let value = msgpack::unpack_exact(&response)
237            .map_err(|e| RemoteManagementError::MalformedResponse(e.to_string()))?;
238        Ok(msgpack_to_pickle(&value))
239    }
240
241    pub fn published_blackhole_list(
242        &mut self,
243        transport_identity_hash: [u8; 16],
244    ) -> Result<PickleValue, RemoteManagementError> {
245        let dest_hash = crate::management::blackhole_dest_hash(&transport_identity_hash);
246        let response = self.request(
247            RemotePurpose::Blackhole,
248            dest_hash,
249            None,
250            "/list",
251            &[],
252            false,
253        )?;
254        let value = msgpack::unpack_exact(&response)
255            .map_err(|e| RemoteManagementError::MalformedResponse(e.to_string()))?;
256        Ok(blackhole_map_to_list(&value))
257    }
258
259    fn request_management(
260        &mut self,
261        transport_identity_hash: [u8; 16],
262        path: &str,
263        data: &[u8],
264    ) -> Result<Vec<u8>, RemoteManagementError> {
265        let dest_hash = crate::management::management_dest_hash(&transport_identity_hash);
266        let prv_key = self
267            .management_identity
268            .as_ref()
269            .and_then(|id| id.get_private_key())
270            .ok_or(RemoteManagementError::MissingIdentity)?;
271        self.request(
272            RemotePurpose::Management,
273            dest_hash,
274            Some(prv_key),
275            path,
276            data,
277            true,
278        )
279    }
280
281    fn request(
282        &mut self,
283        purpose: RemotePurpose,
284        dest_hash: [u8; 16],
285        identity_prv_key: Option<[u8; 64]>,
286        path: &str,
287        data: &[u8],
288        identify_on_new_link: bool,
289    ) -> Result<Vec<u8>, RemoteManagementError> {
290        let link_slot = match purpose {
291            RemotePurpose::Management => Arc::clone(&self.management_link),
292            RemotePurpose::Blackhole => Arc::clone(&self.blackhole_link),
293        };
294
295        if let Some(link_id) = *lock_link(&link_slot) {
296            if self.node.send_request(link_id, path, data).is_ok() {
297                return self.wait_for_response(link_id, path);
298            }
299            *lock_link(&link_slot) = None;
300        }
301
302        let announced = self.wait_for_destination(dest_hash)?;
303        let sig_pub: [u8; 32] = announced.public_key[32..64]
304            .try_into()
305            .expect("slice length checked");
306        let link_id = self
307            .node
308            .create_link(dest_hash, sig_pub)
309            .map_err(|_| RemoteManagementError::LinkTimeout(dest_hash))?;
310        self.wait_for_link_established(link_id, dest_hash)?;
311
312        if identify_on_new_link {
313            let prv_key = identity_prv_key.ok_or(RemoteManagementError::MissingIdentity)?;
314            self.node
315                .identify_on_link(link_id, prv_key)
316                .map_err(|_| RemoteManagementError::SendFailed)?;
317            std::thread::sleep(Duration::from_millis(200));
318        }
319
320        *lock_link(&link_slot) = Some(link_id);
321        self.node
322            .send_request(link_id, path, data)
323            .map_err(|_| RemoteManagementError::SendFailed)?;
324        self.wait_for_response(link_id, path)
325    }
326
327    fn wait_for_destination(
328        &self,
329        dest_hash: [u8; 16],
330    ) -> Result<AnnouncedIdentity, RemoteManagementError> {
331        let dest = DestHash(dest_hash);
332        let deadline = Instant::now() + self.timeout;
333        let _ = self.node.request_path(&dest);
334
335        loop {
336            if let Ok(Some(announced)) = self.node.recall_identity(&dest) {
337                return Ok(announced);
338            }
339            if self.node.has_path(&dest).unwrap_or(false) {
340                if let Ok(Some(announced)) = self.node.recall_identity(&dest) {
341                    return Ok(announced);
342                }
343            }
344            let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
345                return Err(RemoteManagementError::RecallTimeout(dest_hash));
346            };
347            if remaining.is_zero() {
348                return Err(RemoteManagementError::PathTimeout(dest_hash));
349            }
350            std::thread::sleep(remaining.min(Duration::from_millis(100)));
351        }
352    }
353
354    fn wait_for_link_established(
355        &self,
356        expected_link_id: [u8; 16],
357        dest_hash: [u8; 16],
358    ) -> Result<(), RemoteManagementError> {
359        let deadline = Instant::now() + self.timeout;
360        loop {
361            if let Ok(closed) = self.closed_rx.try_recv() {
362                if closed.0 == expected_link_id {
363                    return Err(RemoteManagementError::LinkClosed);
364                }
365            }
366            let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
367                return Err(RemoteManagementError::LinkTimeout(dest_hash));
368            };
369            match self
370                .link_rx
371                .recv_timeout(remaining.min(Duration::from_millis(50)))
372            {
373                Ok(link_id) if link_id.0 == expected_link_id => return Ok(()),
374                Ok(_) => continue,
375                Err(mpsc::RecvTimeoutError::Timeout) => continue,
376                Err(mpsc::RecvTimeoutError::Disconnected) => {
377                    return Err(RemoteManagementError::LinkTimeout(dest_hash));
378                }
379            }
380        }
381    }
382
383    fn wait_for_response(
384        &self,
385        link_id: [u8; 16],
386        path: &str,
387    ) -> Result<Vec<u8>, RemoteManagementError> {
388        let deadline = Instant::now() + self.timeout;
389        loop {
390            if let Ok(closed) = self.closed_rx.try_recv() {
391                if closed.0 == link_id {
392                    return Err(RemoteManagementError::LinkClosed);
393                }
394            }
395            let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
396                return Err(RemoteManagementError::RequestTimeout(path.into()));
397            };
398            match self
399                .response_rx
400                .recv_timeout(remaining.min(Duration::from_millis(50)))
401            {
402                Ok((resp_link_id, _request_id, data)) if resp_link_id.0 == link_id => {
403                    return Ok(data);
404                }
405                Ok(_) => continue,
406                Err(mpsc::RecvTimeoutError::Timeout) => continue,
407                Err(mpsc::RecvTimeoutError::Disconnected) => {
408                    return Err(RemoteManagementError::RequestTimeout(path.into()));
409                }
410            }
411        }
412    }
413}
414
415pub fn parse_transport_identity_hash(s: &str) -> Result<[u8; 16], RemoteManagementError> {
416    let trimmed = s.trim();
417    if trimmed.len() != 32 {
418        return Err(RemoteManagementError::InvalidHash(s.into()));
419    }
420    let mut out = [0u8; 16];
421    for i in 0..16 {
422        out[i] = u8::from_str_radix(&trimmed[i * 2..i * 2 + 2], 16)
423            .map_err(|_| RemoteManagementError::InvalidHash(s.into()))?;
424    }
425    Ok(out)
426}
427
428pub fn msgpack_to_pickle(value: &Value) -> PickleValue {
429    match value {
430        Value::Nil => PickleValue::None,
431        Value::Bool(v) => PickleValue::Bool(*v),
432        Value::UInt(v) => PickleValue::Int((*v).min(i64::MAX as u64) as i64),
433        Value::Int(v) => PickleValue::Int(*v),
434        Value::Float(v) => PickleValue::Float(*v),
435        Value::Bin(v) => PickleValue::Bytes(v.clone()),
436        Value::Str(v) => PickleValue::String(v.clone()),
437        Value::Array(items) => PickleValue::List(items.iter().map(msgpack_to_pickle).collect()),
438        Value::Map(entries) => PickleValue::Dict(
439            entries
440                .iter()
441                .map(|(k, v)| (msgpack_to_pickle(k), msgpack_to_pickle(v)))
442                .collect(),
443        ),
444    }
445}
446
447fn decode_status_response(data: &[u8]) -> Result<RemoteStatus, RemoteManagementError> {
448    let value = msgpack::unpack_exact(data)
449        .map_err(|e| RemoteManagementError::MalformedResponse(e.to_string()))?;
450    let arr = value
451        .as_array()
452        .ok_or_else(|| RemoteManagementError::MalformedResponse("expected status array".into()))?;
453    let stats = arr
454        .first()
455        .ok_or_else(|| RemoteManagementError::MalformedResponse("missing status dict".into()))?;
456    let mut stats = msgpack_to_pickle(stats);
457    normalize_remote_status(&mut stats);
458    let link_count = arr.get(1).and_then(|v| v.as_integer());
459    Ok(RemoteStatus { stats, link_count })
460}
461
462fn normalize_remote_status(value: &mut PickleValue) {
463    let PickleValue::Dict(entries) = value else {
464        return;
465    };
466    if !entries
467        .iter()
468        .any(|(k, _)| string_key(k) == Some("transport_enabled"))
469    {
470        entries.push((
471            PickleValue::String("transport_enabled".into()),
472            PickleValue::Bool(true),
473        ));
474    }
475    if let Some((_, PickleValue::List(ifaces))) = entries
476        .iter_mut()
477        .find(|(k, _)| string_key(k) == Some("interfaces"))
478    {
479        for iface in ifaces {
480            normalize_remote_interface(iface);
481        }
482    }
483}
484
485fn normalize_remote_interface(value: &mut PickleValue) {
486    let PickleValue::Dict(entries) = value else {
487        return;
488    };
489    copy_key(entries, "incoming_announce_freq", "ia_freq");
490    copy_key(entries, "outgoing_announce_freq", "oa_freq");
491    copy_key(entries, "incoming_path_request_freq", "ip_freq");
492    copy_key(entries, "outgoing_path_request_freq", "op_freq");
493}
494
495fn copy_key(entries: &mut Vec<(PickleValue, PickleValue)>, from: &str, to: &str) {
496    if entries.iter().any(|(k, _)| string_key(k) == Some(to)) {
497        return;
498    }
499    if let Some((_, value)) = entries.iter().find(|(k, _)| string_key(k) == Some(from)) {
500        entries.push((PickleValue::String(to.into()), value.clone()));
501    }
502}
503
504fn blackhole_map_to_list(value: &Value) -> PickleValue {
505    let Some(entries) = value.as_map() else {
506        return msgpack_to_pickle(value);
507    };
508    let mut list = Vec::new();
509    for (key, info) in entries {
510        let Some(identity_hash) = key.as_bin() else {
511            continue;
512        };
513        let mut item = vec![(
514            PickleValue::String("identity_hash".into()),
515            PickleValue::Bytes(identity_hash.to_vec()),
516        )];
517        if let Some(info_entries) = info.as_map() {
518            for (k, v) in info_entries {
519                if let Some(key) = k.as_str() {
520                    let out_key = if key == "created" { "created" } else { key };
521                    item.push((PickleValue::String(out_key.into()), msgpack_to_pickle(v)));
522                }
523            }
524        }
525        list.push(PickleValue::Dict(item));
526    }
527    PickleValue::List(list)
528}
529
530fn lock_link<'a>(
531    link: &'a Arc<Mutex<Option<[u8; 16]>>>,
532) -> std::sync::MutexGuard<'a, Option<[u8; 16]>> {
533    match link.lock() {
534        Ok(guard) => guard,
535        Err(poisoned) => poisoned.into_inner(),
536    }
537}
538
539fn string_key(value: &PickleValue) -> Option<&str> {
540    match value {
541        PickleValue::String(s) => Some(s),
542        _ => None,
543    }
544}
545
546fn hex(bytes: &[u8]) -> String {
547    let mut out = String::with_capacity(bytes.len() * 2);
548    for byte in bytes {
549        use std::fmt::Write;
550        let _ = write!(&mut out, "{byte:02x}");
551    }
552    out
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[test]
560    fn parse_transport_identity_hash_accepts_32_hex_chars() {
561        let hash = parse_transport_identity_hash("00112233445566778899aabbccddeeff").unwrap();
562        assert_eq!(hash[0], 0x00);
563        assert_eq!(hash[15], 0xff);
564    }
565
566    #[test]
567    fn parse_transport_identity_hash_rejects_invalid_input() {
568        assert!(parse_transport_identity_hash("short").is_err());
569        assert!(parse_transport_identity_hash("00112233445566778899aabbccddeeg").is_err());
570    }
571
572    #[test]
573    fn status_response_normalizes_renderer_keys() {
574        let status = Value::Map(vec![
575            (
576                Value::Str("interfaces".into()),
577                Value::Array(vec![Value::Map(vec![
578                    (Value::Str("name".into()), Value::Str("if0".into())),
579                    (
580                        Value::Str("incoming_announce_freq".into()),
581                        Value::Float(1.5),
582                    ),
583                ])]),
584            ),
585            (Value::Str("rxb".into()), Value::UInt(1)),
586            (Value::Str("txb".into()), Value::UInt(2)),
587        ]);
588        let data = msgpack::pack(&Value::Array(vec![status, Value::UInt(7)]));
589        let decoded = decode_status_response(&data).unwrap();
590        assert_eq!(decoded.link_count, Some(7));
591        assert_eq!(
592            decoded
593                .stats
594                .get("transport_enabled")
595                .and_then(|v| v.as_bool()),
596            Some(true)
597        );
598        let iface = &decoded
599            .stats
600            .get("interfaces")
601            .and_then(|v| v.as_list())
602            .unwrap()[0];
603        assert_eq!(iface.get("ia_freq").and_then(|v| v.as_float()), Some(1.5));
604    }
605
606    #[test]
607    fn blackhole_map_converts_to_renderer_list() {
608        let value = Value::Map(vec![(
609            Value::Bin(vec![0x11; 16]),
610            Value::Map(vec![
611                (Value::Str("expires".into()), Value::Float(123.0)),
612                (Value::Str("reason".into()), Value::Str("test".into())),
613            ]),
614        )]);
615        let list = blackhole_map_to_list(&value);
616        let items = list.as_list().unwrap();
617        assert_eq!(items.len(), 1);
618        assert_eq!(
619            items[0]
620                .get("identity_hash")
621                .and_then(|v| v.as_bytes())
622                .unwrap(),
623            &[0x11; 16]
624        );
625    }
626}