1use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_crypto::hmac::hmac_sha256;
22use rns_crypto::sha256::sha256;
23
24use crate::event::{
25 BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
26 HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, PathTableEntry,
27 ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode,
28 RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource,
29 RuntimeConfigValue, SingleInterfaceStat,
30};
31use crate::md5::hmac_md5;
32use crate::pickle::{self, PickleValue};
33
34const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
35const WELCOME: &[u8] = b"#WELCOME#";
36const FAILURE: &[u8] = b"#FAILURE#";
37const CHALLENGE_LEN: usize = 40;
38
39#[derive(Debug, Clone)]
41pub enum RpcAddr {
42 Tcp(String, u16),
43}
44
45pub struct RpcServer {
47 shutdown: Arc<AtomicBool>,
48 thread: Option<thread::JoinHandle<()>>,
49}
50
51impl RpcServer {
52 pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
54 let shutdown = Arc::new(AtomicBool::new(false));
55 let shutdown2 = shutdown.clone();
56
57 let listener = match addr {
58 RpcAddr::Tcp(host, port) => {
59 let l = TcpListener::bind((host.as_str(), *port))?;
60 l.set_nonblocking(true)?;
62 l
63 }
64 };
65
66 let thread = thread::Builder::new()
67 .name("rpc-server".into())
68 .spawn(move || {
69 rpc_server_loop(listener, auth_key, event_tx, shutdown2);
70 })
71 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
72
73 Ok(RpcServer {
74 shutdown,
75 thread: Some(thread),
76 })
77 }
78
79 pub fn stop(&mut self) {
81 self.shutdown.store(true, Ordering::Relaxed);
82 if let Some(handle) = self.thread.take() {
83 let _ = handle.join();
84 }
85 }
86}
87
88impl Drop for RpcServer {
89 fn drop(&mut self) {
90 self.stop();
91 }
92}
93
94fn rpc_server_loop(
95 listener: TcpListener,
96 auth_key: [u8; 32],
97 event_tx: EventSender,
98 shutdown: Arc<AtomicBool>,
99) {
100 loop {
101 if shutdown.load(Ordering::Relaxed) {
102 break;
103 }
104
105 match listener.accept() {
106 Ok((stream, _addr)) => {
107 let _ = stream.set_nonblocking(false);
109 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
110 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
111
112 if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
113 log::debug!("RPC connection error: {}", e);
114 }
115 }
116 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117 thread::sleep(std::time::Duration::from_millis(100));
119 }
120 Err(e) => {
121 log::error!("RPC accept error: {}", e);
122 thread::sleep(std::time::Duration::from_millis(100));
123 }
124 }
125 }
126}
127
128fn handle_connection(
129 mut stream: TcpStream,
130 auth_key: &[u8; 32],
131 event_tx: &EventSender,
132) -> io::Result<()> {
133 server_auth(&mut stream, auth_key)?;
135
136 let request_bytes = recv_bytes(&mut stream)?;
138 let request = pickle::decode(&request_bytes)
139 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
140
141 let response = handle_rpc_request(&request, event_tx)?;
143
144 let response_bytes = pickle::encode(&response);
146 send_bytes(&mut stream, &response_bytes)?;
147
148 Ok(())
149}
150
151fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
153 let mut random_bytes = [0u8; CHALLENGE_LEN];
155 {
157 let mut f = std::fs::File::open("/dev/urandom")?;
158 f.read_exact(&mut random_bytes)?;
159 }
160
161 let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
162 challenge_message.extend_from_slice(CHALLENGE_PREFIX);
163 challenge_message.extend_from_slice(b"{sha256}");
164 challenge_message.extend_from_slice(&random_bytes);
165
166 send_bytes(stream, &challenge_message)?;
167
168 let response = recv_bytes(stream)?;
170
171 let message = &challenge_message[CHALLENGE_PREFIX.len()..];
174
175 if verify_response(auth_key, message, &response) {
176 send_bytes(stream, WELCOME)?;
177 Ok(())
178 } else {
179 send_bytes(stream, FAILURE)?;
180 Err(io::Error::new(
181 io::ErrorKind::PermissionDenied,
182 "auth failed",
183 ))
184 }
185}
186
187fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
189 if response.starts_with(b"{sha256}") {
191 let digest = &response[8..];
192 let expected = hmac_sha256(auth_key, message);
193 constant_time_eq(digest, &expected)
194 }
195 else if response.len() == 16 {
197 let expected = hmac_md5(auth_key, message);
198 constant_time_eq(response, &expected)
199 }
200 else if response.starts_with(b"{md5}") {
202 let digest = &response[5..];
203 let expected = hmac_md5(auth_key, message);
204 constant_time_eq(digest, &expected)
205 } else {
206 false
207 }
208}
209
210fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
212 if a.len() != b.len() {
213 return false;
214 }
215 let mut diff = 0u8;
216 for (x, y) in a.iter().zip(b.iter()) {
217 diff |= x ^ y;
218 }
219 diff == 0
220}
221
222fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
224 let len = data.len() as i32;
225 stream.write_all(&len.to_be_bytes())?;
226 stream.write_all(data)?;
227 stream.flush()
228}
229
230fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
232 let mut len_buf = [0u8; 4];
233 stream.read_exact(&mut len_buf)?;
234 let len = i32::from_be_bytes(len_buf);
235
236 if len < 0 {
237 let mut len8_buf = [0u8; 8];
239 stream.read_exact(&mut len8_buf)?;
240 let len = u64::from_be_bytes(len8_buf) as usize;
241 if len > 64 * 1024 * 1024 {
242 return Err(io::Error::new(
243 io::ErrorKind::InvalidData,
244 "message too large",
245 ));
246 }
247 let mut buf = vec![0u8; len];
248 stream.read_exact(&mut buf)?;
249 Ok(buf)
250 } else {
251 let len = len as usize;
252 if len > 64 * 1024 * 1024 {
253 return Err(io::Error::new(
254 io::ErrorKind::InvalidData,
255 "message too large",
256 ));
257 }
258 let mut buf = vec![0u8; len];
259 stream.read_exact(&mut buf)?;
260 Ok(buf)
261 }
262}
263
264fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
266 if let Some(get_val) = request.get("get") {
268 if let Some(path) = get_val.as_str() {
269 return match path {
270 "interface_stats" => {
271 let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
272 if let QueryResponse::InterfaceStats(stats) = resp {
273 Ok(interface_stats_to_pickle(&stats))
274 } else {
275 Ok(PickleValue::None)
276 }
277 }
278 "path_table" => {
279 let max_hops = request
280 .get("max_hops")
281 .and_then(|v| v.as_int().map(|n| n as u8));
282 let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
283 if let QueryResponse::PathTable(entries) = resp {
284 Ok(path_table_to_pickle(&entries))
285 } else {
286 Ok(PickleValue::None)
287 }
288 }
289 "rate_table" => {
290 let resp = send_query(event_tx, QueryRequest::RateTable)?;
291 if let QueryResponse::RateTable(entries) = resp {
292 Ok(rate_table_to_pickle(&entries))
293 } else {
294 Ok(PickleValue::None)
295 }
296 }
297 "next_hop" => {
298 let hash = extract_dest_hash(request, "destination_hash")?;
299 let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
300 if let QueryResponse::NextHop(Some(nh)) = resp {
301 Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
302 } else {
303 Ok(PickleValue::None)
304 }
305 }
306 "next_hop_if_name" => {
307 let hash = extract_dest_hash(request, "destination_hash")?;
308 let resp =
309 send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
310 if let QueryResponse::NextHopIfName(Some(name)) = resp {
311 Ok(PickleValue::String(name))
312 } else {
313 Ok(PickleValue::None)
314 }
315 }
316 "link_count" => {
317 let resp = send_query(event_tx, QueryRequest::LinkCount)?;
318 if let QueryResponse::LinkCount(n) = resp {
319 Ok(PickleValue::Int(n as i64))
320 } else {
321 Ok(PickleValue::None)
322 }
323 }
324 "transport_identity" => {
325 let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
326 if let QueryResponse::TransportIdentity(Some(hash)) = resp {
327 Ok(PickleValue::Bytes(hash.to_vec()))
328 } else {
329 Ok(PickleValue::None)
330 }
331 }
332 "blackholed" => {
333 let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
334 if let QueryResponse::Blackholed(entries) = resp {
335 Ok(blackholed_to_pickle(&entries))
336 } else {
337 Ok(PickleValue::None)
338 }
339 }
340 "discovered_interfaces" => {
341 let only_available = request
342 .get("only_available")
343 .and_then(|v| v.as_bool())
344 .unwrap_or(false);
345 let only_transport = request
346 .get("only_transport")
347 .and_then(|v| v.as_bool())
348 .unwrap_or(false);
349 let resp = send_query(
350 event_tx,
351 QueryRequest::DiscoveredInterfaces {
352 only_available,
353 only_transport,
354 },
355 )?;
356 if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
357 Ok(discovered_interfaces_to_pickle(&interfaces))
358 } else {
359 Ok(PickleValue::None)
360 }
361 }
362 "hooks" => {
363 let (response_tx, response_rx) = mpsc::channel();
364 event_tx
365 .send(Event::ListHooks { response_tx })
366 .map_err(|_| {
367 io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
368 })?;
369 let hooks = response_rx
370 .recv_timeout(std::time::Duration::from_secs(5))
371 .map_err(|_| {
372 io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
373 })?;
374 Ok(hooks_to_pickle(&hooks))
375 }
376 "runtime_config" => {
377 let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
378 if let QueryResponse::RuntimeConfigList(entries) = resp {
379 Ok(runtime_config_list_to_pickle(&entries))
380 } else {
381 Ok(PickleValue::None)
382 }
383 }
384 "known_destinations" => {
385 let resp = send_query(event_tx, QueryRequest::KnownDestinations)?;
386 if let QueryResponse::KnownDestinations(entries) = resp {
387 Ok(known_destinations_to_pickle(&entries))
388 } else {
389 Ok(PickleValue::None)
390 }
391 }
392 "runtime_config_entry" => {
393 let key = request
394 .get("key")
395 .and_then(|v| v.as_str())
396 .unwrap_or_default()
397 .to_string();
398 let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
399 if let QueryResponse::RuntimeConfigEntry(entry) = resp {
400 Ok(entry
401 .as_ref()
402 .map(runtime_config_entry_to_pickle)
403 .unwrap_or(PickleValue::None))
404 } else {
405 Ok(PickleValue::None)
406 }
407 }
408 "backbone_peer_state" => {
409 let interface_name = request
410 .get("interface")
411 .and_then(|v| v.as_str())
412 .map(|s| s.to_string());
413 let resp =
414 send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
415 if let QueryResponse::BackbonePeerState(entries) = resp {
416 Ok(backbone_peer_state_to_pickle(&entries))
417 } else {
418 Ok(PickleValue::None)
419 }
420 }
421 "backbone_interfaces" => {
422 let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
423 if let QueryResponse::BackboneInterfaces(entries) = resp {
424 Ok(backbone_interfaces_to_pickle(&entries))
425 } else {
426 Ok(PickleValue::None)
427 }
428 }
429 "provider_bridge_stats" => {
430 let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
431 if let QueryResponse::ProviderBridgeStats(stats) = resp {
432 Ok(stats
433 .as_ref()
434 .map(provider_bridge_stats_to_pickle)
435 .unwrap_or(PickleValue::None))
436 } else {
437 Ok(PickleValue::None)
438 }
439 }
440 "drain_status" => {
441 let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
442 if let QueryResponse::DrainStatus(status) = resp {
443 Ok(drain_status_to_pickle(&status))
444 } else {
445 Ok(PickleValue::None)
446 }
447 }
448 _ => Ok(PickleValue::None),
449 };
450 }
451 }
452
453 if let Some(begin_val) = request.get("begin_drain") {
454 let timeout_secs = begin_val
455 .as_float()
456 .or_else(|| begin_val.as_int().map(|value| value as f64))
457 .unwrap_or(0.0)
458 .max(0.0);
459 let timeout = Duration::from_secs_f64(timeout_secs);
460 let _ = event_tx.send(Event::BeginDrain { timeout });
461 return Ok(PickleValue::Bool(true));
462 }
463
464 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
465 if set_val == "known_destination_retained" {
466 let dest_hash = extract_dest_hash(request, "dest_hash")?;
467 let resp = send_query(event_tx, QueryRequest::RetainKnownDestination { dest_hash })?;
468 return if let QueryResponse::RetainKnownDestination(ok) = resp {
469 Ok(PickleValue::Bool(ok))
470 } else {
471 Ok(PickleValue::None)
472 };
473 }
474 if set_val == "identity_retained" {
475 let identity_hash = extract_dest_hash(request, "identity_hash")?;
476 let resp = send_query(event_tx, QueryRequest::RetainIdentity { identity_hash })?;
477 return if let QueryResponse::RetainIdentity(ok) = resp {
478 Ok(PickleValue::Bool(ok))
479 } else {
480 Ok(PickleValue::None)
481 };
482 }
483 if set_val == "known_destination_used" {
484 let dest_hash = extract_dest_hash(request, "dest_hash")?;
485 let resp = send_query(
486 event_tx,
487 QueryRequest::MarkKnownDestinationUsed { dest_hash },
488 )?;
489 return if let QueryResponse::MarkKnownDestinationUsed(ok) = resp {
490 Ok(PickleValue::Bool(ok))
491 } else {
492 Ok(PickleValue::None)
493 };
494 }
495 if set_val == "runtime_config" {
496 let key = request
497 .get("key")
498 .and_then(|v| v.as_str())
499 .unwrap_or_default()
500 .to_string();
501 let Some(value) = request
502 .get("value")
503 .and_then(runtime_config_value_from_pickle)
504 else {
505 return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
506 code: RuntimeConfigErrorCode::InvalidType,
507 message: "runtime-config set requires a scalar value".into(),
508 }));
509 };
510 let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
511 return if let QueryResponse::RuntimeConfigSet(result) = resp {
512 Ok(runtime_config_result_to_pickle(result))
513 } else {
514 Ok(PickleValue::None)
515 };
516 }
517 }
518
519 if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
520 if reset_val == "runtime_config" {
521 let key = request
522 .get("key")
523 .and_then(|v| v.as_str())
524 .unwrap_or_default()
525 .to_string();
526 let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
527 return if let QueryResponse::RuntimeConfigReset(result) = resp {
528 Ok(runtime_config_result_to_pickle(result))
529 } else {
530 Ok(PickleValue::None)
531 };
532 }
533 }
534
535 if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
536 if clear_val == "known_destination_retained" {
537 let dest_hash = extract_dest_hash(request, "dest_hash")?;
538 let resp = send_query(
539 event_tx,
540 QueryRequest::UnretainKnownDestination { dest_hash },
541 )?;
542 return if let QueryResponse::UnretainKnownDestination(ok) = resp {
543 Ok(PickleValue::Bool(ok))
544 } else {
545 Ok(PickleValue::None)
546 };
547 }
548 if clear_val == "backbone_peer_state" {
549 let interface_name = required_string(request, "interface")?;
550 let peer_ip = required_string(request, "ip")?;
551 let peer_ip = peer_ip
552 .parse()
553 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
554 let resp = send_query(
555 event_tx,
556 QueryRequest::ClearBackbonePeerState {
557 interface_name,
558 peer_ip,
559 },
560 )?;
561 return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
562 Ok(PickleValue::Bool(ok))
563 } else {
564 Ok(PickleValue::None)
565 };
566 }
567 }
568
569 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
570 if set_val == "backbone_peer_blacklist" {
571 let interface_name = required_string(request, "interface")?;
572 let peer_ip = required_string(request, "ip")?;
573 let peer_ip: IpAddr = peer_ip
574 .parse()
575 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
576 let duration_secs = request
577 .get("duration_secs")
578 .and_then(|v| v.as_int())
579 .ok_or_else(|| {
580 io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
581 })?;
582 let reason = request
583 .get("reason")
584 .and_then(|v| v.as_str())
585 .unwrap_or("sentinel blacklist")
586 .to_string();
587 let penalty_level = request
588 .get("penalty_level")
589 .and_then(|v| v.as_int())
590 .unwrap_or(0)
591 .clamp(0, u8::MAX as i64) as u8;
592 let resp = send_query(
593 event_tx,
594 QueryRequest::BlacklistBackbonePeer {
595 interface_name,
596 peer_ip,
597 duration: Duration::from_secs(duration_secs as u64),
598 reason,
599 penalty_level,
600 },
601 )?;
602 return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
603 Ok(PickleValue::Bool(ok))
604 } else {
605 Ok(PickleValue::None)
606 };
607 }
608 }
609
610 if let Some(hash_val) = request.get("request_path") {
612 if let Some(hash_bytes) = hash_val.as_bytes() {
613 if hash_bytes.len() >= 16 {
614 let mut dest_hash = [0u8; 16];
615 dest_hash.copy_from_slice(&hash_bytes[..16]);
616 let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
617 return Ok(PickleValue::Bool(true));
618 }
619 }
620 }
621
622 if let Some(hash_val) = request.get("send_probe") {
624 if let Some(hash_bytes) = hash_val.as_bytes() {
625 if hash_bytes.len() >= 16 {
626 let mut dest_hash = [0u8; 16];
627 dest_hash.copy_from_slice(&hash_bytes[..16]);
628 let payload_size = request
629 .get("size")
630 .and_then(|v| v.as_int())
631 .and_then(|n| {
632 if n > 0 && n <= 400 {
633 Some(n as usize)
634 } else {
635 None
636 }
637 })
638 .unwrap_or(16);
639 let resp = send_query(
640 event_tx,
641 QueryRequest::SendProbe {
642 dest_hash,
643 payload_size,
644 },
645 )?;
646 if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
647 return Ok(PickleValue::Dict(vec![
648 (
649 PickleValue::String("packet_hash".into()),
650 PickleValue::Bytes(packet_hash.to_vec()),
651 ),
652 (
653 PickleValue::String("hops".into()),
654 PickleValue::Int(hops as i64),
655 ),
656 ]));
657 } else {
658 return Ok(PickleValue::None);
659 }
660 }
661 }
662 }
663
664 if let Some(hash_val) = request.get("check_proof") {
666 if let Some(hash_bytes) = hash_val.as_bytes() {
667 if hash_bytes.len() >= 32 {
668 let mut packet_hash = [0u8; 32];
669 packet_hash.copy_from_slice(&hash_bytes[..32]);
670 let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
671 if let QueryResponse::CheckProof(Some(rtt)) = resp {
672 return Ok(PickleValue::Float(rtt));
673 } else {
674 return Ok(PickleValue::None);
675 }
676 }
677 }
678 }
679
680 if let Some(hash_val) = request.get("blackhole") {
682 if let Some(hash_bytes) = hash_val.as_bytes() {
683 if hash_bytes.len() >= 16 {
684 let mut identity_hash = [0u8; 16];
685 identity_hash.copy_from_slice(&hash_bytes[..16]);
686 let duration_hours = request.get("duration").and_then(|v| v.as_float());
687 let reason = request
688 .get("reason")
689 .and_then(|v| v.as_str())
690 .map(|s| s.to_string());
691 let resp = send_query(
692 event_tx,
693 QueryRequest::BlackholeIdentity {
694 identity_hash,
695 duration_hours,
696 reason,
697 },
698 )?;
699 return Ok(PickleValue::Bool(matches!(
700 resp,
701 QueryResponse::BlackholeResult(true)
702 )));
703 }
704 }
705 }
706
707 if let Some(hash_val) = request.get("unblackhole") {
709 if let Some(hash_bytes) = hash_val.as_bytes() {
710 if hash_bytes.len() >= 16 {
711 let mut identity_hash = [0u8; 16];
712 identity_hash.copy_from_slice(&hash_bytes[..16]);
713 let resp = send_query(
714 event_tx,
715 QueryRequest::UnblackholeIdentity { identity_hash },
716 )?;
717 return Ok(PickleValue::Bool(matches!(
718 resp,
719 QueryResponse::UnblackholeResult(true)
720 )));
721 }
722 }
723 }
724
725 if let Some(drop_val) = request.get("drop") {
727 if let Some(path) = drop_val.as_str() {
728 return match path {
729 "path" => {
730 let hash = extract_dest_hash(request, "destination_hash")?;
731 let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
732 if let QueryResponse::DropPath(ok) = resp {
733 Ok(PickleValue::Bool(ok))
734 } else {
735 Ok(PickleValue::None)
736 }
737 }
738 "all_via" => {
739 let hash = extract_dest_hash(request, "destination_hash")?;
740 let resp = send_query(
741 event_tx,
742 QueryRequest::DropAllVia {
743 transport_hash: hash,
744 },
745 )?;
746 if let QueryResponse::DropAllVia(n) = resp {
747 Ok(PickleValue::Int(n as i64))
748 } else {
749 Ok(PickleValue::None)
750 }
751 }
752 "announce_queues" => {
753 let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
754 if let QueryResponse::DropAnnounceQueues = resp {
755 Ok(PickleValue::Bool(true))
756 } else {
757 Ok(PickleValue::None)
758 }
759 }
760 _ => Ok(PickleValue::None),
761 };
762 }
763 }
764
765 if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
766 return handle_hook_rpc_request(hook_val, request, event_tx);
767 }
768
769 Ok(PickleValue::None)
770}
771
772fn handle_hook_rpc_request(
773 op: &str,
774 request: &PickleValue,
775 event_tx: &EventSender,
776) -> io::Result<PickleValue> {
777 match op {
778 "load" => {
779 let name = required_string(request, "name")?;
780 let attach_point = required_string(request, "attach_point")?;
781 let priority = request
782 .get("priority")
783 .and_then(|v| v.as_int())
784 .unwrap_or(0) as i32;
785 let wasm = request
786 .get("wasm")
787 .and_then(|v| v.as_bytes())
788 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
789 .to_vec();
790 let (response_tx, response_rx) = mpsc::channel();
791 event_tx
792 .send(Event::LoadHook {
793 name,
794 wasm_bytes: wasm,
795 attach_point,
796 priority,
797 response_tx,
798 })
799 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
800 let response = response_rx
801 .recv_timeout(std::time::Duration::from_secs(5))
802 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
803 Ok(hook_result_to_pickle(response))
804 }
805 "load_builtin" => {
806 let name = required_string(request, "name")?;
807 let attach_point = required_string(request, "attach_point")?;
808 let builtin_id = required_string(request, "builtin_id")?;
809 let priority = request
810 .get("priority")
811 .and_then(|v| v.as_int())
812 .unwrap_or(0) as i32;
813 let (response_tx, response_rx) = mpsc::channel();
814 event_tx
815 .send(Event::LoadBuiltinHook {
816 name,
817 builtin_id,
818 attach_point,
819 priority,
820 response_tx,
821 })
822 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
823 let response = response_rx
824 .recv_timeout(std::time::Duration::from_secs(5))
825 .map_err(|_| {
826 io::Error::new(io::ErrorKind::TimedOut, "built-in hook load timed out")
827 })?;
828 Ok(hook_result_to_pickle(response))
829 }
830 "unload" => {
831 let name = required_string(request, "name")?;
832 let attach_point = required_string(request, "attach_point")?;
833 let (response_tx, response_rx) = mpsc::channel();
834 event_tx
835 .send(Event::UnloadHook {
836 name,
837 attach_point,
838 response_tx,
839 })
840 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
841 let response = response_rx
842 .recv_timeout(std::time::Duration::from_secs(5))
843 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
844 Ok(hook_result_to_pickle(response))
845 }
846 "enable" | "disable" => {
847 let name = required_string(request, "name")?;
848 let attach_point = required_string(request, "attach_point")?;
849 let enabled = op == "enable";
850 let (response_tx, response_rx) = mpsc::channel();
851 event_tx
852 .send(Event::SetHookEnabled {
853 name,
854 attach_point,
855 enabled,
856 response_tx,
857 })
858 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
859 let response = response_rx
860 .recv_timeout(std::time::Duration::from_secs(5))
861 .map_err(|_| {
862 io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
863 })?;
864 Ok(hook_result_to_pickle(response))
865 }
866 "set_priority" => {
867 let name = required_string(request, "name")?;
868 let attach_point = required_string(request, "attach_point")?;
869 let priority = request
870 .get("priority")
871 .and_then(|v| v.as_int())
872 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
873 as i32;
874 let (response_tx, response_rx) = mpsc::channel();
875 event_tx
876 .send(Event::SetHookPriority {
877 name,
878 attach_point,
879 priority,
880 response_tx,
881 })
882 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
883 let response = response_rx
884 .recv_timeout(std::time::Duration::from_secs(5))
885 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
886 Ok(hook_result_to_pickle(response))
887 }
888 _ => Ok(PickleValue::None),
889 }
890}
891
892fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
894 let (resp_tx, resp_rx) = mpsc::channel();
895 event_tx
896 .send(Event::Query(request, resp_tx))
897 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
898 resp_rx
899 .recv_timeout(std::time::Duration::from_secs(5))
900 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
901}
902
903fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
905 let bytes = request
906 .get(key)
907 .and_then(|v| v.as_bytes())
908 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
909 if bytes.len() < 16 {
910 return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
911 }
912 let mut hash = [0u8; 16];
913 hash.copy_from_slice(&bytes[..16]);
914 Ok(hash)
915}
916
917fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
918 request
919 .get(key)
920 .and_then(|v| v.as_str())
921 .map(|s| s.to_string())
922 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
923}
924
925fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
926 match result {
927 Ok(()) => PickleValue::Dict(vec![(
928 PickleValue::String("ok".into()),
929 PickleValue::Bool(true),
930 )]),
931 Err(error) => PickleValue::Dict(vec![
932 (PickleValue::String("ok".into()), PickleValue::Bool(false)),
933 (
934 PickleValue::String("error".into()),
935 PickleValue::String(error),
936 ),
937 ]),
938 }
939}
940
941fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
944 let mut ifaces = Vec::new();
945 for iface in &stats.interfaces {
946 ifaces.push(single_iface_to_pickle(iface));
947 }
948
949 let mut dict = vec![
950 (
951 PickleValue::String("interfaces".into()),
952 PickleValue::List(ifaces),
953 ),
954 (
955 PickleValue::String("transport_enabled".into()),
956 PickleValue::Bool(stats.transport_enabled),
957 ),
958 (
959 PickleValue::String("transport_uptime".into()),
960 PickleValue::Float(stats.transport_uptime),
961 ),
962 (
963 PickleValue::String("rxb".into()),
964 PickleValue::Int(stats.total_rxb as i64),
965 ),
966 (
967 PickleValue::String("txb".into()),
968 PickleValue::Int(stats.total_txb as i64),
969 ),
970 ];
971
972 if let Some(tid) = stats.transport_id {
973 dict.push((
974 PickleValue::String("transport_id".into()),
975 PickleValue::Bytes(tid.to_vec()),
976 ));
977 } else {
978 dict.push((
979 PickleValue::String("transport_id".into()),
980 PickleValue::None,
981 ));
982 }
983
984 if let Some(pr) = stats.probe_responder {
985 dict.push((
986 PickleValue::String("probe_responder".into()),
987 PickleValue::Bytes(pr.to_vec()),
988 ));
989 } else {
990 dict.push((
991 PickleValue::String("probe_responder".into()),
992 PickleValue::None,
993 ));
994 }
995
996 if let Some(pool) = &stats.backbone_peer_pool {
997 let members = pool
998 .members
999 .iter()
1000 .map(|member| {
1001 let mut member_dict = vec![
1002 (
1003 PickleValue::String("name".into()),
1004 PickleValue::String(member.name.clone()),
1005 ),
1006 (
1007 PickleValue::String("remote".into()),
1008 PickleValue::String(member.remote.clone()),
1009 ),
1010 (
1011 PickleValue::String("source".into()),
1012 PickleValue::String(member.source.clone()),
1013 ),
1014 (
1015 PickleValue::String("priority".into()),
1016 PickleValue::Int(member.priority as i64),
1017 ),
1018 (
1019 PickleValue::String("state".into()),
1020 PickleValue::String(member.state.clone()),
1021 ),
1022 (
1023 PickleValue::String("failure_count".into()),
1024 PickleValue::Int(member.failure_count as i64),
1025 ),
1026 ];
1027 member_dict.push((
1028 PickleValue::String("interface_id".into()),
1029 member
1030 .interface_id
1031 .map(|id| PickleValue::Int(id as i64))
1032 .unwrap_or(PickleValue::None),
1033 ));
1034 member_dict.push((
1035 PickleValue::String("last_error".into()),
1036 member
1037 .last_error
1038 .as_ref()
1039 .map(|err| PickleValue::String(err.clone()))
1040 .unwrap_or(PickleValue::None),
1041 ));
1042 member_dict.push((
1043 PickleValue::String("cooldown_remaining_seconds".into()),
1044 member
1045 .cooldown_remaining_seconds
1046 .map(PickleValue::Float)
1047 .unwrap_or(PickleValue::None),
1048 ));
1049 PickleValue::Dict(member_dict)
1050 })
1051 .collect();
1052 dict.push((
1053 PickleValue::String("backbone_peer_pool".into()),
1054 PickleValue::Dict(vec![
1055 (
1056 PickleValue::String("max_connected".into()),
1057 PickleValue::Int(pool.max_connected as i64),
1058 ),
1059 (
1060 PickleValue::String("active_count".into()),
1061 PickleValue::Int(pool.active_count as i64),
1062 ),
1063 (
1064 PickleValue::String("standby_count".into()),
1065 PickleValue::Int(pool.standby_count as i64),
1066 ),
1067 (
1068 PickleValue::String("cooldown_count".into()),
1069 PickleValue::Int(pool.cooldown_count as i64),
1070 ),
1071 (
1072 PickleValue::String("members".into()),
1073 PickleValue::List(members),
1074 ),
1075 ]),
1076 ));
1077 } else {
1078 dict.push((
1079 PickleValue::String("backbone_peer_pool".into()),
1080 PickleValue::None,
1081 ));
1082 }
1083
1084 PickleValue::Dict(dict)
1085}
1086
1087fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
1088 let mut dict = vec![
1089 (
1090 PickleValue::String("id".into()),
1091 PickleValue::Int(s.id as i64),
1092 ),
1093 (
1094 PickleValue::String("name".into()),
1095 PickleValue::String(s.name.clone()),
1096 ),
1097 (
1098 PickleValue::String("status".into()),
1099 PickleValue::Bool(s.status),
1100 ),
1101 (
1102 PickleValue::String("mode".into()),
1103 PickleValue::Int(s.mode as i64),
1104 ),
1105 (
1106 PickleValue::String("rxb".into()),
1107 PickleValue::Int(s.rxb as i64),
1108 ),
1109 (
1110 PickleValue::String("txb".into()),
1111 PickleValue::Int(s.txb as i64),
1112 ),
1113 (
1114 PickleValue::String("rx_packets".into()),
1115 PickleValue::Int(s.rx_packets as i64),
1116 ),
1117 (
1118 PickleValue::String("tx_packets".into()),
1119 PickleValue::Int(s.tx_packets as i64),
1120 ),
1121 (
1122 PickleValue::String("started".into()),
1123 PickleValue::Float(s.started),
1124 ),
1125 (
1126 PickleValue::String("ia_freq".into()),
1127 PickleValue::Float(s.ia_freq),
1128 ),
1129 (
1130 PickleValue::String("oa_freq".into()),
1131 PickleValue::Float(s.oa_freq),
1132 ),
1133 (
1134 PickleValue::String("ip_freq".into()),
1135 PickleValue::Float(s.ip_freq),
1136 ),
1137 (
1138 PickleValue::String("op_freq".into()),
1139 PickleValue::Float(s.op_freq),
1140 ),
1141 (
1142 PickleValue::String("burst_active".into()),
1143 PickleValue::Bool(s.burst_active),
1144 ),
1145 (
1146 PickleValue::String("burst_activated".into()),
1147 PickleValue::Float(s.burst_activated),
1148 ),
1149 (
1150 PickleValue::String("pr_burst_active".into()),
1151 PickleValue::Bool(s.pr_burst_active),
1152 ),
1153 (
1154 PickleValue::String("pr_burst_activated".into()),
1155 PickleValue::Float(s.pr_burst_activated),
1156 ),
1157 (
1158 PickleValue::String("clients".into()),
1159 s.clients
1160 .map(|clients| PickleValue::Int(clients as i64))
1161 .unwrap_or(PickleValue::None),
1162 ),
1163 (
1164 PickleValue::String("announce_rate_grace".into()),
1165 PickleValue::Int(s.announce_rate_grace as i64),
1166 ),
1167 (
1168 PickleValue::String("announce_rate_penalty".into()),
1169 PickleValue::Float(s.announce_rate_penalty),
1170 ),
1171 ];
1172
1173 match s.announce_rate_target {
1174 Some(target) => dict.push((
1175 PickleValue::String("announce_rate_target".into()),
1176 PickleValue::Float(target),
1177 )),
1178 None => dict.push((
1179 PickleValue::String("announce_rate_target".into()),
1180 PickleValue::None,
1181 )),
1182 }
1183
1184 match s.bitrate {
1185 Some(br) => dict.push((
1186 PickleValue::String("bitrate".into()),
1187 PickleValue::Int(br as i64),
1188 )),
1189 None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
1190 }
1191
1192 match s.ifac_size {
1193 Some(sz) => dict.push((
1194 PickleValue::String("ifac_size".into()),
1195 PickleValue::Int(sz as i64),
1196 )),
1197 None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
1198 }
1199
1200 PickleValue::Dict(dict)
1201}
1202
1203fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
1204 let list: Vec<PickleValue> = entries
1205 .iter()
1206 .map(|e| {
1207 PickleValue::Dict(vec![
1208 (
1209 PickleValue::String("hash".into()),
1210 PickleValue::Bytes(e.hash.to_vec()),
1211 ),
1212 (
1213 PickleValue::String("timestamp".into()),
1214 PickleValue::Float(e.timestamp),
1215 ),
1216 (
1217 PickleValue::String("via".into()),
1218 PickleValue::Bytes(e.via.to_vec()),
1219 ),
1220 (
1221 PickleValue::String("hops".into()),
1222 PickleValue::Int(e.hops as i64),
1223 ),
1224 (
1225 PickleValue::String("expires".into()),
1226 PickleValue::Float(e.expires),
1227 ),
1228 (
1229 PickleValue::String("interface".into()),
1230 PickleValue::String(e.interface_name.clone()),
1231 ),
1232 ])
1233 })
1234 .collect();
1235 PickleValue::List(list)
1236}
1237
1238fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1239 let list: Vec<PickleValue> = entries
1240 .iter()
1241 .map(|e| {
1242 PickleValue::Dict(vec![
1243 (
1244 PickleValue::String("hash".into()),
1245 PickleValue::Bytes(e.hash.to_vec()),
1246 ),
1247 (
1248 PickleValue::String("last".into()),
1249 PickleValue::Float(e.last),
1250 ),
1251 (
1252 PickleValue::String("rate_violations".into()),
1253 PickleValue::Int(e.rate_violations as i64),
1254 ),
1255 (
1256 PickleValue::String("blocked_until".into()),
1257 PickleValue::Float(e.blocked_until),
1258 ),
1259 (
1260 PickleValue::String("timestamps".into()),
1261 PickleValue::List(
1262 e.timestamps
1263 .iter()
1264 .map(|&t| PickleValue::Float(t))
1265 .collect(),
1266 ),
1267 ),
1268 ])
1269 })
1270 .collect();
1271 PickleValue::List(list)
1272}
1273
1274fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1275 let list: Vec<PickleValue> = entries
1276 .iter()
1277 .map(|e| {
1278 let mut dict = vec![
1279 (
1280 PickleValue::String("identity_hash".into()),
1281 PickleValue::Bytes(e.identity_hash.to_vec()),
1282 ),
1283 (
1284 PickleValue::String("created".into()),
1285 PickleValue::Float(e.created),
1286 ),
1287 (
1288 PickleValue::String("expires".into()),
1289 PickleValue::Float(e.expires),
1290 ),
1291 ];
1292 if let Some(ref reason) = e.reason {
1293 dict.push((
1294 PickleValue::String("reason".into()),
1295 PickleValue::String(reason.clone()),
1296 ));
1297 } else {
1298 dict.push((PickleValue::String("reason".into()), PickleValue::None));
1299 }
1300 PickleValue::Dict(dict)
1301 })
1302 .collect();
1303 PickleValue::List(list)
1304}
1305
1306fn discovered_interfaces_to_pickle(
1307 interfaces: &[crate::discovery::DiscoveredInterface],
1308) -> PickleValue {
1309 let list: Vec<PickleValue> = interfaces
1310 .iter()
1311 .map(|iface| {
1312 let mut dict = vec![
1313 (
1314 PickleValue::String("type".into()),
1315 PickleValue::String(iface.interface_type.clone()),
1316 ),
1317 (
1318 PickleValue::String("transport".into()),
1319 PickleValue::Bool(iface.transport),
1320 ),
1321 (
1322 PickleValue::String("name".into()),
1323 PickleValue::String(iface.name.clone()),
1324 ),
1325 (
1326 PickleValue::String("discovered".into()),
1327 PickleValue::Float(iface.discovered),
1328 ),
1329 (
1330 PickleValue::String("last_heard".into()),
1331 PickleValue::Float(iface.last_heard),
1332 ),
1333 (
1334 PickleValue::String("heard_count".into()),
1335 PickleValue::Int(iface.heard_count as i64),
1336 ),
1337 (
1338 PickleValue::String("status".into()),
1339 PickleValue::String(iface.status.as_str().into()),
1340 ),
1341 (
1342 PickleValue::String("stamp".into()),
1343 PickleValue::Bytes(iface.stamp.clone()),
1344 ),
1345 (
1346 PickleValue::String("value".into()),
1347 PickleValue::Int(iface.stamp_value as i64),
1348 ),
1349 (
1350 PickleValue::String("transport_id".into()),
1351 PickleValue::Bytes(iface.transport_id.to_vec()),
1352 ),
1353 (
1354 PickleValue::String("network_id".into()),
1355 PickleValue::Bytes(iface.network_id.to_vec()),
1356 ),
1357 (
1358 PickleValue::String("hops".into()),
1359 PickleValue::Int(iface.hops as i64),
1360 ),
1361 ];
1362
1363 if let Some(v) = iface.latitude {
1365 dict.push((
1366 PickleValue::String("latitude".into()),
1367 PickleValue::Float(v),
1368 ));
1369 } else {
1370 dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1371 }
1372 if let Some(v) = iface.longitude {
1373 dict.push((
1374 PickleValue::String("longitude".into()),
1375 PickleValue::Float(v),
1376 ));
1377 } else {
1378 dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1379 }
1380 if let Some(v) = iface.height {
1381 dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1382 } else {
1383 dict.push((PickleValue::String("height".into()), PickleValue::None));
1384 }
1385
1386 if let Some(ref v) = iface.reachable_on {
1388 dict.push((
1389 PickleValue::String("reachable_on".into()),
1390 PickleValue::String(v.clone()),
1391 ));
1392 } else {
1393 dict.push((
1394 PickleValue::String("reachable_on".into()),
1395 PickleValue::None,
1396 ));
1397 }
1398 if let Some(v) = iface.port {
1399 dict.push((
1400 PickleValue::String("port".into()),
1401 PickleValue::Int(v as i64),
1402 ));
1403 } else {
1404 dict.push((PickleValue::String("port".into()), PickleValue::None));
1405 }
1406
1407 if let Some(v) = iface.frequency {
1409 dict.push((
1410 PickleValue::String("frequency".into()),
1411 PickleValue::Int(v as i64),
1412 ));
1413 } else {
1414 dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1415 }
1416 if let Some(v) = iface.bandwidth {
1417 dict.push((
1418 PickleValue::String("bandwidth".into()),
1419 PickleValue::Int(v as i64),
1420 ));
1421 } else {
1422 dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1423 }
1424 if let Some(v) = iface.spreading_factor {
1425 dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1426 } else {
1427 dict.push((PickleValue::String("sf".into()), PickleValue::None));
1428 }
1429 if let Some(v) = iface.coding_rate {
1430 dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1431 } else {
1432 dict.push((PickleValue::String("cr".into()), PickleValue::None));
1433 }
1434 if let Some(ref v) = iface.modulation {
1435 dict.push((
1436 PickleValue::String("modulation".into()),
1437 PickleValue::String(v.clone()),
1438 ));
1439 } else {
1440 dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1441 }
1442 if let Some(v) = iface.channel {
1443 dict.push((
1444 PickleValue::String("channel".into()),
1445 PickleValue::Int(v as i64),
1446 ));
1447 } else {
1448 dict.push((PickleValue::String("channel".into()), PickleValue::None));
1449 }
1450
1451 if let Some(ref v) = iface.ifac_netname {
1453 dict.push((
1454 PickleValue::String("ifac_netname".into()),
1455 PickleValue::String(v.clone()),
1456 ));
1457 } else {
1458 dict.push((
1459 PickleValue::String("ifac_netname".into()),
1460 PickleValue::None,
1461 ));
1462 }
1463 if let Some(ref v) = iface.ifac_netkey {
1464 dict.push((
1465 PickleValue::String("ifac_netkey".into()),
1466 PickleValue::String(v.clone()),
1467 ));
1468 } else {
1469 dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1470 }
1471
1472 if let Some(ref v) = iface.config_entry {
1474 dict.push((
1475 PickleValue::String("config_entry".into()),
1476 PickleValue::String(v.clone()),
1477 ));
1478 } else {
1479 dict.push((
1480 PickleValue::String("config_entry".into()),
1481 PickleValue::None,
1482 ));
1483 }
1484
1485 dict.push((
1486 PickleValue::String("discovery_hash".into()),
1487 PickleValue::Bytes(iface.discovery_hash.to_vec()),
1488 ));
1489
1490 PickleValue::Dict(dict)
1491 })
1492 .collect();
1493 PickleValue::List(list)
1494}
1495
1496fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1497 PickleValue::List(
1498 hooks
1499 .iter()
1500 .map(|hook| {
1501 PickleValue::Dict(vec![
1502 (
1503 PickleValue::String("name".into()),
1504 PickleValue::String(hook.name.clone()),
1505 ),
1506 (
1507 PickleValue::String("type".into()),
1508 PickleValue::String(hook.hook_type.clone()),
1509 ),
1510 (
1511 PickleValue::String("attach_point".into()),
1512 PickleValue::String(hook.attach_point.clone()),
1513 ),
1514 (
1515 PickleValue::String("priority".into()),
1516 PickleValue::Int(hook.priority as i64),
1517 ),
1518 (
1519 PickleValue::String("enabled".into()),
1520 PickleValue::Bool(hook.enabled),
1521 ),
1522 (
1523 PickleValue::String("consecutive_traps".into()),
1524 PickleValue::Int(hook.consecutive_traps as i64),
1525 ),
1526 ])
1527 })
1528 .collect(),
1529 )
1530}
1531
1532fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1533 PickleValue::List(
1534 entries
1535 .iter()
1536 .map(|entry| {
1537 PickleValue::Dict(vec![
1538 (
1539 PickleValue::String("interface".into()),
1540 PickleValue::String(entry.interface_name.clone()),
1541 ),
1542 (
1543 PickleValue::String("ip".into()),
1544 PickleValue::String(entry.peer_ip.to_string()),
1545 ),
1546 (
1547 PickleValue::String("connected_count".into()),
1548 PickleValue::Int(entry.connected_count as i64),
1549 ),
1550 (
1551 PickleValue::String("blacklisted_remaining_secs".into()),
1552 entry
1553 .blacklisted_remaining_secs
1554 .map(PickleValue::Float)
1555 .unwrap_or(PickleValue::None),
1556 ),
1557 (
1558 PickleValue::String("blacklist_reason".into()),
1559 entry
1560 .blacklist_reason
1561 .as_ref()
1562 .map(|v: &String| PickleValue::String(v.clone()))
1563 .unwrap_or(PickleValue::None),
1564 ),
1565 (
1566 PickleValue::String("reject_count".into()),
1567 PickleValue::Int(entry.reject_count as i64),
1568 ),
1569 ])
1570 })
1571 .collect(),
1572 )
1573}
1574
1575fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1576 PickleValue::List(
1577 entries
1578 .iter()
1579 .map(|entry| {
1580 PickleValue::Dict(vec![
1581 (
1582 PickleValue::String("id".into()),
1583 PickleValue::Int(entry.interface_id.0 as i64),
1584 ),
1585 (
1586 PickleValue::String("name".into()),
1587 PickleValue::String(entry.interface_name.clone()),
1588 ),
1589 ])
1590 })
1591 .collect(),
1592 )
1593}
1594
1595fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1596 PickleValue::Dict(vec![
1597 (
1598 PickleValue::String("connected".into()),
1599 PickleValue::Bool(stats.connected),
1600 ),
1601 (
1602 PickleValue::String("consumer_count".into()),
1603 PickleValue::Int(stats.consumer_count as i64),
1604 ),
1605 (
1606 PickleValue::String("queue_max_events".into()),
1607 PickleValue::Int(stats.queue_max_events as i64),
1608 ),
1609 (
1610 PickleValue::String("queue_max_bytes".into()),
1611 PickleValue::Int(stats.queue_max_bytes as i64),
1612 ),
1613 (
1614 PickleValue::String("backlog_len".into()),
1615 PickleValue::Int(stats.backlog_len as i64),
1616 ),
1617 (
1618 PickleValue::String("backlog_bytes".into()),
1619 PickleValue::Int(stats.backlog_bytes as i64),
1620 ),
1621 (
1622 PickleValue::String("backlog_dropped_pending".into()),
1623 PickleValue::Int(stats.backlog_dropped_pending as i64),
1624 ),
1625 (
1626 PickleValue::String("backlog_dropped_total".into()),
1627 PickleValue::Int(stats.backlog_dropped_total as i64),
1628 ),
1629 (
1630 PickleValue::String("total_disconnect_count".into()),
1631 PickleValue::Int(stats.total_disconnect_count as i64),
1632 ),
1633 (
1634 PickleValue::String("consumers".into()),
1635 PickleValue::List(
1636 stats
1637 .consumers
1638 .iter()
1639 .map(|consumer| {
1640 PickleValue::Dict(vec![
1641 (
1642 PickleValue::String("id".into()),
1643 PickleValue::Int(consumer.id as i64),
1644 ),
1645 (
1646 PickleValue::String("connected".into()),
1647 PickleValue::Bool(consumer.connected),
1648 ),
1649 (
1650 PickleValue::String("queue_len".into()),
1651 PickleValue::Int(consumer.queue_len as i64),
1652 ),
1653 (
1654 PickleValue::String("queued_bytes".into()),
1655 PickleValue::Int(consumer.queued_bytes as i64),
1656 ),
1657 (
1658 PickleValue::String("dropped_pending".into()),
1659 PickleValue::Int(consumer.dropped_pending as i64),
1660 ),
1661 (
1662 PickleValue::String("dropped_total".into()),
1663 PickleValue::Int(consumer.dropped_total as i64),
1664 ),
1665 (
1666 PickleValue::String("queue_max_events".into()),
1667 PickleValue::Int(consumer.queue_max_events as i64),
1668 ),
1669 (
1670 PickleValue::String("queue_max_bytes".into()),
1671 PickleValue::Int(consumer.queue_max_bytes as i64),
1672 ),
1673 ])
1674 })
1675 .collect(),
1676 ),
1677 ),
1678 ])
1679}
1680
1681fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1682 match state {
1683 LifecycleState::Active => "active",
1684 LifecycleState::Draining => "draining",
1685 LifecycleState::Stopping => "stopping",
1686 LifecycleState::Stopped => "stopped",
1687 }
1688}
1689
1690fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1691 PickleValue::Dict(vec![
1692 (
1693 PickleValue::String("state".into()),
1694 PickleValue::String(lifecycle_state_name(status.state).into()),
1695 ),
1696 (
1697 PickleValue::String("drain_age_seconds".into()),
1698 status
1699 .drain_age_seconds
1700 .map(PickleValue::Float)
1701 .unwrap_or(PickleValue::None),
1702 ),
1703 (
1704 PickleValue::String("deadline_remaining_seconds".into()),
1705 status
1706 .deadline_remaining_seconds
1707 .map(PickleValue::Float)
1708 .unwrap_or(PickleValue::None),
1709 ),
1710 (
1711 PickleValue::String("drain_complete".into()),
1712 PickleValue::Bool(status.drain_complete),
1713 ),
1714 (
1715 PickleValue::String("interface_writer_queued_frames".into()),
1716 PickleValue::Int(status.interface_writer_queued_frames as i64),
1717 ),
1718 (
1719 PickleValue::String("provider_backlog_events".into()),
1720 PickleValue::Int(status.provider_backlog_events as i64),
1721 ),
1722 (
1723 PickleValue::String("provider_consumer_queued_events".into()),
1724 PickleValue::Int(status.provider_consumer_queued_events as i64),
1725 ),
1726 (
1727 PickleValue::String("detail".into()),
1728 status
1729 .detail
1730 .as_ref()
1731 .map(|detail| PickleValue::String(detail.clone()))
1732 .unwrap_or(PickleValue::None),
1733 ),
1734 ])
1735}
1736
1737fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1738 match value {
1739 RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1740 RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1741 RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1742 RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1743 RuntimeConfigValue::Null => PickleValue::None,
1744 }
1745}
1746
1747fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1748 match value {
1749 PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1750 PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1751 PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1752 PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1753 PickleValue::None => Some(RuntimeConfigValue::Null),
1754 _ => None,
1755 }
1756}
1757
1758fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1759 PickleValue::Dict(vec![
1760 (
1761 PickleValue::String("key".into()),
1762 PickleValue::String(entry.key.clone()),
1763 ),
1764 (
1765 PickleValue::String("value".into()),
1766 runtime_config_value_to_pickle(&entry.value),
1767 ),
1768 (
1769 PickleValue::String("default".into()),
1770 runtime_config_value_to_pickle(&entry.default),
1771 ),
1772 (
1773 PickleValue::String("source".into()),
1774 PickleValue::String(match entry.source {
1775 RuntimeConfigSource::Startup => "startup".into(),
1776 RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1777 }),
1778 ),
1779 (
1780 PickleValue::String("apply_mode".into()),
1781 PickleValue::String(match entry.apply_mode {
1782 RuntimeConfigApplyMode::Immediate => "immediate".into(),
1783 RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1784 RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1785 RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1786 }),
1787 ),
1788 (
1789 PickleValue::String("description".into()),
1790 entry
1791 .description
1792 .as_ref()
1793 .map(|v| PickleValue::String(v.clone()))
1794 .unwrap_or(PickleValue::None),
1795 ),
1796 ])
1797}
1798
1799fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1800 PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1801}
1802
1803fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1804 PickleValue::Dict(vec![
1805 (
1806 PickleValue::String("error".into()),
1807 PickleValue::String(match error.code {
1808 RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1809 RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1810 RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1811 RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1812 RuntimeConfigErrorCode::NotFound => "not_found".into(),
1813 RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1814 }),
1815 ),
1816 (
1817 PickleValue::String("message".into()),
1818 PickleValue::String(error.message.clone()),
1819 ),
1820 ])
1821}
1822
1823fn runtime_config_result_to_pickle(
1824 result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1825) -> PickleValue {
1826 match result {
1827 Ok(entry) => runtime_config_entry_to_pickle(&entry),
1828 Err(error) => runtime_config_error_to_pickle(&error),
1829 }
1830}
1831
1832fn known_destination_entry_to_pickle(entry: &KnownDestinationEntry) -> PickleValue {
1833 PickleValue::Dict(vec![
1834 (
1835 PickleValue::String("dest_hash".into()),
1836 PickleValue::Bytes(entry.dest_hash.to_vec()),
1837 ),
1838 (
1839 PickleValue::String("identity_hash".into()),
1840 PickleValue::Bytes(entry.identity_hash.to_vec()),
1841 ),
1842 (
1843 PickleValue::String("public_key".into()),
1844 PickleValue::Bytes(entry.public_key.to_vec()),
1845 ),
1846 (
1847 PickleValue::String("app_data".into()),
1848 entry
1849 .app_data
1850 .as_ref()
1851 .map(|data: &Vec<u8>| PickleValue::Bytes(data.clone()))
1852 .unwrap_or(PickleValue::None),
1853 ),
1854 (
1855 PickleValue::String("hops".into()),
1856 PickleValue::Int(entry.hops as i64),
1857 ),
1858 (
1859 PickleValue::String("received_at".into()),
1860 PickleValue::Float(entry.received_at),
1861 ),
1862 (
1863 PickleValue::String("receiving_interface".into()),
1864 PickleValue::Int(entry.receiving_interface.0 as i64),
1865 ),
1866 (
1867 PickleValue::String("was_used".into()),
1868 PickleValue::Bool(entry.was_used),
1869 ),
1870 (
1871 PickleValue::String("last_used_at".into()),
1872 entry
1873 .last_used_at
1874 .map(PickleValue::Float)
1875 .unwrap_or(PickleValue::None),
1876 ),
1877 (
1878 PickleValue::String("retained".into()),
1879 PickleValue::Bool(entry.retained),
1880 ),
1881 ])
1882}
1883
1884fn known_destinations_to_pickle(entries: &[KnownDestinationEntry]) -> PickleValue {
1885 PickleValue::List(
1886 entries
1887 .iter()
1888 .map(known_destination_entry_to_pickle)
1889 .collect(),
1890 )
1891}
1892
1893fn parse_known_destination_entry(value: &PickleValue) -> io::Result<KnownDestinationEntry> {
1894 let get_bytes = |key: &str, len: usize| -> io::Result<Vec<u8>> {
1895 let value = value.get(key).ok_or_else(|| {
1896 io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key))
1897 })?;
1898 let bytes = value.as_bytes().ok_or_else(|| {
1899 io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key))
1900 })?;
1901 if bytes.len() != len {
1902 return Err(io::Error::new(
1903 io::ErrorKind::InvalidData,
1904 format!("invalid {} length", key),
1905 ));
1906 }
1907 Ok(bytes.to_vec())
1908 };
1909 let get_int = |key: &str| -> io::Result<i64> {
1910 value
1911 .get(key)
1912 .and_then(|v| v.as_int())
1913 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1914 };
1915 let get_float = |key: &str| -> io::Result<f64> {
1916 value
1917 .get(key)
1918 .and_then(|v| v.as_float())
1919 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1920 };
1921 let get_bool = |key: &str| -> io::Result<bool> {
1922 value
1923 .get(key)
1924 .and_then(|v| v.as_bool())
1925 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1926 };
1927
1928 let mut dest_hash = [0u8; 16];
1929 dest_hash.copy_from_slice(&get_bytes("dest_hash", 16)?);
1930 let mut identity_hash = [0u8; 16];
1931 identity_hash.copy_from_slice(&get_bytes("identity_hash", 16)?);
1932 let mut public_key = [0u8; 64];
1933 public_key.copy_from_slice(&get_bytes("public_key", 64)?);
1934 let app_data = value
1935 .get("app_data")
1936 .and_then(|v| v.as_bytes())
1937 .map(|bytes| bytes.to_vec());
1938 let last_used_at = value.get("last_used_at").and_then(|v| v.as_float());
1939
1940 Ok(KnownDestinationEntry {
1941 dest_hash,
1942 identity_hash,
1943 public_key,
1944 app_data,
1945 hops: get_int("hops")? as u8,
1946 received_at: get_float("received_at")?,
1947 receiving_interface: rns_core::transport::types::InterfaceId(
1948 get_int("receiving_interface")? as u64,
1949 ),
1950 was_used: get_bool("was_used")?,
1951 last_used_at,
1952 retained: get_bool("retained")?,
1953 })
1954}
1955
1956fn parse_known_destination_list(value: &PickleValue) -> io::Result<Vec<KnownDestinationEntry>> {
1957 let list = value
1958 .as_list()
1959 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "expected list"))?;
1960 list.iter().map(parse_known_destination_entry).collect()
1961}
1962
1963pub struct RpcClient {
1967 stream: TcpStream,
1968}
1969
1970impl RpcClient {
1971 pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
1973 let mut stream = match addr {
1974 RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
1975 };
1976
1977 stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
1978 stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
1979
1980 client_auth(&mut stream, auth_key)?;
1982
1983 Ok(RpcClient { stream })
1984 }
1985
1986 pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
1988 let request_bytes = pickle::encode(request);
1989 send_bytes(&mut self.stream, &request_bytes)?;
1990
1991 let response_bytes = recv_bytes(&mut self.stream)?;
1992 pickle::decode(&response_bytes)
1993 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
1994 }
1995
1996 pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
1997 let response = self.call(&PickleValue::Dict(vec![(
1998 PickleValue::String("get".into()),
1999 PickleValue::String("hooks".into()),
2000 )]))?;
2001 parse_hook_list(&response)
2002 }
2003
2004 pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
2005 let response = self.call(&PickleValue::Dict(vec![(
2006 PickleValue::String("begin_drain".into()),
2007 PickleValue::Float(timeout.as_secs_f64()),
2008 )]))?;
2009 Ok(response.as_bool().unwrap_or(false))
2010 }
2011
2012 pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
2013 let response = self.call(&PickleValue::Dict(vec![(
2014 PickleValue::String("get".into()),
2015 PickleValue::String("drain_status".into()),
2016 )]))?;
2017 parse_drain_status(&response)
2018 }
2019
2020 pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
2021 self.call(&PickleValue::Dict(vec![(
2022 PickleValue::String("get".into()),
2023 PickleValue::String("provider_bridge_stats".into()),
2024 )]))
2025 }
2026
2027 pub fn load_hook(
2028 &mut self,
2029 name: &str,
2030 attach_point: &str,
2031 priority: i32,
2032 wasm: &[u8],
2033 ) -> io::Result<Result<(), String>> {
2034 let response = self.call(&PickleValue::Dict(vec![
2035 (
2036 PickleValue::String("hook".into()),
2037 PickleValue::String("load".into()),
2038 ),
2039 (
2040 PickleValue::String("name".into()),
2041 PickleValue::String(name.to_string()),
2042 ),
2043 (
2044 PickleValue::String("attach_point".into()),
2045 PickleValue::String(attach_point.to_string()),
2046 ),
2047 (
2048 PickleValue::String("priority".into()),
2049 PickleValue::Int(priority as i64),
2050 ),
2051 (
2052 PickleValue::String("wasm".into()),
2053 PickleValue::Bytes(wasm.to_vec()),
2054 ),
2055 ]))?;
2056 parse_hook_result(&response)
2057 }
2058
2059 pub fn load_builtin_hook(
2060 &mut self,
2061 name: &str,
2062 attach_point: &str,
2063 priority: i32,
2064 builtin_id: &str,
2065 ) -> io::Result<Result<(), String>> {
2066 let response = self.call(&PickleValue::Dict(vec![
2067 (
2068 PickleValue::String("hook".into()),
2069 PickleValue::String("load_builtin".into()),
2070 ),
2071 (
2072 PickleValue::String("name".into()),
2073 PickleValue::String(name.to_string()),
2074 ),
2075 (
2076 PickleValue::String("attach_point".into()),
2077 PickleValue::String(attach_point.to_string()),
2078 ),
2079 (
2080 PickleValue::String("priority".into()),
2081 PickleValue::Int(priority as i64),
2082 ),
2083 (
2084 PickleValue::String("builtin_id".into()),
2085 PickleValue::String(builtin_id.to_string()),
2086 ),
2087 ]))?;
2088 parse_hook_result(&response)
2089 }
2090
2091 pub fn unload_hook(
2092 &mut self,
2093 name: &str,
2094 attach_point: &str,
2095 ) -> io::Result<Result<(), String>> {
2096 let response = self.call(&PickleValue::Dict(vec![
2097 (
2098 PickleValue::String("hook".into()),
2099 PickleValue::String("unload".into()),
2100 ),
2101 (
2102 PickleValue::String("name".into()),
2103 PickleValue::String(name.to_string()),
2104 ),
2105 (
2106 PickleValue::String("attach_point".into()),
2107 PickleValue::String(attach_point.to_string()),
2108 ),
2109 ]))?;
2110 parse_hook_result(&response)
2111 }
2112
2113 pub fn set_hook_enabled(
2114 &mut self,
2115 name: &str,
2116 attach_point: &str,
2117 enabled: bool,
2118 ) -> io::Result<Result<(), String>> {
2119 let op = if enabled { "enable" } else { "disable" };
2120 let response = self.call(&PickleValue::Dict(vec![
2121 (
2122 PickleValue::String("hook".into()),
2123 PickleValue::String(op.into()),
2124 ),
2125 (
2126 PickleValue::String("name".into()),
2127 PickleValue::String(name.to_string()),
2128 ),
2129 (
2130 PickleValue::String("attach_point".into()),
2131 PickleValue::String(attach_point.to_string()),
2132 ),
2133 ]))?;
2134 parse_hook_result(&response)
2135 }
2136
2137 pub fn set_hook_priority(
2138 &mut self,
2139 name: &str,
2140 attach_point: &str,
2141 priority: i32,
2142 ) -> io::Result<Result<(), String>> {
2143 let response = self.call(&PickleValue::Dict(vec![
2144 (
2145 PickleValue::String("hook".into()),
2146 PickleValue::String("set_priority".into()),
2147 ),
2148 (
2149 PickleValue::String("name".into()),
2150 PickleValue::String(name.to_string()),
2151 ),
2152 (
2153 PickleValue::String("attach_point".into()),
2154 PickleValue::String(attach_point.to_string()),
2155 ),
2156 (
2157 PickleValue::String("priority".into()),
2158 PickleValue::Int(priority as i64),
2159 ),
2160 ]))?;
2161 parse_hook_result(&response)
2162 }
2163
2164 pub fn blacklist_backbone_peer(
2165 &mut self,
2166 interface: &str,
2167 ip: &str,
2168 duration_secs: u64,
2169 reason: Option<&str>,
2170 penalty_level: Option<u8>,
2171 ) -> io::Result<bool> {
2172 let mut request = vec![
2173 (
2174 PickleValue::String("set".into()),
2175 PickleValue::String("backbone_peer_blacklist".into()),
2176 ),
2177 (
2178 PickleValue::String("interface".into()),
2179 PickleValue::String(interface.to_string()),
2180 ),
2181 (
2182 PickleValue::String("ip".into()),
2183 PickleValue::String(ip.to_string()),
2184 ),
2185 (
2186 PickleValue::String("duration_secs".into()),
2187 PickleValue::Int(duration_secs as i64),
2188 ),
2189 ];
2190 if let Some(reason) = reason {
2191 request.push((
2192 PickleValue::String("reason".into()),
2193 PickleValue::String(reason.to_string()),
2194 ));
2195 }
2196 if let Some(level) = penalty_level {
2197 request.push((
2198 PickleValue::String("penalty_level".into()),
2199 PickleValue::Int(level as i64),
2200 ));
2201 }
2202 let response = self.call(&PickleValue::Dict(request))?;
2203 Ok(response.as_bool().unwrap_or(false))
2204 }
2205
2206 pub fn known_destinations(&mut self) -> io::Result<Vec<KnownDestinationEntry>> {
2207 let response = self.call(&PickleValue::Dict(vec![(
2208 PickleValue::String("get".into()),
2209 PickleValue::String("known_destinations".into()),
2210 )]))?;
2211 parse_known_destination_list(&response)
2212 }
2213
2214 pub fn retain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2215 let response = self.call(&PickleValue::Dict(vec![
2216 (
2217 PickleValue::String("set".into()),
2218 PickleValue::String("known_destination_retained".into()),
2219 ),
2220 (
2221 PickleValue::String("dest_hash".into()),
2222 PickleValue::Bytes(dest_hash.to_vec()),
2223 ),
2224 ]))?;
2225 Ok(response.as_bool().unwrap_or(false))
2226 }
2227
2228 pub fn retain_identity(&mut self, identity_hash: [u8; 16]) -> io::Result<bool> {
2229 let response = self.call(&PickleValue::Dict(vec![
2230 (
2231 PickleValue::String("set".into()),
2232 PickleValue::String("identity_retained".into()),
2233 ),
2234 (
2235 PickleValue::String("identity_hash".into()),
2236 PickleValue::Bytes(identity_hash.to_vec()),
2237 ),
2238 ]))?;
2239 Ok(response.as_bool().unwrap_or(false))
2240 }
2241
2242 pub fn unretain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2243 let response = self.call(&PickleValue::Dict(vec![
2244 (
2245 PickleValue::String("clear".into()),
2246 PickleValue::String("known_destination_retained".into()),
2247 ),
2248 (
2249 PickleValue::String("dest_hash".into()),
2250 PickleValue::Bytes(dest_hash.to_vec()),
2251 ),
2252 ]))?;
2253 Ok(response.as_bool().unwrap_or(false))
2254 }
2255
2256 pub fn mark_known_destination_used(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2257 let response = self.call(&PickleValue::Dict(vec![
2258 (
2259 PickleValue::String("set".into()),
2260 PickleValue::String("known_destination_used".into()),
2261 ),
2262 (
2263 PickleValue::String("dest_hash".into()),
2264 PickleValue::Bytes(dest_hash.to_vec()),
2265 ),
2266 ]))?;
2267 Ok(response.as_bool().unwrap_or(false))
2268 }
2269}
2270
2271fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
2272 match value {
2273 "active" => Some(LifecycleState::Active),
2274 "draining" => Some(LifecycleState::Draining),
2275 "stopping" => Some(LifecycleState::Stopping),
2276 "stopped" => Some(LifecycleState::Stopped),
2277 _ => None,
2278 }
2279}
2280
2281fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
2282 if !matches!(value, PickleValue::Dict(_)) {
2283 return Ok(None);
2284 }
2285 let state = value
2286 .get("state")
2287 .and_then(|entry| entry.as_str())
2288 .and_then(parse_lifecycle_state)
2289 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
2290 let drain_age_seconds = value.get("drain_age_seconds").and_then(|entry| {
2291 entry
2292 .as_float()
2293 .or_else(|| entry.as_int().map(|v| v as f64))
2294 });
2295 let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
2296 entry
2297 .as_float()
2298 .or_else(|| entry.as_int().map(|v| v as f64))
2299 });
2300 let drain_complete = value
2301 .get("drain_complete")
2302 .and_then(|entry| entry.as_bool())
2303 .unwrap_or(false);
2304 let interface_writer_queued_frames = value
2305 .get("interface_writer_queued_frames")
2306 .and_then(|entry| entry.as_int())
2307 .unwrap_or(0)
2308 .max(0) as usize;
2309 let provider_backlog_events = value
2310 .get("provider_backlog_events")
2311 .and_then(|entry| entry.as_int())
2312 .unwrap_or(0)
2313 .max(0) as usize;
2314 let provider_consumer_queued_events = value
2315 .get("provider_consumer_queued_events")
2316 .and_then(|entry| entry.as_int())
2317 .unwrap_or(0)
2318 .max(0) as usize;
2319 let detail = value
2320 .get("detail")
2321 .and_then(|entry| entry.as_str().map(|v| v.to_string()));
2322 Ok(Some(DrainStatus {
2323 state,
2324 drain_age_seconds,
2325 deadline_remaining_seconds,
2326 drain_complete,
2327 interface_writer_queued_frames,
2328 provider_backlog_events,
2329 provider_consumer_queued_events,
2330 detail,
2331 }))
2332}
2333
2334fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
2335 let ok = response
2336 .get("ok")
2337 .and_then(|v| v.as_bool())
2338 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
2339 if ok {
2340 Ok(Ok(()))
2341 } else {
2342 Ok(Err(response
2343 .get("error")
2344 .and_then(|v| v.as_str())
2345 .unwrap_or("unknown hook error")
2346 .to_string()))
2347 }
2348}
2349
2350fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
2351 let list = response
2352 .as_list()
2353 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
2354 let mut hooks = Vec::with_capacity(list.len());
2355 for item in list {
2356 hooks.push(HookInfo {
2357 name: item
2358 .get("name")
2359 .and_then(|v| v.as_str())
2360 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
2361 .to_string(),
2362 hook_type: item
2363 .get("type")
2364 .and_then(|v| v.as_str())
2365 .unwrap_or(default_hook_type())
2366 .to_string(),
2367 attach_point: item
2368 .get("attach_point")
2369 .and_then(|v| v.as_str())
2370 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
2371 .to_string(),
2372 priority: item
2373 .get("priority")
2374 .and_then(|v| v.as_int())
2375 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
2376 as i32,
2377 enabled: item
2378 .get("enabled")
2379 .and_then(|v| v.as_bool())
2380 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
2381 consecutive_traps: item
2382 .get("consecutive_traps")
2383 .and_then(|v| v.as_int())
2384 .ok_or_else(|| {
2385 io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
2386 })? as u32,
2387 });
2388 }
2389 Ok(hooks)
2390}
2391
2392fn default_hook_type() -> &'static str {
2393 #[cfg(feature = "rns-hooks-native")]
2394 {
2395 return "native";
2396 }
2397 #[cfg(all(not(feature = "rns-hooks-native"), feature = "rns-hooks-wasm"))]
2398 {
2399 return "wasm";
2400 }
2401 #[cfg(all(not(feature = "rns-hooks-native"), not(feature = "rns-hooks-wasm")))]
2402 {
2403 "wasm"
2404 }
2405}
2406
2407fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
2409 let challenge = recv_bytes(stream)?;
2411
2412 if !challenge.starts_with(CHALLENGE_PREFIX) {
2413 return Err(io::Error::new(
2414 io::ErrorKind::InvalidData,
2415 "expected challenge",
2416 ));
2417 }
2418
2419 let message = &challenge[CHALLENGE_PREFIX.len()..];
2420
2421 let response = create_response(auth_key, message);
2423 send_bytes(stream, &response)?;
2424
2425 let result = recv_bytes(stream)?;
2427 if result == WELCOME {
2428 Ok(())
2429 } else {
2430 Err(io::Error::new(
2431 io::ErrorKind::PermissionDenied,
2432 "authentication failed",
2433 ))
2434 }
2435}
2436
2437fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
2439 if message.starts_with(b"{sha256}") || message.len() > 20 {
2441 let digest = hmac_sha256(auth_key, message);
2443 let mut response = Vec::with_capacity(8 + 32);
2444 response.extend_from_slice(b"{sha256}");
2445 response.extend_from_slice(&digest);
2446 response
2447 } else {
2448 let digest = hmac_md5(auth_key, message);
2450 digest.to_vec()
2451 }
2452}
2453
2454pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
2456 sha256(private_key)
2457}
2458
2459#[cfg(test)]
2460mod tests {
2461 use super::*;
2462
2463 #[test]
2464 fn send_recv_bytes_roundtrip() {
2465 let (mut c1, mut c2) = tcp_pair();
2466 let data = b"hello world";
2467 send_bytes(&mut c1, data).unwrap();
2468 let received = recv_bytes(&mut c2).unwrap();
2469 assert_eq!(&received, data);
2470 }
2471
2472 #[test]
2473 fn send_recv_empty() {
2474 let (mut c1, mut c2) = tcp_pair();
2475 send_bytes(&mut c1, b"").unwrap();
2476 let received = recv_bytes(&mut c2).unwrap();
2477 assert!(received.is_empty());
2478 }
2479
2480 #[test]
2481 fn auth_success() {
2482 let key = derive_auth_key(b"test-private-key");
2483 let (mut server, mut client) = tcp_pair();
2484
2485 let key2 = key;
2486 let t = thread::spawn(move || {
2487 client_auth(&mut client, &key2).unwrap();
2488 });
2489
2490 server_auth(&mut server, &key).unwrap();
2491 t.join().unwrap();
2492 }
2493
2494 #[test]
2495 fn auth_failure_wrong_key() {
2496 let server_key = derive_auth_key(b"server-key");
2497 let client_key = derive_auth_key(b"wrong-key");
2498 let (mut server, mut client) = tcp_pair();
2499
2500 let t = thread::spawn(move || {
2501 let result = client_auth(&mut client, &client_key);
2502 assert!(result.is_err());
2503 });
2504
2505 let result = server_auth(&mut server, &server_key);
2506 assert!(result.is_err());
2507 t.join().unwrap();
2508 }
2509
2510 #[test]
2511 fn verify_sha256_response() {
2512 let key = derive_auth_key(b"mykey");
2513 let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2514 let response = create_response(&key, message);
2515 assert!(response.starts_with(b"{sha256}"));
2516 assert!(verify_response(&key, message, &response));
2517 }
2518
2519 #[test]
2520 fn verify_legacy_md5_response() {
2521 let key = derive_auth_key(b"mykey");
2522 let message = b"01234567890123456789";
2524 let digest = hmac_md5(&key, message);
2526 assert!(verify_response(&key, message, &digest));
2527 }
2528
2529 #[test]
2530 fn constant_time_eq_works() {
2531 assert!(constant_time_eq(b"hello", b"hello"));
2532 assert!(!constant_time_eq(b"hello", b"world"));
2533 assert!(!constant_time_eq(b"hello", b"hell"));
2534 }
2535
2536 #[test]
2537 fn rpc_roundtrip() {
2538 let key = derive_auth_key(b"test-key");
2539 let (event_tx, event_rx) = crate::event::channel();
2540
2541 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2544 let port = listener.local_addr().unwrap().port();
2545 listener.set_nonblocking(true).unwrap();
2546
2547 let shutdown = Arc::new(AtomicBool::new(false));
2548 let shutdown2 = shutdown.clone();
2549
2550 let driver_thread = thread::spawn(move || loop {
2552 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2553 Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2554 let _ = resp_tx.send(QueryResponse::LinkCount(42));
2555 }
2556 Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2557 let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2558 interfaces: vec![SingleInterfaceStat {
2559 id: 7,
2560 name: "TestInterface".into(),
2561 status: true,
2562 mode: 1,
2563 rxb: 1000,
2564 txb: 2000,
2565 rx_packets: 10,
2566 tx_packets: 20,
2567 bitrate: Some(10_000_000),
2568 ifac_size: None,
2569 started: 1000.0,
2570 ia_freq: 0.0,
2571 ip_freq: 0.0,
2572 op_freq: 0.0,
2573 op_samples: 0,
2574 burst_active: false,
2575 burst_activated: 0.0,
2576 pr_burst_active: false,
2577 pr_burst_activated: 0.0,
2578 oa_freq: 0.0,
2579 clients: Some(2),
2580 announce_rate_target: Some(3600.0),
2581 announce_rate_grace: 5,
2582 announce_rate_penalty: 0.0,
2583 interface_type: "TestInterface".into(),
2584 }],
2585 transport_id: None,
2586 transport_enabled: true,
2587 transport_uptime: 3600.0,
2588 total_rxb: 1000,
2589 total_txb: 2000,
2590 probe_responder: None,
2591 backbone_peer_pool: None,
2592 }));
2593 }
2594 _ => break,
2595 }
2596 });
2597
2598 let key2 = key;
2599 let shutdown3 = shutdown2.clone();
2600 let server_thread = thread::spawn(move || {
2601 rpc_server_loop(listener, key2, event_tx, shutdown3);
2602 });
2603
2604 thread::sleep(std::time::Duration::from_millis(50));
2606
2607 let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2609 let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2610 let response = client
2611 .call(&PickleValue::Dict(vec![(
2612 PickleValue::String("get".into()),
2613 PickleValue::String("link_count".into()),
2614 )]))
2615 .unwrap();
2616 assert_eq!(response.as_int().unwrap(), 42);
2617 drop(client);
2618
2619 let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2621 let response2 = client2
2622 .call(&PickleValue::Dict(vec![(
2623 PickleValue::String("get".into()),
2624 PickleValue::String("interface_stats".into()),
2625 )]))
2626 .unwrap();
2627 let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2628 assert_eq!(ifaces.len(), 1);
2629 let iface = &ifaces[0];
2630 assert_eq!(
2631 iface.get("name").unwrap().as_str().unwrap(),
2632 "TestInterface"
2633 );
2634 assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2635 drop(client2);
2636
2637 shutdown2.store(true, Ordering::Relaxed);
2639 server_thread.join().unwrap();
2640 driver_thread.join().unwrap();
2641 }
2642
2643 #[test]
2644 fn derive_auth_key_deterministic() {
2645 let key1 = derive_auth_key(b"test");
2646 let key2 = derive_auth_key(b"test");
2647 assert_eq!(key1, key2);
2648 let key3 = derive_auth_key(b"other");
2650 assert_ne!(key1, key3);
2651 }
2652
2653 #[test]
2654 fn pickle_request_handling() {
2655 let (event_tx, event_rx) = crate::event::channel();
2657
2658 let driver = thread::spawn(move || {
2659 if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2660 {
2661 assert_eq!(dest_hash, [1u8; 16]);
2662 let _ = resp_tx.send(QueryResponse::DropPath(true));
2663 }
2664 });
2665
2666 let request = PickleValue::Dict(vec![
2667 (
2668 PickleValue::String("drop".into()),
2669 PickleValue::String("path".into()),
2670 ),
2671 (
2672 PickleValue::String("destination_hash".into()),
2673 PickleValue::Bytes(vec![1u8; 16]),
2674 ),
2675 ]);
2676
2677 let response = handle_rpc_request(&request, &event_tx).unwrap();
2678 assert_eq!(response, PickleValue::Bool(true));
2679 driver.join().unwrap();
2680 }
2681
2682 #[test]
2683 fn hook_list_request_handling() {
2684 let (event_tx, event_rx) = crate::event::channel();
2685
2686 let driver = thread::spawn(move || {
2687 if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2688 let _ = response_tx.send(vec![HookInfo {
2689 name: "stats".into(),
2690 hook_type: "wasm".into(),
2691 attach_point: "PreIngress".into(),
2692 priority: 7,
2693 enabled: true,
2694 consecutive_traps: 0,
2695 }]);
2696 }
2697 });
2698
2699 let request = PickleValue::Dict(vec![(
2700 PickleValue::String("get".into()),
2701 PickleValue::String("hooks".into()),
2702 )]);
2703 let response = handle_rpc_request(&request, &event_tx).unwrap();
2704 let hooks = parse_hook_list(&response).unwrap();
2705 assert_eq!(hooks.len(), 1);
2706 assert_eq!(hooks[0].name, "stats");
2707 driver.join().unwrap();
2708 }
2709
2710 #[test]
2711 fn hook_load_request_handling() {
2712 let (event_tx, event_rx) = crate::event::channel();
2713
2714 let driver = thread::spawn(move || {
2715 if let Ok(Event::LoadHook {
2716 name,
2717 wasm_bytes,
2718 attach_point,
2719 priority,
2720 response_tx,
2721 }) = event_rx.recv()
2722 {
2723 assert_eq!(name, "stats");
2724 assert_eq!(attach_point, "PreIngress");
2725 assert_eq!(priority, 11);
2726 assert_eq!(wasm_bytes, vec![1, 2, 3]);
2727 let _ = response_tx.send(Ok(()));
2728 }
2729 });
2730
2731 let request = PickleValue::Dict(vec![
2732 (
2733 PickleValue::String("hook".into()),
2734 PickleValue::String("load".into()),
2735 ),
2736 (
2737 PickleValue::String("name".into()),
2738 PickleValue::String("stats".into()),
2739 ),
2740 (
2741 PickleValue::String("attach_point".into()),
2742 PickleValue::String("PreIngress".into()),
2743 ),
2744 (PickleValue::String("priority".into()), PickleValue::Int(11)),
2745 (
2746 PickleValue::String("wasm".into()),
2747 PickleValue::Bytes(vec![1, 2, 3]),
2748 ),
2749 ]);
2750 let response = handle_rpc_request(&request, &event_tx).unwrap();
2751 assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2752 driver.join().unwrap();
2753 }
2754
2755 #[test]
2756 fn interface_stats_pickle_format() {
2757 let stats = InterfaceStatsResponse {
2758 interfaces: vec![SingleInterfaceStat {
2759 id: 1,
2760 name: "TCP".into(),
2761 status: true,
2762 mode: 1,
2763 rxb: 100,
2764 txb: 200,
2765 rx_packets: 5,
2766 tx_packets: 10,
2767 bitrate: Some(1000000),
2768 ifac_size: Some(16),
2769 started: 1000.0,
2770 ia_freq: 1.0,
2771 ip_freq: 2.0,
2772 op_freq: 3.0,
2773 op_samples: 0,
2774 burst_active: true,
2775 burst_activated: 1200.0,
2776 pr_burst_active: true,
2777 pr_burst_activated: 1300.0,
2778 oa_freq: 4.0,
2779 clients: Some(3),
2780 announce_rate_target: Some(3600.0),
2781 announce_rate_grace: 5,
2782 announce_rate_penalty: 0.0,
2783 interface_type: "TCPClientInterface".into(),
2784 }],
2785 transport_id: Some([0xAB; 16]),
2786 transport_enabled: true,
2787 transport_uptime: 3600.0,
2788 total_rxb: 100,
2789 total_txb: 200,
2790 probe_responder: None,
2791 backbone_peer_pool: None,
2792 };
2793
2794 let pickle = interface_stats_to_pickle(&stats);
2795
2796 let encoded = pickle::encode(&pickle);
2798 let decoded = pickle::decode(&encoded).unwrap();
2799 assert_eq!(
2800 decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2801 true
2802 );
2803 let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2804 assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2805 assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2806 assert_eq!(ifaces[0].get("ia_freq").unwrap().as_float().unwrap(), 1.0);
2807 assert_eq!(ifaces[0].get("ip_freq").unwrap().as_float().unwrap(), 2.0);
2808 assert_eq!(ifaces[0].get("op_freq").unwrap().as_float().unwrap(), 3.0);
2809 assert_eq!(ifaces[0].get("oa_freq").unwrap().as_float().unwrap(), 4.0);
2810 assert_eq!(
2811 ifaces[0].get("burst_active").unwrap().as_bool().unwrap(),
2812 true
2813 );
2814 assert_eq!(
2815 ifaces[0]
2816 .get("burst_activated")
2817 .unwrap()
2818 .as_float()
2819 .unwrap(),
2820 1200.0
2821 );
2822 assert_eq!(
2823 ifaces[0].get("pr_burst_active").unwrap().as_bool().unwrap(),
2824 true
2825 );
2826 assert_eq!(
2827 ifaces[0]
2828 .get("pr_burst_activated")
2829 .unwrap()
2830 .as_float()
2831 .unwrap(),
2832 1300.0
2833 );
2834 assert_eq!(
2835 ifaces[0]
2836 .get("announce_rate_target")
2837 .unwrap()
2838 .as_float()
2839 .unwrap(),
2840 3600.0
2841 );
2842 assert_eq!(
2843 ifaces[0]
2844 .get("announce_rate_grace")
2845 .unwrap()
2846 .as_int()
2847 .unwrap(),
2848 5
2849 );
2850 assert_eq!(
2851 ifaces[0]
2852 .get("announce_rate_penalty")
2853 .unwrap()
2854 .as_float()
2855 .unwrap(),
2856 0.0
2857 );
2858 assert_eq!(ifaces[0].get("clients").unwrap().as_int().unwrap(), 3);
2859 }
2860
2861 #[test]
2862 fn interface_stats_pickle_includes_backbone_peer_pool_priority() {
2863 let stats = InterfaceStatsResponse {
2864 interfaces: vec![],
2865 transport_id: None,
2866 transport_enabled: true,
2867 transport_uptime: 1.0,
2868 total_rxb: 0,
2869 total_txb: 0,
2870 probe_responder: None,
2871 backbone_peer_pool: Some(crate::event::BackbonePeerPoolStatus {
2872 max_connected: 1,
2873 active_count: 1,
2874 standby_count: 0,
2875 cooldown_count: 0,
2876 members: vec![crate::event::BackbonePeerPoolMemberStatus {
2877 name: "peer".into(),
2878 remote: "127.0.0.1:4242".into(),
2879 source: "configured".into(),
2880 priority: 77,
2881 state: "active".into(),
2882 interface_id: Some(42),
2883 failure_count: 0,
2884 last_error: None,
2885 cooldown_remaining_seconds: None,
2886 }],
2887 }),
2888 };
2889
2890 let pickle = interface_stats_to_pickle(&stats);
2891 let encoded = pickle::encode(&pickle);
2892 let decoded = pickle::decode(&encoded).unwrap();
2893 let pool = decoded.get("backbone_peer_pool").unwrap();
2894 let members = pool.get("members").unwrap().as_list().unwrap();
2895 assert_eq!(members[0].get("priority").unwrap().as_int().unwrap(), 77);
2896 }
2897
2898 #[test]
2899 fn send_probe_rpc_unknown_dest() {
2900 let (event_tx, event_rx) = crate::event::channel();
2901
2902 let driver = thread::spawn(move || {
2903 if let Ok(Event::Query(
2904 QueryRequest::SendProbe {
2905 dest_hash,
2906 payload_size,
2907 },
2908 resp_tx,
2909 )) = event_rx.recv()
2910 {
2911 assert_eq!(dest_hash, [0xAA; 16]);
2912 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2914 }
2915 });
2916
2917 let request = PickleValue::Dict(vec![(
2918 PickleValue::String("send_probe".into()),
2919 PickleValue::Bytes(vec![0xAA; 16]),
2920 )]);
2921
2922 let response = handle_rpc_request(&request, &event_tx).unwrap();
2923 assert_eq!(response, PickleValue::None);
2924 driver.join().unwrap();
2925 }
2926
2927 #[test]
2928 fn send_probe_rpc_with_result() {
2929 let (event_tx, event_rx) = crate::event::channel();
2930
2931 let packet_hash = [0xBB; 32];
2932 let driver = thread::spawn(move || {
2933 if let Ok(Event::Query(
2934 QueryRequest::SendProbe {
2935 dest_hash,
2936 payload_size,
2937 },
2938 resp_tx,
2939 )) = event_rx.recv()
2940 {
2941 assert_eq!(dest_hash, [0xCC; 16]);
2942 assert_eq!(payload_size, 32);
2943 let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
2944 }
2945 });
2946
2947 let request = PickleValue::Dict(vec![
2948 (
2949 PickleValue::String("send_probe".into()),
2950 PickleValue::Bytes(vec![0xCC; 16]),
2951 ),
2952 (PickleValue::String("size".into()), PickleValue::Int(32)),
2953 ]);
2954
2955 let response = handle_rpc_request(&request, &event_tx).unwrap();
2956 let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
2957 assert_eq!(ph, &[0xBB; 32]);
2958 assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
2959 driver.join().unwrap();
2960 }
2961
2962 #[test]
2963 fn retain_identity_rpc_sends_identity_hash_query() {
2964 let (event_tx, event_rx) = crate::event::channel();
2965 let identity_hash = [0x44; 16];
2966
2967 let driver = thread::spawn(move || {
2968 if let Ok(Event::Query(
2969 QueryRequest::RetainIdentity {
2970 identity_hash: hash,
2971 },
2972 resp_tx,
2973 )) = event_rx.recv()
2974 {
2975 assert_eq!(hash, identity_hash);
2976 let _ = resp_tx.send(QueryResponse::RetainIdentity(true));
2977 }
2978 });
2979
2980 let request = PickleValue::Dict(vec![
2981 (
2982 PickleValue::String("set".into()),
2983 PickleValue::String("identity_retained".into()),
2984 ),
2985 (
2986 PickleValue::String("identity_hash".into()),
2987 PickleValue::Bytes(identity_hash.to_vec()),
2988 ),
2989 ]);
2990
2991 let response = handle_rpc_request(&request, &event_tx).unwrap();
2992 assert_eq!(response, PickleValue::Bool(true));
2993 driver.join().unwrap();
2994 }
2995
2996 #[test]
2997 fn send_probe_rpc_size_validation() {
2998 let (event_tx, event_rx) = crate::event::channel();
2999
3000 let driver = thread::spawn(move || {
3002 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
3003 event_rx.recv()
3004 {
3005 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
3007 }
3008 });
3009
3010 let request = PickleValue::Dict(vec![
3011 (
3012 PickleValue::String("send_probe".into()),
3013 PickleValue::Bytes(vec![0xDD; 16]),
3014 ),
3015 (PickleValue::String("size".into()), PickleValue::Int(-1)),
3016 ]);
3017
3018 let response = handle_rpc_request(&request, &event_tx).unwrap();
3019 assert_eq!(response, PickleValue::None);
3020 driver.join().unwrap();
3021 }
3022
3023 #[test]
3024 fn send_probe_rpc_size_too_large() {
3025 let (event_tx, event_rx) = crate::event::channel();
3026
3027 let driver = thread::spawn(move || {
3029 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
3030 event_rx.recv()
3031 {
3032 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
3034 }
3035 });
3036
3037 let request = PickleValue::Dict(vec![
3038 (
3039 PickleValue::String("send_probe".into()),
3040 PickleValue::Bytes(vec![0xDD; 16]),
3041 ),
3042 (PickleValue::String("size".into()), PickleValue::Int(999)),
3043 ]);
3044
3045 let response = handle_rpc_request(&request, &event_tx).unwrap();
3046 assert_eq!(response, PickleValue::None);
3047 driver.join().unwrap();
3048 }
3049
3050 #[test]
3051 fn check_proof_rpc_not_found() {
3052 let (event_tx, event_rx) = crate::event::channel();
3053
3054 let driver = thread::spawn(move || {
3055 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
3056 event_rx.recv()
3057 {
3058 assert_eq!(packet_hash, [0xEE; 32]);
3059 let _ = resp_tx.send(QueryResponse::CheckProof(None));
3060 }
3061 });
3062
3063 let request = PickleValue::Dict(vec![(
3064 PickleValue::String("check_proof".into()),
3065 PickleValue::Bytes(vec![0xEE; 32]),
3066 )]);
3067
3068 let response = handle_rpc_request(&request, &event_tx).unwrap();
3069 assert_eq!(response, PickleValue::None);
3070 driver.join().unwrap();
3071 }
3072
3073 #[test]
3074 fn check_proof_rpc_found() {
3075 let (event_tx, event_rx) = crate::event::channel();
3076
3077 let driver = thread::spawn(move || {
3078 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
3079 event_rx.recv()
3080 {
3081 assert_eq!(packet_hash, [0xFF; 32]);
3082 let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
3083 }
3084 });
3085
3086 let request = PickleValue::Dict(vec![(
3087 PickleValue::String("check_proof".into()),
3088 PickleValue::Bytes(vec![0xFF; 32]),
3089 )]);
3090
3091 let response = handle_rpc_request(&request, &event_tx).unwrap();
3092 if let PickleValue::Float(rtt) = response {
3093 assert!((rtt - 0.352).abs() < 0.001);
3094 } else {
3095 panic!("Expected Float, got {:?}", response);
3096 }
3097 driver.join().unwrap();
3098 }
3099
3100 #[test]
3101 fn request_path_rpc() {
3102 let (event_tx, event_rx) = crate::event::channel();
3103
3104 let driver =
3105 thread::spawn(
3106 move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
3107 Ok(Event::RequestPath { dest_hash }) => {
3108 assert_eq!(dest_hash, [0x11; 16]);
3109 }
3110 other => panic!("Expected RequestPath event, got {:?}", other),
3111 },
3112 );
3113
3114 let request = PickleValue::Dict(vec![(
3115 PickleValue::String("request_path".into()),
3116 PickleValue::Bytes(vec![0x11; 16]),
3117 )]);
3118
3119 let response = handle_rpc_request(&request, &event_tx).unwrap();
3120 assert_eq!(response, PickleValue::Bool(true));
3121 driver.join().unwrap();
3122 }
3123
3124 #[test]
3125 fn begin_drain_rpc_emits_event() {
3126 let (event_tx, event_rx) = crate::event::channel();
3127
3128 let driver = thread::spawn(
3129 move || match event_rx.recv_timeout(Duration::from_secs(5)) {
3130 Ok(Event::BeginDrain { timeout }) => {
3131 assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
3132 }
3133 other => panic!("Expected BeginDrain event, got {:?}", other),
3134 },
3135 );
3136
3137 let request = PickleValue::Dict(vec![(
3138 PickleValue::String("begin_drain".into()),
3139 PickleValue::Float(1.5),
3140 )]);
3141
3142 let response = handle_rpc_request(&request, &event_tx).unwrap();
3143 assert_eq!(response, PickleValue::Bool(true));
3144 driver.join().unwrap();
3145 }
3146
3147 #[test]
3148 fn drain_status_rpc_roundtrips_fields() {
3149 let (event_tx, event_rx) = crate::event::channel();
3150
3151 let driver = thread::spawn(move || {
3152 if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
3153 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
3154 state: LifecycleState::Draining,
3155 drain_age_seconds: Some(0.75),
3156 deadline_remaining_seconds: Some(2.25),
3157 drain_complete: false,
3158 interface_writer_queued_frames: 3,
3159 provider_backlog_events: 4,
3160 provider_consumer_queued_events: 5,
3161 detail: Some("node is draining existing work".into()),
3162 }));
3163 }
3164 });
3165
3166 let request = PickleValue::Dict(vec![(
3167 PickleValue::String("get".into()),
3168 PickleValue::String("drain_status".into()),
3169 )]);
3170
3171 let response = handle_rpc_request(&request, &event_tx).unwrap();
3172 assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
3173 assert_eq!(
3174 response.get("drain_complete").unwrap().as_bool(),
3175 Some(false)
3176 );
3177 assert_eq!(
3178 response
3179 .get("deadline_remaining_seconds")
3180 .unwrap()
3181 .as_float(),
3182 Some(2.25)
3183 );
3184 assert_eq!(
3185 response
3186 .get("interface_writer_queued_frames")
3187 .unwrap()
3188 .as_int(),
3189 Some(3)
3190 );
3191 assert_eq!(
3192 response.get("provider_backlog_events").unwrap().as_int(),
3193 Some(4)
3194 );
3195 assert_eq!(
3196 response
3197 .get("provider_consumer_queued_events")
3198 .unwrap()
3199 .as_int(),
3200 Some(5)
3201 );
3202 assert_eq!(
3203 response.get("detail").unwrap().as_str(),
3204 Some("node is draining existing work")
3205 );
3206 driver.join().unwrap();
3207 }
3208
3209 #[test]
3210 fn interface_stats_with_probe_responder() {
3211 let probe_hash = [0x42; 16];
3212 let stats = InterfaceStatsResponse {
3213 interfaces: vec![],
3214 transport_id: None,
3215 transport_enabled: true,
3216 transport_uptime: 100.0,
3217 total_rxb: 0,
3218 total_txb: 0,
3219 probe_responder: Some(probe_hash),
3220 backbone_peer_pool: None,
3221 };
3222
3223 let pickle = interface_stats_to_pickle(&stats);
3224 let encoded = pickle::encode(&pickle);
3225 let decoded = pickle::decode(&encoded).unwrap();
3226
3227 let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
3228 assert_eq!(pr, &probe_hash);
3229 }
3230
3231 #[test]
3232 fn runtime_config_get_and_set_rpc() {
3233 let (event_tx, event_rx) = crate::event::channel();
3234
3235 let driver = thread::spawn(move || {
3236 if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
3237 event_rx.recv()
3238 {
3239 assert_eq!(key, "global.tick_interval_ms");
3240 let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
3241 RuntimeConfigEntry {
3242 key,
3243 value: RuntimeConfigValue::Int(1000),
3244 default: RuntimeConfigValue::Int(1000),
3245 source: RuntimeConfigSource::Startup,
3246 apply_mode: RuntimeConfigApplyMode::Immediate,
3247 description: Some("tick".into()),
3248 },
3249 )));
3250 } else {
3251 panic!("expected GetRuntimeConfig query");
3252 }
3253
3254 if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
3255 event_rx.recv()
3256 {
3257 assert_eq!(key, "global.tick_interval_ms");
3258 assert_eq!(value, RuntimeConfigValue::Int(250));
3259 let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
3260 key,
3261 value: RuntimeConfigValue::Int(250),
3262 default: RuntimeConfigValue::Int(1000),
3263 source: RuntimeConfigSource::RuntimeOverride,
3264 apply_mode: RuntimeConfigApplyMode::Immediate,
3265 description: Some("tick".into()),
3266 })));
3267 } else {
3268 panic!("expected SetRuntimeConfig query");
3269 }
3270 });
3271
3272 let get_request = PickleValue::Dict(vec![
3273 (
3274 PickleValue::String("get".into()),
3275 PickleValue::String("runtime_config_entry".into()),
3276 ),
3277 (
3278 PickleValue::String("key".into()),
3279 PickleValue::String("global.tick_interval_ms".into()),
3280 ),
3281 ]);
3282 let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
3283 assert_eq!(
3284 get_response.get("key").and_then(|v| v.as_str()),
3285 Some("global.tick_interval_ms")
3286 );
3287
3288 let set_request = PickleValue::Dict(vec![
3289 (
3290 PickleValue::String("set".into()),
3291 PickleValue::String("runtime_config".into()),
3292 ),
3293 (
3294 PickleValue::String("key".into()),
3295 PickleValue::String("global.tick_interval_ms".into()),
3296 ),
3297 (PickleValue::String("value".into()), PickleValue::Int(250)),
3298 ]);
3299 let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
3300 assert_eq!(
3301 set_response.get("value").and_then(|v| v.as_int()),
3302 Some(250)
3303 );
3304
3305 driver.join().unwrap();
3306 }
3307
3308 fn tcp_pair() -> (TcpStream, TcpStream) {
3310 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3311 let port = listener.local_addr().unwrap().port();
3312 let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
3313 let (server, _) = listener.accept().unwrap();
3314 client
3315 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3316 .unwrap();
3317 server
3318 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3319 .unwrap();
3320 (server, client)
3321 }
3322}