1use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_crypto::hmac::hmac_sha256;
22use rns_crypto::sha256::sha256;
23
24use crate::event::{
25 BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
26 HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, PathTableEntry,
27 ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode,
28 RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource,
29 RuntimeConfigValue, SingleInterfaceStat,
30};
31use crate::md5::hmac_md5;
32use crate::pickle::{self, PickleValue};
33
34const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
35const WELCOME: &[u8] = b"#WELCOME#";
36const FAILURE: &[u8] = b"#FAILURE#";
37const CHALLENGE_LEN: usize = 40;
38
39#[derive(Debug, Clone)]
41pub enum RpcAddr {
42 Tcp(String, u16),
43}
44
45pub struct RpcServer {
47 shutdown: Arc<AtomicBool>,
48 thread: Option<thread::JoinHandle<()>>,
49}
50
51impl RpcServer {
52 pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
54 let shutdown = Arc::new(AtomicBool::new(false));
55 let shutdown2 = shutdown.clone();
56
57 let listener = match addr {
58 RpcAddr::Tcp(host, port) => {
59 let l = TcpListener::bind((host.as_str(), *port))?;
60 l.set_nonblocking(true)?;
62 l
63 }
64 };
65
66 let thread = thread::Builder::new()
67 .name("rpc-server".into())
68 .spawn(move || {
69 rpc_server_loop(listener, auth_key, event_tx, shutdown2);
70 })
71 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
72
73 Ok(RpcServer {
74 shutdown,
75 thread: Some(thread),
76 })
77 }
78
79 pub fn stop(&mut self) {
81 self.shutdown.store(true, Ordering::Relaxed);
82 if let Some(handle) = self.thread.take() {
83 let _ = handle.join();
84 }
85 }
86}
87
88impl Drop for RpcServer {
89 fn drop(&mut self) {
90 self.stop();
91 }
92}
93
94fn rpc_server_loop(
95 listener: TcpListener,
96 auth_key: [u8; 32],
97 event_tx: EventSender,
98 shutdown: Arc<AtomicBool>,
99) {
100 loop {
101 if shutdown.load(Ordering::Relaxed) {
102 break;
103 }
104
105 match listener.accept() {
106 Ok((stream, _addr)) => {
107 let _ = stream.set_nonblocking(false);
109 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
110 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
111
112 if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
113 log::debug!("RPC connection error: {}", e);
114 }
115 }
116 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117 thread::sleep(std::time::Duration::from_millis(100));
119 }
120 Err(e) => {
121 log::error!("RPC accept error: {}", e);
122 thread::sleep(std::time::Duration::from_millis(100));
123 }
124 }
125 }
126}
127
128fn handle_connection(
129 mut stream: TcpStream,
130 auth_key: &[u8; 32],
131 event_tx: &EventSender,
132) -> io::Result<()> {
133 server_auth(&mut stream, auth_key)?;
135
136 let request_bytes = recv_bytes(&mut stream)?;
138 let request = pickle::decode(&request_bytes)
139 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
140
141 let response = handle_rpc_request(&request, event_tx)?;
143
144 let response_bytes = pickle::encode(&response);
146 send_bytes(&mut stream, &response_bytes)?;
147
148 Ok(())
149}
150
151fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
153 let mut random_bytes = [0u8; CHALLENGE_LEN];
155 {
157 let mut f = std::fs::File::open("/dev/urandom")?;
158 f.read_exact(&mut random_bytes)?;
159 }
160
161 let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
162 challenge_message.extend_from_slice(CHALLENGE_PREFIX);
163 challenge_message.extend_from_slice(b"{sha256}");
164 challenge_message.extend_from_slice(&random_bytes);
165
166 send_bytes(stream, &challenge_message)?;
167
168 let response = recv_bytes(stream)?;
170
171 let message = &challenge_message[CHALLENGE_PREFIX.len()..];
174
175 if verify_response(auth_key, message, &response) {
176 send_bytes(stream, WELCOME)?;
177 Ok(())
178 } else {
179 send_bytes(stream, FAILURE)?;
180 Err(io::Error::new(
181 io::ErrorKind::PermissionDenied,
182 "auth failed",
183 ))
184 }
185}
186
187fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
189 if response.starts_with(b"{sha256}") {
191 let digest = &response[8..];
192 let expected = hmac_sha256(auth_key, message);
193 constant_time_eq(digest, &expected)
194 }
195 else if response.len() == 16 {
197 let expected = hmac_md5(auth_key, message);
198 constant_time_eq(response, &expected)
199 }
200 else if response.starts_with(b"{md5}") {
202 let digest = &response[5..];
203 let expected = hmac_md5(auth_key, message);
204 constant_time_eq(digest, &expected)
205 } else {
206 false
207 }
208}
209
210fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
212 if a.len() != b.len() {
213 return false;
214 }
215 let mut diff = 0u8;
216 for (x, y) in a.iter().zip(b.iter()) {
217 diff |= x ^ y;
218 }
219 diff == 0
220}
221
222fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
224 let len = data.len() as i32;
225 stream.write_all(&len.to_be_bytes())?;
226 stream.write_all(data)?;
227 stream.flush()
228}
229
230fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
232 let mut len_buf = [0u8; 4];
233 stream.read_exact(&mut len_buf)?;
234 let len = i32::from_be_bytes(len_buf);
235
236 if len < 0 {
237 let mut len8_buf = [0u8; 8];
239 stream.read_exact(&mut len8_buf)?;
240 let len = u64::from_be_bytes(len8_buf) as usize;
241 if len > 64 * 1024 * 1024 {
242 return Err(io::Error::new(
243 io::ErrorKind::InvalidData,
244 "message too large",
245 ));
246 }
247 let mut buf = vec![0u8; len];
248 stream.read_exact(&mut buf)?;
249 Ok(buf)
250 } else {
251 let len = len as usize;
252 if len > 64 * 1024 * 1024 {
253 return Err(io::Error::new(
254 io::ErrorKind::InvalidData,
255 "message too large",
256 ));
257 }
258 let mut buf = vec![0u8; len];
259 stream.read_exact(&mut buf)?;
260 Ok(buf)
261 }
262}
263
264fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
266 if let Some(get_val) = request.get("get") {
268 if let Some(path) = get_val.as_str() {
269 return match path {
270 "interface_stats" => {
271 let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
272 if let QueryResponse::InterfaceStats(stats) = resp {
273 Ok(interface_stats_to_pickle(&stats))
274 } else {
275 Ok(PickleValue::None)
276 }
277 }
278 "path_table" => {
279 let max_hops = request
280 .get("max_hops")
281 .and_then(|v| v.as_int().map(|n| n as u8));
282 let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
283 if let QueryResponse::PathTable(entries) = resp {
284 Ok(path_table_to_pickle(&entries))
285 } else {
286 Ok(PickleValue::None)
287 }
288 }
289 "rate_table" => {
290 let resp = send_query(event_tx, QueryRequest::RateTable)?;
291 if let QueryResponse::RateTable(entries) = resp {
292 Ok(rate_table_to_pickle(&entries))
293 } else {
294 Ok(PickleValue::None)
295 }
296 }
297 "next_hop" => {
298 let hash = extract_dest_hash(request, "destination_hash")?;
299 let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
300 if let QueryResponse::NextHop(Some(nh)) = resp {
301 Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
302 } else {
303 Ok(PickleValue::None)
304 }
305 }
306 "next_hop_if_name" => {
307 let hash = extract_dest_hash(request, "destination_hash")?;
308 let resp =
309 send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
310 if let QueryResponse::NextHopIfName(Some(name)) = resp {
311 Ok(PickleValue::String(name))
312 } else {
313 Ok(PickleValue::None)
314 }
315 }
316 "link_count" => {
317 let resp = send_query(event_tx, QueryRequest::LinkCount)?;
318 if let QueryResponse::LinkCount(n) = resp {
319 Ok(PickleValue::Int(n as i64))
320 } else {
321 Ok(PickleValue::None)
322 }
323 }
324 "transport_identity" => {
325 let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
326 if let QueryResponse::TransportIdentity(Some(hash)) = resp {
327 Ok(PickleValue::Bytes(hash.to_vec()))
328 } else {
329 Ok(PickleValue::None)
330 }
331 }
332 "blackholed" => {
333 let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
334 if let QueryResponse::Blackholed(entries) = resp {
335 Ok(blackholed_to_pickle(&entries))
336 } else {
337 Ok(PickleValue::None)
338 }
339 }
340 "discovered_interfaces" => {
341 let only_available = request
342 .get("only_available")
343 .and_then(|v| v.as_bool())
344 .unwrap_or(false);
345 let only_transport = request
346 .get("only_transport")
347 .and_then(|v| v.as_bool())
348 .unwrap_or(false);
349 let resp = send_query(
350 event_tx,
351 QueryRequest::DiscoveredInterfaces {
352 only_available,
353 only_transport,
354 },
355 )?;
356 if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
357 Ok(discovered_interfaces_to_pickle(&interfaces))
358 } else {
359 Ok(PickleValue::None)
360 }
361 }
362 "hooks" => {
363 let (response_tx, response_rx) = mpsc::channel();
364 event_tx
365 .send(Event::ListHooks { response_tx })
366 .map_err(|_| {
367 io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
368 })?;
369 let hooks = response_rx
370 .recv_timeout(std::time::Duration::from_secs(5))
371 .map_err(|_| {
372 io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
373 })?;
374 Ok(hooks_to_pickle(&hooks))
375 }
376 "runtime_config" => {
377 let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
378 if let QueryResponse::RuntimeConfigList(entries) = resp {
379 Ok(runtime_config_list_to_pickle(&entries))
380 } else {
381 Ok(PickleValue::None)
382 }
383 }
384 "known_destinations" => {
385 let resp = send_query(event_tx, QueryRequest::KnownDestinations)?;
386 if let QueryResponse::KnownDestinations(entries) = resp {
387 Ok(known_destinations_to_pickle(&entries))
388 } else {
389 Ok(PickleValue::None)
390 }
391 }
392 "runtime_config_entry" => {
393 let key = request
394 .get("key")
395 .and_then(|v| v.as_str())
396 .unwrap_or_default()
397 .to_string();
398 let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
399 if let QueryResponse::RuntimeConfigEntry(entry) = resp {
400 Ok(entry
401 .as_ref()
402 .map(runtime_config_entry_to_pickle)
403 .unwrap_or(PickleValue::None))
404 } else {
405 Ok(PickleValue::None)
406 }
407 }
408 "backbone_peer_state" => {
409 let interface_name = request
410 .get("interface")
411 .and_then(|v| v.as_str())
412 .map(|s| s.to_string());
413 let resp =
414 send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
415 if let QueryResponse::BackbonePeerState(entries) = resp {
416 Ok(backbone_peer_state_to_pickle(&entries))
417 } else {
418 Ok(PickleValue::None)
419 }
420 }
421 "backbone_interfaces" => {
422 let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
423 if let QueryResponse::BackboneInterfaces(entries) = resp {
424 Ok(backbone_interfaces_to_pickle(&entries))
425 } else {
426 Ok(PickleValue::None)
427 }
428 }
429 "provider_bridge_stats" => {
430 let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
431 if let QueryResponse::ProviderBridgeStats(stats) = resp {
432 Ok(stats
433 .as_ref()
434 .map(provider_bridge_stats_to_pickle)
435 .unwrap_or(PickleValue::None))
436 } else {
437 Ok(PickleValue::None)
438 }
439 }
440 "drain_status" => {
441 let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
442 if let QueryResponse::DrainStatus(status) = resp {
443 Ok(drain_status_to_pickle(&status))
444 } else {
445 Ok(PickleValue::None)
446 }
447 }
448 _ => Ok(PickleValue::None),
449 };
450 }
451 }
452
453 if let Some(begin_val) = request.get("begin_drain") {
454 let timeout_secs = begin_val
455 .as_float()
456 .or_else(|| begin_val.as_int().map(|value| value as f64))
457 .unwrap_or(0.0)
458 .max(0.0);
459 let timeout = Duration::from_secs_f64(timeout_secs);
460 let _ = event_tx.send(Event::BeginDrain { timeout });
461 return Ok(PickleValue::Bool(true));
462 }
463
464 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
465 if set_val == "known_destination_retained" {
466 let dest_hash = extract_dest_hash(request, "dest_hash")?;
467 let resp = send_query(event_tx, QueryRequest::RetainKnownDestination { dest_hash })?;
468 return if let QueryResponse::RetainKnownDestination(ok) = resp {
469 Ok(PickleValue::Bool(ok))
470 } else {
471 Ok(PickleValue::None)
472 };
473 }
474 if set_val == "known_destination_used" {
475 let dest_hash = extract_dest_hash(request, "dest_hash")?;
476 let resp = send_query(
477 event_tx,
478 QueryRequest::MarkKnownDestinationUsed { dest_hash },
479 )?;
480 return if let QueryResponse::MarkKnownDestinationUsed(ok) = resp {
481 Ok(PickleValue::Bool(ok))
482 } else {
483 Ok(PickleValue::None)
484 };
485 }
486 if set_val == "runtime_config" {
487 let key = request
488 .get("key")
489 .and_then(|v| v.as_str())
490 .unwrap_or_default()
491 .to_string();
492 let Some(value) = request
493 .get("value")
494 .and_then(runtime_config_value_from_pickle)
495 else {
496 return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
497 code: RuntimeConfigErrorCode::InvalidType,
498 message: "runtime-config set requires a scalar value".into(),
499 }));
500 };
501 let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
502 return if let QueryResponse::RuntimeConfigSet(result) = resp {
503 Ok(runtime_config_result_to_pickle(result))
504 } else {
505 Ok(PickleValue::None)
506 };
507 }
508 }
509
510 if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
511 if reset_val == "runtime_config" {
512 let key = request
513 .get("key")
514 .and_then(|v| v.as_str())
515 .unwrap_or_default()
516 .to_string();
517 let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
518 return if let QueryResponse::RuntimeConfigReset(result) = resp {
519 Ok(runtime_config_result_to_pickle(result))
520 } else {
521 Ok(PickleValue::None)
522 };
523 }
524 }
525
526 if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
527 if clear_val == "known_destination_retained" {
528 let dest_hash = extract_dest_hash(request, "dest_hash")?;
529 let resp = send_query(
530 event_tx,
531 QueryRequest::UnretainKnownDestination { dest_hash },
532 )?;
533 return if let QueryResponse::UnretainKnownDestination(ok) = resp {
534 Ok(PickleValue::Bool(ok))
535 } else {
536 Ok(PickleValue::None)
537 };
538 }
539 if clear_val == "backbone_peer_state" {
540 let interface_name = required_string(request, "interface")?;
541 let peer_ip = required_string(request, "ip")?;
542 let peer_ip = peer_ip
543 .parse()
544 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
545 let resp = send_query(
546 event_tx,
547 QueryRequest::ClearBackbonePeerState {
548 interface_name,
549 peer_ip,
550 },
551 )?;
552 return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
553 Ok(PickleValue::Bool(ok))
554 } else {
555 Ok(PickleValue::None)
556 };
557 }
558 }
559
560 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
561 if set_val == "backbone_peer_blacklist" {
562 let interface_name = required_string(request, "interface")?;
563 let peer_ip = required_string(request, "ip")?;
564 let peer_ip: IpAddr = peer_ip
565 .parse()
566 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
567 let duration_secs = request
568 .get("duration_secs")
569 .and_then(|v| v.as_int())
570 .ok_or_else(|| {
571 io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
572 })?;
573 let reason = request
574 .get("reason")
575 .and_then(|v| v.as_str())
576 .unwrap_or("sentinel blacklist")
577 .to_string();
578 let penalty_level = request
579 .get("penalty_level")
580 .and_then(|v| v.as_int())
581 .unwrap_or(0)
582 .clamp(0, u8::MAX as i64) as u8;
583 let resp = send_query(
584 event_tx,
585 QueryRequest::BlacklistBackbonePeer {
586 interface_name,
587 peer_ip,
588 duration: Duration::from_secs(duration_secs as u64),
589 reason,
590 penalty_level,
591 },
592 )?;
593 return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
594 Ok(PickleValue::Bool(ok))
595 } else {
596 Ok(PickleValue::None)
597 };
598 }
599 }
600
601 if let Some(hash_val) = request.get("request_path") {
603 if let Some(hash_bytes) = hash_val.as_bytes() {
604 if hash_bytes.len() >= 16 {
605 let mut dest_hash = [0u8; 16];
606 dest_hash.copy_from_slice(&hash_bytes[..16]);
607 let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
608 return Ok(PickleValue::Bool(true));
609 }
610 }
611 }
612
613 if let Some(hash_val) = request.get("send_probe") {
615 if let Some(hash_bytes) = hash_val.as_bytes() {
616 if hash_bytes.len() >= 16 {
617 let mut dest_hash = [0u8; 16];
618 dest_hash.copy_from_slice(&hash_bytes[..16]);
619 let payload_size = request
620 .get("size")
621 .and_then(|v| v.as_int())
622 .and_then(|n| {
623 if n > 0 && n <= 400 {
624 Some(n as usize)
625 } else {
626 None
627 }
628 })
629 .unwrap_or(16);
630 let resp = send_query(
631 event_tx,
632 QueryRequest::SendProbe {
633 dest_hash,
634 payload_size,
635 },
636 )?;
637 if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
638 return Ok(PickleValue::Dict(vec![
639 (
640 PickleValue::String("packet_hash".into()),
641 PickleValue::Bytes(packet_hash.to_vec()),
642 ),
643 (
644 PickleValue::String("hops".into()),
645 PickleValue::Int(hops as i64),
646 ),
647 ]));
648 } else {
649 return Ok(PickleValue::None);
650 }
651 }
652 }
653 }
654
655 if let Some(hash_val) = request.get("check_proof") {
657 if let Some(hash_bytes) = hash_val.as_bytes() {
658 if hash_bytes.len() >= 32 {
659 let mut packet_hash = [0u8; 32];
660 packet_hash.copy_from_slice(&hash_bytes[..32]);
661 let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
662 if let QueryResponse::CheckProof(Some(rtt)) = resp {
663 return Ok(PickleValue::Float(rtt));
664 } else {
665 return Ok(PickleValue::None);
666 }
667 }
668 }
669 }
670
671 if let Some(hash_val) = request.get("blackhole") {
673 if let Some(hash_bytes) = hash_val.as_bytes() {
674 if hash_bytes.len() >= 16 {
675 let mut identity_hash = [0u8; 16];
676 identity_hash.copy_from_slice(&hash_bytes[..16]);
677 let duration_hours = request.get("duration").and_then(|v| v.as_float());
678 let reason = request
679 .get("reason")
680 .and_then(|v| v.as_str())
681 .map(|s| s.to_string());
682 let resp = send_query(
683 event_tx,
684 QueryRequest::BlackholeIdentity {
685 identity_hash,
686 duration_hours,
687 reason,
688 },
689 )?;
690 return Ok(PickleValue::Bool(matches!(
691 resp,
692 QueryResponse::BlackholeResult(true)
693 )));
694 }
695 }
696 }
697
698 if let Some(hash_val) = request.get("unblackhole") {
700 if let Some(hash_bytes) = hash_val.as_bytes() {
701 if hash_bytes.len() >= 16 {
702 let mut identity_hash = [0u8; 16];
703 identity_hash.copy_from_slice(&hash_bytes[..16]);
704 let resp = send_query(
705 event_tx,
706 QueryRequest::UnblackholeIdentity { identity_hash },
707 )?;
708 return Ok(PickleValue::Bool(matches!(
709 resp,
710 QueryResponse::UnblackholeResult(true)
711 )));
712 }
713 }
714 }
715
716 if let Some(drop_val) = request.get("drop") {
718 if let Some(path) = drop_val.as_str() {
719 return match path {
720 "path" => {
721 let hash = extract_dest_hash(request, "destination_hash")?;
722 let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
723 if let QueryResponse::DropPath(ok) = resp {
724 Ok(PickleValue::Bool(ok))
725 } else {
726 Ok(PickleValue::None)
727 }
728 }
729 "all_via" => {
730 let hash = extract_dest_hash(request, "destination_hash")?;
731 let resp = send_query(
732 event_tx,
733 QueryRequest::DropAllVia {
734 transport_hash: hash,
735 },
736 )?;
737 if let QueryResponse::DropAllVia(n) = resp {
738 Ok(PickleValue::Int(n as i64))
739 } else {
740 Ok(PickleValue::None)
741 }
742 }
743 "announce_queues" => {
744 let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
745 if let QueryResponse::DropAnnounceQueues = resp {
746 Ok(PickleValue::Bool(true))
747 } else {
748 Ok(PickleValue::None)
749 }
750 }
751 _ => Ok(PickleValue::None),
752 };
753 }
754 }
755
756 if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
757 return handle_hook_rpc_request(hook_val, request, event_tx);
758 }
759
760 Ok(PickleValue::None)
761}
762
763fn handle_hook_rpc_request(
764 op: &str,
765 request: &PickleValue,
766 event_tx: &EventSender,
767) -> io::Result<PickleValue> {
768 match op {
769 "load" => {
770 let name = required_string(request, "name")?;
771 let attach_point = required_string(request, "attach_point")?;
772 let priority = request
773 .get("priority")
774 .and_then(|v| v.as_int())
775 .unwrap_or(0) as i32;
776 let wasm = request
777 .get("wasm")
778 .and_then(|v| v.as_bytes())
779 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
780 .to_vec();
781 let (response_tx, response_rx) = mpsc::channel();
782 event_tx
783 .send(Event::LoadHook {
784 name,
785 wasm_bytes: wasm,
786 attach_point,
787 priority,
788 response_tx,
789 })
790 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
791 let response = response_rx
792 .recv_timeout(std::time::Duration::from_secs(5))
793 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
794 Ok(hook_result_to_pickle(response))
795 }
796 "load_builtin" => {
797 let name = required_string(request, "name")?;
798 let attach_point = required_string(request, "attach_point")?;
799 let builtin_id = required_string(request, "builtin_id")?;
800 let priority = request
801 .get("priority")
802 .and_then(|v| v.as_int())
803 .unwrap_or(0) as i32;
804 let (response_tx, response_rx) = mpsc::channel();
805 event_tx
806 .send(Event::LoadBuiltinHook {
807 name,
808 builtin_id,
809 attach_point,
810 priority,
811 response_tx,
812 })
813 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
814 let response = response_rx
815 .recv_timeout(std::time::Duration::from_secs(5))
816 .map_err(|_| {
817 io::Error::new(io::ErrorKind::TimedOut, "built-in hook load timed out")
818 })?;
819 Ok(hook_result_to_pickle(response))
820 }
821 "unload" => {
822 let name = required_string(request, "name")?;
823 let attach_point = required_string(request, "attach_point")?;
824 let (response_tx, response_rx) = mpsc::channel();
825 event_tx
826 .send(Event::UnloadHook {
827 name,
828 attach_point,
829 response_tx,
830 })
831 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
832 let response = response_rx
833 .recv_timeout(std::time::Duration::from_secs(5))
834 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
835 Ok(hook_result_to_pickle(response))
836 }
837 "enable" | "disable" => {
838 let name = required_string(request, "name")?;
839 let attach_point = required_string(request, "attach_point")?;
840 let enabled = op == "enable";
841 let (response_tx, response_rx) = mpsc::channel();
842 event_tx
843 .send(Event::SetHookEnabled {
844 name,
845 attach_point,
846 enabled,
847 response_tx,
848 })
849 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
850 let response = response_rx
851 .recv_timeout(std::time::Duration::from_secs(5))
852 .map_err(|_| {
853 io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
854 })?;
855 Ok(hook_result_to_pickle(response))
856 }
857 "set_priority" => {
858 let name = required_string(request, "name")?;
859 let attach_point = required_string(request, "attach_point")?;
860 let priority = request
861 .get("priority")
862 .and_then(|v| v.as_int())
863 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
864 as i32;
865 let (response_tx, response_rx) = mpsc::channel();
866 event_tx
867 .send(Event::SetHookPriority {
868 name,
869 attach_point,
870 priority,
871 response_tx,
872 })
873 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
874 let response = response_rx
875 .recv_timeout(std::time::Duration::from_secs(5))
876 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
877 Ok(hook_result_to_pickle(response))
878 }
879 _ => Ok(PickleValue::None),
880 }
881}
882
883fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
885 let (resp_tx, resp_rx) = mpsc::channel();
886 event_tx
887 .send(Event::Query(request, resp_tx))
888 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
889 resp_rx
890 .recv_timeout(std::time::Duration::from_secs(5))
891 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
892}
893
894fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
896 let bytes = request
897 .get(key)
898 .and_then(|v| v.as_bytes())
899 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
900 if bytes.len() < 16 {
901 return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
902 }
903 let mut hash = [0u8; 16];
904 hash.copy_from_slice(&bytes[..16]);
905 Ok(hash)
906}
907
908fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
909 request
910 .get(key)
911 .and_then(|v| v.as_str())
912 .map(|s| s.to_string())
913 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
914}
915
916fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
917 match result {
918 Ok(()) => PickleValue::Dict(vec![(
919 PickleValue::String("ok".into()),
920 PickleValue::Bool(true),
921 )]),
922 Err(error) => PickleValue::Dict(vec![
923 (PickleValue::String("ok".into()), PickleValue::Bool(false)),
924 (
925 PickleValue::String("error".into()),
926 PickleValue::String(error),
927 ),
928 ]),
929 }
930}
931
932fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
935 let mut ifaces = Vec::new();
936 for iface in &stats.interfaces {
937 ifaces.push(single_iface_to_pickle(iface));
938 }
939
940 let mut dict = vec![
941 (
942 PickleValue::String("interfaces".into()),
943 PickleValue::List(ifaces),
944 ),
945 (
946 PickleValue::String("transport_enabled".into()),
947 PickleValue::Bool(stats.transport_enabled),
948 ),
949 (
950 PickleValue::String("transport_uptime".into()),
951 PickleValue::Float(stats.transport_uptime),
952 ),
953 (
954 PickleValue::String("rxb".into()),
955 PickleValue::Int(stats.total_rxb as i64),
956 ),
957 (
958 PickleValue::String("txb".into()),
959 PickleValue::Int(stats.total_txb as i64),
960 ),
961 ];
962
963 if let Some(tid) = stats.transport_id {
964 dict.push((
965 PickleValue::String("transport_id".into()),
966 PickleValue::Bytes(tid.to_vec()),
967 ));
968 } else {
969 dict.push((
970 PickleValue::String("transport_id".into()),
971 PickleValue::None,
972 ));
973 }
974
975 if let Some(pr) = stats.probe_responder {
976 dict.push((
977 PickleValue::String("probe_responder".into()),
978 PickleValue::Bytes(pr.to_vec()),
979 ));
980 } else {
981 dict.push((
982 PickleValue::String("probe_responder".into()),
983 PickleValue::None,
984 ));
985 }
986
987 if let Some(pool) = &stats.backbone_peer_pool {
988 let members = pool
989 .members
990 .iter()
991 .map(|member| {
992 let mut member_dict = vec![
993 (
994 PickleValue::String("name".into()),
995 PickleValue::String(member.name.clone()),
996 ),
997 (
998 PickleValue::String("remote".into()),
999 PickleValue::String(member.remote.clone()),
1000 ),
1001 (
1002 PickleValue::String("state".into()),
1003 PickleValue::String(member.state.clone()),
1004 ),
1005 (
1006 PickleValue::String("failure_count".into()),
1007 PickleValue::Int(member.failure_count as i64),
1008 ),
1009 ];
1010 member_dict.push((
1011 PickleValue::String("interface_id".into()),
1012 member
1013 .interface_id
1014 .map(|id| PickleValue::Int(id as i64))
1015 .unwrap_or(PickleValue::None),
1016 ));
1017 member_dict.push((
1018 PickleValue::String("last_error".into()),
1019 member
1020 .last_error
1021 .as_ref()
1022 .map(|err| PickleValue::String(err.clone()))
1023 .unwrap_or(PickleValue::None),
1024 ));
1025 member_dict.push((
1026 PickleValue::String("cooldown_remaining_seconds".into()),
1027 member
1028 .cooldown_remaining_seconds
1029 .map(PickleValue::Float)
1030 .unwrap_or(PickleValue::None),
1031 ));
1032 PickleValue::Dict(member_dict)
1033 })
1034 .collect();
1035 dict.push((
1036 PickleValue::String("backbone_peer_pool".into()),
1037 PickleValue::Dict(vec![
1038 (
1039 PickleValue::String("max_connected".into()),
1040 PickleValue::Int(pool.max_connected as i64),
1041 ),
1042 (
1043 PickleValue::String("active_count".into()),
1044 PickleValue::Int(pool.active_count as i64),
1045 ),
1046 (
1047 PickleValue::String("standby_count".into()),
1048 PickleValue::Int(pool.standby_count as i64),
1049 ),
1050 (
1051 PickleValue::String("cooldown_count".into()),
1052 PickleValue::Int(pool.cooldown_count as i64),
1053 ),
1054 (
1055 PickleValue::String("members".into()),
1056 PickleValue::List(members),
1057 ),
1058 ]),
1059 ));
1060 } else {
1061 dict.push((
1062 PickleValue::String("backbone_peer_pool".into()),
1063 PickleValue::None,
1064 ));
1065 }
1066
1067 PickleValue::Dict(dict)
1068}
1069
1070fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
1071 let mut dict = vec![
1072 (
1073 PickleValue::String("id".into()),
1074 PickleValue::Int(s.id as i64),
1075 ),
1076 (
1077 PickleValue::String("name".into()),
1078 PickleValue::String(s.name.clone()),
1079 ),
1080 (
1081 PickleValue::String("status".into()),
1082 PickleValue::Bool(s.status),
1083 ),
1084 (
1085 PickleValue::String("mode".into()),
1086 PickleValue::Int(s.mode as i64),
1087 ),
1088 (
1089 PickleValue::String("rxb".into()),
1090 PickleValue::Int(s.rxb as i64),
1091 ),
1092 (
1093 PickleValue::String("txb".into()),
1094 PickleValue::Int(s.txb as i64),
1095 ),
1096 (
1097 PickleValue::String("rx_packets".into()),
1098 PickleValue::Int(s.rx_packets as i64),
1099 ),
1100 (
1101 PickleValue::String("tx_packets".into()),
1102 PickleValue::Int(s.tx_packets as i64),
1103 ),
1104 (
1105 PickleValue::String("started".into()),
1106 PickleValue::Float(s.started),
1107 ),
1108 (
1109 PickleValue::String("ia_freq".into()),
1110 PickleValue::Float(s.ia_freq),
1111 ),
1112 (
1113 PickleValue::String("oa_freq".into()),
1114 PickleValue::Float(s.oa_freq),
1115 ),
1116 ];
1117
1118 match s.bitrate {
1119 Some(br) => dict.push((
1120 PickleValue::String("bitrate".into()),
1121 PickleValue::Int(br as i64),
1122 )),
1123 None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
1124 }
1125
1126 match s.ifac_size {
1127 Some(sz) => dict.push((
1128 PickleValue::String("ifac_size".into()),
1129 PickleValue::Int(sz as i64),
1130 )),
1131 None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
1132 }
1133
1134 PickleValue::Dict(dict)
1135}
1136
1137fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
1138 let list: Vec<PickleValue> = entries
1139 .iter()
1140 .map(|e| {
1141 PickleValue::Dict(vec![
1142 (
1143 PickleValue::String("hash".into()),
1144 PickleValue::Bytes(e.hash.to_vec()),
1145 ),
1146 (
1147 PickleValue::String("timestamp".into()),
1148 PickleValue::Float(e.timestamp),
1149 ),
1150 (
1151 PickleValue::String("via".into()),
1152 PickleValue::Bytes(e.via.to_vec()),
1153 ),
1154 (
1155 PickleValue::String("hops".into()),
1156 PickleValue::Int(e.hops as i64),
1157 ),
1158 (
1159 PickleValue::String("expires".into()),
1160 PickleValue::Float(e.expires),
1161 ),
1162 (
1163 PickleValue::String("interface".into()),
1164 PickleValue::String(e.interface_name.clone()),
1165 ),
1166 ])
1167 })
1168 .collect();
1169 PickleValue::List(list)
1170}
1171
1172fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1173 let list: Vec<PickleValue> = entries
1174 .iter()
1175 .map(|e| {
1176 PickleValue::Dict(vec![
1177 (
1178 PickleValue::String("hash".into()),
1179 PickleValue::Bytes(e.hash.to_vec()),
1180 ),
1181 (
1182 PickleValue::String("last".into()),
1183 PickleValue::Float(e.last),
1184 ),
1185 (
1186 PickleValue::String("rate_violations".into()),
1187 PickleValue::Int(e.rate_violations as i64),
1188 ),
1189 (
1190 PickleValue::String("blocked_until".into()),
1191 PickleValue::Float(e.blocked_until),
1192 ),
1193 (
1194 PickleValue::String("timestamps".into()),
1195 PickleValue::List(
1196 e.timestamps
1197 .iter()
1198 .map(|&t| PickleValue::Float(t))
1199 .collect(),
1200 ),
1201 ),
1202 ])
1203 })
1204 .collect();
1205 PickleValue::List(list)
1206}
1207
1208fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1209 let list: Vec<PickleValue> = entries
1210 .iter()
1211 .map(|e| {
1212 let mut dict = vec![
1213 (
1214 PickleValue::String("identity_hash".into()),
1215 PickleValue::Bytes(e.identity_hash.to_vec()),
1216 ),
1217 (
1218 PickleValue::String("created".into()),
1219 PickleValue::Float(e.created),
1220 ),
1221 (
1222 PickleValue::String("expires".into()),
1223 PickleValue::Float(e.expires),
1224 ),
1225 ];
1226 if let Some(ref reason) = e.reason {
1227 dict.push((
1228 PickleValue::String("reason".into()),
1229 PickleValue::String(reason.clone()),
1230 ));
1231 } else {
1232 dict.push((PickleValue::String("reason".into()), PickleValue::None));
1233 }
1234 PickleValue::Dict(dict)
1235 })
1236 .collect();
1237 PickleValue::List(list)
1238}
1239
1240fn discovered_interfaces_to_pickle(
1241 interfaces: &[crate::discovery::DiscoveredInterface],
1242) -> PickleValue {
1243 let list: Vec<PickleValue> = interfaces
1244 .iter()
1245 .map(|iface| {
1246 let mut dict = vec![
1247 (
1248 PickleValue::String("type".into()),
1249 PickleValue::String(iface.interface_type.clone()),
1250 ),
1251 (
1252 PickleValue::String("transport".into()),
1253 PickleValue::Bool(iface.transport),
1254 ),
1255 (
1256 PickleValue::String("name".into()),
1257 PickleValue::String(iface.name.clone()),
1258 ),
1259 (
1260 PickleValue::String("discovered".into()),
1261 PickleValue::Float(iface.discovered),
1262 ),
1263 (
1264 PickleValue::String("last_heard".into()),
1265 PickleValue::Float(iface.last_heard),
1266 ),
1267 (
1268 PickleValue::String("heard_count".into()),
1269 PickleValue::Int(iface.heard_count as i64),
1270 ),
1271 (
1272 PickleValue::String("status".into()),
1273 PickleValue::String(iface.status.as_str().into()),
1274 ),
1275 (
1276 PickleValue::String("stamp".into()),
1277 PickleValue::Bytes(iface.stamp.clone()),
1278 ),
1279 (
1280 PickleValue::String("value".into()),
1281 PickleValue::Int(iface.stamp_value as i64),
1282 ),
1283 (
1284 PickleValue::String("transport_id".into()),
1285 PickleValue::Bytes(iface.transport_id.to_vec()),
1286 ),
1287 (
1288 PickleValue::String("network_id".into()),
1289 PickleValue::Bytes(iface.network_id.to_vec()),
1290 ),
1291 (
1292 PickleValue::String("hops".into()),
1293 PickleValue::Int(iface.hops as i64),
1294 ),
1295 ];
1296
1297 if let Some(v) = iface.latitude {
1299 dict.push((
1300 PickleValue::String("latitude".into()),
1301 PickleValue::Float(v),
1302 ));
1303 } else {
1304 dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1305 }
1306 if let Some(v) = iface.longitude {
1307 dict.push((
1308 PickleValue::String("longitude".into()),
1309 PickleValue::Float(v),
1310 ));
1311 } else {
1312 dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1313 }
1314 if let Some(v) = iface.height {
1315 dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1316 } else {
1317 dict.push((PickleValue::String("height".into()), PickleValue::None));
1318 }
1319
1320 if let Some(ref v) = iface.reachable_on {
1322 dict.push((
1323 PickleValue::String("reachable_on".into()),
1324 PickleValue::String(v.clone()),
1325 ));
1326 } else {
1327 dict.push((
1328 PickleValue::String("reachable_on".into()),
1329 PickleValue::None,
1330 ));
1331 }
1332 if let Some(v) = iface.port {
1333 dict.push((
1334 PickleValue::String("port".into()),
1335 PickleValue::Int(v as i64),
1336 ));
1337 } else {
1338 dict.push((PickleValue::String("port".into()), PickleValue::None));
1339 }
1340
1341 if let Some(v) = iface.frequency {
1343 dict.push((
1344 PickleValue::String("frequency".into()),
1345 PickleValue::Int(v as i64),
1346 ));
1347 } else {
1348 dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1349 }
1350 if let Some(v) = iface.bandwidth {
1351 dict.push((
1352 PickleValue::String("bandwidth".into()),
1353 PickleValue::Int(v as i64),
1354 ));
1355 } else {
1356 dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1357 }
1358 if let Some(v) = iface.spreading_factor {
1359 dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1360 } else {
1361 dict.push((PickleValue::String("sf".into()), PickleValue::None));
1362 }
1363 if let Some(v) = iface.coding_rate {
1364 dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1365 } else {
1366 dict.push((PickleValue::String("cr".into()), PickleValue::None));
1367 }
1368 if let Some(ref v) = iface.modulation {
1369 dict.push((
1370 PickleValue::String("modulation".into()),
1371 PickleValue::String(v.clone()),
1372 ));
1373 } else {
1374 dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1375 }
1376 if let Some(v) = iface.channel {
1377 dict.push((
1378 PickleValue::String("channel".into()),
1379 PickleValue::Int(v as i64),
1380 ));
1381 } else {
1382 dict.push((PickleValue::String("channel".into()), PickleValue::None));
1383 }
1384
1385 if let Some(ref v) = iface.ifac_netname {
1387 dict.push((
1388 PickleValue::String("ifac_netname".into()),
1389 PickleValue::String(v.clone()),
1390 ));
1391 } else {
1392 dict.push((
1393 PickleValue::String("ifac_netname".into()),
1394 PickleValue::None,
1395 ));
1396 }
1397 if let Some(ref v) = iface.ifac_netkey {
1398 dict.push((
1399 PickleValue::String("ifac_netkey".into()),
1400 PickleValue::String(v.clone()),
1401 ));
1402 } else {
1403 dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1404 }
1405
1406 if let Some(ref v) = iface.config_entry {
1408 dict.push((
1409 PickleValue::String("config_entry".into()),
1410 PickleValue::String(v.clone()),
1411 ));
1412 } else {
1413 dict.push((
1414 PickleValue::String("config_entry".into()),
1415 PickleValue::None,
1416 ));
1417 }
1418
1419 dict.push((
1420 PickleValue::String("discovery_hash".into()),
1421 PickleValue::Bytes(iface.discovery_hash.to_vec()),
1422 ));
1423
1424 PickleValue::Dict(dict)
1425 })
1426 .collect();
1427 PickleValue::List(list)
1428}
1429
1430fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1431 PickleValue::List(
1432 hooks
1433 .iter()
1434 .map(|hook| {
1435 PickleValue::Dict(vec![
1436 (
1437 PickleValue::String("name".into()),
1438 PickleValue::String(hook.name.clone()),
1439 ),
1440 (
1441 PickleValue::String("type".into()),
1442 PickleValue::String(hook.hook_type.clone()),
1443 ),
1444 (
1445 PickleValue::String("attach_point".into()),
1446 PickleValue::String(hook.attach_point.clone()),
1447 ),
1448 (
1449 PickleValue::String("priority".into()),
1450 PickleValue::Int(hook.priority as i64),
1451 ),
1452 (
1453 PickleValue::String("enabled".into()),
1454 PickleValue::Bool(hook.enabled),
1455 ),
1456 (
1457 PickleValue::String("consecutive_traps".into()),
1458 PickleValue::Int(hook.consecutive_traps as i64),
1459 ),
1460 ])
1461 })
1462 .collect(),
1463 )
1464}
1465
1466fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1467 PickleValue::List(
1468 entries
1469 .iter()
1470 .map(|entry| {
1471 PickleValue::Dict(vec![
1472 (
1473 PickleValue::String("interface".into()),
1474 PickleValue::String(entry.interface_name.clone()),
1475 ),
1476 (
1477 PickleValue::String("ip".into()),
1478 PickleValue::String(entry.peer_ip.to_string()),
1479 ),
1480 (
1481 PickleValue::String("connected_count".into()),
1482 PickleValue::Int(entry.connected_count as i64),
1483 ),
1484 (
1485 PickleValue::String("blacklisted_remaining_secs".into()),
1486 entry
1487 .blacklisted_remaining_secs
1488 .map(PickleValue::Float)
1489 .unwrap_or(PickleValue::None),
1490 ),
1491 (
1492 PickleValue::String("blacklist_reason".into()),
1493 entry
1494 .blacklist_reason
1495 .as_ref()
1496 .map(|v: &String| PickleValue::String(v.clone()))
1497 .unwrap_or(PickleValue::None),
1498 ),
1499 (
1500 PickleValue::String("reject_count".into()),
1501 PickleValue::Int(entry.reject_count as i64),
1502 ),
1503 ])
1504 })
1505 .collect(),
1506 )
1507}
1508
1509fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1510 PickleValue::List(
1511 entries
1512 .iter()
1513 .map(|entry| {
1514 PickleValue::Dict(vec![
1515 (
1516 PickleValue::String("id".into()),
1517 PickleValue::Int(entry.interface_id.0 as i64),
1518 ),
1519 (
1520 PickleValue::String("name".into()),
1521 PickleValue::String(entry.interface_name.clone()),
1522 ),
1523 ])
1524 })
1525 .collect(),
1526 )
1527}
1528
1529fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1530 PickleValue::Dict(vec![
1531 (
1532 PickleValue::String("connected".into()),
1533 PickleValue::Bool(stats.connected),
1534 ),
1535 (
1536 PickleValue::String("consumer_count".into()),
1537 PickleValue::Int(stats.consumer_count as i64),
1538 ),
1539 (
1540 PickleValue::String("queue_max_events".into()),
1541 PickleValue::Int(stats.queue_max_events as i64),
1542 ),
1543 (
1544 PickleValue::String("queue_max_bytes".into()),
1545 PickleValue::Int(stats.queue_max_bytes as i64),
1546 ),
1547 (
1548 PickleValue::String("backlog_len".into()),
1549 PickleValue::Int(stats.backlog_len as i64),
1550 ),
1551 (
1552 PickleValue::String("backlog_bytes".into()),
1553 PickleValue::Int(stats.backlog_bytes as i64),
1554 ),
1555 (
1556 PickleValue::String("backlog_dropped_pending".into()),
1557 PickleValue::Int(stats.backlog_dropped_pending as i64),
1558 ),
1559 (
1560 PickleValue::String("backlog_dropped_total".into()),
1561 PickleValue::Int(stats.backlog_dropped_total as i64),
1562 ),
1563 (
1564 PickleValue::String("total_disconnect_count".into()),
1565 PickleValue::Int(stats.total_disconnect_count as i64),
1566 ),
1567 (
1568 PickleValue::String("consumers".into()),
1569 PickleValue::List(
1570 stats
1571 .consumers
1572 .iter()
1573 .map(|consumer| {
1574 PickleValue::Dict(vec![
1575 (
1576 PickleValue::String("id".into()),
1577 PickleValue::Int(consumer.id as i64),
1578 ),
1579 (
1580 PickleValue::String("connected".into()),
1581 PickleValue::Bool(consumer.connected),
1582 ),
1583 (
1584 PickleValue::String("queue_len".into()),
1585 PickleValue::Int(consumer.queue_len as i64),
1586 ),
1587 (
1588 PickleValue::String("queued_bytes".into()),
1589 PickleValue::Int(consumer.queued_bytes as i64),
1590 ),
1591 (
1592 PickleValue::String("dropped_pending".into()),
1593 PickleValue::Int(consumer.dropped_pending as i64),
1594 ),
1595 (
1596 PickleValue::String("dropped_total".into()),
1597 PickleValue::Int(consumer.dropped_total as i64),
1598 ),
1599 (
1600 PickleValue::String("queue_max_events".into()),
1601 PickleValue::Int(consumer.queue_max_events as i64),
1602 ),
1603 (
1604 PickleValue::String("queue_max_bytes".into()),
1605 PickleValue::Int(consumer.queue_max_bytes as i64),
1606 ),
1607 ])
1608 })
1609 .collect(),
1610 ),
1611 ),
1612 ])
1613}
1614
1615fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1616 match state {
1617 LifecycleState::Active => "active",
1618 LifecycleState::Draining => "draining",
1619 LifecycleState::Stopping => "stopping",
1620 LifecycleState::Stopped => "stopped",
1621 }
1622}
1623
1624fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1625 PickleValue::Dict(vec![
1626 (
1627 PickleValue::String("state".into()),
1628 PickleValue::String(lifecycle_state_name(status.state).into()),
1629 ),
1630 (
1631 PickleValue::String("drain_age_seconds".into()),
1632 status
1633 .drain_age_seconds
1634 .map(PickleValue::Float)
1635 .unwrap_or(PickleValue::None),
1636 ),
1637 (
1638 PickleValue::String("deadline_remaining_seconds".into()),
1639 status
1640 .deadline_remaining_seconds
1641 .map(PickleValue::Float)
1642 .unwrap_or(PickleValue::None),
1643 ),
1644 (
1645 PickleValue::String("drain_complete".into()),
1646 PickleValue::Bool(status.drain_complete),
1647 ),
1648 (
1649 PickleValue::String("interface_writer_queued_frames".into()),
1650 PickleValue::Int(status.interface_writer_queued_frames as i64),
1651 ),
1652 (
1653 PickleValue::String("provider_backlog_events".into()),
1654 PickleValue::Int(status.provider_backlog_events as i64),
1655 ),
1656 (
1657 PickleValue::String("provider_consumer_queued_events".into()),
1658 PickleValue::Int(status.provider_consumer_queued_events as i64),
1659 ),
1660 (
1661 PickleValue::String("detail".into()),
1662 status
1663 .detail
1664 .as_ref()
1665 .map(|detail| PickleValue::String(detail.clone()))
1666 .unwrap_or(PickleValue::None),
1667 ),
1668 ])
1669}
1670
1671fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1672 match value {
1673 RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1674 RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1675 RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1676 RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1677 RuntimeConfigValue::Null => PickleValue::None,
1678 }
1679}
1680
1681fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1682 match value {
1683 PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1684 PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1685 PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1686 PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1687 PickleValue::None => Some(RuntimeConfigValue::Null),
1688 _ => None,
1689 }
1690}
1691
1692fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1693 PickleValue::Dict(vec![
1694 (
1695 PickleValue::String("key".into()),
1696 PickleValue::String(entry.key.clone()),
1697 ),
1698 (
1699 PickleValue::String("value".into()),
1700 runtime_config_value_to_pickle(&entry.value),
1701 ),
1702 (
1703 PickleValue::String("default".into()),
1704 runtime_config_value_to_pickle(&entry.default),
1705 ),
1706 (
1707 PickleValue::String("source".into()),
1708 PickleValue::String(match entry.source {
1709 RuntimeConfigSource::Startup => "startup".into(),
1710 RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1711 }),
1712 ),
1713 (
1714 PickleValue::String("apply_mode".into()),
1715 PickleValue::String(match entry.apply_mode {
1716 RuntimeConfigApplyMode::Immediate => "immediate".into(),
1717 RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1718 RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1719 RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1720 }),
1721 ),
1722 (
1723 PickleValue::String("description".into()),
1724 entry
1725 .description
1726 .as_ref()
1727 .map(|v| PickleValue::String(v.clone()))
1728 .unwrap_or(PickleValue::None),
1729 ),
1730 ])
1731}
1732
1733fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1734 PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1735}
1736
1737fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1738 PickleValue::Dict(vec![
1739 (
1740 PickleValue::String("error".into()),
1741 PickleValue::String(match error.code {
1742 RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1743 RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1744 RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1745 RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1746 RuntimeConfigErrorCode::NotFound => "not_found".into(),
1747 RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1748 }),
1749 ),
1750 (
1751 PickleValue::String("message".into()),
1752 PickleValue::String(error.message.clone()),
1753 ),
1754 ])
1755}
1756
1757fn runtime_config_result_to_pickle(
1758 result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1759) -> PickleValue {
1760 match result {
1761 Ok(entry) => runtime_config_entry_to_pickle(&entry),
1762 Err(error) => runtime_config_error_to_pickle(&error),
1763 }
1764}
1765
1766fn known_destination_entry_to_pickle(entry: &KnownDestinationEntry) -> PickleValue {
1767 PickleValue::Dict(vec![
1768 (
1769 PickleValue::String("dest_hash".into()),
1770 PickleValue::Bytes(entry.dest_hash.to_vec()),
1771 ),
1772 (
1773 PickleValue::String("identity_hash".into()),
1774 PickleValue::Bytes(entry.identity_hash.to_vec()),
1775 ),
1776 (
1777 PickleValue::String("public_key".into()),
1778 PickleValue::Bytes(entry.public_key.to_vec()),
1779 ),
1780 (
1781 PickleValue::String("app_data".into()),
1782 entry
1783 .app_data
1784 .as_ref()
1785 .map(|data: &Vec<u8>| PickleValue::Bytes(data.clone()))
1786 .unwrap_or(PickleValue::None),
1787 ),
1788 (
1789 PickleValue::String("hops".into()),
1790 PickleValue::Int(entry.hops as i64),
1791 ),
1792 (
1793 PickleValue::String("received_at".into()),
1794 PickleValue::Float(entry.received_at),
1795 ),
1796 (
1797 PickleValue::String("receiving_interface".into()),
1798 PickleValue::Int(entry.receiving_interface.0 as i64),
1799 ),
1800 (
1801 PickleValue::String("was_used".into()),
1802 PickleValue::Bool(entry.was_used),
1803 ),
1804 (
1805 PickleValue::String("last_used_at".into()),
1806 entry
1807 .last_used_at
1808 .map(PickleValue::Float)
1809 .unwrap_or(PickleValue::None),
1810 ),
1811 (
1812 PickleValue::String("retained".into()),
1813 PickleValue::Bool(entry.retained),
1814 ),
1815 ])
1816}
1817
1818fn known_destinations_to_pickle(entries: &[KnownDestinationEntry]) -> PickleValue {
1819 PickleValue::List(
1820 entries
1821 .iter()
1822 .map(known_destination_entry_to_pickle)
1823 .collect(),
1824 )
1825}
1826
1827fn parse_known_destination_entry(value: &PickleValue) -> io::Result<KnownDestinationEntry> {
1828 let get_bytes = |key: &str, len: usize| -> io::Result<Vec<u8>> {
1829 let value = value.get(key).ok_or_else(|| {
1830 io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key))
1831 })?;
1832 let bytes = value.as_bytes().ok_or_else(|| {
1833 io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key))
1834 })?;
1835 if bytes.len() != len {
1836 return Err(io::Error::new(
1837 io::ErrorKind::InvalidData,
1838 format!("invalid {} length", key),
1839 ));
1840 }
1841 Ok(bytes.to_vec())
1842 };
1843 let get_int = |key: &str| -> io::Result<i64> {
1844 value
1845 .get(key)
1846 .and_then(|v| v.as_int())
1847 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1848 };
1849 let get_float = |key: &str| -> io::Result<f64> {
1850 value
1851 .get(key)
1852 .and_then(|v| v.as_float())
1853 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1854 };
1855 let get_bool = |key: &str| -> io::Result<bool> {
1856 value
1857 .get(key)
1858 .and_then(|v| v.as_bool())
1859 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1860 };
1861
1862 let mut dest_hash = [0u8; 16];
1863 dest_hash.copy_from_slice(&get_bytes("dest_hash", 16)?);
1864 let mut identity_hash = [0u8; 16];
1865 identity_hash.copy_from_slice(&get_bytes("identity_hash", 16)?);
1866 let mut public_key = [0u8; 64];
1867 public_key.copy_from_slice(&get_bytes("public_key", 64)?);
1868 let app_data = value
1869 .get("app_data")
1870 .and_then(|v| v.as_bytes())
1871 .map(|bytes| bytes.to_vec());
1872 let last_used_at = value.get("last_used_at").and_then(|v| v.as_float());
1873
1874 Ok(KnownDestinationEntry {
1875 dest_hash,
1876 identity_hash,
1877 public_key,
1878 app_data,
1879 hops: get_int("hops")? as u8,
1880 received_at: get_float("received_at")?,
1881 receiving_interface: rns_core::transport::types::InterfaceId(
1882 get_int("receiving_interface")? as u64,
1883 ),
1884 was_used: get_bool("was_used")?,
1885 last_used_at,
1886 retained: get_bool("retained")?,
1887 })
1888}
1889
1890fn parse_known_destination_list(value: &PickleValue) -> io::Result<Vec<KnownDestinationEntry>> {
1891 let list = value
1892 .as_list()
1893 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "expected list"))?;
1894 list.iter().map(parse_known_destination_entry).collect()
1895}
1896
1897pub struct RpcClient {
1901 stream: TcpStream,
1902}
1903
1904impl RpcClient {
1905 pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
1907 let mut stream = match addr {
1908 RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
1909 };
1910
1911 stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
1912 stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
1913
1914 client_auth(&mut stream, auth_key)?;
1916
1917 Ok(RpcClient { stream })
1918 }
1919
1920 pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
1922 let request_bytes = pickle::encode(request);
1923 send_bytes(&mut self.stream, &request_bytes)?;
1924
1925 let response_bytes = recv_bytes(&mut self.stream)?;
1926 pickle::decode(&response_bytes)
1927 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
1928 }
1929
1930 pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
1931 let response = self.call(&PickleValue::Dict(vec![(
1932 PickleValue::String("get".into()),
1933 PickleValue::String("hooks".into()),
1934 )]))?;
1935 parse_hook_list(&response)
1936 }
1937
1938 pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
1939 let response = self.call(&PickleValue::Dict(vec![(
1940 PickleValue::String("begin_drain".into()),
1941 PickleValue::Float(timeout.as_secs_f64()),
1942 )]))?;
1943 Ok(response.as_bool().unwrap_or(false))
1944 }
1945
1946 pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
1947 let response = self.call(&PickleValue::Dict(vec![(
1948 PickleValue::String("get".into()),
1949 PickleValue::String("drain_status".into()),
1950 )]))?;
1951 parse_drain_status(&response)
1952 }
1953
1954 pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
1955 self.call(&PickleValue::Dict(vec![(
1956 PickleValue::String("get".into()),
1957 PickleValue::String("provider_bridge_stats".into()),
1958 )]))
1959 }
1960
1961 pub fn load_hook(
1962 &mut self,
1963 name: &str,
1964 attach_point: &str,
1965 priority: i32,
1966 wasm: &[u8],
1967 ) -> io::Result<Result<(), String>> {
1968 let response = self.call(&PickleValue::Dict(vec![
1969 (
1970 PickleValue::String("hook".into()),
1971 PickleValue::String("load".into()),
1972 ),
1973 (
1974 PickleValue::String("name".into()),
1975 PickleValue::String(name.to_string()),
1976 ),
1977 (
1978 PickleValue::String("attach_point".into()),
1979 PickleValue::String(attach_point.to_string()),
1980 ),
1981 (
1982 PickleValue::String("priority".into()),
1983 PickleValue::Int(priority as i64),
1984 ),
1985 (
1986 PickleValue::String("wasm".into()),
1987 PickleValue::Bytes(wasm.to_vec()),
1988 ),
1989 ]))?;
1990 parse_hook_result(&response)
1991 }
1992
1993 pub fn load_builtin_hook(
1994 &mut self,
1995 name: &str,
1996 attach_point: &str,
1997 priority: i32,
1998 builtin_id: &str,
1999 ) -> io::Result<Result<(), String>> {
2000 let response = self.call(&PickleValue::Dict(vec![
2001 (
2002 PickleValue::String("hook".into()),
2003 PickleValue::String("load_builtin".into()),
2004 ),
2005 (
2006 PickleValue::String("name".into()),
2007 PickleValue::String(name.to_string()),
2008 ),
2009 (
2010 PickleValue::String("attach_point".into()),
2011 PickleValue::String(attach_point.to_string()),
2012 ),
2013 (
2014 PickleValue::String("priority".into()),
2015 PickleValue::Int(priority as i64),
2016 ),
2017 (
2018 PickleValue::String("builtin_id".into()),
2019 PickleValue::String(builtin_id.to_string()),
2020 ),
2021 ]))?;
2022 parse_hook_result(&response)
2023 }
2024
2025 pub fn unload_hook(
2026 &mut self,
2027 name: &str,
2028 attach_point: &str,
2029 ) -> io::Result<Result<(), String>> {
2030 let response = self.call(&PickleValue::Dict(vec![
2031 (
2032 PickleValue::String("hook".into()),
2033 PickleValue::String("unload".into()),
2034 ),
2035 (
2036 PickleValue::String("name".into()),
2037 PickleValue::String(name.to_string()),
2038 ),
2039 (
2040 PickleValue::String("attach_point".into()),
2041 PickleValue::String(attach_point.to_string()),
2042 ),
2043 ]))?;
2044 parse_hook_result(&response)
2045 }
2046
2047 pub fn set_hook_enabled(
2048 &mut self,
2049 name: &str,
2050 attach_point: &str,
2051 enabled: bool,
2052 ) -> io::Result<Result<(), String>> {
2053 let op = if enabled { "enable" } else { "disable" };
2054 let response = self.call(&PickleValue::Dict(vec![
2055 (
2056 PickleValue::String("hook".into()),
2057 PickleValue::String(op.into()),
2058 ),
2059 (
2060 PickleValue::String("name".into()),
2061 PickleValue::String(name.to_string()),
2062 ),
2063 (
2064 PickleValue::String("attach_point".into()),
2065 PickleValue::String(attach_point.to_string()),
2066 ),
2067 ]))?;
2068 parse_hook_result(&response)
2069 }
2070
2071 pub fn set_hook_priority(
2072 &mut self,
2073 name: &str,
2074 attach_point: &str,
2075 priority: i32,
2076 ) -> io::Result<Result<(), String>> {
2077 let response = self.call(&PickleValue::Dict(vec![
2078 (
2079 PickleValue::String("hook".into()),
2080 PickleValue::String("set_priority".into()),
2081 ),
2082 (
2083 PickleValue::String("name".into()),
2084 PickleValue::String(name.to_string()),
2085 ),
2086 (
2087 PickleValue::String("attach_point".into()),
2088 PickleValue::String(attach_point.to_string()),
2089 ),
2090 (
2091 PickleValue::String("priority".into()),
2092 PickleValue::Int(priority as i64),
2093 ),
2094 ]))?;
2095 parse_hook_result(&response)
2096 }
2097
2098 pub fn blacklist_backbone_peer(
2099 &mut self,
2100 interface: &str,
2101 ip: &str,
2102 duration_secs: u64,
2103 reason: Option<&str>,
2104 penalty_level: Option<u8>,
2105 ) -> io::Result<bool> {
2106 let mut request = vec![
2107 (
2108 PickleValue::String("set".into()),
2109 PickleValue::String("backbone_peer_blacklist".into()),
2110 ),
2111 (
2112 PickleValue::String("interface".into()),
2113 PickleValue::String(interface.to_string()),
2114 ),
2115 (
2116 PickleValue::String("ip".into()),
2117 PickleValue::String(ip.to_string()),
2118 ),
2119 (
2120 PickleValue::String("duration_secs".into()),
2121 PickleValue::Int(duration_secs as i64),
2122 ),
2123 ];
2124 if let Some(reason) = reason {
2125 request.push((
2126 PickleValue::String("reason".into()),
2127 PickleValue::String(reason.to_string()),
2128 ));
2129 }
2130 if let Some(level) = penalty_level {
2131 request.push((
2132 PickleValue::String("penalty_level".into()),
2133 PickleValue::Int(level as i64),
2134 ));
2135 }
2136 let response = self.call(&PickleValue::Dict(request))?;
2137 Ok(response.as_bool().unwrap_or(false))
2138 }
2139
2140 pub fn known_destinations(&mut self) -> io::Result<Vec<KnownDestinationEntry>> {
2141 let response = self.call(&PickleValue::Dict(vec![(
2142 PickleValue::String("get".into()),
2143 PickleValue::String("known_destinations".into()),
2144 )]))?;
2145 parse_known_destination_list(&response)
2146 }
2147
2148 pub fn retain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2149 let response = self.call(&PickleValue::Dict(vec![
2150 (
2151 PickleValue::String("set".into()),
2152 PickleValue::String("known_destination_retained".into()),
2153 ),
2154 (
2155 PickleValue::String("dest_hash".into()),
2156 PickleValue::Bytes(dest_hash.to_vec()),
2157 ),
2158 ]))?;
2159 Ok(response.as_bool().unwrap_or(false))
2160 }
2161
2162 pub fn unretain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2163 let response = self.call(&PickleValue::Dict(vec![
2164 (
2165 PickleValue::String("clear".into()),
2166 PickleValue::String("known_destination_retained".into()),
2167 ),
2168 (
2169 PickleValue::String("dest_hash".into()),
2170 PickleValue::Bytes(dest_hash.to_vec()),
2171 ),
2172 ]))?;
2173 Ok(response.as_bool().unwrap_or(false))
2174 }
2175
2176 pub fn mark_known_destination_used(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2177 let response = self.call(&PickleValue::Dict(vec![
2178 (
2179 PickleValue::String("set".into()),
2180 PickleValue::String("known_destination_used".into()),
2181 ),
2182 (
2183 PickleValue::String("dest_hash".into()),
2184 PickleValue::Bytes(dest_hash.to_vec()),
2185 ),
2186 ]))?;
2187 Ok(response.as_bool().unwrap_or(false))
2188 }
2189}
2190
2191fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
2192 match value {
2193 "active" => Some(LifecycleState::Active),
2194 "draining" => Some(LifecycleState::Draining),
2195 "stopping" => Some(LifecycleState::Stopping),
2196 "stopped" => Some(LifecycleState::Stopped),
2197 _ => None,
2198 }
2199}
2200
2201fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
2202 if !matches!(value, PickleValue::Dict(_)) {
2203 return Ok(None);
2204 }
2205 let state = value
2206 .get("state")
2207 .and_then(|entry| entry.as_str())
2208 .and_then(parse_lifecycle_state)
2209 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
2210 let drain_age_seconds = value.get("drain_age_seconds").and_then(|entry| {
2211 entry
2212 .as_float()
2213 .or_else(|| entry.as_int().map(|v| v as f64))
2214 });
2215 let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
2216 entry
2217 .as_float()
2218 .or_else(|| entry.as_int().map(|v| v as f64))
2219 });
2220 let drain_complete = value
2221 .get("drain_complete")
2222 .and_then(|entry| entry.as_bool())
2223 .unwrap_or(false);
2224 let interface_writer_queued_frames = value
2225 .get("interface_writer_queued_frames")
2226 .and_then(|entry| entry.as_int())
2227 .unwrap_or(0)
2228 .max(0) as usize;
2229 let provider_backlog_events = value
2230 .get("provider_backlog_events")
2231 .and_then(|entry| entry.as_int())
2232 .unwrap_or(0)
2233 .max(0) as usize;
2234 let provider_consumer_queued_events = value
2235 .get("provider_consumer_queued_events")
2236 .and_then(|entry| entry.as_int())
2237 .unwrap_or(0)
2238 .max(0) as usize;
2239 let detail = value
2240 .get("detail")
2241 .and_then(|entry| entry.as_str().map(|v| v.to_string()));
2242 Ok(Some(DrainStatus {
2243 state,
2244 drain_age_seconds,
2245 deadline_remaining_seconds,
2246 drain_complete,
2247 interface_writer_queued_frames,
2248 provider_backlog_events,
2249 provider_consumer_queued_events,
2250 detail,
2251 }))
2252}
2253
2254fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
2255 let ok = response
2256 .get("ok")
2257 .and_then(|v| v.as_bool())
2258 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
2259 if ok {
2260 Ok(Ok(()))
2261 } else {
2262 Ok(Err(response
2263 .get("error")
2264 .and_then(|v| v.as_str())
2265 .unwrap_or("unknown hook error")
2266 .to_string()))
2267 }
2268}
2269
2270fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
2271 let list = response
2272 .as_list()
2273 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
2274 let mut hooks = Vec::with_capacity(list.len());
2275 for item in list {
2276 hooks.push(HookInfo {
2277 name: item
2278 .get("name")
2279 .and_then(|v| v.as_str())
2280 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
2281 .to_string(),
2282 hook_type: item
2283 .get("type")
2284 .and_then(|v| v.as_str())
2285 .unwrap_or(default_hook_type())
2286 .to_string(),
2287 attach_point: item
2288 .get("attach_point")
2289 .and_then(|v| v.as_str())
2290 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
2291 .to_string(),
2292 priority: item
2293 .get("priority")
2294 .and_then(|v| v.as_int())
2295 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
2296 as i32,
2297 enabled: item
2298 .get("enabled")
2299 .and_then(|v| v.as_bool())
2300 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
2301 consecutive_traps: item
2302 .get("consecutive_traps")
2303 .and_then(|v| v.as_int())
2304 .ok_or_else(|| {
2305 io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
2306 })? as u32,
2307 });
2308 }
2309 Ok(hooks)
2310}
2311
2312fn default_hook_type() -> &'static str {
2313 #[cfg(feature = "rns-hooks-native")]
2314 {
2315 return "native";
2316 }
2317 #[cfg(all(not(feature = "rns-hooks-native"), feature = "rns-hooks-wasm"))]
2318 {
2319 return "wasm";
2320 }
2321 #[cfg(all(not(feature = "rns-hooks-native"), not(feature = "rns-hooks-wasm")))]
2322 {
2323 "wasm"
2324 }
2325}
2326
2327fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
2329 let challenge = recv_bytes(stream)?;
2331
2332 if !challenge.starts_with(CHALLENGE_PREFIX) {
2333 return Err(io::Error::new(
2334 io::ErrorKind::InvalidData,
2335 "expected challenge",
2336 ));
2337 }
2338
2339 let message = &challenge[CHALLENGE_PREFIX.len()..];
2340
2341 let response = create_response(auth_key, message);
2343 send_bytes(stream, &response)?;
2344
2345 let result = recv_bytes(stream)?;
2347 if result == WELCOME {
2348 Ok(())
2349 } else {
2350 Err(io::Error::new(
2351 io::ErrorKind::PermissionDenied,
2352 "authentication failed",
2353 ))
2354 }
2355}
2356
2357fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
2359 if message.starts_with(b"{sha256}") || message.len() > 20 {
2361 let digest = hmac_sha256(auth_key, message);
2363 let mut response = Vec::with_capacity(8 + 32);
2364 response.extend_from_slice(b"{sha256}");
2365 response.extend_from_slice(&digest);
2366 response
2367 } else {
2368 let digest = hmac_md5(auth_key, message);
2370 digest.to_vec()
2371 }
2372}
2373
2374pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
2376 sha256(private_key)
2377}
2378
2379#[cfg(test)]
2380mod tests {
2381 use super::*;
2382
2383 #[test]
2384 fn send_recv_bytes_roundtrip() {
2385 let (mut c1, mut c2) = tcp_pair();
2386 let data = b"hello world";
2387 send_bytes(&mut c1, data).unwrap();
2388 let received = recv_bytes(&mut c2).unwrap();
2389 assert_eq!(&received, data);
2390 }
2391
2392 #[test]
2393 fn send_recv_empty() {
2394 let (mut c1, mut c2) = tcp_pair();
2395 send_bytes(&mut c1, b"").unwrap();
2396 let received = recv_bytes(&mut c2).unwrap();
2397 assert!(received.is_empty());
2398 }
2399
2400 #[test]
2401 fn auth_success() {
2402 let key = derive_auth_key(b"test-private-key");
2403 let (mut server, mut client) = tcp_pair();
2404
2405 let key2 = key;
2406 let t = thread::spawn(move || {
2407 client_auth(&mut client, &key2).unwrap();
2408 });
2409
2410 server_auth(&mut server, &key).unwrap();
2411 t.join().unwrap();
2412 }
2413
2414 #[test]
2415 fn auth_failure_wrong_key() {
2416 let server_key = derive_auth_key(b"server-key");
2417 let client_key = derive_auth_key(b"wrong-key");
2418 let (mut server, mut client) = tcp_pair();
2419
2420 let t = thread::spawn(move || {
2421 let result = client_auth(&mut client, &client_key);
2422 assert!(result.is_err());
2423 });
2424
2425 let result = server_auth(&mut server, &server_key);
2426 assert!(result.is_err());
2427 t.join().unwrap();
2428 }
2429
2430 #[test]
2431 fn verify_sha256_response() {
2432 let key = derive_auth_key(b"mykey");
2433 let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2434 let response = create_response(&key, message);
2435 assert!(response.starts_with(b"{sha256}"));
2436 assert!(verify_response(&key, message, &response));
2437 }
2438
2439 #[test]
2440 fn verify_legacy_md5_response() {
2441 let key = derive_auth_key(b"mykey");
2442 let message = b"01234567890123456789";
2444 let digest = hmac_md5(&key, message);
2446 assert!(verify_response(&key, message, &digest));
2447 }
2448
2449 #[test]
2450 fn constant_time_eq_works() {
2451 assert!(constant_time_eq(b"hello", b"hello"));
2452 assert!(!constant_time_eq(b"hello", b"world"));
2453 assert!(!constant_time_eq(b"hello", b"hell"));
2454 }
2455
2456 #[test]
2457 fn rpc_roundtrip() {
2458 let key = derive_auth_key(b"test-key");
2459 let (event_tx, event_rx) = crate::event::channel();
2460
2461 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2464 let port = listener.local_addr().unwrap().port();
2465 listener.set_nonblocking(true).unwrap();
2466
2467 let shutdown = Arc::new(AtomicBool::new(false));
2468 let shutdown2 = shutdown.clone();
2469
2470 let driver_thread = thread::spawn(move || loop {
2472 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2473 Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2474 let _ = resp_tx.send(QueryResponse::LinkCount(42));
2475 }
2476 Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2477 let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2478 interfaces: vec![SingleInterfaceStat {
2479 id: 7,
2480 name: "TestInterface".into(),
2481 status: true,
2482 mode: 1,
2483 rxb: 1000,
2484 txb: 2000,
2485 rx_packets: 10,
2486 tx_packets: 20,
2487 bitrate: Some(10_000_000),
2488 ifac_size: None,
2489 started: 1000.0,
2490 ia_freq: 0.0,
2491 oa_freq: 0.0,
2492 interface_type: "TestInterface".into(),
2493 }],
2494 transport_id: None,
2495 transport_enabled: true,
2496 transport_uptime: 3600.0,
2497 total_rxb: 1000,
2498 total_txb: 2000,
2499 probe_responder: None,
2500 backbone_peer_pool: None,
2501 }));
2502 }
2503 _ => break,
2504 }
2505 });
2506
2507 let key2 = key;
2508 let shutdown3 = shutdown2.clone();
2509 let server_thread = thread::spawn(move || {
2510 rpc_server_loop(listener, key2, event_tx, shutdown3);
2511 });
2512
2513 thread::sleep(std::time::Duration::from_millis(50));
2515
2516 let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2518 let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2519 let response = client
2520 .call(&PickleValue::Dict(vec![(
2521 PickleValue::String("get".into()),
2522 PickleValue::String("link_count".into()),
2523 )]))
2524 .unwrap();
2525 assert_eq!(response.as_int().unwrap(), 42);
2526 drop(client);
2527
2528 let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2530 let response2 = client2
2531 .call(&PickleValue::Dict(vec![(
2532 PickleValue::String("get".into()),
2533 PickleValue::String("interface_stats".into()),
2534 )]))
2535 .unwrap();
2536 let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2537 assert_eq!(ifaces.len(), 1);
2538 let iface = &ifaces[0];
2539 assert_eq!(
2540 iface.get("name").unwrap().as_str().unwrap(),
2541 "TestInterface"
2542 );
2543 assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2544 drop(client2);
2545
2546 shutdown2.store(true, Ordering::Relaxed);
2548 server_thread.join().unwrap();
2549 driver_thread.join().unwrap();
2550 }
2551
2552 #[test]
2553 fn derive_auth_key_deterministic() {
2554 let key1 = derive_auth_key(b"test");
2555 let key2 = derive_auth_key(b"test");
2556 assert_eq!(key1, key2);
2557 let key3 = derive_auth_key(b"other");
2559 assert_ne!(key1, key3);
2560 }
2561
2562 #[test]
2563 fn pickle_request_handling() {
2564 let (event_tx, event_rx) = crate::event::channel();
2566
2567 let driver = thread::spawn(move || {
2568 if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2569 {
2570 assert_eq!(dest_hash, [1u8; 16]);
2571 let _ = resp_tx.send(QueryResponse::DropPath(true));
2572 }
2573 });
2574
2575 let request = PickleValue::Dict(vec![
2576 (
2577 PickleValue::String("drop".into()),
2578 PickleValue::String("path".into()),
2579 ),
2580 (
2581 PickleValue::String("destination_hash".into()),
2582 PickleValue::Bytes(vec![1u8; 16]),
2583 ),
2584 ]);
2585
2586 let response = handle_rpc_request(&request, &event_tx).unwrap();
2587 assert_eq!(response, PickleValue::Bool(true));
2588 driver.join().unwrap();
2589 }
2590
2591 #[test]
2592 fn hook_list_request_handling() {
2593 let (event_tx, event_rx) = crate::event::channel();
2594
2595 let driver = thread::spawn(move || {
2596 if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2597 let _ = response_tx.send(vec![HookInfo {
2598 name: "stats".into(),
2599 hook_type: "wasm".into(),
2600 attach_point: "PreIngress".into(),
2601 priority: 7,
2602 enabled: true,
2603 consecutive_traps: 0,
2604 }]);
2605 }
2606 });
2607
2608 let request = PickleValue::Dict(vec![(
2609 PickleValue::String("get".into()),
2610 PickleValue::String("hooks".into()),
2611 )]);
2612 let response = handle_rpc_request(&request, &event_tx).unwrap();
2613 let hooks = parse_hook_list(&response).unwrap();
2614 assert_eq!(hooks.len(), 1);
2615 assert_eq!(hooks[0].name, "stats");
2616 driver.join().unwrap();
2617 }
2618
2619 #[test]
2620 fn hook_load_request_handling() {
2621 let (event_tx, event_rx) = crate::event::channel();
2622
2623 let driver = thread::spawn(move || {
2624 if let Ok(Event::LoadHook {
2625 name,
2626 wasm_bytes,
2627 attach_point,
2628 priority,
2629 response_tx,
2630 }) = event_rx.recv()
2631 {
2632 assert_eq!(name, "stats");
2633 assert_eq!(attach_point, "PreIngress");
2634 assert_eq!(priority, 11);
2635 assert_eq!(wasm_bytes, vec![1, 2, 3]);
2636 let _ = response_tx.send(Ok(()));
2637 }
2638 });
2639
2640 let request = PickleValue::Dict(vec![
2641 (
2642 PickleValue::String("hook".into()),
2643 PickleValue::String("load".into()),
2644 ),
2645 (
2646 PickleValue::String("name".into()),
2647 PickleValue::String("stats".into()),
2648 ),
2649 (
2650 PickleValue::String("attach_point".into()),
2651 PickleValue::String("PreIngress".into()),
2652 ),
2653 (PickleValue::String("priority".into()), PickleValue::Int(11)),
2654 (
2655 PickleValue::String("wasm".into()),
2656 PickleValue::Bytes(vec![1, 2, 3]),
2657 ),
2658 ]);
2659 let response = handle_rpc_request(&request, &event_tx).unwrap();
2660 assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2661 driver.join().unwrap();
2662 }
2663
2664 #[test]
2665 fn interface_stats_pickle_format() {
2666 let stats = InterfaceStatsResponse {
2667 interfaces: vec![SingleInterfaceStat {
2668 id: 1,
2669 name: "TCP".into(),
2670 status: true,
2671 mode: 1,
2672 rxb: 100,
2673 txb: 200,
2674 rx_packets: 5,
2675 tx_packets: 10,
2676 bitrate: Some(1000000),
2677 ifac_size: Some(16),
2678 started: 1000.0,
2679 ia_freq: 0.0,
2680 oa_freq: 0.0,
2681 interface_type: "TCPClientInterface".into(),
2682 }],
2683 transport_id: Some([0xAB; 16]),
2684 transport_enabled: true,
2685 transport_uptime: 3600.0,
2686 total_rxb: 100,
2687 total_txb: 200,
2688 probe_responder: None,
2689 backbone_peer_pool: None,
2690 };
2691
2692 let pickle = interface_stats_to_pickle(&stats);
2693
2694 let encoded = pickle::encode(&pickle);
2696 let decoded = pickle::decode(&encoded).unwrap();
2697 assert_eq!(
2698 decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2699 true
2700 );
2701 let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2702 assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2703 assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2704 }
2705
2706 #[test]
2707 fn send_probe_rpc_unknown_dest() {
2708 let (event_tx, event_rx) = crate::event::channel();
2709
2710 let driver = thread::spawn(move || {
2711 if let Ok(Event::Query(
2712 QueryRequest::SendProbe {
2713 dest_hash,
2714 payload_size,
2715 },
2716 resp_tx,
2717 )) = event_rx.recv()
2718 {
2719 assert_eq!(dest_hash, [0xAA; 16]);
2720 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2722 }
2723 });
2724
2725 let request = PickleValue::Dict(vec![(
2726 PickleValue::String("send_probe".into()),
2727 PickleValue::Bytes(vec![0xAA; 16]),
2728 )]);
2729
2730 let response = handle_rpc_request(&request, &event_tx).unwrap();
2731 assert_eq!(response, PickleValue::None);
2732 driver.join().unwrap();
2733 }
2734
2735 #[test]
2736 fn send_probe_rpc_with_result() {
2737 let (event_tx, event_rx) = crate::event::channel();
2738
2739 let packet_hash = [0xBB; 32];
2740 let driver = thread::spawn(move || {
2741 if let Ok(Event::Query(
2742 QueryRequest::SendProbe {
2743 dest_hash,
2744 payload_size,
2745 },
2746 resp_tx,
2747 )) = event_rx.recv()
2748 {
2749 assert_eq!(dest_hash, [0xCC; 16]);
2750 assert_eq!(payload_size, 32);
2751 let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
2752 }
2753 });
2754
2755 let request = PickleValue::Dict(vec![
2756 (
2757 PickleValue::String("send_probe".into()),
2758 PickleValue::Bytes(vec![0xCC; 16]),
2759 ),
2760 (PickleValue::String("size".into()), PickleValue::Int(32)),
2761 ]);
2762
2763 let response = handle_rpc_request(&request, &event_tx).unwrap();
2764 let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
2765 assert_eq!(ph, &[0xBB; 32]);
2766 assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
2767 driver.join().unwrap();
2768 }
2769
2770 #[test]
2771 fn send_probe_rpc_size_validation() {
2772 let (event_tx, event_rx) = crate::event::channel();
2773
2774 let driver = thread::spawn(move || {
2776 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2777 event_rx.recv()
2778 {
2779 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2781 }
2782 });
2783
2784 let request = PickleValue::Dict(vec![
2785 (
2786 PickleValue::String("send_probe".into()),
2787 PickleValue::Bytes(vec![0xDD; 16]),
2788 ),
2789 (PickleValue::String("size".into()), PickleValue::Int(-1)),
2790 ]);
2791
2792 let response = handle_rpc_request(&request, &event_tx).unwrap();
2793 assert_eq!(response, PickleValue::None);
2794 driver.join().unwrap();
2795 }
2796
2797 #[test]
2798 fn send_probe_rpc_size_too_large() {
2799 let (event_tx, event_rx) = crate::event::channel();
2800
2801 let driver = thread::spawn(move || {
2803 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2804 event_rx.recv()
2805 {
2806 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2808 }
2809 });
2810
2811 let request = PickleValue::Dict(vec![
2812 (
2813 PickleValue::String("send_probe".into()),
2814 PickleValue::Bytes(vec![0xDD; 16]),
2815 ),
2816 (PickleValue::String("size".into()), PickleValue::Int(999)),
2817 ]);
2818
2819 let response = handle_rpc_request(&request, &event_tx).unwrap();
2820 assert_eq!(response, PickleValue::None);
2821 driver.join().unwrap();
2822 }
2823
2824 #[test]
2825 fn check_proof_rpc_not_found() {
2826 let (event_tx, event_rx) = crate::event::channel();
2827
2828 let driver = thread::spawn(move || {
2829 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2830 event_rx.recv()
2831 {
2832 assert_eq!(packet_hash, [0xEE; 32]);
2833 let _ = resp_tx.send(QueryResponse::CheckProof(None));
2834 }
2835 });
2836
2837 let request = PickleValue::Dict(vec![(
2838 PickleValue::String("check_proof".into()),
2839 PickleValue::Bytes(vec![0xEE; 32]),
2840 )]);
2841
2842 let response = handle_rpc_request(&request, &event_tx).unwrap();
2843 assert_eq!(response, PickleValue::None);
2844 driver.join().unwrap();
2845 }
2846
2847 #[test]
2848 fn check_proof_rpc_found() {
2849 let (event_tx, event_rx) = crate::event::channel();
2850
2851 let driver = thread::spawn(move || {
2852 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2853 event_rx.recv()
2854 {
2855 assert_eq!(packet_hash, [0xFF; 32]);
2856 let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
2857 }
2858 });
2859
2860 let request = PickleValue::Dict(vec![(
2861 PickleValue::String("check_proof".into()),
2862 PickleValue::Bytes(vec![0xFF; 32]),
2863 )]);
2864
2865 let response = handle_rpc_request(&request, &event_tx).unwrap();
2866 if let PickleValue::Float(rtt) = response {
2867 assert!((rtt - 0.352).abs() < 0.001);
2868 } else {
2869 panic!("Expected Float, got {:?}", response);
2870 }
2871 driver.join().unwrap();
2872 }
2873
2874 #[test]
2875 fn request_path_rpc() {
2876 let (event_tx, event_rx) = crate::event::channel();
2877
2878 let driver =
2879 thread::spawn(
2880 move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2881 Ok(Event::RequestPath { dest_hash }) => {
2882 assert_eq!(dest_hash, [0x11; 16]);
2883 }
2884 other => panic!("Expected RequestPath event, got {:?}", other),
2885 },
2886 );
2887
2888 let request = PickleValue::Dict(vec![(
2889 PickleValue::String("request_path".into()),
2890 PickleValue::Bytes(vec![0x11; 16]),
2891 )]);
2892
2893 let response = handle_rpc_request(&request, &event_tx).unwrap();
2894 assert_eq!(response, PickleValue::Bool(true));
2895 driver.join().unwrap();
2896 }
2897
2898 #[test]
2899 fn begin_drain_rpc_emits_event() {
2900 let (event_tx, event_rx) = crate::event::channel();
2901
2902 let driver = thread::spawn(
2903 move || match event_rx.recv_timeout(Duration::from_secs(5)) {
2904 Ok(Event::BeginDrain { timeout }) => {
2905 assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
2906 }
2907 other => panic!("Expected BeginDrain event, got {:?}", other),
2908 },
2909 );
2910
2911 let request = PickleValue::Dict(vec![(
2912 PickleValue::String("begin_drain".into()),
2913 PickleValue::Float(1.5),
2914 )]);
2915
2916 let response = handle_rpc_request(&request, &event_tx).unwrap();
2917 assert_eq!(response, PickleValue::Bool(true));
2918 driver.join().unwrap();
2919 }
2920
2921 #[test]
2922 fn drain_status_rpc_roundtrips_fields() {
2923 let (event_tx, event_rx) = crate::event::channel();
2924
2925 let driver = thread::spawn(move || {
2926 if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
2927 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
2928 state: LifecycleState::Draining,
2929 drain_age_seconds: Some(0.75),
2930 deadline_remaining_seconds: Some(2.25),
2931 drain_complete: false,
2932 interface_writer_queued_frames: 3,
2933 provider_backlog_events: 4,
2934 provider_consumer_queued_events: 5,
2935 detail: Some("node is draining existing work".into()),
2936 }));
2937 }
2938 });
2939
2940 let request = PickleValue::Dict(vec![(
2941 PickleValue::String("get".into()),
2942 PickleValue::String("drain_status".into()),
2943 )]);
2944
2945 let response = handle_rpc_request(&request, &event_tx).unwrap();
2946 assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
2947 assert_eq!(
2948 response.get("drain_complete").unwrap().as_bool(),
2949 Some(false)
2950 );
2951 assert_eq!(
2952 response
2953 .get("deadline_remaining_seconds")
2954 .unwrap()
2955 .as_float(),
2956 Some(2.25)
2957 );
2958 assert_eq!(
2959 response
2960 .get("interface_writer_queued_frames")
2961 .unwrap()
2962 .as_int(),
2963 Some(3)
2964 );
2965 assert_eq!(
2966 response.get("provider_backlog_events").unwrap().as_int(),
2967 Some(4)
2968 );
2969 assert_eq!(
2970 response
2971 .get("provider_consumer_queued_events")
2972 .unwrap()
2973 .as_int(),
2974 Some(5)
2975 );
2976 assert_eq!(
2977 response.get("detail").unwrap().as_str(),
2978 Some("node is draining existing work")
2979 );
2980 driver.join().unwrap();
2981 }
2982
2983 #[test]
2984 fn interface_stats_with_probe_responder() {
2985 let probe_hash = [0x42; 16];
2986 let stats = InterfaceStatsResponse {
2987 interfaces: vec![],
2988 transport_id: None,
2989 transport_enabled: true,
2990 transport_uptime: 100.0,
2991 total_rxb: 0,
2992 total_txb: 0,
2993 probe_responder: Some(probe_hash),
2994 backbone_peer_pool: None,
2995 };
2996
2997 let pickle = interface_stats_to_pickle(&stats);
2998 let encoded = pickle::encode(&pickle);
2999 let decoded = pickle::decode(&encoded).unwrap();
3000
3001 let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
3002 assert_eq!(pr, &probe_hash);
3003 }
3004
3005 #[test]
3006 fn runtime_config_get_and_set_rpc() {
3007 let (event_tx, event_rx) = crate::event::channel();
3008
3009 let driver = thread::spawn(move || {
3010 if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
3011 event_rx.recv()
3012 {
3013 assert_eq!(key, "global.tick_interval_ms");
3014 let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
3015 RuntimeConfigEntry {
3016 key,
3017 value: RuntimeConfigValue::Int(1000),
3018 default: RuntimeConfigValue::Int(1000),
3019 source: RuntimeConfigSource::Startup,
3020 apply_mode: RuntimeConfigApplyMode::Immediate,
3021 description: Some("tick".into()),
3022 },
3023 )));
3024 } else {
3025 panic!("expected GetRuntimeConfig query");
3026 }
3027
3028 if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
3029 event_rx.recv()
3030 {
3031 assert_eq!(key, "global.tick_interval_ms");
3032 assert_eq!(value, RuntimeConfigValue::Int(250));
3033 let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
3034 key,
3035 value: RuntimeConfigValue::Int(250),
3036 default: RuntimeConfigValue::Int(1000),
3037 source: RuntimeConfigSource::RuntimeOverride,
3038 apply_mode: RuntimeConfigApplyMode::Immediate,
3039 description: Some("tick".into()),
3040 })));
3041 } else {
3042 panic!("expected SetRuntimeConfig query");
3043 }
3044 });
3045
3046 let get_request = PickleValue::Dict(vec![
3047 (
3048 PickleValue::String("get".into()),
3049 PickleValue::String("runtime_config_entry".into()),
3050 ),
3051 (
3052 PickleValue::String("key".into()),
3053 PickleValue::String("global.tick_interval_ms".into()),
3054 ),
3055 ]);
3056 let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
3057 assert_eq!(
3058 get_response.get("key").and_then(|v| v.as_str()),
3059 Some("global.tick_interval_ms")
3060 );
3061
3062 let set_request = PickleValue::Dict(vec![
3063 (
3064 PickleValue::String("set".into()),
3065 PickleValue::String("runtime_config".into()),
3066 ),
3067 (
3068 PickleValue::String("key".into()),
3069 PickleValue::String("global.tick_interval_ms".into()),
3070 ),
3071 (PickleValue::String("value".into()), PickleValue::Int(250)),
3072 ]);
3073 let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
3074 assert_eq!(
3075 set_response.get("value").and_then(|v| v.as_int()),
3076 Some(250)
3077 );
3078
3079 driver.join().unwrap();
3080 }
3081
3082 fn tcp_pair() -> (TcpStream, TcpStream) {
3084 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3085 let port = listener.local_addr().unwrap().port();
3086 let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
3087 let (server, _) = listener.accept().unwrap();
3088 client
3089 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3090 .unwrap();
3091 server
3092 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3093 .unwrap();
3094 (server, client)
3095 }
3096}