1use std::path::Path;
2use std::process;
3
4use crate::args::Args;
5use rns_net::config;
6use rns_net::pickle::PickleValue;
7use rns_net::rpc::derive_auth_key;
8use rns_net::storage;
9use rns_net::{RpcAddr, RpcClient};
10use serde_json::{json, Value};
11
12pub fn run(args: Args) {
13 if args.has("help") || args.positional.is_empty() {
14 print_usage();
15 return;
16 }
17
18 let json_output = args.has("j") || args.has("json");
19 let mut client = connect(args.config_path());
20
21 match args.positional.first().map(|s| s.as_str()) {
22 Some("blacklist") => run_blacklist(&args, &mut client, json_output),
23 Some("provider") => run_provider(&mut client, json_output),
24 Some(other) => {
25 eprintln!("Unknown backbone subcommand: {}", other);
26 print_usage();
27 process::exit(1);
28 }
29 None => print_usage(),
30 }
31}
32
33fn run_blacklist(args: &Args, client: &mut RpcClient, json_output: bool) {
34 match args.positional.get(1).map(|s| s.as_str()) {
35 Some("list") => {
36 let mut request = vec![(
37 PickleValue::String("get".into()),
38 PickleValue::String("backbone_peer_state".into()),
39 )];
40 if let Some(interface) = args.positional.get(2) {
41 request.push((
42 PickleValue::String("interface".into()),
43 PickleValue::String(interface.clone()),
44 ));
45 }
46 let response = rpc_call(client, PickleValue::Dict(request));
47 if json_output {
48 println!(
49 "{}",
50 serde_json::to_string_pretty(&pickle_list_to_json(&response))
51 .unwrap_or_default()
52 );
53 } else {
54 print_blacklist(&response);
55 }
56 }
57 Some("clear") => {
58 let interface = args.positional.get(2).cloned().unwrap_or_else(|| {
59 eprintln!("Missing interface name");
60 process::exit(1);
61 });
62 let ip = args.positional.get(3).cloned().unwrap_or_else(|| {
63 eprintln!("Missing peer IP");
64 process::exit(1);
65 });
66 let response = rpc_call(
67 client,
68 PickleValue::Dict(vec![
69 (
70 PickleValue::String("clear".into()),
71 PickleValue::String("backbone_peer_state".into()),
72 ),
73 (
74 PickleValue::String("interface".into()),
75 PickleValue::String(interface),
76 ),
77 (PickleValue::String("ip".into()), PickleValue::String(ip)),
78 ]),
79 );
80 match response {
81 PickleValue::Bool(true) => println!("Cleared"),
82 PickleValue::Bool(false) => {
83 eprintln!("No matching backbone peer state entry");
84 process::exit(1);
85 }
86 _ => {
87 eprintln!("Unexpected response");
88 process::exit(1);
89 }
90 }
91 }
92 _ => {
93 eprintln!("Unknown backbone blacklist action");
94 print_usage();
95 process::exit(1);
96 }
97 }
98}
99
100fn run_provider(client: &mut RpcClient, json_output: bool) {
101 let response = rpc_call(
102 client,
103 PickleValue::Dict(vec![(
104 PickleValue::String("get".into()),
105 PickleValue::String("provider_bridge_stats".into()),
106 )]),
107 );
108 if json_output {
109 println!(
110 "{}",
111 serde_json::to_string_pretty(&provider_stats_to_json(&response)).unwrap_or_default()
112 );
113 } else {
114 print_provider_stats(&response);
115 }
116}
117
118fn print_blacklist(response: &PickleValue) {
119 let Some(entries) = response.as_list() else {
120 eprintln!("Unexpected response");
121 process::exit(1);
122 };
123 if entries.is_empty() {
124 println!("No backbone peer state entries");
125 return;
126 }
127
128 println!(
129 "{:<24} {:<40} {:>5} {:>9} {:>8} {}",
130 "Interface", "IP", "Conn", "BlkSecs", "Rejects", "Reason"
131 );
132 println!("{}", "-".repeat(96));
133 for entry in entries {
134 let interface = entry
135 .get("interface")
136 .and_then(|v| v.as_str())
137 .unwrap_or("-");
138 let ip = entry.get("ip").and_then(|v| v.as_str()).unwrap_or("-");
139 let connected = entry
140 .get("connected_count")
141 .and_then(|v| v.as_int())
142 .unwrap_or(0);
143 let blacklist = entry
144 .get("blacklisted_remaining_secs")
145 .and_then(|v| v.as_float())
146 .map(|v| format!("{:.0}", v))
147 .unwrap_or_else(|| "-".into());
148 let rejects = entry
149 .get("reject_count")
150 .and_then(|v| v.as_int())
151 .unwrap_or(0);
152 let reason = entry
153 .get("blacklist_reason")
154 .and_then(|v| v.as_str())
155 .unwrap_or("");
156 println!(
157 "{:<24} {:<40} {:>5} {:>9} {:>8} {}",
158 interface, ip, connected, blacklist, rejects, reason
159 );
160 }
161}
162
163fn pickle_list_to_json(value: &PickleValue) -> Value {
164 let Some(entries) = value.as_list() else {
165 return Value::Null;
166 };
167 Value::Array(
168 entries
169 .iter()
170 .map(|entry| {
171 json!({
172 "interface": entry.get("interface").and_then(|v| v.as_str()),
173 "ip": entry.get("ip").and_then(|v| v.as_str()),
174 "connected_count": entry.get("connected_count").and_then(|v| v.as_int()),
175 "blacklisted_remaining_secs": entry.get("blacklisted_remaining_secs").and_then(|v| v.as_float()),
176 "blacklist_reason": entry.get("blacklist_reason").and_then(|v| v.as_str()),
177 "reject_count": entry.get("reject_count").and_then(|v| v.as_int()),
178 })
179 })
180 .collect(),
181 )
182}
183
184fn provider_stats_to_json(value: &PickleValue) -> Value {
185 let Some(consumers) = value.get("consumers").and_then(|v| v.as_list()) else {
186 return Value::Null;
187 };
188
189 json!({
190 "connected": value.get("connected").and_then(|v| v.as_bool()),
191 "consumer_count": value.get("consumer_count").and_then(|v| v.as_int()),
192 "queue_max_events": value.get("queue_max_events").and_then(|v| v.as_int()),
193 "queue_max_bytes": value.get("queue_max_bytes").and_then(|v| v.as_int()),
194 "backlog_len": value.get("backlog_len").and_then(|v| v.as_int()),
195 "backlog_bytes": value.get("backlog_bytes").and_then(|v| v.as_int()),
196 "backlog_dropped_pending": value.get("backlog_dropped_pending").and_then(|v| v.as_int()),
197 "backlog_dropped_total": value.get("backlog_dropped_total").and_then(|v| v.as_int()),
198 "total_disconnect_count": value.get("total_disconnect_count").and_then(|v| v.as_int()),
199 "consumers": consumers.iter().map(|consumer| {
200 json!({
201 "id": consumer.get("id").and_then(|v| v.as_int()),
202 "connected": consumer.get("connected").and_then(|v| v.as_bool()),
203 "queue_len": consumer.get("queue_len").and_then(|v| v.as_int()),
204 "queued_bytes": consumer.get("queued_bytes").and_then(|v| v.as_int()),
205 "dropped_pending": consumer.get("dropped_pending").and_then(|v| v.as_int()),
206 "dropped_total": consumer.get("dropped_total").and_then(|v| v.as_int()),
207 "queue_max_events": consumer.get("queue_max_events").and_then(|v| v.as_int()),
208 "queue_max_bytes": consumer.get("queue_max_bytes").and_then(|v| v.as_int()),
209 })
210 }).collect::<Vec<_>>(),
211 })
212}
213
214fn print_provider_stats(response: &PickleValue) {
215 if matches!(response, PickleValue::None) {
216 println!("Provider bridge disabled or unavailable");
217 return;
218 }
219
220 let Some(consumers) = response.get("consumers").and_then(|v| v.as_list()) else {
221 eprintln!("Unexpected response");
222 process::exit(1);
223 };
224
225 println!(
226 "Provider bridge: connected={} consumers={} queue_max_events={} queue_max_bytes={} backlog_len={} backlog_bytes={} backlog_dropped_pending={} backlog_dropped_total={} disconnects={}",
227 response
228 .get("connected")
229 .and_then(|v| v.as_bool())
230 .unwrap_or(false),
231 response
232 .get("consumer_count")
233 .and_then(|v| v.as_int())
234 .unwrap_or(0),
235 response
236 .get("queue_max_events")
237 .and_then(|v| v.as_int())
238 .unwrap_or(0),
239 response
240 .get("queue_max_bytes")
241 .and_then(|v| v.as_int())
242 .unwrap_or(0),
243 response
244 .get("backlog_len")
245 .and_then(|v| v.as_int())
246 .unwrap_or(0),
247 response
248 .get("backlog_bytes")
249 .and_then(|v| v.as_int())
250 .unwrap_or(0),
251 response
252 .get("backlog_dropped_pending")
253 .and_then(|v| v.as_int())
254 .unwrap_or(0),
255 response
256 .get("backlog_dropped_total")
257 .and_then(|v| v.as_int())
258 .unwrap_or(0),
259 response
260 .get("total_disconnect_count")
261 .and_then(|v| v.as_int())
262 .unwrap_or(0),
263 );
264
265 if consumers.is_empty() {
266 println!("No connected provider consumers");
267 return;
268 }
269
270 println!(
271 "{:<4} {:<9} {:>8} {:>12} {:>15} {:>13} {:>11} {:>10}",
272 "ID",
273 "Connected",
274 "Queue",
275 "QueuedBytes",
276 "DroppedPending",
277 "DroppedTotal",
278 "MaxEvents",
279 "MaxBytes"
280 );
281 println!("{}", "-".repeat(96));
282 for consumer in consumers {
283 println!(
284 "{:<4} {:<9} {:>8} {:>12} {:>15} {:>13} {:>11} {:>10}",
285 consumer.get("id").and_then(|v| v.as_int()).unwrap_or(0),
286 if consumer
287 .get("connected")
288 .and_then(|v| v.as_bool())
289 .unwrap_or(false)
290 {
291 "yes"
292 } else {
293 "no"
294 },
295 consumer
296 .get("queue_len")
297 .and_then(|v| v.as_int())
298 .unwrap_or(0),
299 consumer
300 .get("queued_bytes")
301 .and_then(|v| v.as_int())
302 .unwrap_or(0),
303 consumer
304 .get("dropped_pending")
305 .and_then(|v| v.as_int())
306 .unwrap_or(0),
307 consumer
308 .get("dropped_total")
309 .and_then(|v| v.as_int())
310 .unwrap_or(0),
311 consumer
312 .get("queue_max_events")
313 .and_then(|v| v.as_int())
314 .unwrap_or(0),
315 consumer
316 .get("queue_max_bytes")
317 .and_then(|v| v.as_int())
318 .unwrap_or(0),
319 );
320 }
321}
322
323fn connect(config_path: Option<&str>) -> RpcClient {
324 let config_dir = storage::resolve_config_dir(config_path.map(Path::new));
325 let config_file = config_dir.join("config");
326 let rns_config = if config_file.exists() {
327 match config::parse_file(&config_file) {
328 Ok(c) => c,
329 Err(e) => {
330 eprintln!("Error reading config: {}", e);
331 process::exit(1);
332 }
333 }
334 } else {
335 match config::parse("") {
336 Ok(c) => c,
337 Err(e) => {
338 eprintln!("Error: {}", e);
339 process::exit(1);
340 }
341 }
342 };
343
344 let paths = match storage::ensure_storage_dirs(&config_dir) {
345 Ok(p) => p,
346 Err(e) => {
347 eprintln!("Error: {}", e);
348 process::exit(1);
349 }
350 };
351
352 let identity = match storage::load_or_create_identity(&paths.identities) {
353 Ok(id) => id,
354 Err(e) => {
355 eprintln!("Error loading identity: {}", e);
356 process::exit(1);
357 }
358 };
359
360 let auth_key = derive_auth_key(&identity.get_private_key().unwrap_or([0u8; 64]));
361 let rpc_addr = RpcAddr::Tcp(
362 "127.0.0.1".into(),
363 rns_config.reticulum.instance_control_port,
364 );
365 match RpcClient::connect(&rpc_addr, &auth_key) {
366 Ok(client) => client,
367 Err(e) => {
368 eprintln!("Could not connect to rnsd: {}", e);
369 eprintln!("Is rnsd running?");
370 process::exit(1);
371 }
372 }
373}
374
375fn rpc_call(client: &mut RpcClient, request: PickleValue) -> PickleValue {
376 match client.call(&request) {
377 Ok(response) => response,
378 Err(e) => {
379 eprintln!("RPC error: {}", e);
380 process::exit(1);
381 }
382 }
383}
384
385fn print_usage() {
386 println!("Usage:");
387 println!(" rns-ctl backbone blacklist list [INTERFACE] [--json]");
388 println!(" rns-ctl backbone blacklist clear <INTERFACE> <IP>");
389 println!(" rns-ctl backbone provider [--json]");
390}