Skip to main content

rns_ctl/cmd/
backbone.rs

1use std::path::Path;
2use std::process;
3
4use crate::args::Args;
5use rns_net::config;
6use rns_net::pickle::PickleValue;
7use rns_net::rpc::derive_auth_key;
8use rns_net::storage;
9use rns_net::{RpcAddr, RpcClient};
10use serde_json::{json, Value};
11
12pub fn run(args: Args) {
13    if args.has("help") || args.positional.is_empty() {
14        print_usage();
15        return;
16    }
17
18    let json_output = args.has("j") || args.has("json");
19    let mut client = connect(args.config_path());
20
21    match args.positional.first().map(|s| s.as_str()) {
22        Some("blacklist") => run_blacklist(&args, &mut client, json_output),
23        Some("provider") => run_provider(&mut client, json_output),
24        Some(other) => {
25            eprintln!("Unknown backbone subcommand: {}", other);
26            print_usage();
27            process::exit(1);
28        }
29        None => print_usage(),
30    }
31}
32
33fn run_blacklist(args: &Args, client: &mut RpcClient, json_output: bool) {
34    match args.positional.get(1).map(|s| s.as_str()) {
35        Some("list") => {
36            let mut request = vec![(
37                PickleValue::String("get".into()),
38                PickleValue::String("backbone_peer_state".into()),
39            )];
40            if let Some(interface) = args.positional.get(2) {
41                request.push((
42                    PickleValue::String("interface".into()),
43                    PickleValue::String(interface.clone()),
44                ));
45            }
46            let response = rpc_call(client, PickleValue::Dict(request));
47            if json_output {
48                println!(
49                    "{}",
50                    serde_json::to_string_pretty(&pickle_list_to_json(&response))
51                        .unwrap_or_default()
52                );
53            } else {
54                print_blacklist(&response);
55            }
56        }
57        Some("clear") => {
58            let interface = args.positional.get(2).cloned().unwrap_or_else(|| {
59                eprintln!("Missing interface name");
60                process::exit(1);
61            });
62            let ip = args.positional.get(3).cloned().unwrap_or_else(|| {
63                eprintln!("Missing peer IP");
64                process::exit(1);
65            });
66            let response = rpc_call(
67                client,
68                PickleValue::Dict(vec![
69                    (
70                        PickleValue::String("clear".into()),
71                        PickleValue::String("backbone_peer_state".into()),
72                    ),
73                    (
74                        PickleValue::String("interface".into()),
75                        PickleValue::String(interface),
76                    ),
77                    (PickleValue::String("ip".into()), PickleValue::String(ip)),
78                ]),
79            );
80            match response {
81                PickleValue::Bool(true) => println!("Cleared"),
82                PickleValue::Bool(false) => {
83                    eprintln!("No matching backbone peer state entry");
84                    process::exit(1);
85                }
86                _ => {
87                    eprintln!("Unexpected response");
88                    process::exit(1);
89                }
90            }
91        }
92        _ => {
93            eprintln!("Unknown backbone blacklist action");
94            print_usage();
95            process::exit(1);
96        }
97    }
98}
99
100fn run_provider(client: &mut RpcClient, json_output: bool) {
101    let response = rpc_call(
102        client,
103        PickleValue::Dict(vec![(
104            PickleValue::String("get".into()),
105            PickleValue::String("provider_bridge_stats".into()),
106        )]),
107    );
108    if json_output {
109        println!(
110            "{}",
111            serde_json::to_string_pretty(&provider_stats_to_json(&response)).unwrap_or_default()
112        );
113    } else {
114        print_provider_stats(&response);
115    }
116}
117
118fn print_blacklist(response: &PickleValue) {
119    let Some(entries) = response.as_list() else {
120        eprintln!("Unexpected response");
121        process::exit(1);
122    };
123    if entries.is_empty() {
124        println!("No backbone peer state entries");
125        return;
126    }
127
128    println!(
129        "{:<24} {:<40} {:>5} {:>9} {:>8}  {}",
130        "Interface", "IP", "Conn", "BlkSecs", "Rejects", "Reason"
131    );
132    println!("{}", "-".repeat(96));
133    for entry in entries {
134        let interface = entry
135            .get("interface")
136            .and_then(|v| v.as_str())
137            .unwrap_or("-");
138        let ip = entry.get("ip").and_then(|v| v.as_str()).unwrap_or("-");
139        let connected = entry
140            .get("connected_count")
141            .and_then(|v| v.as_int())
142            .unwrap_or(0);
143        let blacklist = entry
144            .get("blacklisted_remaining_secs")
145            .and_then(|v| v.as_float())
146            .map(|v| format!("{:.0}", v))
147            .unwrap_or_else(|| "-".into());
148        let rejects = entry
149            .get("reject_count")
150            .and_then(|v| v.as_int())
151            .unwrap_or(0);
152        let reason = entry
153            .get("blacklist_reason")
154            .and_then(|v| v.as_str())
155            .unwrap_or("");
156        println!(
157            "{:<24} {:<40} {:>5} {:>9} {:>8}  {}",
158            interface, ip, connected, blacklist, rejects, reason
159        );
160    }
161}
162
163fn pickle_list_to_json(value: &PickleValue) -> Value {
164    let Some(entries) = value.as_list() else {
165        return Value::Null;
166    };
167    Value::Array(
168        entries
169            .iter()
170            .map(|entry| {
171                json!({
172                    "interface": entry.get("interface").and_then(|v| v.as_str()),
173                    "ip": entry.get("ip").and_then(|v| v.as_str()),
174                    "connected_count": entry.get("connected_count").and_then(|v| v.as_int()),
175                    "blacklisted_remaining_secs": entry.get("blacklisted_remaining_secs").and_then(|v| v.as_float()),
176                    "blacklist_reason": entry.get("blacklist_reason").and_then(|v| v.as_str()),
177                    "reject_count": entry.get("reject_count").and_then(|v| v.as_int()),
178                })
179            })
180            .collect(),
181    )
182}
183
184fn provider_stats_to_json(value: &PickleValue) -> Value {
185    let Some(consumers) = value.get("consumers").and_then(|v| v.as_list()) else {
186        return Value::Null;
187    };
188
189    json!({
190        "connected": value.get("connected").and_then(|v| v.as_bool()),
191        "consumer_count": value.get("consumer_count").and_then(|v| v.as_int()),
192        "queue_max_events": value.get("queue_max_events").and_then(|v| v.as_int()),
193        "queue_max_bytes": value.get("queue_max_bytes").and_then(|v| v.as_int()),
194        "backlog_len": value.get("backlog_len").and_then(|v| v.as_int()),
195        "backlog_bytes": value.get("backlog_bytes").and_then(|v| v.as_int()),
196        "backlog_dropped_pending": value.get("backlog_dropped_pending").and_then(|v| v.as_int()),
197        "backlog_dropped_total": value.get("backlog_dropped_total").and_then(|v| v.as_int()),
198        "total_disconnect_count": value.get("total_disconnect_count").and_then(|v| v.as_int()),
199        "consumers": consumers.iter().map(|consumer| {
200            json!({
201                "id": consumer.get("id").and_then(|v| v.as_int()),
202                "connected": consumer.get("connected").and_then(|v| v.as_bool()),
203                "queue_len": consumer.get("queue_len").and_then(|v| v.as_int()),
204                "queued_bytes": consumer.get("queued_bytes").and_then(|v| v.as_int()),
205                "dropped_pending": consumer.get("dropped_pending").and_then(|v| v.as_int()),
206                "dropped_total": consumer.get("dropped_total").and_then(|v| v.as_int()),
207                "queue_max_events": consumer.get("queue_max_events").and_then(|v| v.as_int()),
208                "queue_max_bytes": consumer.get("queue_max_bytes").and_then(|v| v.as_int()),
209            })
210        }).collect::<Vec<_>>(),
211    })
212}
213
214fn print_provider_stats(response: &PickleValue) {
215    if matches!(response, PickleValue::None) {
216        println!("Provider bridge disabled or unavailable");
217        return;
218    }
219
220    let Some(consumers) = response.get("consumers").and_then(|v| v.as_list()) else {
221        eprintln!("Unexpected response");
222        process::exit(1);
223    };
224
225    println!(
226        "Provider bridge: connected={} consumers={} queue_max_events={} queue_max_bytes={} backlog_len={} backlog_bytes={} backlog_dropped_pending={} backlog_dropped_total={} disconnects={}",
227        response
228            .get("connected")
229            .and_then(|v| v.as_bool())
230            .unwrap_or(false),
231        response
232            .get("consumer_count")
233            .and_then(|v| v.as_int())
234            .unwrap_or(0),
235        response
236            .get("queue_max_events")
237            .and_then(|v| v.as_int())
238            .unwrap_or(0),
239        response
240            .get("queue_max_bytes")
241            .and_then(|v| v.as_int())
242            .unwrap_or(0),
243        response
244            .get("backlog_len")
245            .and_then(|v| v.as_int())
246            .unwrap_or(0),
247        response
248            .get("backlog_bytes")
249            .and_then(|v| v.as_int())
250            .unwrap_or(0),
251        response
252            .get("backlog_dropped_pending")
253            .and_then(|v| v.as_int())
254            .unwrap_or(0),
255        response
256            .get("backlog_dropped_total")
257            .and_then(|v| v.as_int())
258            .unwrap_or(0),
259        response
260            .get("total_disconnect_count")
261            .and_then(|v| v.as_int())
262            .unwrap_or(0),
263    );
264
265    if consumers.is_empty() {
266        println!("No connected provider consumers");
267        return;
268    }
269
270    println!(
271        "{:<4} {:<9} {:>8} {:>12} {:>15} {:>13} {:>11} {:>10}",
272        "ID",
273        "Connected",
274        "Queue",
275        "QueuedBytes",
276        "DroppedPending",
277        "DroppedTotal",
278        "MaxEvents",
279        "MaxBytes"
280    );
281    println!("{}", "-".repeat(96));
282    for consumer in consumers {
283        println!(
284            "{:<4} {:<9} {:>8} {:>12} {:>15} {:>13} {:>11} {:>10}",
285            consumer.get("id").and_then(|v| v.as_int()).unwrap_or(0),
286            if consumer
287                .get("connected")
288                .and_then(|v| v.as_bool())
289                .unwrap_or(false)
290            {
291                "yes"
292            } else {
293                "no"
294            },
295            consumer
296                .get("queue_len")
297                .and_then(|v| v.as_int())
298                .unwrap_or(0),
299            consumer
300                .get("queued_bytes")
301                .and_then(|v| v.as_int())
302                .unwrap_or(0),
303            consumer
304                .get("dropped_pending")
305                .and_then(|v| v.as_int())
306                .unwrap_or(0),
307            consumer
308                .get("dropped_total")
309                .and_then(|v| v.as_int())
310                .unwrap_or(0),
311            consumer
312                .get("queue_max_events")
313                .and_then(|v| v.as_int())
314                .unwrap_or(0),
315            consumer
316                .get("queue_max_bytes")
317                .and_then(|v| v.as_int())
318                .unwrap_or(0),
319        );
320    }
321}
322
323fn connect(config_path: Option<&str>) -> RpcClient {
324    let config_dir = storage::resolve_config_dir(config_path.map(Path::new));
325    let config_file = config_dir.join("config");
326    let rns_config = if config_file.exists() {
327        match config::parse_file(&config_file) {
328            Ok(c) => c,
329            Err(e) => {
330                eprintln!("Error reading config: {}", e);
331                process::exit(1);
332            }
333        }
334    } else {
335        match config::parse("") {
336            Ok(c) => c,
337            Err(e) => {
338                eprintln!("Error: {}", e);
339                process::exit(1);
340            }
341        }
342    };
343
344    let paths = match storage::ensure_storage_dirs(&config_dir) {
345        Ok(p) => p,
346        Err(e) => {
347            eprintln!("Error: {}", e);
348            process::exit(1);
349        }
350    };
351
352    let identity = match storage::load_or_create_identity(&paths.identities) {
353        Ok(id) => id,
354        Err(e) => {
355            eprintln!("Error loading identity: {}", e);
356            process::exit(1);
357        }
358    };
359
360    let auth_key = derive_auth_key(&identity.get_private_key().unwrap_or([0u8; 64]));
361    let rpc_addr = RpcAddr::Tcp(
362        "127.0.0.1".into(),
363        rns_config.reticulum.instance_control_port,
364    );
365    match RpcClient::connect(&rpc_addr, &auth_key) {
366        Ok(client) => client,
367        Err(e) => {
368            eprintln!("Could not connect to rnsd: {}", e);
369            eprintln!("Is rnsd running?");
370            process::exit(1);
371        }
372    }
373}
374
375fn rpc_call(client: &mut RpcClient, request: PickleValue) -> PickleValue {
376    match client.call(&request) {
377        Ok(response) => response,
378        Err(e) => {
379            eprintln!("RPC error: {}", e);
380            process::exit(1);
381        }
382    }
383}
384
385fn print_usage() {
386    println!("Usage:");
387    println!("    rns-ctl backbone blacklist list [INTERFACE] [--json]");
388    println!("    rns-ctl backbone blacklist clear <INTERFACE> <IP>");
389    println!("    rns-ctl backbone provider [--json]");
390}