1use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_crypto::hmac::hmac_sha256;
22use rns_crypto::sha256::sha256;
23
24use crate::event::{
25 BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
26 HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, PathTableEntry,
27 ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode,
28 RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource,
29 RuntimeConfigValue, SingleInterfaceStat,
30};
31use crate::md5::hmac_md5;
32use crate::pickle::{self, PickleValue};
33
34const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
35const WELCOME: &[u8] = b"#WELCOME#";
36const FAILURE: &[u8] = b"#FAILURE#";
37const CHALLENGE_LEN: usize = 40;
38
39#[derive(Debug, Clone)]
41pub enum RpcAddr {
42 Tcp(String, u16),
43}
44
45pub struct RpcServer {
47 shutdown: Arc<AtomicBool>,
48 thread: Option<thread::JoinHandle<()>>,
49}
50
51impl RpcServer {
52 pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
54 let shutdown = Arc::new(AtomicBool::new(false));
55 let shutdown2 = shutdown.clone();
56
57 let listener = match addr {
58 RpcAddr::Tcp(host, port) => {
59 let l = TcpListener::bind((host.as_str(), *port))?;
60 l.set_nonblocking(true)?;
62 l
63 }
64 };
65
66 let thread = thread::Builder::new()
67 .name("rpc-server".into())
68 .spawn(move || {
69 rpc_server_loop(listener, auth_key, event_tx, shutdown2);
70 })
71 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
72
73 Ok(RpcServer {
74 shutdown,
75 thread: Some(thread),
76 })
77 }
78
79 pub fn stop(&mut self) {
81 self.shutdown.store(true, Ordering::Relaxed);
82 if let Some(handle) = self.thread.take() {
83 let _ = handle.join();
84 }
85 }
86}
87
88impl Drop for RpcServer {
89 fn drop(&mut self) {
90 self.stop();
91 }
92}
93
94fn rpc_server_loop(
95 listener: TcpListener,
96 auth_key: [u8; 32],
97 event_tx: EventSender,
98 shutdown: Arc<AtomicBool>,
99) {
100 loop {
101 if shutdown.load(Ordering::Relaxed) {
102 break;
103 }
104
105 match listener.accept() {
106 Ok((stream, _addr)) => {
107 let _ = stream.set_nonblocking(false);
109 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
110 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
111
112 if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
113 log::debug!("RPC connection error: {}", e);
114 }
115 }
116 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117 thread::sleep(std::time::Duration::from_millis(100));
119 }
120 Err(e) => {
121 log::error!("RPC accept error: {}", e);
122 thread::sleep(std::time::Duration::from_millis(100));
123 }
124 }
125 }
126}
127
128fn handle_connection(
129 mut stream: TcpStream,
130 auth_key: &[u8; 32],
131 event_tx: &EventSender,
132) -> io::Result<()> {
133 server_auth(&mut stream, auth_key)?;
135
136 let request_bytes = recv_bytes(&mut stream)?;
138 let request = pickle::decode(&request_bytes)
139 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
140
141 let response = handle_rpc_request(&request, event_tx)?;
143
144 let response_bytes = pickle::encode(&response);
146 send_bytes(&mut stream, &response_bytes)?;
147
148 Ok(())
149}
150
151fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
153 let mut random_bytes = [0u8; CHALLENGE_LEN];
155 {
157 let mut f = std::fs::File::open("/dev/urandom")?;
158 f.read_exact(&mut random_bytes)?;
159 }
160
161 let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
162 challenge_message.extend_from_slice(CHALLENGE_PREFIX);
163 challenge_message.extend_from_slice(b"{sha256}");
164 challenge_message.extend_from_slice(&random_bytes);
165
166 send_bytes(stream, &challenge_message)?;
167
168 let response = recv_bytes(stream)?;
170
171 let message = &challenge_message[CHALLENGE_PREFIX.len()..];
174
175 if verify_response(auth_key, message, &response) {
176 send_bytes(stream, WELCOME)?;
177 Ok(())
178 } else {
179 send_bytes(stream, FAILURE)?;
180 Err(io::Error::new(
181 io::ErrorKind::PermissionDenied,
182 "auth failed",
183 ))
184 }
185}
186
187fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
189 if response.starts_with(b"{sha256}") {
191 let digest = &response[8..];
192 let expected = hmac_sha256(auth_key, message);
193 constant_time_eq(digest, &expected)
194 }
195 else if response.len() == 16 {
197 let expected = hmac_md5(auth_key, message);
198 constant_time_eq(response, &expected)
199 }
200 else if response.starts_with(b"{md5}") {
202 let digest = &response[5..];
203 let expected = hmac_md5(auth_key, message);
204 constant_time_eq(digest, &expected)
205 } else {
206 false
207 }
208}
209
210fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
212 if a.len() != b.len() {
213 return false;
214 }
215 let mut diff = 0u8;
216 for (x, y) in a.iter().zip(b.iter()) {
217 diff |= x ^ y;
218 }
219 diff == 0
220}
221
222fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
224 let len = data.len() as i32;
225 stream.write_all(&len.to_be_bytes())?;
226 stream.write_all(data)?;
227 stream.flush()
228}
229
230fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
232 let mut len_buf = [0u8; 4];
233 stream.read_exact(&mut len_buf)?;
234 let len = i32::from_be_bytes(len_buf);
235
236 if len < 0 {
237 let mut len8_buf = [0u8; 8];
239 stream.read_exact(&mut len8_buf)?;
240 let len = u64::from_be_bytes(len8_buf) as usize;
241 if len > 64 * 1024 * 1024 {
242 return Err(io::Error::new(
243 io::ErrorKind::InvalidData,
244 "message too large",
245 ));
246 }
247 let mut buf = vec![0u8; len];
248 stream.read_exact(&mut buf)?;
249 Ok(buf)
250 } else {
251 let len = len as usize;
252 if len > 64 * 1024 * 1024 {
253 return Err(io::Error::new(
254 io::ErrorKind::InvalidData,
255 "message too large",
256 ));
257 }
258 let mut buf = vec![0u8; len];
259 stream.read_exact(&mut buf)?;
260 Ok(buf)
261 }
262}
263
264fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
266 if let Some(get_val) = request.get("get") {
268 if let Some(path) = get_val.as_str() {
269 return match path {
270 "interface_stats" => {
271 let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
272 if let QueryResponse::InterfaceStats(stats) = resp {
273 Ok(interface_stats_to_pickle(&stats))
274 } else {
275 Ok(PickleValue::None)
276 }
277 }
278 "path_table" => {
279 let max_hops = request
280 .get("max_hops")
281 .and_then(|v| v.as_int().map(|n| n as u8));
282 let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
283 if let QueryResponse::PathTable(entries) = resp {
284 Ok(path_table_to_pickle(&entries))
285 } else {
286 Ok(PickleValue::None)
287 }
288 }
289 "rate_table" => {
290 let resp = send_query(event_tx, QueryRequest::RateTable)?;
291 if let QueryResponse::RateTable(entries) = resp {
292 Ok(rate_table_to_pickle(&entries))
293 } else {
294 Ok(PickleValue::None)
295 }
296 }
297 "next_hop" => {
298 let hash = extract_dest_hash(request, "destination_hash")?;
299 let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
300 if let QueryResponse::NextHop(Some(nh)) = resp {
301 Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
302 } else {
303 Ok(PickleValue::None)
304 }
305 }
306 "next_hop_if_name" => {
307 let hash = extract_dest_hash(request, "destination_hash")?;
308 let resp =
309 send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
310 if let QueryResponse::NextHopIfName(Some(name)) = resp {
311 Ok(PickleValue::String(name))
312 } else {
313 Ok(PickleValue::None)
314 }
315 }
316 "link_count" => {
317 let resp = send_query(event_tx, QueryRequest::LinkCount)?;
318 if let QueryResponse::LinkCount(n) = resp {
319 Ok(PickleValue::Int(n as i64))
320 } else {
321 Ok(PickleValue::None)
322 }
323 }
324 "transport_identity" => {
325 let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
326 if let QueryResponse::TransportIdentity(Some(hash)) = resp {
327 Ok(PickleValue::Bytes(hash.to_vec()))
328 } else {
329 Ok(PickleValue::None)
330 }
331 }
332 "blackholed" => {
333 let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
334 if let QueryResponse::Blackholed(entries) = resp {
335 Ok(blackholed_to_pickle(&entries))
336 } else {
337 Ok(PickleValue::None)
338 }
339 }
340 "discovered_interfaces" => {
341 let only_available = request
342 .get("only_available")
343 .and_then(|v| v.as_bool())
344 .unwrap_or(false);
345 let only_transport = request
346 .get("only_transport")
347 .and_then(|v| v.as_bool())
348 .unwrap_or(false);
349 let resp = send_query(
350 event_tx,
351 QueryRequest::DiscoveredInterfaces {
352 only_available,
353 only_transport,
354 },
355 )?;
356 if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
357 Ok(discovered_interfaces_to_pickle(&interfaces))
358 } else {
359 Ok(PickleValue::None)
360 }
361 }
362 "hooks" => {
363 let (response_tx, response_rx) = mpsc::channel();
364 event_tx
365 .send(Event::ListHooks { response_tx })
366 .map_err(|_| {
367 io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
368 })?;
369 let hooks = response_rx
370 .recv_timeout(std::time::Duration::from_secs(5))
371 .map_err(|_| {
372 io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
373 })?;
374 Ok(hooks_to_pickle(&hooks))
375 }
376 "runtime_config" => {
377 let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
378 if let QueryResponse::RuntimeConfigList(entries) = resp {
379 Ok(runtime_config_list_to_pickle(&entries))
380 } else {
381 Ok(PickleValue::None)
382 }
383 }
384 "known_destinations" => {
385 let resp = send_query(event_tx, QueryRequest::KnownDestinations)?;
386 if let QueryResponse::KnownDestinations(entries) = resp {
387 Ok(known_destinations_to_pickle(&entries))
388 } else {
389 Ok(PickleValue::None)
390 }
391 }
392 "runtime_config_entry" => {
393 let key = request
394 .get("key")
395 .and_then(|v| v.as_str())
396 .unwrap_or_default()
397 .to_string();
398 let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
399 if let QueryResponse::RuntimeConfigEntry(entry) = resp {
400 Ok(entry
401 .as_ref()
402 .map(runtime_config_entry_to_pickle)
403 .unwrap_or(PickleValue::None))
404 } else {
405 Ok(PickleValue::None)
406 }
407 }
408 "backbone_peer_state" => {
409 let interface_name = request
410 .get("interface")
411 .and_then(|v| v.as_str())
412 .map(|s| s.to_string());
413 let resp =
414 send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
415 if let QueryResponse::BackbonePeerState(entries) = resp {
416 Ok(backbone_peer_state_to_pickle(&entries))
417 } else {
418 Ok(PickleValue::None)
419 }
420 }
421 "backbone_interfaces" => {
422 let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
423 if let QueryResponse::BackboneInterfaces(entries) = resp {
424 Ok(backbone_interfaces_to_pickle(&entries))
425 } else {
426 Ok(PickleValue::None)
427 }
428 }
429 "provider_bridge_stats" => {
430 let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
431 if let QueryResponse::ProviderBridgeStats(stats) = resp {
432 Ok(stats
433 .as_ref()
434 .map(provider_bridge_stats_to_pickle)
435 .unwrap_or(PickleValue::None))
436 } else {
437 Ok(PickleValue::None)
438 }
439 }
440 "drain_status" => {
441 let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
442 if let QueryResponse::DrainStatus(status) = resp {
443 Ok(drain_status_to_pickle(&status))
444 } else {
445 Ok(PickleValue::None)
446 }
447 }
448 _ => Ok(PickleValue::None),
449 };
450 }
451 }
452
453 if let Some(begin_val) = request.get("begin_drain") {
454 let timeout_secs = begin_val
455 .as_float()
456 .or_else(|| begin_val.as_int().map(|value| value as f64))
457 .unwrap_or(0.0)
458 .max(0.0);
459 let timeout = Duration::from_secs_f64(timeout_secs);
460 let _ = event_tx.send(Event::BeginDrain { timeout });
461 return Ok(PickleValue::Bool(true));
462 }
463
464 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
465 if set_val == "known_destination_retained" {
466 let dest_hash = extract_dest_hash(request, "dest_hash")?;
467 let resp = send_query(event_tx, QueryRequest::RetainKnownDestination { dest_hash })?;
468 return if let QueryResponse::RetainKnownDestination(ok) = resp {
469 Ok(PickleValue::Bool(ok))
470 } else {
471 Ok(PickleValue::None)
472 };
473 }
474 if set_val == "known_destination_used" {
475 let dest_hash = extract_dest_hash(request, "dest_hash")?;
476 let resp = send_query(
477 event_tx,
478 QueryRequest::MarkKnownDestinationUsed { dest_hash },
479 )?;
480 return if let QueryResponse::MarkKnownDestinationUsed(ok) = resp {
481 Ok(PickleValue::Bool(ok))
482 } else {
483 Ok(PickleValue::None)
484 };
485 }
486 if set_val == "runtime_config" {
487 let key = request
488 .get("key")
489 .and_then(|v| v.as_str())
490 .unwrap_or_default()
491 .to_string();
492 let Some(value) = request
493 .get("value")
494 .and_then(runtime_config_value_from_pickle)
495 else {
496 return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
497 code: RuntimeConfigErrorCode::InvalidType,
498 message: "runtime-config set requires a scalar value".into(),
499 }));
500 };
501 let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
502 return if let QueryResponse::RuntimeConfigSet(result) = resp {
503 Ok(runtime_config_result_to_pickle(result))
504 } else {
505 Ok(PickleValue::None)
506 };
507 }
508 }
509
510 if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
511 if reset_val == "runtime_config" {
512 let key = request
513 .get("key")
514 .and_then(|v| v.as_str())
515 .unwrap_or_default()
516 .to_string();
517 let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
518 return if let QueryResponse::RuntimeConfigReset(result) = resp {
519 Ok(runtime_config_result_to_pickle(result))
520 } else {
521 Ok(PickleValue::None)
522 };
523 }
524 }
525
526 if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
527 if clear_val == "known_destination_retained" {
528 let dest_hash = extract_dest_hash(request, "dest_hash")?;
529 let resp = send_query(
530 event_tx,
531 QueryRequest::UnretainKnownDestination { dest_hash },
532 )?;
533 return if let QueryResponse::UnretainKnownDestination(ok) = resp {
534 Ok(PickleValue::Bool(ok))
535 } else {
536 Ok(PickleValue::None)
537 };
538 }
539 if clear_val == "backbone_peer_state" {
540 let interface_name = required_string(request, "interface")?;
541 let peer_ip = required_string(request, "ip")?;
542 let peer_ip = peer_ip
543 .parse()
544 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
545 let resp = send_query(
546 event_tx,
547 QueryRequest::ClearBackbonePeerState {
548 interface_name,
549 peer_ip,
550 },
551 )?;
552 return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
553 Ok(PickleValue::Bool(ok))
554 } else {
555 Ok(PickleValue::None)
556 };
557 }
558 }
559
560 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
561 if set_val == "backbone_peer_blacklist" {
562 let interface_name = required_string(request, "interface")?;
563 let peer_ip = required_string(request, "ip")?;
564 let peer_ip: IpAddr = peer_ip
565 .parse()
566 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
567 let duration_secs = request
568 .get("duration_secs")
569 .and_then(|v| v.as_int())
570 .ok_or_else(|| {
571 io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
572 })?;
573 let reason = request
574 .get("reason")
575 .and_then(|v| v.as_str())
576 .unwrap_or("sentinel blacklist")
577 .to_string();
578 let penalty_level = request
579 .get("penalty_level")
580 .and_then(|v| v.as_int())
581 .unwrap_or(0)
582 .clamp(0, u8::MAX as i64) as u8;
583 let resp = send_query(
584 event_tx,
585 QueryRequest::BlacklistBackbonePeer {
586 interface_name,
587 peer_ip,
588 duration: Duration::from_secs(duration_secs as u64),
589 reason,
590 penalty_level,
591 },
592 )?;
593 return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
594 Ok(PickleValue::Bool(ok))
595 } else {
596 Ok(PickleValue::None)
597 };
598 }
599 }
600
601 if let Some(hash_val) = request.get("request_path") {
603 if let Some(hash_bytes) = hash_val.as_bytes() {
604 if hash_bytes.len() >= 16 {
605 let mut dest_hash = [0u8; 16];
606 dest_hash.copy_from_slice(&hash_bytes[..16]);
607 let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
608 return Ok(PickleValue::Bool(true));
609 }
610 }
611 }
612
613 if let Some(hash_val) = request.get("send_probe") {
615 if let Some(hash_bytes) = hash_val.as_bytes() {
616 if hash_bytes.len() >= 16 {
617 let mut dest_hash = [0u8; 16];
618 dest_hash.copy_from_slice(&hash_bytes[..16]);
619 let payload_size = request
620 .get("size")
621 .and_then(|v| v.as_int())
622 .and_then(|n| {
623 if n > 0 && n <= 400 {
624 Some(n as usize)
625 } else {
626 None
627 }
628 })
629 .unwrap_or(16);
630 let resp = send_query(
631 event_tx,
632 QueryRequest::SendProbe {
633 dest_hash,
634 payload_size,
635 },
636 )?;
637 if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
638 return Ok(PickleValue::Dict(vec![
639 (
640 PickleValue::String("packet_hash".into()),
641 PickleValue::Bytes(packet_hash.to_vec()),
642 ),
643 (
644 PickleValue::String("hops".into()),
645 PickleValue::Int(hops as i64),
646 ),
647 ]));
648 } else {
649 return Ok(PickleValue::None);
650 }
651 }
652 }
653 }
654
655 if let Some(hash_val) = request.get("check_proof") {
657 if let Some(hash_bytes) = hash_val.as_bytes() {
658 if hash_bytes.len() >= 32 {
659 let mut packet_hash = [0u8; 32];
660 packet_hash.copy_from_slice(&hash_bytes[..32]);
661 let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
662 if let QueryResponse::CheckProof(Some(rtt)) = resp {
663 return Ok(PickleValue::Float(rtt));
664 } else {
665 return Ok(PickleValue::None);
666 }
667 }
668 }
669 }
670
671 if let Some(hash_val) = request.get("blackhole") {
673 if let Some(hash_bytes) = hash_val.as_bytes() {
674 if hash_bytes.len() >= 16 {
675 let mut identity_hash = [0u8; 16];
676 identity_hash.copy_from_slice(&hash_bytes[..16]);
677 let duration_hours = request.get("duration").and_then(|v| v.as_float());
678 let reason = request
679 .get("reason")
680 .and_then(|v| v.as_str())
681 .map(|s| s.to_string());
682 let resp = send_query(
683 event_tx,
684 QueryRequest::BlackholeIdentity {
685 identity_hash,
686 duration_hours,
687 reason,
688 },
689 )?;
690 return Ok(PickleValue::Bool(matches!(
691 resp,
692 QueryResponse::BlackholeResult(true)
693 )));
694 }
695 }
696 }
697
698 if let Some(hash_val) = request.get("unblackhole") {
700 if let Some(hash_bytes) = hash_val.as_bytes() {
701 if hash_bytes.len() >= 16 {
702 let mut identity_hash = [0u8; 16];
703 identity_hash.copy_from_slice(&hash_bytes[..16]);
704 let resp = send_query(
705 event_tx,
706 QueryRequest::UnblackholeIdentity { identity_hash },
707 )?;
708 return Ok(PickleValue::Bool(matches!(
709 resp,
710 QueryResponse::UnblackholeResult(true)
711 )));
712 }
713 }
714 }
715
716 if let Some(drop_val) = request.get("drop") {
718 if let Some(path) = drop_val.as_str() {
719 return match path {
720 "path" => {
721 let hash = extract_dest_hash(request, "destination_hash")?;
722 let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
723 if let QueryResponse::DropPath(ok) = resp {
724 Ok(PickleValue::Bool(ok))
725 } else {
726 Ok(PickleValue::None)
727 }
728 }
729 "all_via" => {
730 let hash = extract_dest_hash(request, "destination_hash")?;
731 let resp = send_query(
732 event_tx,
733 QueryRequest::DropAllVia {
734 transport_hash: hash,
735 },
736 )?;
737 if let QueryResponse::DropAllVia(n) = resp {
738 Ok(PickleValue::Int(n as i64))
739 } else {
740 Ok(PickleValue::None)
741 }
742 }
743 "announce_queues" => {
744 let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
745 if let QueryResponse::DropAnnounceQueues = resp {
746 Ok(PickleValue::Bool(true))
747 } else {
748 Ok(PickleValue::None)
749 }
750 }
751 _ => Ok(PickleValue::None),
752 };
753 }
754 }
755
756 if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
757 return handle_hook_rpc_request(hook_val, request, event_tx);
758 }
759
760 Ok(PickleValue::None)
761}
762
763fn handle_hook_rpc_request(
764 op: &str,
765 request: &PickleValue,
766 event_tx: &EventSender,
767) -> io::Result<PickleValue> {
768 match op {
769 "load" => {
770 let name = required_string(request, "name")?;
771 let attach_point = required_string(request, "attach_point")?;
772 let priority = request
773 .get("priority")
774 .and_then(|v| v.as_int())
775 .unwrap_or(0) as i32;
776 let wasm = request
777 .get("wasm")
778 .and_then(|v| v.as_bytes())
779 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
780 .to_vec();
781 let (response_tx, response_rx) = mpsc::channel();
782 event_tx
783 .send(Event::LoadHook {
784 name,
785 wasm_bytes: wasm,
786 attach_point,
787 priority,
788 response_tx,
789 })
790 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
791 let response = response_rx
792 .recv_timeout(std::time::Duration::from_secs(5))
793 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
794 Ok(hook_result_to_pickle(response))
795 }
796 "unload" => {
797 let name = required_string(request, "name")?;
798 let attach_point = required_string(request, "attach_point")?;
799 let (response_tx, response_rx) = mpsc::channel();
800 event_tx
801 .send(Event::UnloadHook {
802 name,
803 attach_point,
804 response_tx,
805 })
806 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
807 let response = response_rx
808 .recv_timeout(std::time::Duration::from_secs(5))
809 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
810 Ok(hook_result_to_pickle(response))
811 }
812 "enable" | "disable" => {
813 let name = required_string(request, "name")?;
814 let attach_point = required_string(request, "attach_point")?;
815 let enabled = op == "enable";
816 let (response_tx, response_rx) = mpsc::channel();
817 event_tx
818 .send(Event::SetHookEnabled {
819 name,
820 attach_point,
821 enabled,
822 response_tx,
823 })
824 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
825 let response = response_rx
826 .recv_timeout(std::time::Duration::from_secs(5))
827 .map_err(|_| {
828 io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
829 })?;
830 Ok(hook_result_to_pickle(response))
831 }
832 "set_priority" => {
833 let name = required_string(request, "name")?;
834 let attach_point = required_string(request, "attach_point")?;
835 let priority = request
836 .get("priority")
837 .and_then(|v| v.as_int())
838 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
839 as i32;
840 let (response_tx, response_rx) = mpsc::channel();
841 event_tx
842 .send(Event::SetHookPriority {
843 name,
844 attach_point,
845 priority,
846 response_tx,
847 })
848 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
849 let response = response_rx
850 .recv_timeout(std::time::Duration::from_secs(5))
851 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
852 Ok(hook_result_to_pickle(response))
853 }
854 _ => Ok(PickleValue::None),
855 }
856}
857
858fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
860 let (resp_tx, resp_rx) = mpsc::channel();
861 event_tx
862 .send(Event::Query(request, resp_tx))
863 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
864 resp_rx
865 .recv_timeout(std::time::Duration::from_secs(5))
866 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
867}
868
869fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
871 let bytes = request
872 .get(key)
873 .and_then(|v| v.as_bytes())
874 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
875 if bytes.len() < 16 {
876 return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
877 }
878 let mut hash = [0u8; 16];
879 hash.copy_from_slice(&bytes[..16]);
880 Ok(hash)
881}
882
883fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
884 request
885 .get(key)
886 .and_then(|v| v.as_str())
887 .map(|s| s.to_string())
888 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
889}
890
891fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
892 match result {
893 Ok(()) => PickleValue::Dict(vec![(
894 PickleValue::String("ok".into()),
895 PickleValue::Bool(true),
896 )]),
897 Err(error) => PickleValue::Dict(vec![
898 (PickleValue::String("ok".into()), PickleValue::Bool(false)),
899 (
900 PickleValue::String("error".into()),
901 PickleValue::String(error),
902 ),
903 ]),
904 }
905}
906
907fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
910 let mut ifaces = Vec::new();
911 for iface in &stats.interfaces {
912 ifaces.push(single_iface_to_pickle(iface));
913 }
914
915 let mut dict = vec![
916 (
917 PickleValue::String("interfaces".into()),
918 PickleValue::List(ifaces),
919 ),
920 (
921 PickleValue::String("transport_enabled".into()),
922 PickleValue::Bool(stats.transport_enabled),
923 ),
924 (
925 PickleValue::String("transport_uptime".into()),
926 PickleValue::Float(stats.transport_uptime),
927 ),
928 (
929 PickleValue::String("rxb".into()),
930 PickleValue::Int(stats.total_rxb as i64),
931 ),
932 (
933 PickleValue::String("txb".into()),
934 PickleValue::Int(stats.total_txb as i64),
935 ),
936 ];
937
938 if let Some(tid) = stats.transport_id {
939 dict.push((
940 PickleValue::String("transport_id".into()),
941 PickleValue::Bytes(tid.to_vec()),
942 ));
943 } else {
944 dict.push((
945 PickleValue::String("transport_id".into()),
946 PickleValue::None,
947 ));
948 }
949
950 if let Some(pr) = stats.probe_responder {
951 dict.push((
952 PickleValue::String("probe_responder".into()),
953 PickleValue::Bytes(pr.to_vec()),
954 ));
955 } else {
956 dict.push((
957 PickleValue::String("probe_responder".into()),
958 PickleValue::None,
959 ));
960 }
961
962 if let Some(pool) = &stats.backbone_peer_pool {
963 let members = pool
964 .members
965 .iter()
966 .map(|member| {
967 let mut member_dict = vec![
968 (
969 PickleValue::String("name".into()),
970 PickleValue::String(member.name.clone()),
971 ),
972 (
973 PickleValue::String("remote".into()),
974 PickleValue::String(member.remote.clone()),
975 ),
976 (
977 PickleValue::String("state".into()),
978 PickleValue::String(member.state.clone()),
979 ),
980 (
981 PickleValue::String("failure_count".into()),
982 PickleValue::Int(member.failure_count as i64),
983 ),
984 ];
985 member_dict.push((
986 PickleValue::String("interface_id".into()),
987 member
988 .interface_id
989 .map(|id| PickleValue::Int(id as i64))
990 .unwrap_or(PickleValue::None),
991 ));
992 member_dict.push((
993 PickleValue::String("last_error".into()),
994 member
995 .last_error
996 .as_ref()
997 .map(|err| PickleValue::String(err.clone()))
998 .unwrap_or(PickleValue::None),
999 ));
1000 member_dict.push((
1001 PickleValue::String("cooldown_remaining_seconds".into()),
1002 member
1003 .cooldown_remaining_seconds
1004 .map(PickleValue::Float)
1005 .unwrap_or(PickleValue::None),
1006 ));
1007 PickleValue::Dict(member_dict)
1008 })
1009 .collect();
1010 dict.push((
1011 PickleValue::String("backbone_peer_pool".into()),
1012 PickleValue::Dict(vec![
1013 (
1014 PickleValue::String("max_connected".into()),
1015 PickleValue::Int(pool.max_connected as i64),
1016 ),
1017 (
1018 PickleValue::String("active_count".into()),
1019 PickleValue::Int(pool.active_count as i64),
1020 ),
1021 (
1022 PickleValue::String("standby_count".into()),
1023 PickleValue::Int(pool.standby_count as i64),
1024 ),
1025 (
1026 PickleValue::String("cooldown_count".into()),
1027 PickleValue::Int(pool.cooldown_count as i64),
1028 ),
1029 (
1030 PickleValue::String("members".into()),
1031 PickleValue::List(members),
1032 ),
1033 ]),
1034 ));
1035 } else {
1036 dict.push((
1037 PickleValue::String("backbone_peer_pool".into()),
1038 PickleValue::None,
1039 ));
1040 }
1041
1042 PickleValue::Dict(dict)
1043}
1044
1045fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
1046 let mut dict = vec![
1047 (
1048 PickleValue::String("id".into()),
1049 PickleValue::Int(s.id as i64),
1050 ),
1051 (
1052 PickleValue::String("name".into()),
1053 PickleValue::String(s.name.clone()),
1054 ),
1055 (
1056 PickleValue::String("status".into()),
1057 PickleValue::Bool(s.status),
1058 ),
1059 (
1060 PickleValue::String("mode".into()),
1061 PickleValue::Int(s.mode as i64),
1062 ),
1063 (
1064 PickleValue::String("rxb".into()),
1065 PickleValue::Int(s.rxb as i64),
1066 ),
1067 (
1068 PickleValue::String("txb".into()),
1069 PickleValue::Int(s.txb as i64),
1070 ),
1071 (
1072 PickleValue::String("rx_packets".into()),
1073 PickleValue::Int(s.rx_packets as i64),
1074 ),
1075 (
1076 PickleValue::String("tx_packets".into()),
1077 PickleValue::Int(s.tx_packets as i64),
1078 ),
1079 (
1080 PickleValue::String("started".into()),
1081 PickleValue::Float(s.started),
1082 ),
1083 (
1084 PickleValue::String("ia_freq".into()),
1085 PickleValue::Float(s.ia_freq),
1086 ),
1087 (
1088 PickleValue::String("oa_freq".into()),
1089 PickleValue::Float(s.oa_freq),
1090 ),
1091 ];
1092
1093 match s.bitrate {
1094 Some(br) => dict.push((
1095 PickleValue::String("bitrate".into()),
1096 PickleValue::Int(br as i64),
1097 )),
1098 None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
1099 }
1100
1101 match s.ifac_size {
1102 Some(sz) => dict.push((
1103 PickleValue::String("ifac_size".into()),
1104 PickleValue::Int(sz as i64),
1105 )),
1106 None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
1107 }
1108
1109 PickleValue::Dict(dict)
1110}
1111
1112fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
1113 let list: Vec<PickleValue> = entries
1114 .iter()
1115 .map(|e| {
1116 PickleValue::Dict(vec![
1117 (
1118 PickleValue::String("hash".into()),
1119 PickleValue::Bytes(e.hash.to_vec()),
1120 ),
1121 (
1122 PickleValue::String("timestamp".into()),
1123 PickleValue::Float(e.timestamp),
1124 ),
1125 (
1126 PickleValue::String("via".into()),
1127 PickleValue::Bytes(e.via.to_vec()),
1128 ),
1129 (
1130 PickleValue::String("hops".into()),
1131 PickleValue::Int(e.hops as i64),
1132 ),
1133 (
1134 PickleValue::String("expires".into()),
1135 PickleValue::Float(e.expires),
1136 ),
1137 (
1138 PickleValue::String("interface".into()),
1139 PickleValue::String(e.interface_name.clone()),
1140 ),
1141 ])
1142 })
1143 .collect();
1144 PickleValue::List(list)
1145}
1146
1147fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1148 let list: Vec<PickleValue> = entries
1149 .iter()
1150 .map(|e| {
1151 PickleValue::Dict(vec![
1152 (
1153 PickleValue::String("hash".into()),
1154 PickleValue::Bytes(e.hash.to_vec()),
1155 ),
1156 (
1157 PickleValue::String("last".into()),
1158 PickleValue::Float(e.last),
1159 ),
1160 (
1161 PickleValue::String("rate_violations".into()),
1162 PickleValue::Int(e.rate_violations as i64),
1163 ),
1164 (
1165 PickleValue::String("blocked_until".into()),
1166 PickleValue::Float(e.blocked_until),
1167 ),
1168 (
1169 PickleValue::String("timestamps".into()),
1170 PickleValue::List(
1171 e.timestamps
1172 .iter()
1173 .map(|&t| PickleValue::Float(t))
1174 .collect(),
1175 ),
1176 ),
1177 ])
1178 })
1179 .collect();
1180 PickleValue::List(list)
1181}
1182
1183fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1184 let list: Vec<PickleValue> = entries
1185 .iter()
1186 .map(|e| {
1187 let mut dict = vec![
1188 (
1189 PickleValue::String("identity_hash".into()),
1190 PickleValue::Bytes(e.identity_hash.to_vec()),
1191 ),
1192 (
1193 PickleValue::String("created".into()),
1194 PickleValue::Float(e.created),
1195 ),
1196 (
1197 PickleValue::String("expires".into()),
1198 PickleValue::Float(e.expires),
1199 ),
1200 ];
1201 if let Some(ref reason) = e.reason {
1202 dict.push((
1203 PickleValue::String("reason".into()),
1204 PickleValue::String(reason.clone()),
1205 ));
1206 } else {
1207 dict.push((PickleValue::String("reason".into()), PickleValue::None));
1208 }
1209 PickleValue::Dict(dict)
1210 })
1211 .collect();
1212 PickleValue::List(list)
1213}
1214
1215fn discovered_interfaces_to_pickle(
1216 interfaces: &[crate::discovery::DiscoveredInterface],
1217) -> PickleValue {
1218 let list: Vec<PickleValue> = interfaces
1219 .iter()
1220 .map(|iface| {
1221 let mut dict = vec![
1222 (
1223 PickleValue::String("type".into()),
1224 PickleValue::String(iface.interface_type.clone()),
1225 ),
1226 (
1227 PickleValue::String("transport".into()),
1228 PickleValue::Bool(iface.transport),
1229 ),
1230 (
1231 PickleValue::String("name".into()),
1232 PickleValue::String(iface.name.clone()),
1233 ),
1234 (
1235 PickleValue::String("discovered".into()),
1236 PickleValue::Float(iface.discovered),
1237 ),
1238 (
1239 PickleValue::String("last_heard".into()),
1240 PickleValue::Float(iface.last_heard),
1241 ),
1242 (
1243 PickleValue::String("heard_count".into()),
1244 PickleValue::Int(iface.heard_count as i64),
1245 ),
1246 (
1247 PickleValue::String("status".into()),
1248 PickleValue::String(iface.status.as_str().into()),
1249 ),
1250 (
1251 PickleValue::String("stamp".into()),
1252 PickleValue::Bytes(iface.stamp.clone()),
1253 ),
1254 (
1255 PickleValue::String("value".into()),
1256 PickleValue::Int(iface.stamp_value as i64),
1257 ),
1258 (
1259 PickleValue::String("transport_id".into()),
1260 PickleValue::Bytes(iface.transport_id.to_vec()),
1261 ),
1262 (
1263 PickleValue::String("network_id".into()),
1264 PickleValue::Bytes(iface.network_id.to_vec()),
1265 ),
1266 (
1267 PickleValue::String("hops".into()),
1268 PickleValue::Int(iface.hops as i64),
1269 ),
1270 ];
1271
1272 if let Some(v) = iface.latitude {
1274 dict.push((
1275 PickleValue::String("latitude".into()),
1276 PickleValue::Float(v),
1277 ));
1278 } else {
1279 dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1280 }
1281 if let Some(v) = iface.longitude {
1282 dict.push((
1283 PickleValue::String("longitude".into()),
1284 PickleValue::Float(v),
1285 ));
1286 } else {
1287 dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1288 }
1289 if let Some(v) = iface.height {
1290 dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1291 } else {
1292 dict.push((PickleValue::String("height".into()), PickleValue::None));
1293 }
1294
1295 if let Some(ref v) = iface.reachable_on {
1297 dict.push((
1298 PickleValue::String("reachable_on".into()),
1299 PickleValue::String(v.clone()),
1300 ));
1301 } else {
1302 dict.push((
1303 PickleValue::String("reachable_on".into()),
1304 PickleValue::None,
1305 ));
1306 }
1307 if let Some(v) = iface.port {
1308 dict.push((
1309 PickleValue::String("port".into()),
1310 PickleValue::Int(v as i64),
1311 ));
1312 } else {
1313 dict.push((PickleValue::String("port".into()), PickleValue::None));
1314 }
1315
1316 if let Some(v) = iface.frequency {
1318 dict.push((
1319 PickleValue::String("frequency".into()),
1320 PickleValue::Int(v as i64),
1321 ));
1322 } else {
1323 dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1324 }
1325 if let Some(v) = iface.bandwidth {
1326 dict.push((
1327 PickleValue::String("bandwidth".into()),
1328 PickleValue::Int(v as i64),
1329 ));
1330 } else {
1331 dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1332 }
1333 if let Some(v) = iface.spreading_factor {
1334 dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1335 } else {
1336 dict.push((PickleValue::String("sf".into()), PickleValue::None));
1337 }
1338 if let Some(v) = iface.coding_rate {
1339 dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1340 } else {
1341 dict.push((PickleValue::String("cr".into()), PickleValue::None));
1342 }
1343 if let Some(ref v) = iface.modulation {
1344 dict.push((
1345 PickleValue::String("modulation".into()),
1346 PickleValue::String(v.clone()),
1347 ));
1348 } else {
1349 dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1350 }
1351 if let Some(v) = iface.channel {
1352 dict.push((
1353 PickleValue::String("channel".into()),
1354 PickleValue::Int(v as i64),
1355 ));
1356 } else {
1357 dict.push((PickleValue::String("channel".into()), PickleValue::None));
1358 }
1359
1360 if let Some(ref v) = iface.ifac_netname {
1362 dict.push((
1363 PickleValue::String("ifac_netname".into()),
1364 PickleValue::String(v.clone()),
1365 ));
1366 } else {
1367 dict.push((
1368 PickleValue::String("ifac_netname".into()),
1369 PickleValue::None,
1370 ));
1371 }
1372 if let Some(ref v) = iface.ifac_netkey {
1373 dict.push((
1374 PickleValue::String("ifac_netkey".into()),
1375 PickleValue::String(v.clone()),
1376 ));
1377 } else {
1378 dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1379 }
1380
1381 if let Some(ref v) = iface.config_entry {
1383 dict.push((
1384 PickleValue::String("config_entry".into()),
1385 PickleValue::String(v.clone()),
1386 ));
1387 } else {
1388 dict.push((
1389 PickleValue::String("config_entry".into()),
1390 PickleValue::None,
1391 ));
1392 }
1393
1394 dict.push((
1395 PickleValue::String("discovery_hash".into()),
1396 PickleValue::Bytes(iface.discovery_hash.to_vec()),
1397 ));
1398
1399 PickleValue::Dict(dict)
1400 })
1401 .collect();
1402 PickleValue::List(list)
1403}
1404
1405fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1406 PickleValue::List(
1407 hooks
1408 .iter()
1409 .map(|hook| {
1410 PickleValue::Dict(vec![
1411 (
1412 PickleValue::String("name".into()),
1413 PickleValue::String(hook.name.clone()),
1414 ),
1415 (
1416 PickleValue::String("attach_point".into()),
1417 PickleValue::String(hook.attach_point.clone()),
1418 ),
1419 (
1420 PickleValue::String("priority".into()),
1421 PickleValue::Int(hook.priority as i64),
1422 ),
1423 (
1424 PickleValue::String("enabled".into()),
1425 PickleValue::Bool(hook.enabled),
1426 ),
1427 (
1428 PickleValue::String("consecutive_traps".into()),
1429 PickleValue::Int(hook.consecutive_traps as i64),
1430 ),
1431 ])
1432 })
1433 .collect(),
1434 )
1435}
1436
1437fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1438 PickleValue::List(
1439 entries
1440 .iter()
1441 .map(|entry| {
1442 PickleValue::Dict(vec![
1443 (
1444 PickleValue::String("interface".into()),
1445 PickleValue::String(entry.interface_name.clone()),
1446 ),
1447 (
1448 PickleValue::String("ip".into()),
1449 PickleValue::String(entry.peer_ip.to_string()),
1450 ),
1451 (
1452 PickleValue::String("connected_count".into()),
1453 PickleValue::Int(entry.connected_count as i64),
1454 ),
1455 (
1456 PickleValue::String("blacklisted_remaining_secs".into()),
1457 entry
1458 .blacklisted_remaining_secs
1459 .map(PickleValue::Float)
1460 .unwrap_or(PickleValue::None),
1461 ),
1462 (
1463 PickleValue::String("blacklist_reason".into()),
1464 entry
1465 .blacklist_reason
1466 .as_ref()
1467 .map(|v: &String| PickleValue::String(v.clone()))
1468 .unwrap_or(PickleValue::None),
1469 ),
1470 (
1471 PickleValue::String("reject_count".into()),
1472 PickleValue::Int(entry.reject_count as i64),
1473 ),
1474 ])
1475 })
1476 .collect(),
1477 )
1478}
1479
1480fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1481 PickleValue::List(
1482 entries
1483 .iter()
1484 .map(|entry| {
1485 PickleValue::Dict(vec![
1486 (
1487 PickleValue::String("id".into()),
1488 PickleValue::Int(entry.interface_id.0 as i64),
1489 ),
1490 (
1491 PickleValue::String("name".into()),
1492 PickleValue::String(entry.interface_name.clone()),
1493 ),
1494 ])
1495 })
1496 .collect(),
1497 )
1498}
1499
1500fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1501 PickleValue::Dict(vec![
1502 (
1503 PickleValue::String("connected".into()),
1504 PickleValue::Bool(stats.connected),
1505 ),
1506 (
1507 PickleValue::String("consumer_count".into()),
1508 PickleValue::Int(stats.consumer_count as i64),
1509 ),
1510 (
1511 PickleValue::String("queue_max_events".into()),
1512 PickleValue::Int(stats.queue_max_events as i64),
1513 ),
1514 (
1515 PickleValue::String("queue_max_bytes".into()),
1516 PickleValue::Int(stats.queue_max_bytes as i64),
1517 ),
1518 (
1519 PickleValue::String("backlog_len".into()),
1520 PickleValue::Int(stats.backlog_len as i64),
1521 ),
1522 (
1523 PickleValue::String("backlog_bytes".into()),
1524 PickleValue::Int(stats.backlog_bytes as i64),
1525 ),
1526 (
1527 PickleValue::String("backlog_dropped_pending".into()),
1528 PickleValue::Int(stats.backlog_dropped_pending as i64),
1529 ),
1530 (
1531 PickleValue::String("backlog_dropped_total".into()),
1532 PickleValue::Int(stats.backlog_dropped_total as i64),
1533 ),
1534 (
1535 PickleValue::String("total_disconnect_count".into()),
1536 PickleValue::Int(stats.total_disconnect_count as i64),
1537 ),
1538 (
1539 PickleValue::String("consumers".into()),
1540 PickleValue::List(
1541 stats
1542 .consumers
1543 .iter()
1544 .map(|consumer| {
1545 PickleValue::Dict(vec![
1546 (
1547 PickleValue::String("id".into()),
1548 PickleValue::Int(consumer.id as i64),
1549 ),
1550 (
1551 PickleValue::String("connected".into()),
1552 PickleValue::Bool(consumer.connected),
1553 ),
1554 (
1555 PickleValue::String("queue_len".into()),
1556 PickleValue::Int(consumer.queue_len as i64),
1557 ),
1558 (
1559 PickleValue::String("queued_bytes".into()),
1560 PickleValue::Int(consumer.queued_bytes as i64),
1561 ),
1562 (
1563 PickleValue::String("dropped_pending".into()),
1564 PickleValue::Int(consumer.dropped_pending as i64),
1565 ),
1566 (
1567 PickleValue::String("dropped_total".into()),
1568 PickleValue::Int(consumer.dropped_total as i64),
1569 ),
1570 (
1571 PickleValue::String("queue_max_events".into()),
1572 PickleValue::Int(consumer.queue_max_events as i64),
1573 ),
1574 (
1575 PickleValue::String("queue_max_bytes".into()),
1576 PickleValue::Int(consumer.queue_max_bytes as i64),
1577 ),
1578 ])
1579 })
1580 .collect(),
1581 ),
1582 ),
1583 ])
1584}
1585
1586fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1587 match state {
1588 LifecycleState::Active => "active",
1589 LifecycleState::Draining => "draining",
1590 LifecycleState::Stopping => "stopping",
1591 LifecycleState::Stopped => "stopped",
1592 }
1593}
1594
1595fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1596 PickleValue::Dict(vec![
1597 (
1598 PickleValue::String("state".into()),
1599 PickleValue::String(lifecycle_state_name(status.state).into()),
1600 ),
1601 (
1602 PickleValue::String("drain_age_seconds".into()),
1603 status
1604 .drain_age_seconds
1605 .map(PickleValue::Float)
1606 .unwrap_or(PickleValue::None),
1607 ),
1608 (
1609 PickleValue::String("deadline_remaining_seconds".into()),
1610 status
1611 .deadline_remaining_seconds
1612 .map(PickleValue::Float)
1613 .unwrap_or(PickleValue::None),
1614 ),
1615 (
1616 PickleValue::String("drain_complete".into()),
1617 PickleValue::Bool(status.drain_complete),
1618 ),
1619 (
1620 PickleValue::String("interface_writer_queued_frames".into()),
1621 PickleValue::Int(status.interface_writer_queued_frames as i64),
1622 ),
1623 (
1624 PickleValue::String("provider_backlog_events".into()),
1625 PickleValue::Int(status.provider_backlog_events as i64),
1626 ),
1627 (
1628 PickleValue::String("provider_consumer_queued_events".into()),
1629 PickleValue::Int(status.provider_consumer_queued_events as i64),
1630 ),
1631 (
1632 PickleValue::String("detail".into()),
1633 status
1634 .detail
1635 .as_ref()
1636 .map(|detail| PickleValue::String(detail.clone()))
1637 .unwrap_or(PickleValue::None),
1638 ),
1639 ])
1640}
1641
1642fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1643 match value {
1644 RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1645 RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1646 RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1647 RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1648 RuntimeConfigValue::Null => PickleValue::None,
1649 }
1650}
1651
1652fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1653 match value {
1654 PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1655 PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1656 PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1657 PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1658 PickleValue::None => Some(RuntimeConfigValue::Null),
1659 _ => None,
1660 }
1661}
1662
1663fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1664 PickleValue::Dict(vec![
1665 (
1666 PickleValue::String("key".into()),
1667 PickleValue::String(entry.key.clone()),
1668 ),
1669 (
1670 PickleValue::String("value".into()),
1671 runtime_config_value_to_pickle(&entry.value),
1672 ),
1673 (
1674 PickleValue::String("default".into()),
1675 runtime_config_value_to_pickle(&entry.default),
1676 ),
1677 (
1678 PickleValue::String("source".into()),
1679 PickleValue::String(match entry.source {
1680 RuntimeConfigSource::Startup => "startup".into(),
1681 RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1682 }),
1683 ),
1684 (
1685 PickleValue::String("apply_mode".into()),
1686 PickleValue::String(match entry.apply_mode {
1687 RuntimeConfigApplyMode::Immediate => "immediate".into(),
1688 RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1689 RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1690 RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1691 }),
1692 ),
1693 (
1694 PickleValue::String("description".into()),
1695 entry
1696 .description
1697 .as_ref()
1698 .map(|v| PickleValue::String(v.clone()))
1699 .unwrap_or(PickleValue::None),
1700 ),
1701 ])
1702}
1703
1704fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1705 PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1706}
1707
1708fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1709 PickleValue::Dict(vec![
1710 (
1711 PickleValue::String("error".into()),
1712 PickleValue::String(match error.code {
1713 RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1714 RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1715 RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1716 RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1717 RuntimeConfigErrorCode::NotFound => "not_found".into(),
1718 RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1719 }),
1720 ),
1721 (
1722 PickleValue::String("message".into()),
1723 PickleValue::String(error.message.clone()),
1724 ),
1725 ])
1726}
1727
1728fn runtime_config_result_to_pickle(
1729 result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1730) -> PickleValue {
1731 match result {
1732 Ok(entry) => runtime_config_entry_to_pickle(&entry),
1733 Err(error) => runtime_config_error_to_pickle(&error),
1734 }
1735}
1736
1737fn known_destination_entry_to_pickle(entry: &KnownDestinationEntry) -> PickleValue {
1738 PickleValue::Dict(vec![
1739 (
1740 PickleValue::String("dest_hash".into()),
1741 PickleValue::Bytes(entry.dest_hash.to_vec()),
1742 ),
1743 (
1744 PickleValue::String("identity_hash".into()),
1745 PickleValue::Bytes(entry.identity_hash.to_vec()),
1746 ),
1747 (
1748 PickleValue::String("public_key".into()),
1749 PickleValue::Bytes(entry.public_key.to_vec()),
1750 ),
1751 (
1752 PickleValue::String("app_data".into()),
1753 entry
1754 .app_data
1755 .as_ref()
1756 .map(|data: &Vec<u8>| PickleValue::Bytes(data.clone()))
1757 .unwrap_or(PickleValue::None),
1758 ),
1759 (
1760 PickleValue::String("hops".into()),
1761 PickleValue::Int(entry.hops as i64),
1762 ),
1763 (
1764 PickleValue::String("received_at".into()),
1765 PickleValue::Float(entry.received_at),
1766 ),
1767 (
1768 PickleValue::String("receiving_interface".into()),
1769 PickleValue::Int(entry.receiving_interface.0 as i64),
1770 ),
1771 (
1772 PickleValue::String("was_used".into()),
1773 PickleValue::Bool(entry.was_used),
1774 ),
1775 (
1776 PickleValue::String("last_used_at".into()),
1777 entry
1778 .last_used_at
1779 .map(PickleValue::Float)
1780 .unwrap_or(PickleValue::None),
1781 ),
1782 (
1783 PickleValue::String("retained".into()),
1784 PickleValue::Bool(entry.retained),
1785 ),
1786 ])
1787}
1788
1789fn known_destinations_to_pickle(entries: &[KnownDestinationEntry]) -> PickleValue {
1790 PickleValue::List(
1791 entries
1792 .iter()
1793 .map(known_destination_entry_to_pickle)
1794 .collect(),
1795 )
1796}
1797
1798fn parse_known_destination_entry(value: &PickleValue) -> io::Result<KnownDestinationEntry> {
1799 let get_bytes = |key: &str, len: usize| -> io::Result<Vec<u8>> {
1800 let value = value.get(key).ok_or_else(|| {
1801 io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key))
1802 })?;
1803 let bytes = value.as_bytes().ok_or_else(|| {
1804 io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key))
1805 })?;
1806 if bytes.len() != len {
1807 return Err(io::Error::new(
1808 io::ErrorKind::InvalidData,
1809 format!("invalid {} length", key),
1810 ));
1811 }
1812 Ok(bytes.to_vec())
1813 };
1814 let get_int = |key: &str| -> io::Result<i64> {
1815 value
1816 .get(key)
1817 .and_then(|v| v.as_int())
1818 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1819 };
1820 let get_float = |key: &str| -> io::Result<f64> {
1821 value
1822 .get(key)
1823 .and_then(|v| v.as_float())
1824 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1825 };
1826 let get_bool = |key: &str| -> io::Result<bool> {
1827 value
1828 .get(key)
1829 .and_then(|v| v.as_bool())
1830 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1831 };
1832
1833 let mut dest_hash = [0u8; 16];
1834 dest_hash.copy_from_slice(&get_bytes("dest_hash", 16)?);
1835 let mut identity_hash = [0u8; 16];
1836 identity_hash.copy_from_slice(&get_bytes("identity_hash", 16)?);
1837 let mut public_key = [0u8; 64];
1838 public_key.copy_from_slice(&get_bytes("public_key", 64)?);
1839 let app_data = value
1840 .get("app_data")
1841 .and_then(|v| v.as_bytes())
1842 .map(|bytes| bytes.to_vec());
1843 let last_used_at = value.get("last_used_at").and_then(|v| v.as_float());
1844
1845 Ok(KnownDestinationEntry {
1846 dest_hash,
1847 identity_hash,
1848 public_key,
1849 app_data,
1850 hops: get_int("hops")? as u8,
1851 received_at: get_float("received_at")?,
1852 receiving_interface: rns_core::transport::types::InterfaceId(
1853 get_int("receiving_interface")? as u64,
1854 ),
1855 was_used: get_bool("was_used")?,
1856 last_used_at,
1857 retained: get_bool("retained")?,
1858 })
1859}
1860
1861fn parse_known_destination_list(value: &PickleValue) -> io::Result<Vec<KnownDestinationEntry>> {
1862 let list = value
1863 .as_list()
1864 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "expected list"))?;
1865 list.iter().map(parse_known_destination_entry).collect()
1866}
1867
1868pub struct RpcClient {
1872 stream: TcpStream,
1873}
1874
1875impl RpcClient {
1876 pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
1878 let mut stream = match addr {
1879 RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
1880 };
1881
1882 stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
1883 stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
1884
1885 client_auth(&mut stream, auth_key)?;
1887
1888 Ok(RpcClient { stream })
1889 }
1890
1891 pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
1893 let request_bytes = pickle::encode(request);
1894 send_bytes(&mut self.stream, &request_bytes)?;
1895
1896 let response_bytes = recv_bytes(&mut self.stream)?;
1897 pickle::decode(&response_bytes)
1898 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
1899 }
1900
1901 pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
1902 let response = self.call(&PickleValue::Dict(vec![(
1903 PickleValue::String("get".into()),
1904 PickleValue::String("hooks".into()),
1905 )]))?;
1906 parse_hook_list(&response)
1907 }
1908
1909 pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
1910 let response = self.call(&PickleValue::Dict(vec![(
1911 PickleValue::String("begin_drain".into()),
1912 PickleValue::Float(timeout.as_secs_f64()),
1913 )]))?;
1914 Ok(response.as_bool().unwrap_or(false))
1915 }
1916
1917 pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
1918 let response = self.call(&PickleValue::Dict(vec![(
1919 PickleValue::String("get".into()),
1920 PickleValue::String("drain_status".into()),
1921 )]))?;
1922 parse_drain_status(&response)
1923 }
1924
1925 pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
1926 self.call(&PickleValue::Dict(vec![(
1927 PickleValue::String("get".into()),
1928 PickleValue::String("provider_bridge_stats".into()),
1929 )]))
1930 }
1931
1932 pub fn load_hook(
1933 &mut self,
1934 name: &str,
1935 attach_point: &str,
1936 priority: i32,
1937 wasm: &[u8],
1938 ) -> io::Result<Result<(), String>> {
1939 let response = self.call(&PickleValue::Dict(vec![
1940 (
1941 PickleValue::String("hook".into()),
1942 PickleValue::String("load".into()),
1943 ),
1944 (
1945 PickleValue::String("name".into()),
1946 PickleValue::String(name.to_string()),
1947 ),
1948 (
1949 PickleValue::String("attach_point".into()),
1950 PickleValue::String(attach_point.to_string()),
1951 ),
1952 (
1953 PickleValue::String("priority".into()),
1954 PickleValue::Int(priority as i64),
1955 ),
1956 (
1957 PickleValue::String("wasm".into()),
1958 PickleValue::Bytes(wasm.to_vec()),
1959 ),
1960 ]))?;
1961 parse_hook_result(&response)
1962 }
1963
1964 pub fn unload_hook(
1965 &mut self,
1966 name: &str,
1967 attach_point: &str,
1968 ) -> io::Result<Result<(), String>> {
1969 let response = self.call(&PickleValue::Dict(vec![
1970 (
1971 PickleValue::String("hook".into()),
1972 PickleValue::String("unload".into()),
1973 ),
1974 (
1975 PickleValue::String("name".into()),
1976 PickleValue::String(name.to_string()),
1977 ),
1978 (
1979 PickleValue::String("attach_point".into()),
1980 PickleValue::String(attach_point.to_string()),
1981 ),
1982 ]))?;
1983 parse_hook_result(&response)
1984 }
1985
1986 pub fn set_hook_enabled(
1987 &mut self,
1988 name: &str,
1989 attach_point: &str,
1990 enabled: bool,
1991 ) -> io::Result<Result<(), String>> {
1992 let op = if enabled { "enable" } else { "disable" };
1993 let response = self.call(&PickleValue::Dict(vec![
1994 (
1995 PickleValue::String("hook".into()),
1996 PickleValue::String(op.into()),
1997 ),
1998 (
1999 PickleValue::String("name".into()),
2000 PickleValue::String(name.to_string()),
2001 ),
2002 (
2003 PickleValue::String("attach_point".into()),
2004 PickleValue::String(attach_point.to_string()),
2005 ),
2006 ]))?;
2007 parse_hook_result(&response)
2008 }
2009
2010 pub fn set_hook_priority(
2011 &mut self,
2012 name: &str,
2013 attach_point: &str,
2014 priority: i32,
2015 ) -> io::Result<Result<(), String>> {
2016 let response = self.call(&PickleValue::Dict(vec![
2017 (
2018 PickleValue::String("hook".into()),
2019 PickleValue::String("set_priority".into()),
2020 ),
2021 (
2022 PickleValue::String("name".into()),
2023 PickleValue::String(name.to_string()),
2024 ),
2025 (
2026 PickleValue::String("attach_point".into()),
2027 PickleValue::String(attach_point.to_string()),
2028 ),
2029 (
2030 PickleValue::String("priority".into()),
2031 PickleValue::Int(priority as i64),
2032 ),
2033 ]))?;
2034 parse_hook_result(&response)
2035 }
2036
2037 pub fn blacklist_backbone_peer(
2038 &mut self,
2039 interface: &str,
2040 ip: &str,
2041 duration_secs: u64,
2042 reason: Option<&str>,
2043 penalty_level: Option<u8>,
2044 ) -> io::Result<bool> {
2045 let mut request = vec![
2046 (
2047 PickleValue::String("set".into()),
2048 PickleValue::String("backbone_peer_blacklist".into()),
2049 ),
2050 (
2051 PickleValue::String("interface".into()),
2052 PickleValue::String(interface.to_string()),
2053 ),
2054 (
2055 PickleValue::String("ip".into()),
2056 PickleValue::String(ip.to_string()),
2057 ),
2058 (
2059 PickleValue::String("duration_secs".into()),
2060 PickleValue::Int(duration_secs as i64),
2061 ),
2062 ];
2063 if let Some(reason) = reason {
2064 request.push((
2065 PickleValue::String("reason".into()),
2066 PickleValue::String(reason.to_string()),
2067 ));
2068 }
2069 if let Some(level) = penalty_level {
2070 request.push((
2071 PickleValue::String("penalty_level".into()),
2072 PickleValue::Int(level as i64),
2073 ));
2074 }
2075 let response = self.call(&PickleValue::Dict(request))?;
2076 Ok(response.as_bool().unwrap_or(false))
2077 }
2078
2079 pub fn known_destinations(&mut self) -> io::Result<Vec<KnownDestinationEntry>> {
2080 let response = self.call(&PickleValue::Dict(vec![(
2081 PickleValue::String("get".into()),
2082 PickleValue::String("known_destinations".into()),
2083 )]))?;
2084 parse_known_destination_list(&response)
2085 }
2086
2087 pub fn retain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2088 let response = self.call(&PickleValue::Dict(vec![
2089 (
2090 PickleValue::String("set".into()),
2091 PickleValue::String("known_destination_retained".into()),
2092 ),
2093 (
2094 PickleValue::String("dest_hash".into()),
2095 PickleValue::Bytes(dest_hash.to_vec()),
2096 ),
2097 ]))?;
2098 Ok(response.as_bool().unwrap_or(false))
2099 }
2100
2101 pub fn unretain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2102 let response = self.call(&PickleValue::Dict(vec![
2103 (
2104 PickleValue::String("clear".into()),
2105 PickleValue::String("known_destination_retained".into()),
2106 ),
2107 (
2108 PickleValue::String("dest_hash".into()),
2109 PickleValue::Bytes(dest_hash.to_vec()),
2110 ),
2111 ]))?;
2112 Ok(response.as_bool().unwrap_or(false))
2113 }
2114
2115 pub fn mark_known_destination_used(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2116 let response = self.call(&PickleValue::Dict(vec![
2117 (
2118 PickleValue::String("set".into()),
2119 PickleValue::String("known_destination_used".into()),
2120 ),
2121 (
2122 PickleValue::String("dest_hash".into()),
2123 PickleValue::Bytes(dest_hash.to_vec()),
2124 ),
2125 ]))?;
2126 Ok(response.as_bool().unwrap_or(false))
2127 }
2128}
2129
2130fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
2131 match value {
2132 "active" => Some(LifecycleState::Active),
2133 "draining" => Some(LifecycleState::Draining),
2134 "stopping" => Some(LifecycleState::Stopping),
2135 "stopped" => Some(LifecycleState::Stopped),
2136 _ => None,
2137 }
2138}
2139
2140fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
2141 if !matches!(value, PickleValue::Dict(_)) {
2142 return Ok(None);
2143 }
2144 let state = value
2145 .get("state")
2146 .and_then(|entry| entry.as_str())
2147 .and_then(parse_lifecycle_state)
2148 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
2149 let drain_age_seconds = value.get("drain_age_seconds").and_then(|entry| {
2150 entry
2151 .as_float()
2152 .or_else(|| entry.as_int().map(|v| v as f64))
2153 });
2154 let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
2155 entry
2156 .as_float()
2157 .or_else(|| entry.as_int().map(|v| v as f64))
2158 });
2159 let drain_complete = value
2160 .get("drain_complete")
2161 .and_then(|entry| entry.as_bool())
2162 .unwrap_or(false);
2163 let interface_writer_queued_frames = value
2164 .get("interface_writer_queued_frames")
2165 .and_then(|entry| entry.as_int())
2166 .unwrap_or(0)
2167 .max(0) as usize;
2168 let provider_backlog_events = value
2169 .get("provider_backlog_events")
2170 .and_then(|entry| entry.as_int())
2171 .unwrap_or(0)
2172 .max(0) as usize;
2173 let provider_consumer_queued_events = value
2174 .get("provider_consumer_queued_events")
2175 .and_then(|entry| entry.as_int())
2176 .unwrap_or(0)
2177 .max(0) as usize;
2178 let detail = value
2179 .get("detail")
2180 .and_then(|entry| entry.as_str().map(|v| v.to_string()));
2181 Ok(Some(DrainStatus {
2182 state,
2183 drain_age_seconds,
2184 deadline_remaining_seconds,
2185 drain_complete,
2186 interface_writer_queued_frames,
2187 provider_backlog_events,
2188 provider_consumer_queued_events,
2189 detail,
2190 }))
2191}
2192
2193fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
2194 let ok = response
2195 .get("ok")
2196 .and_then(|v| v.as_bool())
2197 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
2198 if ok {
2199 Ok(Ok(()))
2200 } else {
2201 Ok(Err(response
2202 .get("error")
2203 .and_then(|v| v.as_str())
2204 .unwrap_or("unknown hook error")
2205 .to_string()))
2206 }
2207}
2208
2209fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
2210 let list = response
2211 .as_list()
2212 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
2213 let mut hooks = Vec::with_capacity(list.len());
2214 for item in list {
2215 hooks.push(HookInfo {
2216 name: item
2217 .get("name")
2218 .and_then(|v| v.as_str())
2219 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
2220 .to_string(),
2221 attach_point: item
2222 .get("attach_point")
2223 .and_then(|v| v.as_str())
2224 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
2225 .to_string(),
2226 priority: item
2227 .get("priority")
2228 .and_then(|v| v.as_int())
2229 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
2230 as i32,
2231 enabled: item
2232 .get("enabled")
2233 .and_then(|v| v.as_bool())
2234 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
2235 consecutive_traps: item
2236 .get("consecutive_traps")
2237 .and_then(|v| v.as_int())
2238 .ok_or_else(|| {
2239 io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
2240 })? as u32,
2241 });
2242 }
2243 Ok(hooks)
2244}
2245
2246fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
2248 let challenge = recv_bytes(stream)?;
2250
2251 if !challenge.starts_with(CHALLENGE_PREFIX) {
2252 return Err(io::Error::new(
2253 io::ErrorKind::InvalidData,
2254 "expected challenge",
2255 ));
2256 }
2257
2258 let message = &challenge[CHALLENGE_PREFIX.len()..];
2259
2260 let response = create_response(auth_key, message);
2262 send_bytes(stream, &response)?;
2263
2264 let result = recv_bytes(stream)?;
2266 if result == WELCOME {
2267 Ok(())
2268 } else {
2269 Err(io::Error::new(
2270 io::ErrorKind::PermissionDenied,
2271 "authentication failed",
2272 ))
2273 }
2274}
2275
2276fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
2278 if message.starts_with(b"{sha256}") || message.len() > 20 {
2280 let digest = hmac_sha256(auth_key, message);
2282 let mut response = Vec::with_capacity(8 + 32);
2283 response.extend_from_slice(b"{sha256}");
2284 response.extend_from_slice(&digest);
2285 response
2286 } else {
2287 let digest = hmac_md5(auth_key, message);
2289 digest.to_vec()
2290 }
2291}
2292
2293pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
2295 sha256(private_key)
2296}
2297
2298#[cfg(test)]
2299mod tests {
2300 use super::*;
2301
2302 #[test]
2303 fn send_recv_bytes_roundtrip() {
2304 let (mut c1, mut c2) = tcp_pair();
2305 let data = b"hello world";
2306 send_bytes(&mut c1, data).unwrap();
2307 let received = recv_bytes(&mut c2).unwrap();
2308 assert_eq!(&received, data);
2309 }
2310
2311 #[test]
2312 fn send_recv_empty() {
2313 let (mut c1, mut c2) = tcp_pair();
2314 send_bytes(&mut c1, b"").unwrap();
2315 let received = recv_bytes(&mut c2).unwrap();
2316 assert!(received.is_empty());
2317 }
2318
2319 #[test]
2320 fn auth_success() {
2321 let key = derive_auth_key(b"test-private-key");
2322 let (mut server, mut client) = tcp_pair();
2323
2324 let key2 = key;
2325 let t = thread::spawn(move || {
2326 client_auth(&mut client, &key2).unwrap();
2327 });
2328
2329 server_auth(&mut server, &key).unwrap();
2330 t.join().unwrap();
2331 }
2332
2333 #[test]
2334 fn auth_failure_wrong_key() {
2335 let server_key = derive_auth_key(b"server-key");
2336 let client_key = derive_auth_key(b"wrong-key");
2337 let (mut server, mut client) = tcp_pair();
2338
2339 let t = thread::spawn(move || {
2340 let result = client_auth(&mut client, &client_key);
2341 assert!(result.is_err());
2342 });
2343
2344 let result = server_auth(&mut server, &server_key);
2345 assert!(result.is_err());
2346 t.join().unwrap();
2347 }
2348
2349 #[test]
2350 fn verify_sha256_response() {
2351 let key = derive_auth_key(b"mykey");
2352 let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2353 let response = create_response(&key, message);
2354 assert!(response.starts_with(b"{sha256}"));
2355 assert!(verify_response(&key, message, &response));
2356 }
2357
2358 #[test]
2359 fn verify_legacy_md5_response() {
2360 let key = derive_auth_key(b"mykey");
2361 let message = b"01234567890123456789";
2363 let digest = hmac_md5(&key, message);
2365 assert!(verify_response(&key, message, &digest));
2366 }
2367
2368 #[test]
2369 fn constant_time_eq_works() {
2370 assert!(constant_time_eq(b"hello", b"hello"));
2371 assert!(!constant_time_eq(b"hello", b"world"));
2372 assert!(!constant_time_eq(b"hello", b"hell"));
2373 }
2374
2375 #[test]
2376 fn rpc_roundtrip() {
2377 let key = derive_auth_key(b"test-key");
2378 let (event_tx, event_rx) = crate::event::channel();
2379
2380 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2383 let port = listener.local_addr().unwrap().port();
2384 listener.set_nonblocking(true).unwrap();
2385
2386 let shutdown = Arc::new(AtomicBool::new(false));
2387 let shutdown2 = shutdown.clone();
2388
2389 let driver_thread = thread::spawn(move || loop {
2391 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2392 Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2393 let _ = resp_tx.send(QueryResponse::LinkCount(42));
2394 }
2395 Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2396 let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2397 interfaces: vec![SingleInterfaceStat {
2398 id: 7,
2399 name: "TestInterface".into(),
2400 status: true,
2401 mode: 1,
2402 rxb: 1000,
2403 txb: 2000,
2404 rx_packets: 10,
2405 tx_packets: 20,
2406 bitrate: Some(10_000_000),
2407 ifac_size: None,
2408 started: 1000.0,
2409 ia_freq: 0.0,
2410 oa_freq: 0.0,
2411 interface_type: "TestInterface".into(),
2412 }],
2413 transport_id: None,
2414 transport_enabled: true,
2415 transport_uptime: 3600.0,
2416 total_rxb: 1000,
2417 total_txb: 2000,
2418 probe_responder: None,
2419 backbone_peer_pool: None,
2420 }));
2421 }
2422 _ => break,
2423 }
2424 });
2425
2426 let key2 = key;
2427 let shutdown3 = shutdown2.clone();
2428 let server_thread = thread::spawn(move || {
2429 rpc_server_loop(listener, key2, event_tx, shutdown3);
2430 });
2431
2432 thread::sleep(std::time::Duration::from_millis(50));
2434
2435 let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2437 let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2438 let response = client
2439 .call(&PickleValue::Dict(vec![(
2440 PickleValue::String("get".into()),
2441 PickleValue::String("link_count".into()),
2442 )]))
2443 .unwrap();
2444 assert_eq!(response.as_int().unwrap(), 42);
2445 drop(client);
2446
2447 let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2449 let response2 = client2
2450 .call(&PickleValue::Dict(vec![(
2451 PickleValue::String("get".into()),
2452 PickleValue::String("interface_stats".into()),
2453 )]))
2454 .unwrap();
2455 let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2456 assert_eq!(ifaces.len(), 1);
2457 let iface = &ifaces[0];
2458 assert_eq!(
2459 iface.get("name").unwrap().as_str().unwrap(),
2460 "TestInterface"
2461 );
2462 assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2463 drop(client2);
2464
2465 shutdown2.store(true, Ordering::Relaxed);
2467 server_thread.join().unwrap();
2468 driver_thread.join().unwrap();
2469 }
2470
2471 #[test]
2472 fn derive_auth_key_deterministic() {
2473 let key1 = derive_auth_key(b"test");
2474 let key2 = derive_auth_key(b"test");
2475 assert_eq!(key1, key2);
2476 let key3 = derive_auth_key(b"other");
2478 assert_ne!(key1, key3);
2479 }
2480
2481 #[test]
2482 fn pickle_request_handling() {
2483 let (event_tx, event_rx) = crate::event::channel();
2485
2486 let driver = thread::spawn(move || {
2487 if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2488 {
2489 assert_eq!(dest_hash, [1u8; 16]);
2490 let _ = resp_tx.send(QueryResponse::DropPath(true));
2491 }
2492 });
2493
2494 let request = PickleValue::Dict(vec![
2495 (
2496 PickleValue::String("drop".into()),
2497 PickleValue::String("path".into()),
2498 ),
2499 (
2500 PickleValue::String("destination_hash".into()),
2501 PickleValue::Bytes(vec![1u8; 16]),
2502 ),
2503 ]);
2504
2505 let response = handle_rpc_request(&request, &event_tx).unwrap();
2506 assert_eq!(response, PickleValue::Bool(true));
2507 driver.join().unwrap();
2508 }
2509
2510 #[test]
2511 fn hook_list_request_handling() {
2512 let (event_tx, event_rx) = crate::event::channel();
2513
2514 let driver = thread::spawn(move || {
2515 if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2516 let _ = response_tx.send(vec![HookInfo {
2517 name: "stats".into(),
2518 attach_point: "PreIngress".into(),
2519 priority: 7,
2520 enabled: true,
2521 consecutive_traps: 0,
2522 }]);
2523 }
2524 });
2525
2526 let request = PickleValue::Dict(vec![(
2527 PickleValue::String("get".into()),
2528 PickleValue::String("hooks".into()),
2529 )]);
2530 let response = handle_rpc_request(&request, &event_tx).unwrap();
2531 let hooks = parse_hook_list(&response).unwrap();
2532 assert_eq!(hooks.len(), 1);
2533 assert_eq!(hooks[0].name, "stats");
2534 driver.join().unwrap();
2535 }
2536
2537 #[test]
2538 fn hook_load_request_handling() {
2539 let (event_tx, event_rx) = crate::event::channel();
2540
2541 let driver = thread::spawn(move || {
2542 if let Ok(Event::LoadHook {
2543 name,
2544 wasm_bytes,
2545 attach_point,
2546 priority,
2547 response_tx,
2548 }) = event_rx.recv()
2549 {
2550 assert_eq!(name, "stats");
2551 assert_eq!(attach_point, "PreIngress");
2552 assert_eq!(priority, 11);
2553 assert_eq!(wasm_bytes, vec![1, 2, 3]);
2554 let _ = response_tx.send(Ok(()));
2555 }
2556 });
2557
2558 let request = PickleValue::Dict(vec![
2559 (
2560 PickleValue::String("hook".into()),
2561 PickleValue::String("load".into()),
2562 ),
2563 (
2564 PickleValue::String("name".into()),
2565 PickleValue::String("stats".into()),
2566 ),
2567 (
2568 PickleValue::String("attach_point".into()),
2569 PickleValue::String("PreIngress".into()),
2570 ),
2571 (PickleValue::String("priority".into()), PickleValue::Int(11)),
2572 (
2573 PickleValue::String("wasm".into()),
2574 PickleValue::Bytes(vec![1, 2, 3]),
2575 ),
2576 ]);
2577 let response = handle_rpc_request(&request, &event_tx).unwrap();
2578 assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2579 driver.join().unwrap();
2580 }
2581
2582 #[test]
2583 fn interface_stats_pickle_format() {
2584 let stats = InterfaceStatsResponse {
2585 interfaces: vec![SingleInterfaceStat {
2586 id: 1,
2587 name: "TCP".into(),
2588 status: true,
2589 mode: 1,
2590 rxb: 100,
2591 txb: 200,
2592 rx_packets: 5,
2593 tx_packets: 10,
2594 bitrate: Some(1000000),
2595 ifac_size: Some(16),
2596 started: 1000.0,
2597 ia_freq: 0.0,
2598 oa_freq: 0.0,
2599 interface_type: "TCPClientInterface".into(),
2600 }],
2601 transport_id: Some([0xAB; 16]),
2602 transport_enabled: true,
2603 transport_uptime: 3600.0,
2604 total_rxb: 100,
2605 total_txb: 200,
2606 probe_responder: None,
2607 backbone_peer_pool: None,
2608 };
2609
2610 let pickle = interface_stats_to_pickle(&stats);
2611
2612 let encoded = pickle::encode(&pickle);
2614 let decoded = pickle::decode(&encoded).unwrap();
2615 assert_eq!(
2616 decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2617 true
2618 );
2619 let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2620 assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2621 assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2622 }
2623
2624 #[test]
2625 fn send_probe_rpc_unknown_dest() {
2626 let (event_tx, event_rx) = crate::event::channel();
2627
2628 let driver = thread::spawn(move || {
2629 if let Ok(Event::Query(
2630 QueryRequest::SendProbe {
2631 dest_hash,
2632 payload_size,
2633 },
2634 resp_tx,
2635 )) = event_rx.recv()
2636 {
2637 assert_eq!(dest_hash, [0xAA; 16]);
2638 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2640 }
2641 });
2642
2643 let request = PickleValue::Dict(vec![(
2644 PickleValue::String("send_probe".into()),
2645 PickleValue::Bytes(vec![0xAA; 16]),
2646 )]);
2647
2648 let response = handle_rpc_request(&request, &event_tx).unwrap();
2649 assert_eq!(response, PickleValue::None);
2650 driver.join().unwrap();
2651 }
2652
2653 #[test]
2654 fn send_probe_rpc_with_result() {
2655 let (event_tx, event_rx) = crate::event::channel();
2656
2657 let packet_hash = [0xBB; 32];
2658 let driver = thread::spawn(move || {
2659 if let Ok(Event::Query(
2660 QueryRequest::SendProbe {
2661 dest_hash,
2662 payload_size,
2663 },
2664 resp_tx,
2665 )) = event_rx.recv()
2666 {
2667 assert_eq!(dest_hash, [0xCC; 16]);
2668 assert_eq!(payload_size, 32);
2669 let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
2670 }
2671 });
2672
2673 let request = PickleValue::Dict(vec![
2674 (
2675 PickleValue::String("send_probe".into()),
2676 PickleValue::Bytes(vec![0xCC; 16]),
2677 ),
2678 (PickleValue::String("size".into()), PickleValue::Int(32)),
2679 ]);
2680
2681 let response = handle_rpc_request(&request, &event_tx).unwrap();
2682 let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
2683 assert_eq!(ph, &[0xBB; 32]);
2684 assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
2685 driver.join().unwrap();
2686 }
2687
2688 #[test]
2689 fn send_probe_rpc_size_validation() {
2690 let (event_tx, event_rx) = crate::event::channel();
2691
2692 let driver = thread::spawn(move || {
2694 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2695 event_rx.recv()
2696 {
2697 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2699 }
2700 });
2701
2702 let request = PickleValue::Dict(vec![
2703 (
2704 PickleValue::String("send_probe".into()),
2705 PickleValue::Bytes(vec![0xDD; 16]),
2706 ),
2707 (PickleValue::String("size".into()), PickleValue::Int(-1)),
2708 ]);
2709
2710 let response = handle_rpc_request(&request, &event_tx).unwrap();
2711 assert_eq!(response, PickleValue::None);
2712 driver.join().unwrap();
2713 }
2714
2715 #[test]
2716 fn send_probe_rpc_size_too_large() {
2717 let (event_tx, event_rx) = crate::event::channel();
2718
2719 let driver = thread::spawn(move || {
2721 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2722 event_rx.recv()
2723 {
2724 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2726 }
2727 });
2728
2729 let request = PickleValue::Dict(vec![
2730 (
2731 PickleValue::String("send_probe".into()),
2732 PickleValue::Bytes(vec![0xDD; 16]),
2733 ),
2734 (PickleValue::String("size".into()), PickleValue::Int(999)),
2735 ]);
2736
2737 let response = handle_rpc_request(&request, &event_tx).unwrap();
2738 assert_eq!(response, PickleValue::None);
2739 driver.join().unwrap();
2740 }
2741
2742 #[test]
2743 fn check_proof_rpc_not_found() {
2744 let (event_tx, event_rx) = crate::event::channel();
2745
2746 let driver = thread::spawn(move || {
2747 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2748 event_rx.recv()
2749 {
2750 assert_eq!(packet_hash, [0xEE; 32]);
2751 let _ = resp_tx.send(QueryResponse::CheckProof(None));
2752 }
2753 });
2754
2755 let request = PickleValue::Dict(vec![(
2756 PickleValue::String("check_proof".into()),
2757 PickleValue::Bytes(vec![0xEE; 32]),
2758 )]);
2759
2760 let response = handle_rpc_request(&request, &event_tx).unwrap();
2761 assert_eq!(response, PickleValue::None);
2762 driver.join().unwrap();
2763 }
2764
2765 #[test]
2766 fn check_proof_rpc_found() {
2767 let (event_tx, event_rx) = crate::event::channel();
2768
2769 let driver = thread::spawn(move || {
2770 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2771 event_rx.recv()
2772 {
2773 assert_eq!(packet_hash, [0xFF; 32]);
2774 let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
2775 }
2776 });
2777
2778 let request = PickleValue::Dict(vec![(
2779 PickleValue::String("check_proof".into()),
2780 PickleValue::Bytes(vec![0xFF; 32]),
2781 )]);
2782
2783 let response = handle_rpc_request(&request, &event_tx).unwrap();
2784 if let PickleValue::Float(rtt) = response {
2785 assert!((rtt - 0.352).abs() < 0.001);
2786 } else {
2787 panic!("Expected Float, got {:?}", response);
2788 }
2789 driver.join().unwrap();
2790 }
2791
2792 #[test]
2793 fn request_path_rpc() {
2794 let (event_tx, event_rx) = crate::event::channel();
2795
2796 let driver =
2797 thread::spawn(
2798 move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2799 Ok(Event::RequestPath { dest_hash }) => {
2800 assert_eq!(dest_hash, [0x11; 16]);
2801 }
2802 other => panic!("Expected RequestPath event, got {:?}", other),
2803 },
2804 );
2805
2806 let request = PickleValue::Dict(vec![(
2807 PickleValue::String("request_path".into()),
2808 PickleValue::Bytes(vec![0x11; 16]),
2809 )]);
2810
2811 let response = handle_rpc_request(&request, &event_tx).unwrap();
2812 assert_eq!(response, PickleValue::Bool(true));
2813 driver.join().unwrap();
2814 }
2815
2816 #[test]
2817 fn begin_drain_rpc_emits_event() {
2818 let (event_tx, event_rx) = crate::event::channel();
2819
2820 let driver = thread::spawn(
2821 move || match event_rx.recv_timeout(Duration::from_secs(5)) {
2822 Ok(Event::BeginDrain { timeout }) => {
2823 assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
2824 }
2825 other => panic!("Expected BeginDrain event, got {:?}", other),
2826 },
2827 );
2828
2829 let request = PickleValue::Dict(vec![(
2830 PickleValue::String("begin_drain".into()),
2831 PickleValue::Float(1.5),
2832 )]);
2833
2834 let response = handle_rpc_request(&request, &event_tx).unwrap();
2835 assert_eq!(response, PickleValue::Bool(true));
2836 driver.join().unwrap();
2837 }
2838
2839 #[test]
2840 fn drain_status_rpc_roundtrips_fields() {
2841 let (event_tx, event_rx) = crate::event::channel();
2842
2843 let driver = thread::spawn(move || {
2844 if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
2845 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
2846 state: LifecycleState::Draining,
2847 drain_age_seconds: Some(0.75),
2848 deadline_remaining_seconds: Some(2.25),
2849 drain_complete: false,
2850 interface_writer_queued_frames: 3,
2851 provider_backlog_events: 4,
2852 provider_consumer_queued_events: 5,
2853 detail: Some("node is draining existing work".into()),
2854 }));
2855 }
2856 });
2857
2858 let request = PickleValue::Dict(vec![(
2859 PickleValue::String("get".into()),
2860 PickleValue::String("drain_status".into()),
2861 )]);
2862
2863 let response = handle_rpc_request(&request, &event_tx).unwrap();
2864 assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
2865 assert_eq!(
2866 response.get("drain_complete").unwrap().as_bool(),
2867 Some(false)
2868 );
2869 assert_eq!(
2870 response
2871 .get("deadline_remaining_seconds")
2872 .unwrap()
2873 .as_float(),
2874 Some(2.25)
2875 );
2876 assert_eq!(
2877 response
2878 .get("interface_writer_queued_frames")
2879 .unwrap()
2880 .as_int(),
2881 Some(3)
2882 );
2883 assert_eq!(
2884 response.get("provider_backlog_events").unwrap().as_int(),
2885 Some(4)
2886 );
2887 assert_eq!(
2888 response
2889 .get("provider_consumer_queued_events")
2890 .unwrap()
2891 .as_int(),
2892 Some(5)
2893 );
2894 assert_eq!(
2895 response.get("detail").unwrap().as_str(),
2896 Some("node is draining existing work")
2897 );
2898 driver.join().unwrap();
2899 }
2900
2901 #[test]
2902 fn interface_stats_with_probe_responder() {
2903 let probe_hash = [0x42; 16];
2904 let stats = InterfaceStatsResponse {
2905 interfaces: vec![],
2906 transport_id: None,
2907 transport_enabled: true,
2908 transport_uptime: 100.0,
2909 total_rxb: 0,
2910 total_txb: 0,
2911 probe_responder: Some(probe_hash),
2912 backbone_peer_pool: None,
2913 };
2914
2915 let pickle = interface_stats_to_pickle(&stats);
2916 let encoded = pickle::encode(&pickle);
2917 let decoded = pickle::decode(&encoded).unwrap();
2918
2919 let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
2920 assert_eq!(pr, &probe_hash);
2921 }
2922
2923 #[test]
2924 fn runtime_config_get_and_set_rpc() {
2925 let (event_tx, event_rx) = crate::event::channel();
2926
2927 let driver = thread::spawn(move || {
2928 if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
2929 event_rx.recv()
2930 {
2931 assert_eq!(key, "global.tick_interval_ms");
2932 let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
2933 RuntimeConfigEntry {
2934 key,
2935 value: RuntimeConfigValue::Int(1000),
2936 default: RuntimeConfigValue::Int(1000),
2937 source: RuntimeConfigSource::Startup,
2938 apply_mode: RuntimeConfigApplyMode::Immediate,
2939 description: Some("tick".into()),
2940 },
2941 )));
2942 } else {
2943 panic!("expected GetRuntimeConfig query");
2944 }
2945
2946 if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
2947 event_rx.recv()
2948 {
2949 assert_eq!(key, "global.tick_interval_ms");
2950 assert_eq!(value, RuntimeConfigValue::Int(250));
2951 let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
2952 key,
2953 value: RuntimeConfigValue::Int(250),
2954 default: RuntimeConfigValue::Int(1000),
2955 source: RuntimeConfigSource::RuntimeOverride,
2956 apply_mode: RuntimeConfigApplyMode::Immediate,
2957 description: Some("tick".into()),
2958 })));
2959 } else {
2960 panic!("expected SetRuntimeConfig query");
2961 }
2962 });
2963
2964 let get_request = PickleValue::Dict(vec![
2965 (
2966 PickleValue::String("get".into()),
2967 PickleValue::String("runtime_config_entry".into()),
2968 ),
2969 (
2970 PickleValue::String("key".into()),
2971 PickleValue::String("global.tick_interval_ms".into()),
2972 ),
2973 ]);
2974 let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
2975 assert_eq!(
2976 get_response.get("key").and_then(|v| v.as_str()),
2977 Some("global.tick_interval_ms")
2978 );
2979
2980 let set_request = PickleValue::Dict(vec![
2981 (
2982 PickleValue::String("set".into()),
2983 PickleValue::String("runtime_config".into()),
2984 ),
2985 (
2986 PickleValue::String("key".into()),
2987 PickleValue::String("global.tick_interval_ms".into()),
2988 ),
2989 (PickleValue::String("value".into()), PickleValue::Int(250)),
2990 ]);
2991 let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
2992 assert_eq!(
2993 set_response.get("value").and_then(|v| v.as_int()),
2994 Some(250)
2995 );
2996
2997 driver.join().unwrap();
2998 }
2999
3000 fn tcp_pair() -> (TcpStream, TcpStream) {
3002 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3003 let port = listener.local_addr().unwrap().port();
3004 let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
3005 let (server, _) = listener.accept().unwrap();
3006 client
3007 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3008 .unwrap();
3009 server
3010 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3011 .unwrap();
3012 (server, client)
3013 }
3014}