1use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_crypto::hmac::hmac_sha256;
22use rns_crypto::sha256::sha256;
23
24use crate::event::{
25 BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
26 HookInfo, InterfaceStatsResponse, LifecycleState, PathTableEntry, ProviderBridgeStats,
27 QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode, RuntimeConfigEntry,
28 RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource, RuntimeConfigValue,
29 SingleInterfaceStat,
30};
31use crate::md5::hmac_md5;
32use crate::pickle::{self, PickleValue};
33
34const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
35const WELCOME: &[u8] = b"#WELCOME#";
36const FAILURE: &[u8] = b"#FAILURE#";
37const CHALLENGE_LEN: usize = 40;
38
39#[derive(Debug, Clone)]
41pub enum RpcAddr {
42 Tcp(String, u16),
43}
44
45pub struct RpcServer {
47 shutdown: Arc<AtomicBool>,
48 thread: Option<thread::JoinHandle<()>>,
49}
50
51impl RpcServer {
52 pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
54 let shutdown = Arc::new(AtomicBool::new(false));
55 let shutdown2 = shutdown.clone();
56
57 let listener = match addr {
58 RpcAddr::Tcp(host, port) => {
59 let l = TcpListener::bind((host.as_str(), *port))?;
60 l.set_nonblocking(true)?;
62 l
63 }
64 };
65
66 let thread = thread::Builder::new()
67 .name("rpc-server".into())
68 .spawn(move || {
69 rpc_server_loop(listener, auth_key, event_tx, shutdown2);
70 })
71 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
72
73 Ok(RpcServer {
74 shutdown,
75 thread: Some(thread),
76 })
77 }
78
79 pub fn stop(&mut self) {
81 self.shutdown.store(true, Ordering::Relaxed);
82 if let Some(handle) = self.thread.take() {
83 let _ = handle.join();
84 }
85 }
86}
87
88impl Drop for RpcServer {
89 fn drop(&mut self) {
90 self.stop();
91 }
92}
93
94fn rpc_server_loop(
95 listener: TcpListener,
96 auth_key: [u8; 32],
97 event_tx: EventSender,
98 shutdown: Arc<AtomicBool>,
99) {
100 loop {
101 if shutdown.load(Ordering::Relaxed) {
102 break;
103 }
104
105 match listener.accept() {
106 Ok((stream, _addr)) => {
107 let _ = stream.set_nonblocking(false);
109 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
110 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
111
112 if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
113 log::debug!("RPC connection error: {}", e);
114 }
115 }
116 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117 thread::sleep(std::time::Duration::from_millis(100));
119 }
120 Err(e) => {
121 log::error!("RPC accept error: {}", e);
122 thread::sleep(std::time::Duration::from_millis(100));
123 }
124 }
125 }
126}
127
128fn handle_connection(
129 mut stream: TcpStream,
130 auth_key: &[u8; 32],
131 event_tx: &EventSender,
132) -> io::Result<()> {
133 server_auth(&mut stream, auth_key)?;
135
136 let request_bytes = recv_bytes(&mut stream)?;
138 let request = pickle::decode(&request_bytes)
139 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
140
141 let response = handle_rpc_request(&request, event_tx)?;
143
144 let response_bytes = pickle::encode(&response);
146 send_bytes(&mut stream, &response_bytes)?;
147
148 Ok(())
149}
150
151fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
153 let mut random_bytes = [0u8; CHALLENGE_LEN];
155 {
157 let mut f = std::fs::File::open("/dev/urandom")?;
158 f.read_exact(&mut random_bytes)?;
159 }
160
161 let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
162 challenge_message.extend_from_slice(CHALLENGE_PREFIX);
163 challenge_message.extend_from_slice(b"{sha256}");
164 challenge_message.extend_from_slice(&random_bytes);
165
166 send_bytes(stream, &challenge_message)?;
167
168 let response = recv_bytes(stream)?;
170
171 let message = &challenge_message[CHALLENGE_PREFIX.len()..];
174
175 if verify_response(auth_key, message, &response) {
176 send_bytes(stream, WELCOME)?;
177 Ok(())
178 } else {
179 send_bytes(stream, FAILURE)?;
180 Err(io::Error::new(
181 io::ErrorKind::PermissionDenied,
182 "auth failed",
183 ))
184 }
185}
186
187fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
189 if response.starts_with(b"{sha256}") {
191 let digest = &response[8..];
192 let expected = hmac_sha256(auth_key, message);
193 constant_time_eq(digest, &expected)
194 }
195 else if response.len() == 16 {
197 let expected = hmac_md5(auth_key, message);
198 constant_time_eq(response, &expected)
199 }
200 else if response.starts_with(b"{md5}") {
202 let digest = &response[5..];
203 let expected = hmac_md5(auth_key, message);
204 constant_time_eq(digest, &expected)
205 } else {
206 false
207 }
208}
209
210fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
212 if a.len() != b.len() {
213 return false;
214 }
215 let mut diff = 0u8;
216 for (x, y) in a.iter().zip(b.iter()) {
217 diff |= x ^ y;
218 }
219 diff == 0
220}
221
222fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
224 let len = data.len() as i32;
225 stream.write_all(&len.to_be_bytes())?;
226 stream.write_all(data)?;
227 stream.flush()
228}
229
230fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
232 let mut len_buf = [0u8; 4];
233 stream.read_exact(&mut len_buf)?;
234 let len = i32::from_be_bytes(len_buf);
235
236 if len < 0 {
237 let mut len8_buf = [0u8; 8];
239 stream.read_exact(&mut len8_buf)?;
240 let len = u64::from_be_bytes(len8_buf) as usize;
241 if len > 64 * 1024 * 1024 {
242 return Err(io::Error::new(
243 io::ErrorKind::InvalidData,
244 "message too large",
245 ));
246 }
247 let mut buf = vec![0u8; len];
248 stream.read_exact(&mut buf)?;
249 Ok(buf)
250 } else {
251 let len = len as usize;
252 if len > 64 * 1024 * 1024 {
253 return Err(io::Error::new(
254 io::ErrorKind::InvalidData,
255 "message too large",
256 ));
257 }
258 let mut buf = vec![0u8; len];
259 stream.read_exact(&mut buf)?;
260 Ok(buf)
261 }
262}
263
264fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
266 if let Some(get_val) = request.get("get") {
268 if let Some(path) = get_val.as_str() {
269 return match path {
270 "interface_stats" => {
271 let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
272 if let QueryResponse::InterfaceStats(stats) = resp {
273 Ok(interface_stats_to_pickle(&stats))
274 } else {
275 Ok(PickleValue::None)
276 }
277 }
278 "path_table" => {
279 let max_hops = request
280 .get("max_hops")
281 .and_then(|v| v.as_int().map(|n| n as u8));
282 let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
283 if let QueryResponse::PathTable(entries) = resp {
284 Ok(path_table_to_pickle(&entries))
285 } else {
286 Ok(PickleValue::None)
287 }
288 }
289 "rate_table" => {
290 let resp = send_query(event_tx, QueryRequest::RateTable)?;
291 if let QueryResponse::RateTable(entries) = resp {
292 Ok(rate_table_to_pickle(&entries))
293 } else {
294 Ok(PickleValue::None)
295 }
296 }
297 "next_hop" => {
298 let hash = extract_dest_hash(request, "destination_hash")?;
299 let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
300 if let QueryResponse::NextHop(Some(nh)) = resp {
301 Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
302 } else {
303 Ok(PickleValue::None)
304 }
305 }
306 "next_hop_if_name" => {
307 let hash = extract_dest_hash(request, "destination_hash")?;
308 let resp =
309 send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
310 if let QueryResponse::NextHopIfName(Some(name)) = resp {
311 Ok(PickleValue::String(name))
312 } else {
313 Ok(PickleValue::None)
314 }
315 }
316 "link_count" => {
317 let resp = send_query(event_tx, QueryRequest::LinkCount)?;
318 if let QueryResponse::LinkCount(n) = resp {
319 Ok(PickleValue::Int(n as i64))
320 } else {
321 Ok(PickleValue::None)
322 }
323 }
324 "transport_identity" => {
325 let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
326 if let QueryResponse::TransportIdentity(Some(hash)) = resp {
327 Ok(PickleValue::Bytes(hash.to_vec()))
328 } else {
329 Ok(PickleValue::None)
330 }
331 }
332 "blackholed" => {
333 let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
334 if let QueryResponse::Blackholed(entries) = resp {
335 Ok(blackholed_to_pickle(&entries))
336 } else {
337 Ok(PickleValue::None)
338 }
339 }
340 "discovered_interfaces" => {
341 let only_available = request
342 .get("only_available")
343 .and_then(|v| v.as_bool())
344 .unwrap_or(false);
345 let only_transport = request
346 .get("only_transport")
347 .and_then(|v| v.as_bool())
348 .unwrap_or(false);
349 let resp = send_query(
350 event_tx,
351 QueryRequest::DiscoveredInterfaces {
352 only_available,
353 only_transport,
354 },
355 )?;
356 if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
357 Ok(discovered_interfaces_to_pickle(&interfaces))
358 } else {
359 Ok(PickleValue::None)
360 }
361 }
362 "hooks" => {
363 let (response_tx, response_rx) = mpsc::channel();
364 event_tx
365 .send(Event::ListHooks { response_tx })
366 .map_err(|_| {
367 io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
368 })?;
369 let hooks = response_rx
370 .recv_timeout(std::time::Duration::from_secs(5))
371 .map_err(|_| {
372 io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
373 })?;
374 Ok(hooks_to_pickle(&hooks))
375 }
376 "runtime_config" => {
377 let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
378 if let QueryResponse::RuntimeConfigList(entries) = resp {
379 Ok(runtime_config_list_to_pickle(&entries))
380 } else {
381 Ok(PickleValue::None)
382 }
383 }
384 "runtime_config_entry" => {
385 let key = request
386 .get("key")
387 .and_then(|v| v.as_str())
388 .unwrap_or_default()
389 .to_string();
390 let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
391 if let QueryResponse::RuntimeConfigEntry(entry) = resp {
392 Ok(entry
393 .as_ref()
394 .map(runtime_config_entry_to_pickle)
395 .unwrap_or(PickleValue::None))
396 } else {
397 Ok(PickleValue::None)
398 }
399 }
400 "backbone_peer_state" => {
401 let interface_name = request
402 .get("interface")
403 .and_then(|v| v.as_str())
404 .map(|s| s.to_string());
405 let resp =
406 send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
407 if let QueryResponse::BackbonePeerState(entries) = resp {
408 Ok(backbone_peer_state_to_pickle(&entries))
409 } else {
410 Ok(PickleValue::None)
411 }
412 }
413 "backbone_interfaces" => {
414 let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
415 if let QueryResponse::BackboneInterfaces(entries) = resp {
416 Ok(backbone_interfaces_to_pickle(&entries))
417 } else {
418 Ok(PickleValue::None)
419 }
420 }
421 "provider_bridge_stats" => {
422 let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
423 if let QueryResponse::ProviderBridgeStats(stats) = resp {
424 Ok(stats
425 .as_ref()
426 .map(provider_bridge_stats_to_pickle)
427 .unwrap_or(PickleValue::None))
428 } else {
429 Ok(PickleValue::None)
430 }
431 }
432 "drain_status" => {
433 let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
434 if let QueryResponse::DrainStatus(status) = resp {
435 Ok(drain_status_to_pickle(&status))
436 } else {
437 Ok(PickleValue::None)
438 }
439 }
440 _ => Ok(PickleValue::None),
441 };
442 }
443 }
444
445 if let Some(begin_val) = request.get("begin_drain") {
446 let timeout_secs = begin_val
447 .as_float()
448 .or_else(|| begin_val.as_int().map(|value| value as f64))
449 .unwrap_or(0.0)
450 .max(0.0);
451 let timeout = Duration::from_secs_f64(timeout_secs);
452 let _ = event_tx.send(Event::BeginDrain { timeout });
453 return Ok(PickleValue::Bool(true));
454 }
455
456 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
457 if set_val == "runtime_config" {
458 let key = request
459 .get("key")
460 .and_then(|v| v.as_str())
461 .unwrap_or_default()
462 .to_string();
463 let Some(value) = request
464 .get("value")
465 .and_then(runtime_config_value_from_pickle)
466 else {
467 return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
468 code: RuntimeConfigErrorCode::InvalidType,
469 message: "runtime-config set requires a scalar value".into(),
470 }));
471 };
472 let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
473 return if let QueryResponse::RuntimeConfigSet(result) = resp {
474 Ok(runtime_config_result_to_pickle(result))
475 } else {
476 Ok(PickleValue::None)
477 };
478 }
479 }
480
481 if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
482 if reset_val == "runtime_config" {
483 let key = request
484 .get("key")
485 .and_then(|v| v.as_str())
486 .unwrap_or_default()
487 .to_string();
488 let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
489 return if let QueryResponse::RuntimeConfigReset(result) = resp {
490 Ok(runtime_config_result_to_pickle(result))
491 } else {
492 Ok(PickleValue::None)
493 };
494 }
495 }
496
497 if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
498 if clear_val == "backbone_peer_state" {
499 let interface_name = required_string(request, "interface")?;
500 let peer_ip = required_string(request, "ip")?;
501 let peer_ip = peer_ip
502 .parse()
503 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
504 let resp = send_query(
505 event_tx,
506 QueryRequest::ClearBackbonePeerState {
507 interface_name,
508 peer_ip,
509 },
510 )?;
511 return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
512 Ok(PickleValue::Bool(ok))
513 } else {
514 Ok(PickleValue::None)
515 };
516 }
517 }
518
519 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
520 if set_val == "backbone_peer_blacklist" {
521 let interface_name = required_string(request, "interface")?;
522 let peer_ip = required_string(request, "ip")?;
523 let peer_ip: IpAddr = peer_ip
524 .parse()
525 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
526 let duration_secs = request
527 .get("duration_secs")
528 .and_then(|v| v.as_int())
529 .ok_or_else(|| {
530 io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
531 })?;
532 let reason = request
533 .get("reason")
534 .and_then(|v| v.as_str())
535 .unwrap_or("sentinel blacklist")
536 .to_string();
537 let penalty_level = request
538 .get("penalty_level")
539 .and_then(|v| v.as_int())
540 .unwrap_or(0)
541 .clamp(0, u8::MAX as i64) as u8;
542 let resp = send_query(
543 event_tx,
544 QueryRequest::BlacklistBackbonePeer {
545 interface_name,
546 peer_ip,
547 duration: Duration::from_secs(duration_secs as u64),
548 reason,
549 penalty_level,
550 },
551 )?;
552 return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
553 Ok(PickleValue::Bool(ok))
554 } else {
555 Ok(PickleValue::None)
556 };
557 }
558 }
559
560 if let Some(hash_val) = request.get("request_path") {
562 if let Some(hash_bytes) = hash_val.as_bytes() {
563 if hash_bytes.len() >= 16 {
564 let mut dest_hash = [0u8; 16];
565 dest_hash.copy_from_slice(&hash_bytes[..16]);
566 let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
567 return Ok(PickleValue::Bool(true));
568 }
569 }
570 }
571
572 if let Some(hash_val) = request.get("send_probe") {
574 if let Some(hash_bytes) = hash_val.as_bytes() {
575 if hash_bytes.len() >= 16 {
576 let mut dest_hash = [0u8; 16];
577 dest_hash.copy_from_slice(&hash_bytes[..16]);
578 let payload_size = request
579 .get("size")
580 .and_then(|v| v.as_int())
581 .and_then(|n| {
582 if n > 0 && n <= 400 {
583 Some(n as usize)
584 } else {
585 None
586 }
587 })
588 .unwrap_or(16);
589 let resp = send_query(
590 event_tx,
591 QueryRequest::SendProbe {
592 dest_hash,
593 payload_size,
594 },
595 )?;
596 if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
597 return Ok(PickleValue::Dict(vec![
598 (
599 PickleValue::String("packet_hash".into()),
600 PickleValue::Bytes(packet_hash.to_vec()),
601 ),
602 (
603 PickleValue::String("hops".into()),
604 PickleValue::Int(hops as i64),
605 ),
606 ]));
607 } else {
608 return Ok(PickleValue::None);
609 }
610 }
611 }
612 }
613
614 if let Some(hash_val) = request.get("check_proof") {
616 if let Some(hash_bytes) = hash_val.as_bytes() {
617 if hash_bytes.len() >= 32 {
618 let mut packet_hash = [0u8; 32];
619 packet_hash.copy_from_slice(&hash_bytes[..32]);
620 let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
621 if let QueryResponse::CheckProof(Some(rtt)) = resp {
622 return Ok(PickleValue::Float(rtt));
623 } else {
624 return Ok(PickleValue::None);
625 }
626 }
627 }
628 }
629
630 if let Some(hash_val) = request.get("blackhole") {
632 if let Some(hash_bytes) = hash_val.as_bytes() {
633 if hash_bytes.len() >= 16 {
634 let mut identity_hash = [0u8; 16];
635 identity_hash.copy_from_slice(&hash_bytes[..16]);
636 let duration_hours = request.get("duration").and_then(|v| v.as_float());
637 let reason = request
638 .get("reason")
639 .and_then(|v| v.as_str())
640 .map(|s| s.to_string());
641 let resp = send_query(
642 event_tx,
643 QueryRequest::BlackholeIdentity {
644 identity_hash,
645 duration_hours,
646 reason,
647 },
648 )?;
649 return Ok(PickleValue::Bool(matches!(
650 resp,
651 QueryResponse::BlackholeResult(true)
652 )));
653 }
654 }
655 }
656
657 if let Some(hash_val) = request.get("unblackhole") {
659 if let Some(hash_bytes) = hash_val.as_bytes() {
660 if hash_bytes.len() >= 16 {
661 let mut identity_hash = [0u8; 16];
662 identity_hash.copy_from_slice(&hash_bytes[..16]);
663 let resp = send_query(
664 event_tx,
665 QueryRequest::UnblackholeIdentity { identity_hash },
666 )?;
667 return Ok(PickleValue::Bool(matches!(
668 resp,
669 QueryResponse::UnblackholeResult(true)
670 )));
671 }
672 }
673 }
674
675 if let Some(drop_val) = request.get("drop") {
677 if let Some(path) = drop_val.as_str() {
678 return match path {
679 "path" => {
680 let hash = extract_dest_hash(request, "destination_hash")?;
681 let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
682 if let QueryResponse::DropPath(ok) = resp {
683 Ok(PickleValue::Bool(ok))
684 } else {
685 Ok(PickleValue::None)
686 }
687 }
688 "all_via" => {
689 let hash = extract_dest_hash(request, "destination_hash")?;
690 let resp = send_query(
691 event_tx,
692 QueryRequest::DropAllVia {
693 transport_hash: hash,
694 },
695 )?;
696 if let QueryResponse::DropAllVia(n) = resp {
697 Ok(PickleValue::Int(n as i64))
698 } else {
699 Ok(PickleValue::None)
700 }
701 }
702 "announce_queues" => {
703 let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
704 if let QueryResponse::DropAnnounceQueues = resp {
705 Ok(PickleValue::Bool(true))
706 } else {
707 Ok(PickleValue::None)
708 }
709 }
710 _ => Ok(PickleValue::None),
711 };
712 }
713 }
714
715 if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
716 return handle_hook_rpc_request(hook_val, request, event_tx);
717 }
718
719 Ok(PickleValue::None)
720}
721
722fn handle_hook_rpc_request(
723 op: &str,
724 request: &PickleValue,
725 event_tx: &EventSender,
726) -> io::Result<PickleValue> {
727 match op {
728 "load" => {
729 let name = required_string(request, "name")?;
730 let attach_point = required_string(request, "attach_point")?;
731 let priority = request
732 .get("priority")
733 .and_then(|v| v.as_int())
734 .unwrap_or(0) as i32;
735 let wasm = request
736 .get("wasm")
737 .and_then(|v| v.as_bytes())
738 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
739 .to_vec();
740 let (response_tx, response_rx) = mpsc::channel();
741 event_tx
742 .send(Event::LoadHook {
743 name,
744 wasm_bytes: wasm,
745 attach_point,
746 priority,
747 response_tx,
748 })
749 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
750 let response = response_rx
751 .recv_timeout(std::time::Duration::from_secs(5))
752 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
753 Ok(hook_result_to_pickle(response))
754 }
755 "unload" => {
756 let name = required_string(request, "name")?;
757 let attach_point = required_string(request, "attach_point")?;
758 let (response_tx, response_rx) = mpsc::channel();
759 event_tx
760 .send(Event::UnloadHook {
761 name,
762 attach_point,
763 response_tx,
764 })
765 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
766 let response = response_rx
767 .recv_timeout(std::time::Duration::from_secs(5))
768 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
769 Ok(hook_result_to_pickle(response))
770 }
771 "enable" | "disable" => {
772 let name = required_string(request, "name")?;
773 let attach_point = required_string(request, "attach_point")?;
774 let enabled = op == "enable";
775 let (response_tx, response_rx) = mpsc::channel();
776 event_tx
777 .send(Event::SetHookEnabled {
778 name,
779 attach_point,
780 enabled,
781 response_tx,
782 })
783 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
784 let response = response_rx
785 .recv_timeout(std::time::Duration::from_secs(5))
786 .map_err(|_| {
787 io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
788 })?;
789 Ok(hook_result_to_pickle(response))
790 }
791 "set_priority" => {
792 let name = required_string(request, "name")?;
793 let attach_point = required_string(request, "attach_point")?;
794 let priority = request
795 .get("priority")
796 .and_then(|v| v.as_int())
797 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
798 as i32;
799 let (response_tx, response_rx) = mpsc::channel();
800 event_tx
801 .send(Event::SetHookPriority {
802 name,
803 attach_point,
804 priority,
805 response_tx,
806 })
807 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
808 let response = response_rx
809 .recv_timeout(std::time::Duration::from_secs(5))
810 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
811 Ok(hook_result_to_pickle(response))
812 }
813 _ => Ok(PickleValue::None),
814 }
815}
816
817fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
819 let (resp_tx, resp_rx) = mpsc::channel();
820 event_tx
821 .send(Event::Query(request, resp_tx))
822 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
823 resp_rx
824 .recv_timeout(std::time::Duration::from_secs(5))
825 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
826}
827
828fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
830 let bytes = request
831 .get(key)
832 .and_then(|v| v.as_bytes())
833 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
834 if bytes.len() < 16 {
835 return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
836 }
837 let mut hash = [0u8; 16];
838 hash.copy_from_slice(&bytes[..16]);
839 Ok(hash)
840}
841
842fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
843 request
844 .get(key)
845 .and_then(|v| v.as_str())
846 .map(|s| s.to_string())
847 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
848}
849
850fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
851 match result {
852 Ok(()) => PickleValue::Dict(vec![(
853 PickleValue::String("ok".into()),
854 PickleValue::Bool(true),
855 )]),
856 Err(error) => PickleValue::Dict(vec![
857 (PickleValue::String("ok".into()), PickleValue::Bool(false)),
858 (
859 PickleValue::String("error".into()),
860 PickleValue::String(error),
861 ),
862 ]),
863 }
864}
865
866fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
869 let mut ifaces = Vec::new();
870 for iface in &stats.interfaces {
871 ifaces.push(single_iface_to_pickle(iface));
872 }
873
874 let mut dict = vec![
875 (
876 PickleValue::String("interfaces".into()),
877 PickleValue::List(ifaces),
878 ),
879 (
880 PickleValue::String("transport_enabled".into()),
881 PickleValue::Bool(stats.transport_enabled),
882 ),
883 (
884 PickleValue::String("transport_uptime".into()),
885 PickleValue::Float(stats.transport_uptime),
886 ),
887 (
888 PickleValue::String("rxb".into()),
889 PickleValue::Int(stats.total_rxb as i64),
890 ),
891 (
892 PickleValue::String("txb".into()),
893 PickleValue::Int(stats.total_txb as i64),
894 ),
895 ];
896
897 if let Some(tid) = stats.transport_id {
898 dict.push((
899 PickleValue::String("transport_id".into()),
900 PickleValue::Bytes(tid.to_vec()),
901 ));
902 } else {
903 dict.push((
904 PickleValue::String("transport_id".into()),
905 PickleValue::None,
906 ));
907 }
908
909 if let Some(pr) = stats.probe_responder {
910 dict.push((
911 PickleValue::String("probe_responder".into()),
912 PickleValue::Bytes(pr.to_vec()),
913 ));
914 } else {
915 dict.push((
916 PickleValue::String("probe_responder".into()),
917 PickleValue::None,
918 ));
919 }
920
921 PickleValue::Dict(dict)
922}
923
924fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
925 let mut dict = vec![
926 (
927 PickleValue::String("id".into()),
928 PickleValue::Int(s.id as i64),
929 ),
930 (
931 PickleValue::String("name".into()),
932 PickleValue::String(s.name.clone()),
933 ),
934 (
935 PickleValue::String("status".into()),
936 PickleValue::Bool(s.status),
937 ),
938 (
939 PickleValue::String("mode".into()),
940 PickleValue::Int(s.mode as i64),
941 ),
942 (
943 PickleValue::String("rxb".into()),
944 PickleValue::Int(s.rxb as i64),
945 ),
946 (
947 PickleValue::String("txb".into()),
948 PickleValue::Int(s.txb as i64),
949 ),
950 (
951 PickleValue::String("rx_packets".into()),
952 PickleValue::Int(s.rx_packets as i64),
953 ),
954 (
955 PickleValue::String("tx_packets".into()),
956 PickleValue::Int(s.tx_packets as i64),
957 ),
958 (
959 PickleValue::String("started".into()),
960 PickleValue::Float(s.started),
961 ),
962 (
963 PickleValue::String("ia_freq".into()),
964 PickleValue::Float(s.ia_freq),
965 ),
966 (
967 PickleValue::String("oa_freq".into()),
968 PickleValue::Float(s.oa_freq),
969 ),
970 ];
971
972 match s.bitrate {
973 Some(br) => dict.push((
974 PickleValue::String("bitrate".into()),
975 PickleValue::Int(br as i64),
976 )),
977 None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
978 }
979
980 match s.ifac_size {
981 Some(sz) => dict.push((
982 PickleValue::String("ifac_size".into()),
983 PickleValue::Int(sz as i64),
984 )),
985 None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
986 }
987
988 PickleValue::Dict(dict)
989}
990
991fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
992 let list: Vec<PickleValue> = entries
993 .iter()
994 .map(|e| {
995 PickleValue::Dict(vec![
996 (
997 PickleValue::String("hash".into()),
998 PickleValue::Bytes(e.hash.to_vec()),
999 ),
1000 (
1001 PickleValue::String("timestamp".into()),
1002 PickleValue::Float(e.timestamp),
1003 ),
1004 (
1005 PickleValue::String("via".into()),
1006 PickleValue::Bytes(e.via.to_vec()),
1007 ),
1008 (
1009 PickleValue::String("hops".into()),
1010 PickleValue::Int(e.hops as i64),
1011 ),
1012 (
1013 PickleValue::String("expires".into()),
1014 PickleValue::Float(e.expires),
1015 ),
1016 (
1017 PickleValue::String("interface".into()),
1018 PickleValue::String(e.interface_name.clone()),
1019 ),
1020 ])
1021 })
1022 .collect();
1023 PickleValue::List(list)
1024}
1025
1026fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1027 let list: Vec<PickleValue> = entries
1028 .iter()
1029 .map(|e| {
1030 PickleValue::Dict(vec![
1031 (
1032 PickleValue::String("hash".into()),
1033 PickleValue::Bytes(e.hash.to_vec()),
1034 ),
1035 (
1036 PickleValue::String("last".into()),
1037 PickleValue::Float(e.last),
1038 ),
1039 (
1040 PickleValue::String("rate_violations".into()),
1041 PickleValue::Int(e.rate_violations as i64),
1042 ),
1043 (
1044 PickleValue::String("blocked_until".into()),
1045 PickleValue::Float(e.blocked_until),
1046 ),
1047 (
1048 PickleValue::String("timestamps".into()),
1049 PickleValue::List(
1050 e.timestamps
1051 .iter()
1052 .map(|&t| PickleValue::Float(t))
1053 .collect(),
1054 ),
1055 ),
1056 ])
1057 })
1058 .collect();
1059 PickleValue::List(list)
1060}
1061
1062fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1063 let list: Vec<PickleValue> = entries
1064 .iter()
1065 .map(|e| {
1066 let mut dict = vec![
1067 (
1068 PickleValue::String("identity_hash".into()),
1069 PickleValue::Bytes(e.identity_hash.to_vec()),
1070 ),
1071 (
1072 PickleValue::String("created".into()),
1073 PickleValue::Float(e.created),
1074 ),
1075 (
1076 PickleValue::String("expires".into()),
1077 PickleValue::Float(e.expires),
1078 ),
1079 ];
1080 if let Some(ref reason) = e.reason {
1081 dict.push((
1082 PickleValue::String("reason".into()),
1083 PickleValue::String(reason.clone()),
1084 ));
1085 } else {
1086 dict.push((PickleValue::String("reason".into()), PickleValue::None));
1087 }
1088 PickleValue::Dict(dict)
1089 })
1090 .collect();
1091 PickleValue::List(list)
1092}
1093
1094fn discovered_interfaces_to_pickle(
1095 interfaces: &[crate::discovery::DiscoveredInterface],
1096) -> PickleValue {
1097 let list: Vec<PickleValue> = interfaces
1098 .iter()
1099 .map(|iface| {
1100 let mut dict = vec![
1101 (
1102 PickleValue::String("type".into()),
1103 PickleValue::String(iface.interface_type.clone()),
1104 ),
1105 (
1106 PickleValue::String("transport".into()),
1107 PickleValue::Bool(iface.transport),
1108 ),
1109 (
1110 PickleValue::String("name".into()),
1111 PickleValue::String(iface.name.clone()),
1112 ),
1113 (
1114 PickleValue::String("discovered".into()),
1115 PickleValue::Float(iface.discovered),
1116 ),
1117 (
1118 PickleValue::String("last_heard".into()),
1119 PickleValue::Float(iface.last_heard),
1120 ),
1121 (
1122 PickleValue::String("heard_count".into()),
1123 PickleValue::Int(iface.heard_count as i64),
1124 ),
1125 (
1126 PickleValue::String("status".into()),
1127 PickleValue::String(iface.status.as_str().into()),
1128 ),
1129 (
1130 PickleValue::String("stamp".into()),
1131 PickleValue::Bytes(iface.stamp.clone()),
1132 ),
1133 (
1134 PickleValue::String("value".into()),
1135 PickleValue::Int(iface.stamp_value as i64),
1136 ),
1137 (
1138 PickleValue::String("transport_id".into()),
1139 PickleValue::Bytes(iface.transport_id.to_vec()),
1140 ),
1141 (
1142 PickleValue::String("network_id".into()),
1143 PickleValue::Bytes(iface.network_id.to_vec()),
1144 ),
1145 (
1146 PickleValue::String("hops".into()),
1147 PickleValue::Int(iface.hops as i64),
1148 ),
1149 ];
1150
1151 if let Some(v) = iface.latitude {
1153 dict.push((
1154 PickleValue::String("latitude".into()),
1155 PickleValue::Float(v),
1156 ));
1157 } else {
1158 dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1159 }
1160 if let Some(v) = iface.longitude {
1161 dict.push((
1162 PickleValue::String("longitude".into()),
1163 PickleValue::Float(v),
1164 ));
1165 } else {
1166 dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1167 }
1168 if let Some(v) = iface.height {
1169 dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1170 } else {
1171 dict.push((PickleValue::String("height".into()), PickleValue::None));
1172 }
1173
1174 if let Some(ref v) = iface.reachable_on {
1176 dict.push((
1177 PickleValue::String("reachable_on".into()),
1178 PickleValue::String(v.clone()),
1179 ));
1180 } else {
1181 dict.push((
1182 PickleValue::String("reachable_on".into()),
1183 PickleValue::None,
1184 ));
1185 }
1186 if let Some(v) = iface.port {
1187 dict.push((
1188 PickleValue::String("port".into()),
1189 PickleValue::Int(v as i64),
1190 ));
1191 } else {
1192 dict.push((PickleValue::String("port".into()), PickleValue::None));
1193 }
1194
1195 if let Some(v) = iface.frequency {
1197 dict.push((
1198 PickleValue::String("frequency".into()),
1199 PickleValue::Int(v as i64),
1200 ));
1201 } else {
1202 dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1203 }
1204 if let Some(v) = iface.bandwidth {
1205 dict.push((
1206 PickleValue::String("bandwidth".into()),
1207 PickleValue::Int(v as i64),
1208 ));
1209 } else {
1210 dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1211 }
1212 if let Some(v) = iface.spreading_factor {
1213 dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1214 } else {
1215 dict.push((PickleValue::String("sf".into()), PickleValue::None));
1216 }
1217 if let Some(v) = iface.coding_rate {
1218 dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1219 } else {
1220 dict.push((PickleValue::String("cr".into()), PickleValue::None));
1221 }
1222 if let Some(ref v) = iface.modulation {
1223 dict.push((
1224 PickleValue::String("modulation".into()),
1225 PickleValue::String(v.clone()),
1226 ));
1227 } else {
1228 dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1229 }
1230 if let Some(v) = iface.channel {
1231 dict.push((
1232 PickleValue::String("channel".into()),
1233 PickleValue::Int(v as i64),
1234 ));
1235 } else {
1236 dict.push((PickleValue::String("channel".into()), PickleValue::None));
1237 }
1238
1239 if let Some(ref v) = iface.ifac_netname {
1241 dict.push((
1242 PickleValue::String("ifac_netname".into()),
1243 PickleValue::String(v.clone()),
1244 ));
1245 } else {
1246 dict.push((
1247 PickleValue::String("ifac_netname".into()),
1248 PickleValue::None,
1249 ));
1250 }
1251 if let Some(ref v) = iface.ifac_netkey {
1252 dict.push((
1253 PickleValue::String("ifac_netkey".into()),
1254 PickleValue::String(v.clone()),
1255 ));
1256 } else {
1257 dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1258 }
1259
1260 if let Some(ref v) = iface.config_entry {
1262 dict.push((
1263 PickleValue::String("config_entry".into()),
1264 PickleValue::String(v.clone()),
1265 ));
1266 } else {
1267 dict.push((
1268 PickleValue::String("config_entry".into()),
1269 PickleValue::None,
1270 ));
1271 }
1272
1273 dict.push((
1274 PickleValue::String("discovery_hash".into()),
1275 PickleValue::Bytes(iface.discovery_hash.to_vec()),
1276 ));
1277
1278 PickleValue::Dict(dict)
1279 })
1280 .collect();
1281 PickleValue::List(list)
1282}
1283
1284fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1285 PickleValue::List(
1286 hooks
1287 .iter()
1288 .map(|hook| {
1289 PickleValue::Dict(vec![
1290 (
1291 PickleValue::String("name".into()),
1292 PickleValue::String(hook.name.clone()),
1293 ),
1294 (
1295 PickleValue::String("attach_point".into()),
1296 PickleValue::String(hook.attach_point.clone()),
1297 ),
1298 (
1299 PickleValue::String("priority".into()),
1300 PickleValue::Int(hook.priority as i64),
1301 ),
1302 (
1303 PickleValue::String("enabled".into()),
1304 PickleValue::Bool(hook.enabled),
1305 ),
1306 (
1307 PickleValue::String("consecutive_traps".into()),
1308 PickleValue::Int(hook.consecutive_traps as i64),
1309 ),
1310 ])
1311 })
1312 .collect(),
1313 )
1314}
1315
1316fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1317 PickleValue::List(
1318 entries
1319 .iter()
1320 .map(|entry| {
1321 PickleValue::Dict(vec![
1322 (
1323 PickleValue::String("interface".into()),
1324 PickleValue::String(entry.interface_name.clone()),
1325 ),
1326 (
1327 PickleValue::String("ip".into()),
1328 PickleValue::String(entry.peer_ip.to_string()),
1329 ),
1330 (
1331 PickleValue::String("connected_count".into()),
1332 PickleValue::Int(entry.connected_count as i64),
1333 ),
1334 (
1335 PickleValue::String("blacklisted_remaining_secs".into()),
1336 entry
1337 .blacklisted_remaining_secs
1338 .map(PickleValue::Float)
1339 .unwrap_or(PickleValue::None),
1340 ),
1341 (
1342 PickleValue::String("blacklist_reason".into()),
1343 entry
1344 .blacklist_reason
1345 .as_ref()
1346 .map(|v: &String| PickleValue::String(v.clone()))
1347 .unwrap_or(PickleValue::None),
1348 ),
1349 (
1350 PickleValue::String("reject_count".into()),
1351 PickleValue::Int(entry.reject_count as i64),
1352 ),
1353 ])
1354 })
1355 .collect(),
1356 )
1357}
1358
1359fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1360 PickleValue::List(
1361 entries
1362 .iter()
1363 .map(|entry| {
1364 PickleValue::Dict(vec![
1365 (
1366 PickleValue::String("id".into()),
1367 PickleValue::Int(entry.interface_id.0 as i64),
1368 ),
1369 (
1370 PickleValue::String("name".into()),
1371 PickleValue::String(entry.interface_name.clone()),
1372 ),
1373 ])
1374 })
1375 .collect(),
1376 )
1377}
1378
1379fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1380 PickleValue::Dict(vec![
1381 (
1382 PickleValue::String("connected".into()),
1383 PickleValue::Bool(stats.connected),
1384 ),
1385 (
1386 PickleValue::String("consumer_count".into()),
1387 PickleValue::Int(stats.consumer_count as i64),
1388 ),
1389 (
1390 PickleValue::String("queue_max_events".into()),
1391 PickleValue::Int(stats.queue_max_events as i64),
1392 ),
1393 (
1394 PickleValue::String("queue_max_bytes".into()),
1395 PickleValue::Int(stats.queue_max_bytes as i64),
1396 ),
1397 (
1398 PickleValue::String("backlog_len".into()),
1399 PickleValue::Int(stats.backlog_len as i64),
1400 ),
1401 (
1402 PickleValue::String("backlog_bytes".into()),
1403 PickleValue::Int(stats.backlog_bytes as i64),
1404 ),
1405 (
1406 PickleValue::String("backlog_dropped_pending".into()),
1407 PickleValue::Int(stats.backlog_dropped_pending as i64),
1408 ),
1409 (
1410 PickleValue::String("backlog_dropped_total".into()),
1411 PickleValue::Int(stats.backlog_dropped_total as i64),
1412 ),
1413 (
1414 PickleValue::String("total_disconnect_count".into()),
1415 PickleValue::Int(stats.total_disconnect_count as i64),
1416 ),
1417 (
1418 PickleValue::String("consumers".into()),
1419 PickleValue::List(
1420 stats
1421 .consumers
1422 .iter()
1423 .map(|consumer| {
1424 PickleValue::Dict(vec![
1425 (
1426 PickleValue::String("id".into()),
1427 PickleValue::Int(consumer.id as i64),
1428 ),
1429 (
1430 PickleValue::String("connected".into()),
1431 PickleValue::Bool(consumer.connected),
1432 ),
1433 (
1434 PickleValue::String("queue_len".into()),
1435 PickleValue::Int(consumer.queue_len as i64),
1436 ),
1437 (
1438 PickleValue::String("queued_bytes".into()),
1439 PickleValue::Int(consumer.queued_bytes as i64),
1440 ),
1441 (
1442 PickleValue::String("dropped_pending".into()),
1443 PickleValue::Int(consumer.dropped_pending as i64),
1444 ),
1445 (
1446 PickleValue::String("dropped_total".into()),
1447 PickleValue::Int(consumer.dropped_total as i64),
1448 ),
1449 (
1450 PickleValue::String("queue_max_events".into()),
1451 PickleValue::Int(consumer.queue_max_events as i64),
1452 ),
1453 (
1454 PickleValue::String("queue_max_bytes".into()),
1455 PickleValue::Int(consumer.queue_max_bytes as i64),
1456 ),
1457 ])
1458 })
1459 .collect(),
1460 ),
1461 ),
1462 ])
1463}
1464
1465fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1466 match state {
1467 LifecycleState::Active => "active",
1468 LifecycleState::Draining => "draining",
1469 LifecycleState::Stopping => "stopping",
1470 LifecycleState::Stopped => "stopped",
1471 }
1472}
1473
1474fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1475 PickleValue::Dict(vec![
1476 (
1477 PickleValue::String("state".into()),
1478 PickleValue::String(lifecycle_state_name(status.state).into()),
1479 ),
1480 (
1481 PickleValue::String("drain_age_seconds".into()),
1482 status
1483 .drain_age_seconds
1484 .map(PickleValue::Float)
1485 .unwrap_or(PickleValue::None),
1486 ),
1487 (
1488 PickleValue::String("deadline_remaining_seconds".into()),
1489 status
1490 .deadline_remaining_seconds
1491 .map(PickleValue::Float)
1492 .unwrap_or(PickleValue::None),
1493 ),
1494 (
1495 PickleValue::String("drain_complete".into()),
1496 PickleValue::Bool(status.drain_complete),
1497 ),
1498 (
1499 PickleValue::String("interface_writer_queued_frames".into()),
1500 PickleValue::Int(status.interface_writer_queued_frames as i64),
1501 ),
1502 (
1503 PickleValue::String("provider_backlog_events".into()),
1504 PickleValue::Int(status.provider_backlog_events as i64),
1505 ),
1506 (
1507 PickleValue::String("provider_consumer_queued_events".into()),
1508 PickleValue::Int(status.provider_consumer_queued_events as i64),
1509 ),
1510 (
1511 PickleValue::String("detail".into()),
1512 status
1513 .detail
1514 .as_ref()
1515 .map(|detail| PickleValue::String(detail.clone()))
1516 .unwrap_or(PickleValue::None),
1517 ),
1518 ])
1519}
1520
1521fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1522 match value {
1523 RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1524 RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1525 RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1526 RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1527 RuntimeConfigValue::Null => PickleValue::None,
1528 }
1529}
1530
1531fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1532 match value {
1533 PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1534 PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1535 PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1536 PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1537 PickleValue::None => Some(RuntimeConfigValue::Null),
1538 _ => None,
1539 }
1540}
1541
1542fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1543 PickleValue::Dict(vec![
1544 (
1545 PickleValue::String("key".into()),
1546 PickleValue::String(entry.key.clone()),
1547 ),
1548 (
1549 PickleValue::String("value".into()),
1550 runtime_config_value_to_pickle(&entry.value),
1551 ),
1552 (
1553 PickleValue::String("default".into()),
1554 runtime_config_value_to_pickle(&entry.default),
1555 ),
1556 (
1557 PickleValue::String("source".into()),
1558 PickleValue::String(match entry.source {
1559 RuntimeConfigSource::Startup => "startup".into(),
1560 RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1561 }),
1562 ),
1563 (
1564 PickleValue::String("apply_mode".into()),
1565 PickleValue::String(match entry.apply_mode {
1566 RuntimeConfigApplyMode::Immediate => "immediate".into(),
1567 RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1568 RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1569 RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1570 }),
1571 ),
1572 (
1573 PickleValue::String("description".into()),
1574 entry
1575 .description
1576 .as_ref()
1577 .map(|v| PickleValue::String(v.clone()))
1578 .unwrap_or(PickleValue::None),
1579 ),
1580 ])
1581}
1582
1583fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1584 PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1585}
1586
1587fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1588 PickleValue::Dict(vec![
1589 (
1590 PickleValue::String("error".into()),
1591 PickleValue::String(match error.code {
1592 RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1593 RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1594 RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1595 RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1596 RuntimeConfigErrorCode::NotFound => "not_found".into(),
1597 RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1598 }),
1599 ),
1600 (
1601 PickleValue::String("message".into()),
1602 PickleValue::String(error.message.clone()),
1603 ),
1604 ])
1605}
1606
1607fn runtime_config_result_to_pickle(
1608 result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1609) -> PickleValue {
1610 match result {
1611 Ok(entry) => runtime_config_entry_to_pickle(&entry),
1612 Err(error) => runtime_config_error_to_pickle(&error),
1613 }
1614}
1615
1616pub struct RpcClient {
1620 stream: TcpStream,
1621}
1622
1623impl RpcClient {
1624 pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
1626 let mut stream = match addr {
1627 RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
1628 };
1629
1630 stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
1631 stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
1632
1633 client_auth(&mut stream, auth_key)?;
1635
1636 Ok(RpcClient { stream })
1637 }
1638
1639 pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
1641 let request_bytes = pickle::encode(request);
1642 send_bytes(&mut self.stream, &request_bytes)?;
1643
1644 let response_bytes = recv_bytes(&mut self.stream)?;
1645 pickle::decode(&response_bytes)
1646 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
1647 }
1648
1649 pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
1650 let response = self.call(&PickleValue::Dict(vec![(
1651 PickleValue::String("get".into()),
1652 PickleValue::String("hooks".into()),
1653 )]))?;
1654 parse_hook_list(&response)
1655 }
1656
1657 pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
1658 let response = self.call(&PickleValue::Dict(vec![(
1659 PickleValue::String("begin_drain".into()),
1660 PickleValue::Float(timeout.as_secs_f64()),
1661 )]))?;
1662 Ok(response.as_bool().unwrap_or(false))
1663 }
1664
1665 pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
1666 let response = self.call(&PickleValue::Dict(vec![(
1667 PickleValue::String("get".into()),
1668 PickleValue::String("drain_status".into()),
1669 )]))?;
1670 parse_drain_status(&response)
1671 }
1672
1673 pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
1674 self.call(&PickleValue::Dict(vec![(
1675 PickleValue::String("get".into()),
1676 PickleValue::String("provider_bridge_stats".into()),
1677 )]))
1678 }
1679
1680 pub fn load_hook(
1681 &mut self,
1682 name: &str,
1683 attach_point: &str,
1684 priority: i32,
1685 wasm: &[u8],
1686 ) -> io::Result<Result<(), String>> {
1687 let response = self.call(&PickleValue::Dict(vec![
1688 (
1689 PickleValue::String("hook".into()),
1690 PickleValue::String("load".into()),
1691 ),
1692 (
1693 PickleValue::String("name".into()),
1694 PickleValue::String(name.to_string()),
1695 ),
1696 (
1697 PickleValue::String("attach_point".into()),
1698 PickleValue::String(attach_point.to_string()),
1699 ),
1700 (
1701 PickleValue::String("priority".into()),
1702 PickleValue::Int(priority as i64),
1703 ),
1704 (
1705 PickleValue::String("wasm".into()),
1706 PickleValue::Bytes(wasm.to_vec()),
1707 ),
1708 ]))?;
1709 parse_hook_result(&response)
1710 }
1711
1712 pub fn unload_hook(
1713 &mut self,
1714 name: &str,
1715 attach_point: &str,
1716 ) -> io::Result<Result<(), String>> {
1717 let response = self.call(&PickleValue::Dict(vec![
1718 (
1719 PickleValue::String("hook".into()),
1720 PickleValue::String("unload".into()),
1721 ),
1722 (
1723 PickleValue::String("name".into()),
1724 PickleValue::String(name.to_string()),
1725 ),
1726 (
1727 PickleValue::String("attach_point".into()),
1728 PickleValue::String(attach_point.to_string()),
1729 ),
1730 ]))?;
1731 parse_hook_result(&response)
1732 }
1733
1734 pub fn set_hook_enabled(
1735 &mut self,
1736 name: &str,
1737 attach_point: &str,
1738 enabled: bool,
1739 ) -> io::Result<Result<(), String>> {
1740 let op = if enabled { "enable" } else { "disable" };
1741 let response = self.call(&PickleValue::Dict(vec![
1742 (
1743 PickleValue::String("hook".into()),
1744 PickleValue::String(op.into()),
1745 ),
1746 (
1747 PickleValue::String("name".into()),
1748 PickleValue::String(name.to_string()),
1749 ),
1750 (
1751 PickleValue::String("attach_point".into()),
1752 PickleValue::String(attach_point.to_string()),
1753 ),
1754 ]))?;
1755 parse_hook_result(&response)
1756 }
1757
1758 pub fn set_hook_priority(
1759 &mut self,
1760 name: &str,
1761 attach_point: &str,
1762 priority: i32,
1763 ) -> io::Result<Result<(), String>> {
1764 let response = self.call(&PickleValue::Dict(vec![
1765 (
1766 PickleValue::String("hook".into()),
1767 PickleValue::String("set_priority".into()),
1768 ),
1769 (
1770 PickleValue::String("name".into()),
1771 PickleValue::String(name.to_string()),
1772 ),
1773 (
1774 PickleValue::String("attach_point".into()),
1775 PickleValue::String(attach_point.to_string()),
1776 ),
1777 (
1778 PickleValue::String("priority".into()),
1779 PickleValue::Int(priority as i64),
1780 ),
1781 ]))?;
1782 parse_hook_result(&response)
1783 }
1784
1785 pub fn blacklist_backbone_peer(
1786 &mut self,
1787 interface: &str,
1788 ip: &str,
1789 duration_secs: u64,
1790 reason: Option<&str>,
1791 penalty_level: Option<u8>,
1792 ) -> io::Result<bool> {
1793 let mut request = vec![
1794 (
1795 PickleValue::String("set".into()),
1796 PickleValue::String("backbone_peer_blacklist".into()),
1797 ),
1798 (
1799 PickleValue::String("interface".into()),
1800 PickleValue::String(interface.to_string()),
1801 ),
1802 (
1803 PickleValue::String("ip".into()),
1804 PickleValue::String(ip.to_string()),
1805 ),
1806 (
1807 PickleValue::String("duration_secs".into()),
1808 PickleValue::Int(duration_secs as i64),
1809 ),
1810 ];
1811 if let Some(reason) = reason {
1812 request.push((
1813 PickleValue::String("reason".into()),
1814 PickleValue::String(reason.to_string()),
1815 ));
1816 }
1817 if let Some(level) = penalty_level {
1818 request.push((
1819 PickleValue::String("penalty_level".into()),
1820 PickleValue::Int(level as i64),
1821 ));
1822 }
1823 let response = self.call(&PickleValue::Dict(request))?;
1824 Ok(response.as_bool().unwrap_or(false))
1825 }
1826}
1827
1828fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
1829 match value {
1830 "active" => Some(LifecycleState::Active),
1831 "draining" => Some(LifecycleState::Draining),
1832 "stopping" => Some(LifecycleState::Stopping),
1833 "stopped" => Some(LifecycleState::Stopped),
1834 _ => None,
1835 }
1836}
1837
1838fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
1839 if !matches!(value, PickleValue::Dict(_)) {
1840 return Ok(None);
1841 }
1842 let state = value
1843 .get("state")
1844 .and_then(|entry| entry.as_str())
1845 .and_then(parse_lifecycle_state)
1846 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
1847 let drain_age_seconds = value
1848 .get("drain_age_seconds")
1849 .and_then(|entry| entry.as_float().or_else(|| entry.as_int().map(|v| v as f64)));
1850 let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
1851 entry
1852 .as_float()
1853 .or_else(|| entry.as_int().map(|v| v as f64))
1854 });
1855 let drain_complete = value
1856 .get("drain_complete")
1857 .and_then(|entry| entry.as_bool())
1858 .unwrap_or(false);
1859 let interface_writer_queued_frames = value
1860 .get("interface_writer_queued_frames")
1861 .and_then(|entry| entry.as_int())
1862 .unwrap_or(0)
1863 .max(0) as usize;
1864 let provider_backlog_events = value
1865 .get("provider_backlog_events")
1866 .and_then(|entry| entry.as_int())
1867 .unwrap_or(0)
1868 .max(0) as usize;
1869 let provider_consumer_queued_events = value
1870 .get("provider_consumer_queued_events")
1871 .and_then(|entry| entry.as_int())
1872 .unwrap_or(0)
1873 .max(0) as usize;
1874 let detail = value
1875 .get("detail")
1876 .and_then(|entry| entry.as_str().map(|v| v.to_string()));
1877 Ok(Some(DrainStatus {
1878 state,
1879 drain_age_seconds,
1880 deadline_remaining_seconds,
1881 drain_complete,
1882 interface_writer_queued_frames,
1883 provider_backlog_events,
1884 provider_consumer_queued_events,
1885 detail,
1886 }))
1887}
1888
1889fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
1890 let ok = response
1891 .get("ok")
1892 .and_then(|v| v.as_bool())
1893 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
1894 if ok {
1895 Ok(Ok(()))
1896 } else {
1897 Ok(Err(response
1898 .get("error")
1899 .and_then(|v| v.as_str())
1900 .unwrap_or("unknown hook error")
1901 .to_string()))
1902 }
1903}
1904
1905fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
1906 let list = response
1907 .as_list()
1908 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
1909 let mut hooks = Vec::with_capacity(list.len());
1910 for item in list {
1911 hooks.push(HookInfo {
1912 name: item
1913 .get("name")
1914 .and_then(|v| v.as_str())
1915 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
1916 .to_string(),
1917 attach_point: item
1918 .get("attach_point")
1919 .and_then(|v| v.as_str())
1920 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
1921 .to_string(),
1922 priority: item
1923 .get("priority")
1924 .and_then(|v| v.as_int())
1925 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
1926 as i32,
1927 enabled: item
1928 .get("enabled")
1929 .and_then(|v| v.as_bool())
1930 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
1931 consecutive_traps: item
1932 .get("consecutive_traps")
1933 .and_then(|v| v.as_int())
1934 .ok_or_else(|| {
1935 io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
1936 })? as u32,
1937 });
1938 }
1939 Ok(hooks)
1940}
1941
1942fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
1944 let challenge = recv_bytes(stream)?;
1946
1947 if !challenge.starts_with(CHALLENGE_PREFIX) {
1948 return Err(io::Error::new(
1949 io::ErrorKind::InvalidData,
1950 "expected challenge",
1951 ));
1952 }
1953
1954 let message = &challenge[CHALLENGE_PREFIX.len()..];
1955
1956 let response = create_response(auth_key, message);
1958 send_bytes(stream, &response)?;
1959
1960 let result = recv_bytes(stream)?;
1962 if result == WELCOME {
1963 Ok(())
1964 } else {
1965 Err(io::Error::new(
1966 io::ErrorKind::PermissionDenied,
1967 "authentication failed",
1968 ))
1969 }
1970}
1971
1972fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
1974 if message.starts_with(b"{sha256}") || message.len() > 20 {
1976 let digest = hmac_sha256(auth_key, message);
1978 let mut response = Vec::with_capacity(8 + 32);
1979 response.extend_from_slice(b"{sha256}");
1980 response.extend_from_slice(&digest);
1981 response
1982 } else {
1983 let digest = hmac_md5(auth_key, message);
1985 digest.to_vec()
1986 }
1987}
1988
1989pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
1991 sha256(private_key)
1992}
1993
1994#[cfg(test)]
1995mod tests {
1996 use super::*;
1997
1998 #[test]
1999 fn send_recv_bytes_roundtrip() {
2000 let (mut c1, mut c2) = tcp_pair();
2001 let data = b"hello world";
2002 send_bytes(&mut c1, data).unwrap();
2003 let received = recv_bytes(&mut c2).unwrap();
2004 assert_eq!(&received, data);
2005 }
2006
2007 #[test]
2008 fn send_recv_empty() {
2009 let (mut c1, mut c2) = tcp_pair();
2010 send_bytes(&mut c1, b"").unwrap();
2011 let received = recv_bytes(&mut c2).unwrap();
2012 assert!(received.is_empty());
2013 }
2014
2015 #[test]
2016 fn auth_success() {
2017 let key = derive_auth_key(b"test-private-key");
2018 let (mut server, mut client) = tcp_pair();
2019
2020 let key2 = key;
2021 let t = thread::spawn(move || {
2022 client_auth(&mut client, &key2).unwrap();
2023 });
2024
2025 server_auth(&mut server, &key).unwrap();
2026 t.join().unwrap();
2027 }
2028
2029 #[test]
2030 fn auth_failure_wrong_key() {
2031 let server_key = derive_auth_key(b"server-key");
2032 let client_key = derive_auth_key(b"wrong-key");
2033 let (mut server, mut client) = tcp_pair();
2034
2035 let t = thread::spawn(move || {
2036 let result = client_auth(&mut client, &client_key);
2037 assert!(result.is_err());
2038 });
2039
2040 let result = server_auth(&mut server, &server_key);
2041 assert!(result.is_err());
2042 t.join().unwrap();
2043 }
2044
2045 #[test]
2046 fn verify_sha256_response() {
2047 let key = derive_auth_key(b"mykey");
2048 let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2049 let response = create_response(&key, message);
2050 assert!(response.starts_with(b"{sha256}"));
2051 assert!(verify_response(&key, message, &response));
2052 }
2053
2054 #[test]
2055 fn verify_legacy_md5_response() {
2056 let key = derive_auth_key(b"mykey");
2057 let message = b"01234567890123456789";
2059 let digest = hmac_md5(&key, message);
2061 assert!(verify_response(&key, message, &digest));
2062 }
2063
2064 #[test]
2065 fn constant_time_eq_works() {
2066 assert!(constant_time_eq(b"hello", b"hello"));
2067 assert!(!constant_time_eq(b"hello", b"world"));
2068 assert!(!constant_time_eq(b"hello", b"hell"));
2069 }
2070
2071 #[test]
2072 fn rpc_roundtrip() {
2073 let key = derive_auth_key(b"test-key");
2074 let (event_tx, event_rx) = crate::event::channel();
2075
2076 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2079 let port = listener.local_addr().unwrap().port();
2080 listener.set_nonblocking(true).unwrap();
2081
2082 let shutdown = Arc::new(AtomicBool::new(false));
2083 let shutdown2 = shutdown.clone();
2084
2085 let driver_thread = thread::spawn(move || loop {
2087 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2088 Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2089 let _ = resp_tx.send(QueryResponse::LinkCount(42));
2090 }
2091 Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2092 let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2093 interfaces: vec![SingleInterfaceStat {
2094 id: 7,
2095 name: "TestInterface".into(),
2096 status: true,
2097 mode: 1,
2098 rxb: 1000,
2099 txb: 2000,
2100 rx_packets: 10,
2101 tx_packets: 20,
2102 bitrate: Some(10_000_000),
2103 ifac_size: None,
2104 started: 1000.0,
2105 ia_freq: 0.0,
2106 oa_freq: 0.0,
2107 interface_type: "TestInterface".into(),
2108 }],
2109 transport_id: None,
2110 transport_enabled: true,
2111 transport_uptime: 3600.0,
2112 total_rxb: 1000,
2113 total_txb: 2000,
2114 probe_responder: None,
2115 }));
2116 }
2117 _ => break,
2118 }
2119 });
2120
2121 let key2 = key;
2122 let shutdown3 = shutdown2.clone();
2123 let server_thread = thread::spawn(move || {
2124 rpc_server_loop(listener, key2, event_tx, shutdown3);
2125 });
2126
2127 thread::sleep(std::time::Duration::from_millis(50));
2129
2130 let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2132 let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2133 let response = client
2134 .call(&PickleValue::Dict(vec![(
2135 PickleValue::String("get".into()),
2136 PickleValue::String("link_count".into()),
2137 )]))
2138 .unwrap();
2139 assert_eq!(response.as_int().unwrap(), 42);
2140 drop(client);
2141
2142 let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2144 let response2 = client2
2145 .call(&PickleValue::Dict(vec![(
2146 PickleValue::String("get".into()),
2147 PickleValue::String("interface_stats".into()),
2148 )]))
2149 .unwrap();
2150 let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2151 assert_eq!(ifaces.len(), 1);
2152 let iface = &ifaces[0];
2153 assert_eq!(
2154 iface.get("name").unwrap().as_str().unwrap(),
2155 "TestInterface"
2156 );
2157 assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2158 drop(client2);
2159
2160 shutdown2.store(true, Ordering::Relaxed);
2162 server_thread.join().unwrap();
2163 driver_thread.join().unwrap();
2164 }
2165
2166 #[test]
2167 fn derive_auth_key_deterministic() {
2168 let key1 = derive_auth_key(b"test");
2169 let key2 = derive_auth_key(b"test");
2170 assert_eq!(key1, key2);
2171 let key3 = derive_auth_key(b"other");
2173 assert_ne!(key1, key3);
2174 }
2175
2176 #[test]
2177 fn pickle_request_handling() {
2178 let (event_tx, event_rx) = crate::event::channel();
2180
2181 let driver = thread::spawn(move || {
2182 if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2183 {
2184 assert_eq!(dest_hash, [1u8; 16]);
2185 let _ = resp_tx.send(QueryResponse::DropPath(true));
2186 }
2187 });
2188
2189 let request = PickleValue::Dict(vec![
2190 (
2191 PickleValue::String("drop".into()),
2192 PickleValue::String("path".into()),
2193 ),
2194 (
2195 PickleValue::String("destination_hash".into()),
2196 PickleValue::Bytes(vec![1u8; 16]),
2197 ),
2198 ]);
2199
2200 let response = handle_rpc_request(&request, &event_tx).unwrap();
2201 assert_eq!(response, PickleValue::Bool(true));
2202 driver.join().unwrap();
2203 }
2204
2205 #[test]
2206 fn hook_list_request_handling() {
2207 let (event_tx, event_rx) = crate::event::channel();
2208
2209 let driver = thread::spawn(move || {
2210 if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2211 let _ = response_tx.send(vec![HookInfo {
2212 name: "stats".into(),
2213 attach_point: "PreIngress".into(),
2214 priority: 7,
2215 enabled: true,
2216 consecutive_traps: 0,
2217 }]);
2218 }
2219 });
2220
2221 let request = PickleValue::Dict(vec![(
2222 PickleValue::String("get".into()),
2223 PickleValue::String("hooks".into()),
2224 )]);
2225 let response = handle_rpc_request(&request, &event_tx).unwrap();
2226 let hooks = parse_hook_list(&response).unwrap();
2227 assert_eq!(hooks.len(), 1);
2228 assert_eq!(hooks[0].name, "stats");
2229 driver.join().unwrap();
2230 }
2231
2232 #[test]
2233 fn hook_load_request_handling() {
2234 let (event_tx, event_rx) = crate::event::channel();
2235
2236 let driver = thread::spawn(move || {
2237 if let Ok(Event::LoadHook {
2238 name,
2239 wasm_bytes,
2240 attach_point,
2241 priority,
2242 response_tx,
2243 }) = event_rx.recv()
2244 {
2245 assert_eq!(name, "stats");
2246 assert_eq!(attach_point, "PreIngress");
2247 assert_eq!(priority, 11);
2248 assert_eq!(wasm_bytes, vec![1, 2, 3]);
2249 let _ = response_tx.send(Ok(()));
2250 }
2251 });
2252
2253 let request = PickleValue::Dict(vec![
2254 (
2255 PickleValue::String("hook".into()),
2256 PickleValue::String("load".into()),
2257 ),
2258 (
2259 PickleValue::String("name".into()),
2260 PickleValue::String("stats".into()),
2261 ),
2262 (
2263 PickleValue::String("attach_point".into()),
2264 PickleValue::String("PreIngress".into()),
2265 ),
2266 (PickleValue::String("priority".into()), PickleValue::Int(11)),
2267 (
2268 PickleValue::String("wasm".into()),
2269 PickleValue::Bytes(vec![1, 2, 3]),
2270 ),
2271 ]);
2272 let response = handle_rpc_request(&request, &event_tx).unwrap();
2273 assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2274 driver.join().unwrap();
2275 }
2276
2277 #[test]
2278 fn interface_stats_pickle_format() {
2279 let stats = InterfaceStatsResponse {
2280 interfaces: vec![SingleInterfaceStat {
2281 id: 1,
2282 name: "TCP".into(),
2283 status: true,
2284 mode: 1,
2285 rxb: 100,
2286 txb: 200,
2287 rx_packets: 5,
2288 tx_packets: 10,
2289 bitrate: Some(1000000),
2290 ifac_size: Some(16),
2291 started: 1000.0,
2292 ia_freq: 0.0,
2293 oa_freq: 0.0,
2294 interface_type: "TCPClientInterface".into(),
2295 }],
2296 transport_id: Some([0xAB; 16]),
2297 transport_enabled: true,
2298 transport_uptime: 3600.0,
2299 total_rxb: 100,
2300 total_txb: 200,
2301 probe_responder: None,
2302 };
2303
2304 let pickle = interface_stats_to_pickle(&stats);
2305
2306 let encoded = pickle::encode(&pickle);
2308 let decoded = pickle::decode(&encoded).unwrap();
2309 assert_eq!(
2310 decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2311 true
2312 );
2313 let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2314 assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2315 assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2316 }
2317
2318 #[test]
2319 fn send_probe_rpc_unknown_dest() {
2320 let (event_tx, event_rx) = crate::event::channel();
2321
2322 let driver = thread::spawn(move || {
2323 if let Ok(Event::Query(
2324 QueryRequest::SendProbe {
2325 dest_hash,
2326 payload_size,
2327 },
2328 resp_tx,
2329 )) = event_rx.recv()
2330 {
2331 assert_eq!(dest_hash, [0xAA; 16]);
2332 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2334 }
2335 });
2336
2337 let request = PickleValue::Dict(vec![(
2338 PickleValue::String("send_probe".into()),
2339 PickleValue::Bytes(vec![0xAA; 16]),
2340 )]);
2341
2342 let response = handle_rpc_request(&request, &event_tx).unwrap();
2343 assert_eq!(response, PickleValue::None);
2344 driver.join().unwrap();
2345 }
2346
2347 #[test]
2348 fn send_probe_rpc_with_result() {
2349 let (event_tx, event_rx) = crate::event::channel();
2350
2351 let packet_hash = [0xBB; 32];
2352 let driver = thread::spawn(move || {
2353 if let Ok(Event::Query(
2354 QueryRequest::SendProbe {
2355 dest_hash,
2356 payload_size,
2357 },
2358 resp_tx,
2359 )) = event_rx.recv()
2360 {
2361 assert_eq!(dest_hash, [0xCC; 16]);
2362 assert_eq!(payload_size, 32);
2363 let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
2364 }
2365 });
2366
2367 let request = PickleValue::Dict(vec![
2368 (
2369 PickleValue::String("send_probe".into()),
2370 PickleValue::Bytes(vec![0xCC; 16]),
2371 ),
2372 (PickleValue::String("size".into()), PickleValue::Int(32)),
2373 ]);
2374
2375 let response = handle_rpc_request(&request, &event_tx).unwrap();
2376 let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
2377 assert_eq!(ph, &[0xBB; 32]);
2378 assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
2379 driver.join().unwrap();
2380 }
2381
2382 #[test]
2383 fn send_probe_rpc_size_validation() {
2384 let (event_tx, event_rx) = crate::event::channel();
2385
2386 let driver = thread::spawn(move || {
2388 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2389 event_rx.recv()
2390 {
2391 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2393 }
2394 });
2395
2396 let request = PickleValue::Dict(vec![
2397 (
2398 PickleValue::String("send_probe".into()),
2399 PickleValue::Bytes(vec![0xDD; 16]),
2400 ),
2401 (PickleValue::String("size".into()), PickleValue::Int(-1)),
2402 ]);
2403
2404 let response = handle_rpc_request(&request, &event_tx).unwrap();
2405 assert_eq!(response, PickleValue::None);
2406 driver.join().unwrap();
2407 }
2408
2409 #[test]
2410 fn send_probe_rpc_size_too_large() {
2411 let (event_tx, event_rx) = crate::event::channel();
2412
2413 let driver = thread::spawn(move || {
2415 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2416 event_rx.recv()
2417 {
2418 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2420 }
2421 });
2422
2423 let request = PickleValue::Dict(vec![
2424 (
2425 PickleValue::String("send_probe".into()),
2426 PickleValue::Bytes(vec![0xDD; 16]),
2427 ),
2428 (PickleValue::String("size".into()), PickleValue::Int(999)),
2429 ]);
2430
2431 let response = handle_rpc_request(&request, &event_tx).unwrap();
2432 assert_eq!(response, PickleValue::None);
2433 driver.join().unwrap();
2434 }
2435
2436 #[test]
2437 fn check_proof_rpc_not_found() {
2438 let (event_tx, event_rx) = crate::event::channel();
2439
2440 let driver = thread::spawn(move || {
2441 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2442 event_rx.recv()
2443 {
2444 assert_eq!(packet_hash, [0xEE; 32]);
2445 let _ = resp_tx.send(QueryResponse::CheckProof(None));
2446 }
2447 });
2448
2449 let request = PickleValue::Dict(vec![(
2450 PickleValue::String("check_proof".into()),
2451 PickleValue::Bytes(vec![0xEE; 32]),
2452 )]);
2453
2454 let response = handle_rpc_request(&request, &event_tx).unwrap();
2455 assert_eq!(response, PickleValue::None);
2456 driver.join().unwrap();
2457 }
2458
2459 #[test]
2460 fn check_proof_rpc_found() {
2461 let (event_tx, event_rx) = crate::event::channel();
2462
2463 let driver = thread::spawn(move || {
2464 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2465 event_rx.recv()
2466 {
2467 assert_eq!(packet_hash, [0xFF; 32]);
2468 let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
2469 }
2470 });
2471
2472 let request = PickleValue::Dict(vec![(
2473 PickleValue::String("check_proof".into()),
2474 PickleValue::Bytes(vec![0xFF; 32]),
2475 )]);
2476
2477 let response = handle_rpc_request(&request, &event_tx).unwrap();
2478 if let PickleValue::Float(rtt) = response {
2479 assert!((rtt - 0.352).abs() < 0.001);
2480 } else {
2481 panic!("Expected Float, got {:?}", response);
2482 }
2483 driver.join().unwrap();
2484 }
2485
2486 #[test]
2487 fn request_path_rpc() {
2488 let (event_tx, event_rx) = crate::event::channel();
2489
2490 let driver =
2491 thread::spawn(
2492 move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2493 Ok(Event::RequestPath { dest_hash }) => {
2494 assert_eq!(dest_hash, [0x11; 16]);
2495 }
2496 other => panic!("Expected RequestPath event, got {:?}", other),
2497 },
2498 );
2499
2500 let request = PickleValue::Dict(vec![(
2501 PickleValue::String("request_path".into()),
2502 PickleValue::Bytes(vec![0x11; 16]),
2503 )]);
2504
2505 let response = handle_rpc_request(&request, &event_tx).unwrap();
2506 assert_eq!(response, PickleValue::Bool(true));
2507 driver.join().unwrap();
2508 }
2509
2510 #[test]
2511 fn begin_drain_rpc_emits_event() {
2512 let (event_tx, event_rx) = crate::event::channel();
2513
2514 let driver = thread::spawn(move || match event_rx.recv_timeout(Duration::from_secs(5)) {
2515 Ok(Event::BeginDrain { timeout }) => {
2516 assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
2517 }
2518 other => panic!("Expected BeginDrain event, got {:?}", other),
2519 });
2520
2521 let request = PickleValue::Dict(vec![(
2522 PickleValue::String("begin_drain".into()),
2523 PickleValue::Float(1.5),
2524 )]);
2525
2526 let response = handle_rpc_request(&request, &event_tx).unwrap();
2527 assert_eq!(response, PickleValue::Bool(true));
2528 driver.join().unwrap();
2529 }
2530
2531 #[test]
2532 fn drain_status_rpc_roundtrips_fields() {
2533 let (event_tx, event_rx) = crate::event::channel();
2534
2535 let driver = thread::spawn(move || {
2536 if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
2537 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
2538 state: LifecycleState::Draining,
2539 drain_age_seconds: Some(0.75),
2540 deadline_remaining_seconds: Some(2.25),
2541 drain_complete: false,
2542 interface_writer_queued_frames: 3,
2543 provider_backlog_events: 4,
2544 provider_consumer_queued_events: 5,
2545 detail: Some("node is draining existing work".into()),
2546 }));
2547 }
2548 });
2549
2550 let request = PickleValue::Dict(vec![(
2551 PickleValue::String("get".into()),
2552 PickleValue::String("drain_status".into()),
2553 )]);
2554
2555 let response = handle_rpc_request(&request, &event_tx).unwrap();
2556 assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
2557 assert_eq!(
2558 response.get("drain_complete").unwrap().as_bool(),
2559 Some(false)
2560 );
2561 assert_eq!(
2562 response
2563 .get("deadline_remaining_seconds")
2564 .unwrap()
2565 .as_float(),
2566 Some(2.25)
2567 );
2568 assert_eq!(
2569 response
2570 .get("interface_writer_queued_frames")
2571 .unwrap()
2572 .as_int(),
2573 Some(3)
2574 );
2575 assert_eq!(
2576 response
2577 .get("provider_backlog_events")
2578 .unwrap()
2579 .as_int(),
2580 Some(4)
2581 );
2582 assert_eq!(
2583 response
2584 .get("provider_consumer_queued_events")
2585 .unwrap()
2586 .as_int(),
2587 Some(5)
2588 );
2589 assert_eq!(
2590 response.get("detail").unwrap().as_str(),
2591 Some("node is draining existing work")
2592 );
2593 driver.join().unwrap();
2594 }
2595
2596 #[test]
2597 fn interface_stats_with_probe_responder() {
2598 let probe_hash = [0x42; 16];
2599 let stats = InterfaceStatsResponse {
2600 interfaces: vec![],
2601 transport_id: None,
2602 transport_enabled: true,
2603 transport_uptime: 100.0,
2604 total_rxb: 0,
2605 total_txb: 0,
2606 probe_responder: Some(probe_hash),
2607 };
2608
2609 let pickle = interface_stats_to_pickle(&stats);
2610 let encoded = pickle::encode(&pickle);
2611 let decoded = pickle::decode(&encoded).unwrap();
2612
2613 let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
2614 assert_eq!(pr, &probe_hash);
2615 }
2616
2617 #[test]
2618 fn runtime_config_get_and_set_rpc() {
2619 let (event_tx, event_rx) = crate::event::channel();
2620
2621 let driver = thread::spawn(move || {
2622 if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
2623 event_rx.recv()
2624 {
2625 assert_eq!(key, "global.tick_interval_ms");
2626 let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
2627 RuntimeConfigEntry {
2628 key,
2629 value: RuntimeConfigValue::Int(1000),
2630 default: RuntimeConfigValue::Int(1000),
2631 source: RuntimeConfigSource::Startup,
2632 apply_mode: RuntimeConfigApplyMode::Immediate,
2633 description: Some("tick".into()),
2634 },
2635 )));
2636 } else {
2637 panic!("expected GetRuntimeConfig query");
2638 }
2639
2640 if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
2641 event_rx.recv()
2642 {
2643 assert_eq!(key, "global.tick_interval_ms");
2644 assert_eq!(value, RuntimeConfigValue::Int(250));
2645 let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
2646 key,
2647 value: RuntimeConfigValue::Int(250),
2648 default: RuntimeConfigValue::Int(1000),
2649 source: RuntimeConfigSource::RuntimeOverride,
2650 apply_mode: RuntimeConfigApplyMode::Immediate,
2651 description: Some("tick".into()),
2652 })));
2653 } else {
2654 panic!("expected SetRuntimeConfig query");
2655 }
2656 });
2657
2658 let get_request = PickleValue::Dict(vec![
2659 (
2660 PickleValue::String("get".into()),
2661 PickleValue::String("runtime_config_entry".into()),
2662 ),
2663 (
2664 PickleValue::String("key".into()),
2665 PickleValue::String("global.tick_interval_ms".into()),
2666 ),
2667 ]);
2668 let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
2669 assert_eq!(
2670 get_response.get("key").and_then(|v| v.as_str()),
2671 Some("global.tick_interval_ms")
2672 );
2673
2674 let set_request = PickleValue::Dict(vec![
2675 (
2676 PickleValue::String("set".into()),
2677 PickleValue::String("runtime_config".into()),
2678 ),
2679 (
2680 PickleValue::String("key".into()),
2681 PickleValue::String("global.tick_interval_ms".into()),
2682 ),
2683 (PickleValue::String("value".into()), PickleValue::Int(250)),
2684 ]);
2685 let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
2686 assert_eq!(
2687 set_response.get("value").and_then(|v| v.as_int()),
2688 Some(250)
2689 );
2690
2691 driver.join().unwrap();
2692 }
2693
2694 fn tcp_pair() -> (TcpStream, TcpStream) {
2696 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2697 let port = listener.local_addr().unwrap().port();
2698 let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
2699 let (server, _) = listener.accept().unwrap();
2700 client
2701 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
2702 .unwrap();
2703 server
2704 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
2705 .unwrap();
2706 (server, client)
2707 }
2708}