1use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_core::msgpack::{self, Value as MsgpackValue};
22use rns_crypto::hmac::hmac_sha256;
23use rns_crypto::sha256::sha256;
24
25use crate::event::{
26 BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
27 HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, PathTableEntry,
28 ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode,
29 RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource,
30 RuntimeConfigValue, SingleInterfaceStat,
31};
32use crate::md5::hmac_md5;
33use crate::pickle::{self, PickleValue};
34
35const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
36const WELCOME: &[u8] = b"#WELCOME#";
37const FAILURE: &[u8] = b"#FAILURE#";
38const CHALLENGE_LEN: usize = 40;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41enum RpcCodec {
42 Msgpack,
43 Pickle,
44}
45
46#[derive(Debug, Clone)]
48pub enum RpcAddr {
49 Tcp(String, u16),
50}
51
52pub struct RpcServer {
54 shutdown: Arc<AtomicBool>,
55 thread: Option<thread::JoinHandle<()>>,
56}
57
58impl RpcServer {
59 pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
61 let shutdown = Arc::new(AtomicBool::new(false));
62 let shutdown2 = shutdown.clone();
63
64 let listener = match addr {
65 RpcAddr::Tcp(host, port) => {
66 let l = TcpListener::bind((host.as_str(), *port))?;
67 l.set_nonblocking(true)?;
69 l
70 }
71 };
72
73 let thread = thread::Builder::new()
74 .name("rpc-server".into())
75 .spawn(move || {
76 rpc_server_loop(listener, auth_key, event_tx, shutdown2);
77 })
78 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
79
80 Ok(RpcServer {
81 shutdown,
82 thread: Some(thread),
83 })
84 }
85
86 pub fn stop(&mut self) {
88 self.shutdown.store(true, Ordering::Relaxed);
89 if let Some(handle) = self.thread.take() {
90 let _ = handle.join();
91 }
92 }
93}
94
95impl Drop for RpcServer {
96 fn drop(&mut self) {
97 self.stop();
98 }
99}
100
101fn rpc_server_loop(
102 listener: TcpListener,
103 auth_key: [u8; 32],
104 event_tx: EventSender,
105 shutdown: Arc<AtomicBool>,
106) {
107 loop {
108 if shutdown.load(Ordering::Relaxed) {
109 break;
110 }
111
112 match listener.accept() {
113 Ok((stream, _addr)) => {
114 let _ = stream.set_nonblocking(false);
116 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
117 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
118
119 if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
120 log::debug!("RPC connection error: {}", e);
121 }
122 }
123 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
124 thread::sleep(std::time::Duration::from_millis(100));
126 }
127 Err(e) => {
128 log::error!("RPC accept error: {}", e);
129 thread::sleep(std::time::Duration::from_millis(100));
130 }
131 }
132 }
133}
134
135fn handle_connection(
136 mut stream: TcpStream,
137 auth_key: &[u8; 32],
138 event_tx: &EventSender,
139) -> io::Result<()> {
140 server_auth(&mut stream, auth_key)?;
142
143 let request_bytes = recv_bytes(&mut stream)?;
145 let (request, codec) = decode_rpc_payload(&request_bytes)?;
146
147 let response = handle_rpc_request(&request, event_tx)?;
149
150 let response_bytes = encode_rpc_payload(&response, codec);
152 send_bytes(&mut stream, &response_bytes)?;
153
154 Ok(())
155}
156
157fn decode_rpc_payload(data: &[u8]) -> io::Result<(PickleValue, RpcCodec)> {
158 if let Ok(value) = msgpack::unpack_exact(data) {
159 return Ok((msgpack_to_pickle(&value), RpcCodec::Msgpack));
160 }
161 pickle::decode(data)
162 .map(|value| (value, RpcCodec::Pickle))
163 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
164}
165
166fn encode_rpc_payload(value: &PickleValue, codec: RpcCodec) -> Vec<u8> {
167 match codec {
168 RpcCodec::Msgpack => msgpack::pack(&pickle_to_msgpack(value)),
169 RpcCodec::Pickle => pickle::encode(value),
170 }
171}
172
173fn pickle_to_msgpack(value: &PickleValue) -> MsgpackValue {
174 match value {
175 PickleValue::None => MsgpackValue::Nil,
176 PickleValue::Bool(v) => MsgpackValue::Bool(*v),
177 PickleValue::Int(v) if *v >= 0 => MsgpackValue::UInt(*v as u64),
178 PickleValue::Int(v) => MsgpackValue::Int(*v),
179 PickleValue::Float(v) => MsgpackValue::Float(*v),
180 PickleValue::String(v) => MsgpackValue::Str(v.clone()),
181 PickleValue::Bytes(v) => MsgpackValue::Bin(v.clone()),
182 PickleValue::List(values) => {
183 MsgpackValue::Array(values.iter().map(pickle_to_msgpack).collect())
184 }
185 PickleValue::Dict(values) => MsgpackValue::Map(
186 values
187 .iter()
188 .map(|(key, value)| (pickle_to_msgpack(key), pickle_to_msgpack(value)))
189 .collect(),
190 ),
191 }
192}
193
194fn msgpack_to_pickle(value: &MsgpackValue) -> PickleValue {
195 match value {
196 MsgpackValue::Nil => PickleValue::None,
197 MsgpackValue::Bool(v) => PickleValue::Bool(*v),
198 MsgpackValue::UInt(v) if *v <= i64::MAX as u64 => PickleValue::Int(*v as i64),
199 MsgpackValue::UInt(v) => PickleValue::Float(*v as f64),
200 MsgpackValue::Int(v) => PickleValue::Int(*v),
201 MsgpackValue::Float(v) => PickleValue::Float(*v),
202 MsgpackValue::Bin(v) => PickleValue::Bytes(v.clone()),
203 MsgpackValue::Str(v) => PickleValue::String(v.clone()),
204 MsgpackValue::Array(values) => {
205 PickleValue::List(values.iter().map(msgpack_to_pickle).collect())
206 }
207 MsgpackValue::Map(values) => PickleValue::Dict(
208 values
209 .iter()
210 .map(|(key, value)| (msgpack_to_pickle(key), msgpack_to_pickle(value)))
211 .collect(),
212 ),
213 }
214}
215
216fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
218 let mut random_bytes = [0u8; CHALLENGE_LEN];
220 {
222 let mut f = std::fs::File::open("/dev/urandom")?;
223 f.read_exact(&mut random_bytes)?;
224 }
225
226 let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
227 challenge_message.extend_from_slice(CHALLENGE_PREFIX);
228 challenge_message.extend_from_slice(b"{sha256}");
229 challenge_message.extend_from_slice(&random_bytes);
230
231 send_bytes(stream, &challenge_message)?;
232
233 let response = recv_bytes(stream)?;
235
236 let message = &challenge_message[CHALLENGE_PREFIX.len()..];
239
240 if verify_response(auth_key, message, &response) {
241 send_bytes(stream, WELCOME)?;
242 Ok(())
243 } else {
244 send_bytes(stream, FAILURE)?;
245 Err(io::Error::new(
246 io::ErrorKind::PermissionDenied,
247 "auth failed",
248 ))
249 }
250}
251
252fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
254 if response.starts_with(b"{sha256}") {
256 let digest = &response[8..];
257 let expected = hmac_sha256(auth_key, message);
258 constant_time_eq(digest, &expected)
259 }
260 else if response.len() == 16 {
262 let expected = hmac_md5(auth_key, message);
263 constant_time_eq(response, &expected)
264 }
265 else if response.starts_with(b"{md5}") {
267 let digest = &response[5..];
268 let expected = hmac_md5(auth_key, message);
269 constant_time_eq(digest, &expected)
270 } else {
271 false
272 }
273}
274
275fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
277 if a.len() != b.len() {
278 return false;
279 }
280 let mut diff = 0u8;
281 for (x, y) in a.iter().zip(b.iter()) {
282 diff |= x ^ y;
283 }
284 diff == 0
285}
286
287fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
289 let len = data.len() as i32;
290 stream.write_all(&len.to_be_bytes())?;
291 stream.write_all(data)?;
292 stream.flush()
293}
294
295fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
297 let mut len_buf = [0u8; 4];
298 stream.read_exact(&mut len_buf)?;
299 let len = i32::from_be_bytes(len_buf);
300
301 if len < 0 {
302 let mut len8_buf = [0u8; 8];
304 stream.read_exact(&mut len8_buf)?;
305 let len = u64::from_be_bytes(len8_buf) as usize;
306 if len > 64 * 1024 * 1024 {
307 return Err(io::Error::new(
308 io::ErrorKind::InvalidData,
309 "message too large",
310 ));
311 }
312 let mut buf = vec![0u8; len];
313 stream.read_exact(&mut buf)?;
314 Ok(buf)
315 } else {
316 let len = len as usize;
317 if len > 64 * 1024 * 1024 {
318 return Err(io::Error::new(
319 io::ErrorKind::InvalidData,
320 "message too large",
321 ));
322 }
323 let mut buf = vec![0u8; len];
324 stream.read_exact(&mut buf)?;
325 Ok(buf)
326 }
327}
328
329fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
331 if let Some(get_val) = request.get("get") {
333 if let Some(path) = get_val.as_str() {
334 return match path {
335 "interface_stats" => {
336 let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
337 if let QueryResponse::InterfaceStats(stats) = resp {
338 Ok(interface_stats_to_pickle(&stats))
339 } else {
340 Ok(PickleValue::None)
341 }
342 }
343 "path_table" => {
344 let max_hops = request
345 .get("max_hops")
346 .and_then(|v| v.as_int().map(|n| n as u8));
347 let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
348 if let QueryResponse::PathTable(entries) = resp {
349 Ok(path_table_to_pickle(&entries))
350 } else {
351 Ok(PickleValue::None)
352 }
353 }
354 "rate_table" => {
355 let resp = send_query(event_tx, QueryRequest::RateTable)?;
356 if let QueryResponse::RateTable(entries) = resp {
357 Ok(rate_table_to_pickle(&entries))
358 } else {
359 Ok(PickleValue::None)
360 }
361 }
362 "next_hop" => {
363 let hash = extract_dest_hash(request, "destination_hash")?;
364 let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
365 if let QueryResponse::NextHop(Some(nh)) = resp {
366 Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
367 } else {
368 Ok(PickleValue::None)
369 }
370 }
371 "next_hop_if_name" => {
372 let hash = extract_dest_hash(request, "destination_hash")?;
373 let resp =
374 send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
375 if let QueryResponse::NextHopIfName(Some(name)) = resp {
376 Ok(PickleValue::String(name))
377 } else {
378 Ok(PickleValue::None)
379 }
380 }
381 "link_count" => {
382 let resp = send_query(event_tx, QueryRequest::LinkCount)?;
383 if let QueryResponse::LinkCount(n) = resp {
384 Ok(PickleValue::Int(n as i64))
385 } else {
386 Ok(PickleValue::None)
387 }
388 }
389 "transport_identity" => {
390 let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
391 if let QueryResponse::TransportIdentity(Some(hash)) = resp {
392 Ok(PickleValue::Bytes(hash.to_vec()))
393 } else {
394 Ok(PickleValue::None)
395 }
396 }
397 "blackholed" => {
398 let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
399 if let QueryResponse::Blackholed(entries) = resp {
400 Ok(blackholed_to_pickle(&entries))
401 } else {
402 Ok(PickleValue::None)
403 }
404 }
405 "discovered_interfaces" => {
406 let only_available = request
407 .get("only_available")
408 .and_then(|v| v.as_bool())
409 .unwrap_or(false);
410 let only_transport = request
411 .get("only_transport")
412 .and_then(|v| v.as_bool())
413 .unwrap_or(false);
414 let resp = send_query(
415 event_tx,
416 QueryRequest::DiscoveredInterfaces {
417 only_available,
418 only_transport,
419 },
420 )?;
421 if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
422 Ok(discovered_interfaces_to_pickle(&interfaces))
423 } else {
424 Ok(PickleValue::None)
425 }
426 }
427 "hooks" => {
428 let (response_tx, response_rx) = mpsc::channel();
429 event_tx
430 .send(Event::ListHooks { response_tx })
431 .map_err(|_| {
432 io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
433 })?;
434 let hooks = response_rx
435 .recv_timeout(std::time::Duration::from_secs(5))
436 .map_err(|_| {
437 io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
438 })?;
439 Ok(hooks_to_pickle(&hooks))
440 }
441 "runtime_config" => {
442 let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
443 if let QueryResponse::RuntimeConfigList(entries) = resp {
444 Ok(runtime_config_list_to_pickle(&entries))
445 } else {
446 Ok(PickleValue::None)
447 }
448 }
449 "known_destinations" => {
450 let resp = send_query(event_tx, QueryRequest::KnownDestinations)?;
451 if let QueryResponse::KnownDestinations(entries) = resp {
452 Ok(known_destinations_to_pickle(&entries))
453 } else {
454 Ok(PickleValue::None)
455 }
456 }
457 "runtime_config_entry" => {
458 let key = request
459 .get("key")
460 .and_then(|v| v.as_str())
461 .unwrap_or_default()
462 .to_string();
463 let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
464 if let QueryResponse::RuntimeConfigEntry(entry) = resp {
465 Ok(entry
466 .as_ref()
467 .map(runtime_config_entry_to_pickle)
468 .unwrap_or(PickleValue::None))
469 } else {
470 Ok(PickleValue::None)
471 }
472 }
473 "backbone_peer_state" => {
474 let interface_name = request
475 .get("interface")
476 .and_then(|v| v.as_str())
477 .map(|s| s.to_string());
478 let resp =
479 send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
480 if let QueryResponse::BackbonePeerState(entries) = resp {
481 Ok(backbone_peer_state_to_pickle(&entries))
482 } else {
483 Ok(PickleValue::None)
484 }
485 }
486 "backbone_interfaces" => {
487 let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
488 if let QueryResponse::BackboneInterfaces(entries) = resp {
489 Ok(backbone_interfaces_to_pickle(&entries))
490 } else {
491 Ok(PickleValue::None)
492 }
493 }
494 "provider_bridge_stats" => {
495 let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
496 if let QueryResponse::ProviderBridgeStats(stats) = resp {
497 Ok(stats
498 .as_ref()
499 .map(provider_bridge_stats_to_pickle)
500 .unwrap_or(PickleValue::None))
501 } else {
502 Ok(PickleValue::None)
503 }
504 }
505 "drain_status" => {
506 let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
507 if let QueryResponse::DrainStatus(status) = resp {
508 Ok(drain_status_to_pickle(&status))
509 } else {
510 Ok(PickleValue::None)
511 }
512 }
513 _ => Ok(PickleValue::None),
514 };
515 }
516 }
517
518 if let Some(begin_val) = request.get("begin_drain") {
519 let timeout_secs = begin_val
520 .as_float()
521 .or_else(|| begin_val.as_int().map(|value| value as f64))
522 .unwrap_or(0.0)
523 .max(0.0);
524 let timeout = Duration::from_secs_f64(timeout_secs);
525 let _ = event_tx.send(Event::BeginDrain { timeout });
526 return Ok(PickleValue::Bool(true));
527 }
528
529 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
530 if set_val == "known_destination_retained" {
531 let dest_hash = extract_dest_hash(request, "dest_hash")?;
532 let resp = send_query(event_tx, QueryRequest::RetainKnownDestination { dest_hash })?;
533 return if let QueryResponse::RetainKnownDestination(ok) = resp {
534 Ok(PickleValue::Bool(ok))
535 } else {
536 Ok(PickleValue::None)
537 };
538 }
539 if set_val == "identity_retained" {
540 let identity_hash = extract_dest_hash(request, "identity_hash")?;
541 let resp = send_query(event_tx, QueryRequest::RetainIdentity { identity_hash })?;
542 return if let QueryResponse::RetainIdentity(ok) = resp {
543 Ok(PickleValue::Bool(ok))
544 } else {
545 Ok(PickleValue::None)
546 };
547 }
548 if set_val == "known_destination_used" {
549 let dest_hash = extract_dest_hash(request, "dest_hash")?;
550 let resp = send_query(
551 event_tx,
552 QueryRequest::MarkKnownDestinationUsed { dest_hash },
553 )?;
554 return if let QueryResponse::MarkKnownDestinationUsed(ok) = resp {
555 Ok(PickleValue::Bool(ok))
556 } else {
557 Ok(PickleValue::None)
558 };
559 }
560 if set_val == "runtime_config" {
561 let key = request
562 .get("key")
563 .and_then(|v| v.as_str())
564 .unwrap_or_default()
565 .to_string();
566 let Some(value) = request
567 .get("value")
568 .and_then(runtime_config_value_from_pickle)
569 else {
570 return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
571 code: RuntimeConfigErrorCode::InvalidType,
572 message: "runtime-config set requires a scalar value".into(),
573 }));
574 };
575 let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
576 return if let QueryResponse::RuntimeConfigSet(result) = resp {
577 Ok(runtime_config_result_to_pickle(result))
578 } else {
579 Ok(PickleValue::None)
580 };
581 }
582 }
583
584 if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
585 if reset_val == "runtime_config" {
586 let key = request
587 .get("key")
588 .and_then(|v| v.as_str())
589 .unwrap_or_default()
590 .to_string();
591 let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
592 return if let QueryResponse::RuntimeConfigReset(result) = resp {
593 Ok(runtime_config_result_to_pickle(result))
594 } else {
595 Ok(PickleValue::None)
596 };
597 }
598 }
599
600 if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
601 if clear_val == "known_destination_retained" {
602 let dest_hash = extract_dest_hash(request, "dest_hash")?;
603 let resp = send_query(
604 event_tx,
605 QueryRequest::UnretainKnownDestination { dest_hash },
606 )?;
607 return if let QueryResponse::UnretainKnownDestination(ok) = resp {
608 Ok(PickleValue::Bool(ok))
609 } else {
610 Ok(PickleValue::None)
611 };
612 }
613 if clear_val == "backbone_peer_state" {
614 let interface_name = required_string(request, "interface")?;
615 let peer_ip = required_string(request, "ip")?;
616 let peer_ip = peer_ip
617 .parse()
618 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
619 let resp = send_query(
620 event_tx,
621 QueryRequest::ClearBackbonePeerState {
622 interface_name,
623 peer_ip,
624 },
625 )?;
626 return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
627 Ok(PickleValue::Bool(ok))
628 } else {
629 Ok(PickleValue::None)
630 };
631 }
632 }
633
634 if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
635 if set_val == "backbone_peer_blacklist" {
636 let interface_name = required_string(request, "interface")?;
637 let peer_ip = required_string(request, "ip")?;
638 let peer_ip: IpAddr = peer_ip
639 .parse()
640 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
641 let duration_secs = request
642 .get("duration_secs")
643 .and_then(|v| v.as_int())
644 .ok_or_else(|| {
645 io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
646 })?;
647 let reason = request
648 .get("reason")
649 .and_then(|v| v.as_str())
650 .unwrap_or("sentinel blacklist")
651 .to_string();
652 let penalty_level = request
653 .get("penalty_level")
654 .and_then(|v| v.as_int())
655 .unwrap_or(0)
656 .clamp(0, u8::MAX as i64) as u8;
657 let resp = send_query(
658 event_tx,
659 QueryRequest::BlacklistBackbonePeer {
660 interface_name,
661 peer_ip,
662 duration: Duration::from_secs(duration_secs as u64),
663 reason,
664 penalty_level,
665 },
666 )?;
667 return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
668 Ok(PickleValue::Bool(ok))
669 } else {
670 Ok(PickleValue::None)
671 };
672 }
673 }
674
675 if let Some(hash_val) = request.get("request_path") {
677 if let Some(hash_bytes) = hash_val.as_bytes() {
678 if hash_bytes.len() >= 16 {
679 let mut dest_hash = [0u8; 16];
680 dest_hash.copy_from_slice(&hash_bytes[..16]);
681 let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
682 return Ok(PickleValue::Bool(true));
683 }
684 }
685 }
686
687 if let Some(hash_val) = request.get("send_probe") {
689 if let Some(hash_bytes) = hash_val.as_bytes() {
690 if hash_bytes.len() >= 16 {
691 let mut dest_hash = [0u8; 16];
692 dest_hash.copy_from_slice(&hash_bytes[..16]);
693 let payload_size = request
694 .get("size")
695 .and_then(|v| v.as_int())
696 .and_then(|n| {
697 if n > 0 && n <= 400 {
698 Some(n as usize)
699 } else {
700 None
701 }
702 })
703 .unwrap_or(16);
704 let resp = send_query(
705 event_tx,
706 QueryRequest::SendProbe {
707 dest_hash,
708 payload_size,
709 },
710 )?;
711 if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
712 return Ok(PickleValue::Dict(vec![
713 (
714 PickleValue::String("packet_hash".into()),
715 PickleValue::Bytes(packet_hash.to_vec()),
716 ),
717 (
718 PickleValue::String("hops".into()),
719 PickleValue::Int(hops as i64),
720 ),
721 ]));
722 } else {
723 return Ok(PickleValue::None);
724 }
725 }
726 }
727 }
728
729 if let Some(hash_val) = request.get("check_proof") {
731 if let Some(hash_bytes) = hash_val.as_bytes() {
732 if hash_bytes.len() >= 32 {
733 let mut packet_hash = [0u8; 32];
734 packet_hash.copy_from_slice(&hash_bytes[..32]);
735 let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
736 if let QueryResponse::CheckProof(Some(rtt)) = resp {
737 return Ok(PickleValue::Float(rtt));
738 } else {
739 return Ok(PickleValue::None);
740 }
741 }
742 }
743 }
744
745 if let Some(hash_val) = request.get("blackhole") {
747 if let Some(hash_bytes) = hash_val.as_bytes() {
748 if hash_bytes.len() >= 16 {
749 let mut identity_hash = [0u8; 16];
750 identity_hash.copy_from_slice(&hash_bytes[..16]);
751 let duration_hours = request.get("duration").and_then(|v| v.as_float());
752 let reason = request
753 .get("reason")
754 .and_then(|v| v.as_str())
755 .map(|s| s.to_string());
756 let resp = send_query(
757 event_tx,
758 QueryRequest::BlackholeIdentity {
759 identity_hash,
760 duration_hours,
761 reason,
762 },
763 )?;
764 return Ok(PickleValue::Bool(matches!(
765 resp,
766 QueryResponse::BlackholeResult(true)
767 )));
768 }
769 }
770 }
771
772 if let Some(hash_val) = request.get("unblackhole") {
774 if let Some(hash_bytes) = hash_val.as_bytes() {
775 if hash_bytes.len() >= 16 {
776 let mut identity_hash = [0u8; 16];
777 identity_hash.copy_from_slice(&hash_bytes[..16]);
778 let resp = send_query(
779 event_tx,
780 QueryRequest::UnblackholeIdentity { identity_hash },
781 )?;
782 return Ok(PickleValue::Bool(matches!(
783 resp,
784 QueryResponse::UnblackholeResult(true)
785 )));
786 }
787 }
788 }
789
790 if let Some(drop_val) = request.get("drop") {
792 if let Some(path) = drop_val.as_str() {
793 return match path {
794 "path" => {
795 let hash = extract_dest_hash(request, "destination_hash")?;
796 let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
797 if let QueryResponse::DropPath(ok) = resp {
798 Ok(PickleValue::Bool(ok))
799 } else {
800 Ok(PickleValue::None)
801 }
802 }
803 "all_via" => {
804 let hash = extract_dest_hash(request, "destination_hash")?;
805 let resp = send_query(
806 event_tx,
807 QueryRequest::DropAllVia {
808 transport_hash: hash,
809 },
810 )?;
811 if let QueryResponse::DropAllVia(n) = resp {
812 Ok(PickleValue::Int(n as i64))
813 } else {
814 Ok(PickleValue::None)
815 }
816 }
817 "announce_queues" => {
818 let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
819 if let QueryResponse::DropAnnounceQueues = resp {
820 Ok(PickleValue::Bool(true))
821 } else {
822 Ok(PickleValue::None)
823 }
824 }
825 _ => Ok(PickleValue::None),
826 };
827 }
828 }
829
830 if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
831 return handle_hook_rpc_request(hook_val, request, event_tx);
832 }
833
834 Ok(PickleValue::None)
835}
836
837fn handle_hook_rpc_request(
838 op: &str,
839 request: &PickleValue,
840 event_tx: &EventSender,
841) -> io::Result<PickleValue> {
842 match op {
843 "load" => {
844 let name = required_string(request, "name")?;
845 let attach_point = required_string(request, "attach_point")?;
846 let priority = request
847 .get("priority")
848 .and_then(|v| v.as_int())
849 .unwrap_or(0) as i32;
850 let wasm = request
851 .get("wasm")
852 .and_then(|v| v.as_bytes())
853 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
854 .to_vec();
855 let (response_tx, response_rx) = mpsc::channel();
856 event_tx
857 .send(Event::LoadHook {
858 name,
859 wasm_bytes: wasm,
860 attach_point,
861 priority,
862 response_tx,
863 })
864 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
865 let response = response_rx
866 .recv_timeout(std::time::Duration::from_secs(5))
867 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
868 Ok(hook_result_to_pickle(response))
869 }
870 "load_builtin" => {
871 let name = required_string(request, "name")?;
872 let attach_point = required_string(request, "attach_point")?;
873 let builtin_id = required_string(request, "builtin_id")?;
874 let priority = request
875 .get("priority")
876 .and_then(|v| v.as_int())
877 .unwrap_or(0) as i32;
878 let (response_tx, response_rx) = mpsc::channel();
879 event_tx
880 .send(Event::LoadBuiltinHook {
881 name,
882 builtin_id,
883 attach_point,
884 priority,
885 response_tx,
886 })
887 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
888 let response = response_rx
889 .recv_timeout(std::time::Duration::from_secs(5))
890 .map_err(|_| {
891 io::Error::new(io::ErrorKind::TimedOut, "built-in hook load timed out")
892 })?;
893 Ok(hook_result_to_pickle(response))
894 }
895 "unload" => {
896 let name = required_string(request, "name")?;
897 let attach_point = required_string(request, "attach_point")?;
898 let (response_tx, response_rx) = mpsc::channel();
899 event_tx
900 .send(Event::UnloadHook {
901 name,
902 attach_point,
903 response_tx,
904 })
905 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
906 let response = response_rx
907 .recv_timeout(std::time::Duration::from_secs(5))
908 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
909 Ok(hook_result_to_pickle(response))
910 }
911 "enable" | "disable" => {
912 let name = required_string(request, "name")?;
913 let attach_point = required_string(request, "attach_point")?;
914 let enabled = op == "enable";
915 let (response_tx, response_rx) = mpsc::channel();
916 event_tx
917 .send(Event::SetHookEnabled {
918 name,
919 attach_point,
920 enabled,
921 response_tx,
922 })
923 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
924 let response = response_rx
925 .recv_timeout(std::time::Duration::from_secs(5))
926 .map_err(|_| {
927 io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
928 })?;
929 Ok(hook_result_to_pickle(response))
930 }
931 "set_priority" => {
932 let name = required_string(request, "name")?;
933 let attach_point = required_string(request, "attach_point")?;
934 let priority = request
935 .get("priority")
936 .and_then(|v| v.as_int())
937 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
938 as i32;
939 let (response_tx, response_rx) = mpsc::channel();
940 event_tx
941 .send(Event::SetHookPriority {
942 name,
943 attach_point,
944 priority,
945 response_tx,
946 })
947 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
948 let response = response_rx
949 .recv_timeout(std::time::Duration::from_secs(5))
950 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
951 Ok(hook_result_to_pickle(response))
952 }
953 _ => Ok(PickleValue::None),
954 }
955}
956
957fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
959 let (resp_tx, resp_rx) = mpsc::channel();
960 event_tx
961 .send(Event::Query(request, resp_tx))
962 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
963 resp_rx
964 .recv_timeout(std::time::Duration::from_secs(5))
965 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
966}
967
968fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
970 let bytes = request
971 .get(key)
972 .and_then(|v| v.as_bytes())
973 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
974 if bytes.len() < 16 {
975 return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
976 }
977 let mut hash = [0u8; 16];
978 hash.copy_from_slice(&bytes[..16]);
979 Ok(hash)
980}
981
982fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
983 request
984 .get(key)
985 .and_then(|v| v.as_str())
986 .map(|s| s.to_string())
987 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
988}
989
990fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
991 match result {
992 Ok(()) => PickleValue::Dict(vec![(
993 PickleValue::String("ok".into()),
994 PickleValue::Bool(true),
995 )]),
996 Err(error) => PickleValue::Dict(vec![
997 (PickleValue::String("ok".into()), PickleValue::Bool(false)),
998 (
999 PickleValue::String("error".into()),
1000 PickleValue::String(error),
1001 ),
1002 ]),
1003 }
1004}
1005
1006fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
1009 let mut ifaces = Vec::new();
1010 for iface in &stats.interfaces {
1011 ifaces.push(single_iface_to_pickle(iface));
1012 }
1013
1014 let mut dict = vec![
1015 (
1016 PickleValue::String("interfaces".into()),
1017 PickleValue::List(ifaces),
1018 ),
1019 (
1020 PickleValue::String("transport_enabled".into()),
1021 PickleValue::Bool(stats.transport_enabled),
1022 ),
1023 (
1024 PickleValue::String("transport_uptime".into()),
1025 PickleValue::Float(stats.transport_uptime),
1026 ),
1027 (
1028 PickleValue::String("rxb".into()),
1029 PickleValue::Int(stats.total_rxb as i64),
1030 ),
1031 (
1032 PickleValue::String("txb".into()),
1033 PickleValue::Int(stats.total_txb as i64),
1034 ),
1035 ];
1036
1037 if let Some(tid) = stats.transport_id {
1038 dict.push((
1039 PickleValue::String("transport_id".into()),
1040 PickleValue::Bytes(tid.to_vec()),
1041 ));
1042 } else {
1043 dict.push((
1044 PickleValue::String("transport_id".into()),
1045 PickleValue::None,
1046 ));
1047 }
1048
1049 if let Some(pr) = stats.probe_responder {
1050 dict.push((
1051 PickleValue::String("probe_responder".into()),
1052 PickleValue::Bytes(pr.to_vec()),
1053 ));
1054 } else {
1055 dict.push((
1056 PickleValue::String("probe_responder".into()),
1057 PickleValue::None,
1058 ));
1059 }
1060
1061 if let Some(pool) = &stats.backbone_peer_pool {
1062 let members = pool
1063 .members
1064 .iter()
1065 .map(|member| {
1066 let mut member_dict = vec![
1067 (
1068 PickleValue::String("name".into()),
1069 PickleValue::String(member.name.clone()),
1070 ),
1071 (
1072 PickleValue::String("remote".into()),
1073 PickleValue::String(member.remote.clone()),
1074 ),
1075 (
1076 PickleValue::String("source".into()),
1077 PickleValue::String(member.source.clone()),
1078 ),
1079 (
1080 PickleValue::String("priority".into()),
1081 PickleValue::Int(member.priority as i64),
1082 ),
1083 (
1084 PickleValue::String("state".into()),
1085 PickleValue::String(member.state.clone()),
1086 ),
1087 (
1088 PickleValue::String("failure_count".into()),
1089 PickleValue::Int(member.failure_count as i64),
1090 ),
1091 ];
1092 member_dict.push((
1093 PickleValue::String("interface_id".into()),
1094 member
1095 .interface_id
1096 .map(|id| PickleValue::Int(id as i64))
1097 .unwrap_or(PickleValue::None),
1098 ));
1099 member_dict.push((
1100 PickleValue::String("last_error".into()),
1101 member
1102 .last_error
1103 .as_ref()
1104 .map(|err| PickleValue::String(err.clone()))
1105 .unwrap_or(PickleValue::None),
1106 ));
1107 member_dict.push((
1108 PickleValue::String("cooldown_remaining_seconds".into()),
1109 member
1110 .cooldown_remaining_seconds
1111 .map(PickleValue::Float)
1112 .unwrap_or(PickleValue::None),
1113 ));
1114 PickleValue::Dict(member_dict)
1115 })
1116 .collect();
1117 dict.push((
1118 PickleValue::String("backbone_peer_pool".into()),
1119 PickleValue::Dict(vec![
1120 (
1121 PickleValue::String("max_connected".into()),
1122 PickleValue::Int(pool.max_connected as i64),
1123 ),
1124 (
1125 PickleValue::String("active_count".into()),
1126 PickleValue::Int(pool.active_count as i64),
1127 ),
1128 (
1129 PickleValue::String("standby_count".into()),
1130 PickleValue::Int(pool.standby_count as i64),
1131 ),
1132 (
1133 PickleValue::String("cooldown_count".into()),
1134 PickleValue::Int(pool.cooldown_count as i64),
1135 ),
1136 (
1137 PickleValue::String("members".into()),
1138 PickleValue::List(members),
1139 ),
1140 ]),
1141 ));
1142 } else {
1143 dict.push((
1144 PickleValue::String("backbone_peer_pool".into()),
1145 PickleValue::None,
1146 ));
1147 }
1148
1149 PickleValue::Dict(dict)
1150}
1151
1152fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
1153 let mut dict = vec![
1154 (
1155 PickleValue::String("id".into()),
1156 PickleValue::Int(s.id as i64),
1157 ),
1158 (
1159 PickleValue::String("name".into()),
1160 PickleValue::String(s.name.clone()),
1161 ),
1162 (
1163 PickleValue::String("status".into()),
1164 PickleValue::Bool(s.status),
1165 ),
1166 (
1167 PickleValue::String("mode".into()),
1168 PickleValue::Int(s.mode as i64),
1169 ),
1170 (
1171 PickleValue::String("rxb".into()),
1172 PickleValue::Int(s.rxb as i64),
1173 ),
1174 (
1175 PickleValue::String("txb".into()),
1176 PickleValue::Int(s.txb as i64),
1177 ),
1178 (
1179 PickleValue::String("rx_packets".into()),
1180 PickleValue::Int(s.rx_packets as i64),
1181 ),
1182 (
1183 PickleValue::String("tx_packets".into()),
1184 PickleValue::Int(s.tx_packets as i64),
1185 ),
1186 (
1187 PickleValue::String("started".into()),
1188 PickleValue::Float(s.started),
1189 ),
1190 (
1191 PickleValue::String("ia_freq".into()),
1192 PickleValue::Float(s.ia_freq),
1193 ),
1194 (
1195 PickleValue::String("oa_freq".into()),
1196 PickleValue::Float(s.oa_freq),
1197 ),
1198 (
1199 PickleValue::String("ip_freq".into()),
1200 PickleValue::Float(s.ip_freq),
1201 ),
1202 (
1203 PickleValue::String("op_freq".into()),
1204 PickleValue::Float(s.op_freq),
1205 ),
1206 (
1207 PickleValue::String("burst_active".into()),
1208 PickleValue::Bool(s.burst_active),
1209 ),
1210 (
1211 PickleValue::String("burst_activated".into()),
1212 PickleValue::Float(s.burst_activated),
1213 ),
1214 (
1215 PickleValue::String("pr_burst_active".into()),
1216 PickleValue::Bool(s.pr_burst_active),
1217 ),
1218 (
1219 PickleValue::String("pr_burst_activated".into()),
1220 PickleValue::Float(s.pr_burst_activated),
1221 ),
1222 (
1223 PickleValue::String("clients".into()),
1224 s.clients
1225 .map(|clients| PickleValue::Int(clients as i64))
1226 .unwrap_or(PickleValue::None),
1227 ),
1228 (
1229 PickleValue::String("announce_rate_grace".into()),
1230 PickleValue::Int(s.announce_rate_grace as i64),
1231 ),
1232 (
1233 PickleValue::String("announce_rate_penalty".into()),
1234 PickleValue::Float(s.announce_rate_penalty),
1235 ),
1236 ];
1237
1238 match s.announce_rate_target {
1239 Some(target) => dict.push((
1240 PickleValue::String("announce_rate_target".into()),
1241 PickleValue::Float(target),
1242 )),
1243 None => dict.push((
1244 PickleValue::String("announce_rate_target".into()),
1245 PickleValue::None,
1246 )),
1247 }
1248
1249 match s.bitrate {
1250 Some(br) => dict.push((
1251 PickleValue::String("bitrate".into()),
1252 PickleValue::Int(br as i64),
1253 )),
1254 None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
1255 }
1256
1257 match s.ifac_size {
1258 Some(sz) => dict.push((
1259 PickleValue::String("ifac_size".into()),
1260 PickleValue::Int(sz as i64),
1261 )),
1262 None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
1263 }
1264
1265 PickleValue::Dict(dict)
1266}
1267
1268fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
1269 let list: Vec<PickleValue> = entries
1270 .iter()
1271 .map(|e| {
1272 PickleValue::Dict(vec![
1273 (
1274 PickleValue::String("hash".into()),
1275 PickleValue::Bytes(e.hash.to_vec()),
1276 ),
1277 (
1278 PickleValue::String("timestamp".into()),
1279 PickleValue::Float(e.timestamp),
1280 ),
1281 (
1282 PickleValue::String("via".into()),
1283 PickleValue::Bytes(e.via.to_vec()),
1284 ),
1285 (
1286 PickleValue::String("hops".into()),
1287 PickleValue::Int(e.hops as i64),
1288 ),
1289 (
1290 PickleValue::String("expires".into()),
1291 PickleValue::Float(e.expires),
1292 ),
1293 (
1294 PickleValue::String("interface".into()),
1295 PickleValue::String(e.interface_name.clone()),
1296 ),
1297 ])
1298 })
1299 .collect();
1300 PickleValue::List(list)
1301}
1302
1303fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1304 let list: Vec<PickleValue> = entries
1305 .iter()
1306 .map(|e| {
1307 PickleValue::Dict(vec![
1308 (
1309 PickleValue::String("hash".into()),
1310 PickleValue::Bytes(e.hash.to_vec()),
1311 ),
1312 (
1313 PickleValue::String("last".into()),
1314 PickleValue::Float(e.last),
1315 ),
1316 (
1317 PickleValue::String("rate_violations".into()),
1318 PickleValue::Int(e.rate_violations as i64),
1319 ),
1320 (
1321 PickleValue::String("blocked_until".into()),
1322 PickleValue::Float(e.blocked_until),
1323 ),
1324 (
1325 PickleValue::String("timestamps".into()),
1326 PickleValue::List(
1327 e.timestamps
1328 .iter()
1329 .map(|&t| PickleValue::Float(t))
1330 .collect(),
1331 ),
1332 ),
1333 ])
1334 })
1335 .collect();
1336 PickleValue::List(list)
1337}
1338
1339fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1340 let list: Vec<PickleValue> = entries
1341 .iter()
1342 .map(|e| {
1343 let mut dict = vec![
1344 (
1345 PickleValue::String("identity_hash".into()),
1346 PickleValue::Bytes(e.identity_hash.to_vec()),
1347 ),
1348 (
1349 PickleValue::String("created".into()),
1350 PickleValue::Float(e.created),
1351 ),
1352 (
1353 PickleValue::String("expires".into()),
1354 PickleValue::Float(e.expires),
1355 ),
1356 ];
1357 if let Some(ref reason) = e.reason {
1358 dict.push((
1359 PickleValue::String("reason".into()),
1360 PickleValue::String(reason.clone()),
1361 ));
1362 } else {
1363 dict.push((PickleValue::String("reason".into()), PickleValue::None));
1364 }
1365 PickleValue::Dict(dict)
1366 })
1367 .collect();
1368 PickleValue::List(list)
1369}
1370
1371fn discovered_interfaces_to_pickle(
1372 interfaces: &[crate::discovery::DiscoveredInterface],
1373) -> PickleValue {
1374 let list: Vec<PickleValue> = interfaces
1375 .iter()
1376 .map(|iface| {
1377 let mut dict = vec![
1378 (
1379 PickleValue::String("type".into()),
1380 PickleValue::String(iface.interface_type.clone()),
1381 ),
1382 (
1383 PickleValue::String("transport".into()),
1384 PickleValue::Bool(iface.transport),
1385 ),
1386 (
1387 PickleValue::String("name".into()),
1388 PickleValue::String(iface.name.clone()),
1389 ),
1390 (
1391 PickleValue::String("discovered".into()),
1392 PickleValue::Float(iface.discovered),
1393 ),
1394 (
1395 PickleValue::String("last_heard".into()),
1396 PickleValue::Float(iface.last_heard),
1397 ),
1398 (
1399 PickleValue::String("heard_count".into()),
1400 PickleValue::Int(iface.heard_count as i64),
1401 ),
1402 (
1403 PickleValue::String("status".into()),
1404 PickleValue::String(iface.status.as_str().into()),
1405 ),
1406 (
1407 PickleValue::String("stamp".into()),
1408 PickleValue::Bytes(iface.stamp.clone()),
1409 ),
1410 (
1411 PickleValue::String("value".into()),
1412 PickleValue::Int(iface.stamp_value as i64),
1413 ),
1414 (
1415 PickleValue::String("transport_id".into()),
1416 PickleValue::Bytes(iface.transport_id.to_vec()),
1417 ),
1418 (
1419 PickleValue::String("network_id".into()),
1420 PickleValue::Bytes(iface.network_id.to_vec()),
1421 ),
1422 (
1423 PickleValue::String("hops".into()),
1424 PickleValue::Int(iface.hops as i64),
1425 ),
1426 ];
1427
1428 if let Some(v) = iface.latitude {
1430 dict.push((
1431 PickleValue::String("latitude".into()),
1432 PickleValue::Float(v),
1433 ));
1434 } else {
1435 dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1436 }
1437 if let Some(v) = iface.longitude {
1438 dict.push((
1439 PickleValue::String("longitude".into()),
1440 PickleValue::Float(v),
1441 ));
1442 } else {
1443 dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1444 }
1445 if let Some(v) = iface.height {
1446 dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1447 } else {
1448 dict.push((PickleValue::String("height".into()), PickleValue::None));
1449 }
1450
1451 if let Some(ref v) = iface.reachable_on {
1453 dict.push((
1454 PickleValue::String("reachable_on".into()),
1455 PickleValue::String(v.clone()),
1456 ));
1457 } else {
1458 dict.push((
1459 PickleValue::String("reachable_on".into()),
1460 PickleValue::None,
1461 ));
1462 }
1463 if let Some(v) = iface.port {
1464 dict.push((
1465 PickleValue::String("port".into()),
1466 PickleValue::Int(v as i64),
1467 ));
1468 } else {
1469 dict.push((PickleValue::String("port".into()), PickleValue::None));
1470 }
1471
1472 if let Some(v) = iface.frequency {
1474 dict.push((
1475 PickleValue::String("frequency".into()),
1476 PickleValue::Int(v as i64),
1477 ));
1478 } else {
1479 dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1480 }
1481 if let Some(v) = iface.bandwidth {
1482 dict.push((
1483 PickleValue::String("bandwidth".into()),
1484 PickleValue::Int(v as i64),
1485 ));
1486 } else {
1487 dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1488 }
1489 if let Some(v) = iface.spreading_factor {
1490 dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1491 } else {
1492 dict.push((PickleValue::String("sf".into()), PickleValue::None));
1493 }
1494 if let Some(v) = iface.coding_rate {
1495 dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1496 } else {
1497 dict.push((PickleValue::String("cr".into()), PickleValue::None));
1498 }
1499 if let Some(ref v) = iface.modulation {
1500 dict.push((
1501 PickleValue::String("modulation".into()),
1502 PickleValue::String(v.clone()),
1503 ));
1504 } else {
1505 dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1506 }
1507 if let Some(v) = iface.channel {
1508 dict.push((
1509 PickleValue::String("channel".into()),
1510 PickleValue::Int(v as i64),
1511 ));
1512 } else {
1513 dict.push((PickleValue::String("channel".into()), PickleValue::None));
1514 }
1515
1516 if let Some(ref v) = iface.ifac_netname {
1518 dict.push((
1519 PickleValue::String("ifac_netname".into()),
1520 PickleValue::String(v.clone()),
1521 ));
1522 } else {
1523 dict.push((
1524 PickleValue::String("ifac_netname".into()),
1525 PickleValue::None,
1526 ));
1527 }
1528 if let Some(ref v) = iface.ifac_netkey {
1529 dict.push((
1530 PickleValue::String("ifac_netkey".into()),
1531 PickleValue::String(v.clone()),
1532 ));
1533 } else {
1534 dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1535 }
1536
1537 if let Some(ref v) = iface.config_entry {
1539 dict.push((
1540 PickleValue::String("config_entry".into()),
1541 PickleValue::String(v.clone()),
1542 ));
1543 } else {
1544 dict.push((
1545 PickleValue::String("config_entry".into()),
1546 PickleValue::None,
1547 ));
1548 }
1549
1550 dict.push((
1551 PickleValue::String("discovery_hash".into()),
1552 PickleValue::Bytes(iface.discovery_hash.to_vec()),
1553 ));
1554
1555 PickleValue::Dict(dict)
1556 })
1557 .collect();
1558 PickleValue::List(list)
1559}
1560
1561fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1562 PickleValue::List(
1563 hooks
1564 .iter()
1565 .map(|hook| {
1566 PickleValue::Dict(vec![
1567 (
1568 PickleValue::String("name".into()),
1569 PickleValue::String(hook.name.clone()),
1570 ),
1571 (
1572 PickleValue::String("type".into()),
1573 PickleValue::String(hook.hook_type.clone()),
1574 ),
1575 (
1576 PickleValue::String("attach_point".into()),
1577 PickleValue::String(hook.attach_point.clone()),
1578 ),
1579 (
1580 PickleValue::String("priority".into()),
1581 PickleValue::Int(hook.priority as i64),
1582 ),
1583 (
1584 PickleValue::String("enabled".into()),
1585 PickleValue::Bool(hook.enabled),
1586 ),
1587 (
1588 PickleValue::String("consecutive_traps".into()),
1589 PickleValue::Int(hook.consecutive_traps as i64),
1590 ),
1591 ])
1592 })
1593 .collect(),
1594 )
1595}
1596
1597fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1598 PickleValue::List(
1599 entries
1600 .iter()
1601 .map(|entry| {
1602 PickleValue::Dict(vec![
1603 (
1604 PickleValue::String("interface".into()),
1605 PickleValue::String(entry.interface_name.clone()),
1606 ),
1607 (
1608 PickleValue::String("ip".into()),
1609 PickleValue::String(entry.peer_ip.to_string()),
1610 ),
1611 (
1612 PickleValue::String("connected_count".into()),
1613 PickleValue::Int(entry.connected_count as i64),
1614 ),
1615 (
1616 PickleValue::String("blacklisted_remaining_secs".into()),
1617 entry
1618 .blacklisted_remaining_secs
1619 .map(PickleValue::Float)
1620 .unwrap_or(PickleValue::None),
1621 ),
1622 (
1623 PickleValue::String("blacklist_reason".into()),
1624 entry
1625 .blacklist_reason
1626 .as_ref()
1627 .map(|v: &String| PickleValue::String(v.clone()))
1628 .unwrap_or(PickleValue::None),
1629 ),
1630 (
1631 PickleValue::String("reject_count".into()),
1632 PickleValue::Int(entry.reject_count as i64),
1633 ),
1634 ])
1635 })
1636 .collect(),
1637 )
1638}
1639
1640fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1641 PickleValue::List(
1642 entries
1643 .iter()
1644 .map(|entry| {
1645 PickleValue::Dict(vec![
1646 (
1647 PickleValue::String("id".into()),
1648 PickleValue::Int(entry.interface_id.0 as i64),
1649 ),
1650 (
1651 PickleValue::String("name".into()),
1652 PickleValue::String(entry.interface_name.clone()),
1653 ),
1654 ])
1655 })
1656 .collect(),
1657 )
1658}
1659
1660fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1661 PickleValue::Dict(vec![
1662 (
1663 PickleValue::String("connected".into()),
1664 PickleValue::Bool(stats.connected),
1665 ),
1666 (
1667 PickleValue::String("consumer_count".into()),
1668 PickleValue::Int(stats.consumer_count as i64),
1669 ),
1670 (
1671 PickleValue::String("queue_max_events".into()),
1672 PickleValue::Int(stats.queue_max_events as i64),
1673 ),
1674 (
1675 PickleValue::String("queue_max_bytes".into()),
1676 PickleValue::Int(stats.queue_max_bytes as i64),
1677 ),
1678 (
1679 PickleValue::String("backlog_len".into()),
1680 PickleValue::Int(stats.backlog_len as i64),
1681 ),
1682 (
1683 PickleValue::String("backlog_bytes".into()),
1684 PickleValue::Int(stats.backlog_bytes as i64),
1685 ),
1686 (
1687 PickleValue::String("backlog_dropped_pending".into()),
1688 PickleValue::Int(stats.backlog_dropped_pending as i64),
1689 ),
1690 (
1691 PickleValue::String("backlog_dropped_total".into()),
1692 PickleValue::Int(stats.backlog_dropped_total as i64),
1693 ),
1694 (
1695 PickleValue::String("total_disconnect_count".into()),
1696 PickleValue::Int(stats.total_disconnect_count as i64),
1697 ),
1698 (
1699 PickleValue::String("consumers".into()),
1700 PickleValue::List(
1701 stats
1702 .consumers
1703 .iter()
1704 .map(|consumer| {
1705 PickleValue::Dict(vec![
1706 (
1707 PickleValue::String("id".into()),
1708 PickleValue::Int(consumer.id as i64),
1709 ),
1710 (
1711 PickleValue::String("connected".into()),
1712 PickleValue::Bool(consumer.connected),
1713 ),
1714 (
1715 PickleValue::String("queue_len".into()),
1716 PickleValue::Int(consumer.queue_len as i64),
1717 ),
1718 (
1719 PickleValue::String("queued_bytes".into()),
1720 PickleValue::Int(consumer.queued_bytes as i64),
1721 ),
1722 (
1723 PickleValue::String("dropped_pending".into()),
1724 PickleValue::Int(consumer.dropped_pending as i64),
1725 ),
1726 (
1727 PickleValue::String("dropped_total".into()),
1728 PickleValue::Int(consumer.dropped_total as i64),
1729 ),
1730 (
1731 PickleValue::String("queue_max_events".into()),
1732 PickleValue::Int(consumer.queue_max_events as i64),
1733 ),
1734 (
1735 PickleValue::String("queue_max_bytes".into()),
1736 PickleValue::Int(consumer.queue_max_bytes as i64),
1737 ),
1738 ])
1739 })
1740 .collect(),
1741 ),
1742 ),
1743 ])
1744}
1745
1746fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1747 match state {
1748 LifecycleState::Active => "active",
1749 LifecycleState::Draining => "draining",
1750 LifecycleState::Stopping => "stopping",
1751 LifecycleState::Stopped => "stopped",
1752 }
1753}
1754
1755fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1756 PickleValue::Dict(vec![
1757 (
1758 PickleValue::String("state".into()),
1759 PickleValue::String(lifecycle_state_name(status.state).into()),
1760 ),
1761 (
1762 PickleValue::String("drain_age_seconds".into()),
1763 status
1764 .drain_age_seconds
1765 .map(PickleValue::Float)
1766 .unwrap_or(PickleValue::None),
1767 ),
1768 (
1769 PickleValue::String("deadline_remaining_seconds".into()),
1770 status
1771 .deadline_remaining_seconds
1772 .map(PickleValue::Float)
1773 .unwrap_or(PickleValue::None),
1774 ),
1775 (
1776 PickleValue::String("drain_complete".into()),
1777 PickleValue::Bool(status.drain_complete),
1778 ),
1779 (
1780 PickleValue::String("interface_writer_queued_frames".into()),
1781 PickleValue::Int(status.interface_writer_queued_frames as i64),
1782 ),
1783 (
1784 PickleValue::String("provider_backlog_events".into()),
1785 PickleValue::Int(status.provider_backlog_events as i64),
1786 ),
1787 (
1788 PickleValue::String("provider_consumer_queued_events".into()),
1789 PickleValue::Int(status.provider_consumer_queued_events as i64),
1790 ),
1791 (
1792 PickleValue::String("detail".into()),
1793 status
1794 .detail
1795 .as_ref()
1796 .map(|detail| PickleValue::String(detail.clone()))
1797 .unwrap_or(PickleValue::None),
1798 ),
1799 ])
1800}
1801
1802fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1803 match value {
1804 RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1805 RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1806 RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1807 RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1808 RuntimeConfigValue::Null => PickleValue::None,
1809 }
1810}
1811
1812fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1813 match value {
1814 PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1815 PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1816 PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1817 PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1818 PickleValue::None => Some(RuntimeConfigValue::Null),
1819 _ => None,
1820 }
1821}
1822
1823fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1824 PickleValue::Dict(vec![
1825 (
1826 PickleValue::String("key".into()),
1827 PickleValue::String(entry.key.clone()),
1828 ),
1829 (
1830 PickleValue::String("value".into()),
1831 runtime_config_value_to_pickle(&entry.value),
1832 ),
1833 (
1834 PickleValue::String("default".into()),
1835 runtime_config_value_to_pickle(&entry.default),
1836 ),
1837 (
1838 PickleValue::String("source".into()),
1839 PickleValue::String(match entry.source {
1840 RuntimeConfigSource::Startup => "startup".into(),
1841 RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1842 }),
1843 ),
1844 (
1845 PickleValue::String("apply_mode".into()),
1846 PickleValue::String(match entry.apply_mode {
1847 RuntimeConfigApplyMode::Immediate => "immediate".into(),
1848 RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1849 RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1850 RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1851 }),
1852 ),
1853 (
1854 PickleValue::String("description".into()),
1855 entry
1856 .description
1857 .as_ref()
1858 .map(|v| PickleValue::String(v.clone()))
1859 .unwrap_or(PickleValue::None),
1860 ),
1861 ])
1862}
1863
1864fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1865 PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1866}
1867
1868fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1869 PickleValue::Dict(vec![
1870 (
1871 PickleValue::String("error".into()),
1872 PickleValue::String(match error.code {
1873 RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1874 RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1875 RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1876 RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1877 RuntimeConfigErrorCode::NotFound => "not_found".into(),
1878 RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1879 }),
1880 ),
1881 (
1882 PickleValue::String("message".into()),
1883 PickleValue::String(error.message.clone()),
1884 ),
1885 ])
1886}
1887
1888fn runtime_config_result_to_pickle(
1889 result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1890) -> PickleValue {
1891 match result {
1892 Ok(entry) => runtime_config_entry_to_pickle(&entry),
1893 Err(error) => runtime_config_error_to_pickle(&error),
1894 }
1895}
1896
1897fn known_destination_entry_to_pickle(entry: &KnownDestinationEntry) -> PickleValue {
1898 PickleValue::Dict(vec![
1899 (
1900 PickleValue::String("dest_hash".into()),
1901 PickleValue::Bytes(entry.dest_hash.to_vec()),
1902 ),
1903 (
1904 PickleValue::String("identity_hash".into()),
1905 PickleValue::Bytes(entry.identity_hash.to_vec()),
1906 ),
1907 (
1908 PickleValue::String("public_key".into()),
1909 PickleValue::Bytes(entry.public_key.to_vec()),
1910 ),
1911 (
1912 PickleValue::String("app_data".into()),
1913 entry
1914 .app_data
1915 .as_ref()
1916 .map(|data: &Vec<u8>| PickleValue::Bytes(data.clone()))
1917 .unwrap_or(PickleValue::None),
1918 ),
1919 (
1920 PickleValue::String("hops".into()),
1921 PickleValue::Int(entry.hops as i64),
1922 ),
1923 (
1924 PickleValue::String("received_at".into()),
1925 PickleValue::Float(entry.received_at),
1926 ),
1927 (
1928 PickleValue::String("receiving_interface".into()),
1929 PickleValue::Int(entry.receiving_interface.0 as i64),
1930 ),
1931 (
1932 PickleValue::String("was_used".into()),
1933 PickleValue::Bool(entry.was_used),
1934 ),
1935 (
1936 PickleValue::String("last_used_at".into()),
1937 entry
1938 .last_used_at
1939 .map(PickleValue::Float)
1940 .unwrap_or(PickleValue::None),
1941 ),
1942 (
1943 PickleValue::String("retained".into()),
1944 PickleValue::Bool(entry.retained),
1945 ),
1946 ])
1947}
1948
1949fn known_destinations_to_pickle(entries: &[KnownDestinationEntry]) -> PickleValue {
1950 PickleValue::List(
1951 entries
1952 .iter()
1953 .map(known_destination_entry_to_pickle)
1954 .collect(),
1955 )
1956}
1957
1958fn parse_known_destination_entry(value: &PickleValue) -> io::Result<KnownDestinationEntry> {
1959 let get_bytes = |key: &str, len: usize| -> io::Result<Vec<u8>> {
1960 let value = value.get(key).ok_or_else(|| {
1961 io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key))
1962 })?;
1963 let bytes = value.as_bytes().ok_or_else(|| {
1964 io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key))
1965 })?;
1966 if bytes.len() != len {
1967 return Err(io::Error::new(
1968 io::ErrorKind::InvalidData,
1969 format!("invalid {} length", key),
1970 ));
1971 }
1972 Ok(bytes.to_vec())
1973 };
1974 let get_int = |key: &str| -> io::Result<i64> {
1975 value
1976 .get(key)
1977 .and_then(|v| v.as_int())
1978 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1979 };
1980 let get_float = |key: &str| -> io::Result<f64> {
1981 value
1982 .get(key)
1983 .and_then(|v| v.as_float())
1984 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1985 };
1986 let get_bool = |key: &str| -> io::Result<bool> {
1987 value
1988 .get(key)
1989 .and_then(|v| v.as_bool())
1990 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1991 };
1992
1993 let mut dest_hash = [0u8; 16];
1994 dest_hash.copy_from_slice(&get_bytes("dest_hash", 16)?);
1995 let mut identity_hash = [0u8; 16];
1996 identity_hash.copy_from_slice(&get_bytes("identity_hash", 16)?);
1997 let mut public_key = [0u8; 64];
1998 public_key.copy_from_slice(&get_bytes("public_key", 64)?);
1999 let app_data = value
2000 .get("app_data")
2001 .and_then(|v| v.as_bytes())
2002 .map(|bytes| bytes.to_vec());
2003 let last_used_at = value.get("last_used_at").and_then(|v| v.as_float());
2004
2005 Ok(KnownDestinationEntry {
2006 dest_hash,
2007 identity_hash,
2008 public_key,
2009 app_data,
2010 hops: get_int("hops")? as u8,
2011 received_at: get_float("received_at")?,
2012 receiving_interface: rns_core::transport::types::InterfaceId(
2013 get_int("receiving_interface")? as u64,
2014 ),
2015 was_used: get_bool("was_used")?,
2016 last_used_at,
2017 retained: get_bool("retained")?,
2018 })
2019}
2020
2021fn parse_known_destination_list(value: &PickleValue) -> io::Result<Vec<KnownDestinationEntry>> {
2022 let list = value
2023 .as_list()
2024 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "expected list"))?;
2025 list.iter().map(parse_known_destination_entry).collect()
2026}
2027
2028pub struct RpcClient {
2032 stream: TcpStream,
2033}
2034
2035impl RpcClient {
2036 pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
2038 let mut stream = match addr {
2039 RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
2040 };
2041
2042 stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
2043 stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
2044
2045 client_auth(&mut stream, auth_key)?;
2047
2048 Ok(RpcClient { stream })
2049 }
2050
2051 pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
2053 let request_bytes = encode_rpc_payload(request, RpcCodec::Msgpack);
2054 send_bytes(&mut self.stream, &request_bytes)?;
2055
2056 let response_bytes = recv_bytes(&mut self.stream)?;
2057 decode_rpc_payload(&response_bytes).map(|(value, _)| value)
2058 }
2059
2060 pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
2061 let response = self.call(&PickleValue::Dict(vec![(
2062 PickleValue::String("get".into()),
2063 PickleValue::String("hooks".into()),
2064 )]))?;
2065 parse_hook_list(&response)
2066 }
2067
2068 pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
2069 let response = self.call(&PickleValue::Dict(vec![(
2070 PickleValue::String("begin_drain".into()),
2071 PickleValue::Float(timeout.as_secs_f64()),
2072 )]))?;
2073 Ok(response.as_bool().unwrap_or(false))
2074 }
2075
2076 pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
2077 let response = self.call(&PickleValue::Dict(vec![(
2078 PickleValue::String("get".into()),
2079 PickleValue::String("drain_status".into()),
2080 )]))?;
2081 parse_drain_status(&response)
2082 }
2083
2084 pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
2085 self.call(&PickleValue::Dict(vec![(
2086 PickleValue::String("get".into()),
2087 PickleValue::String("provider_bridge_stats".into()),
2088 )]))
2089 }
2090
2091 pub fn load_hook(
2092 &mut self,
2093 name: &str,
2094 attach_point: &str,
2095 priority: i32,
2096 wasm: &[u8],
2097 ) -> io::Result<Result<(), String>> {
2098 let response = self.call(&PickleValue::Dict(vec![
2099 (
2100 PickleValue::String("hook".into()),
2101 PickleValue::String("load".into()),
2102 ),
2103 (
2104 PickleValue::String("name".into()),
2105 PickleValue::String(name.to_string()),
2106 ),
2107 (
2108 PickleValue::String("attach_point".into()),
2109 PickleValue::String(attach_point.to_string()),
2110 ),
2111 (
2112 PickleValue::String("priority".into()),
2113 PickleValue::Int(priority as i64),
2114 ),
2115 (
2116 PickleValue::String("wasm".into()),
2117 PickleValue::Bytes(wasm.to_vec()),
2118 ),
2119 ]))?;
2120 parse_hook_result(&response)
2121 }
2122
2123 pub fn load_builtin_hook(
2124 &mut self,
2125 name: &str,
2126 attach_point: &str,
2127 priority: i32,
2128 builtin_id: &str,
2129 ) -> io::Result<Result<(), String>> {
2130 let response = self.call(&PickleValue::Dict(vec![
2131 (
2132 PickleValue::String("hook".into()),
2133 PickleValue::String("load_builtin".into()),
2134 ),
2135 (
2136 PickleValue::String("name".into()),
2137 PickleValue::String(name.to_string()),
2138 ),
2139 (
2140 PickleValue::String("attach_point".into()),
2141 PickleValue::String(attach_point.to_string()),
2142 ),
2143 (
2144 PickleValue::String("priority".into()),
2145 PickleValue::Int(priority as i64),
2146 ),
2147 (
2148 PickleValue::String("builtin_id".into()),
2149 PickleValue::String(builtin_id.to_string()),
2150 ),
2151 ]))?;
2152 parse_hook_result(&response)
2153 }
2154
2155 pub fn unload_hook(
2156 &mut self,
2157 name: &str,
2158 attach_point: &str,
2159 ) -> io::Result<Result<(), String>> {
2160 let response = self.call(&PickleValue::Dict(vec![
2161 (
2162 PickleValue::String("hook".into()),
2163 PickleValue::String("unload".into()),
2164 ),
2165 (
2166 PickleValue::String("name".into()),
2167 PickleValue::String(name.to_string()),
2168 ),
2169 (
2170 PickleValue::String("attach_point".into()),
2171 PickleValue::String(attach_point.to_string()),
2172 ),
2173 ]))?;
2174 parse_hook_result(&response)
2175 }
2176
2177 pub fn set_hook_enabled(
2178 &mut self,
2179 name: &str,
2180 attach_point: &str,
2181 enabled: bool,
2182 ) -> io::Result<Result<(), String>> {
2183 let op = if enabled { "enable" } else { "disable" };
2184 let response = self.call(&PickleValue::Dict(vec![
2185 (
2186 PickleValue::String("hook".into()),
2187 PickleValue::String(op.into()),
2188 ),
2189 (
2190 PickleValue::String("name".into()),
2191 PickleValue::String(name.to_string()),
2192 ),
2193 (
2194 PickleValue::String("attach_point".into()),
2195 PickleValue::String(attach_point.to_string()),
2196 ),
2197 ]))?;
2198 parse_hook_result(&response)
2199 }
2200
2201 pub fn set_hook_priority(
2202 &mut self,
2203 name: &str,
2204 attach_point: &str,
2205 priority: i32,
2206 ) -> io::Result<Result<(), String>> {
2207 let response = self.call(&PickleValue::Dict(vec![
2208 (
2209 PickleValue::String("hook".into()),
2210 PickleValue::String("set_priority".into()),
2211 ),
2212 (
2213 PickleValue::String("name".into()),
2214 PickleValue::String(name.to_string()),
2215 ),
2216 (
2217 PickleValue::String("attach_point".into()),
2218 PickleValue::String(attach_point.to_string()),
2219 ),
2220 (
2221 PickleValue::String("priority".into()),
2222 PickleValue::Int(priority as i64),
2223 ),
2224 ]))?;
2225 parse_hook_result(&response)
2226 }
2227
2228 pub fn blacklist_backbone_peer(
2229 &mut self,
2230 interface: &str,
2231 ip: &str,
2232 duration_secs: u64,
2233 reason: Option<&str>,
2234 penalty_level: Option<u8>,
2235 ) -> io::Result<bool> {
2236 let mut request = vec![
2237 (
2238 PickleValue::String("set".into()),
2239 PickleValue::String("backbone_peer_blacklist".into()),
2240 ),
2241 (
2242 PickleValue::String("interface".into()),
2243 PickleValue::String(interface.to_string()),
2244 ),
2245 (
2246 PickleValue::String("ip".into()),
2247 PickleValue::String(ip.to_string()),
2248 ),
2249 (
2250 PickleValue::String("duration_secs".into()),
2251 PickleValue::Int(duration_secs as i64),
2252 ),
2253 ];
2254 if let Some(reason) = reason {
2255 request.push((
2256 PickleValue::String("reason".into()),
2257 PickleValue::String(reason.to_string()),
2258 ));
2259 }
2260 if let Some(level) = penalty_level {
2261 request.push((
2262 PickleValue::String("penalty_level".into()),
2263 PickleValue::Int(level as i64),
2264 ));
2265 }
2266 let response = self.call(&PickleValue::Dict(request))?;
2267 Ok(response.as_bool().unwrap_or(false))
2268 }
2269
2270 pub fn known_destinations(&mut self) -> io::Result<Vec<KnownDestinationEntry>> {
2271 let response = self.call(&PickleValue::Dict(vec![(
2272 PickleValue::String("get".into()),
2273 PickleValue::String("known_destinations".into()),
2274 )]))?;
2275 parse_known_destination_list(&response)
2276 }
2277
2278 pub fn retain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2279 let response = self.call(&PickleValue::Dict(vec![
2280 (
2281 PickleValue::String("set".into()),
2282 PickleValue::String("known_destination_retained".into()),
2283 ),
2284 (
2285 PickleValue::String("dest_hash".into()),
2286 PickleValue::Bytes(dest_hash.to_vec()),
2287 ),
2288 ]))?;
2289 Ok(response.as_bool().unwrap_or(false))
2290 }
2291
2292 pub fn retain_identity(&mut self, identity_hash: [u8; 16]) -> io::Result<bool> {
2293 let response = self.call(&PickleValue::Dict(vec![
2294 (
2295 PickleValue::String("set".into()),
2296 PickleValue::String("identity_retained".into()),
2297 ),
2298 (
2299 PickleValue::String("identity_hash".into()),
2300 PickleValue::Bytes(identity_hash.to_vec()),
2301 ),
2302 ]))?;
2303 Ok(response.as_bool().unwrap_or(false))
2304 }
2305
2306 pub fn unretain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2307 let response = self.call(&PickleValue::Dict(vec![
2308 (
2309 PickleValue::String("clear".into()),
2310 PickleValue::String("known_destination_retained".into()),
2311 ),
2312 (
2313 PickleValue::String("dest_hash".into()),
2314 PickleValue::Bytes(dest_hash.to_vec()),
2315 ),
2316 ]))?;
2317 Ok(response.as_bool().unwrap_or(false))
2318 }
2319
2320 pub fn mark_known_destination_used(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2321 let response = self.call(&PickleValue::Dict(vec![
2322 (
2323 PickleValue::String("set".into()),
2324 PickleValue::String("known_destination_used".into()),
2325 ),
2326 (
2327 PickleValue::String("dest_hash".into()),
2328 PickleValue::Bytes(dest_hash.to_vec()),
2329 ),
2330 ]))?;
2331 Ok(response.as_bool().unwrap_or(false))
2332 }
2333}
2334
2335fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
2336 match value {
2337 "active" => Some(LifecycleState::Active),
2338 "draining" => Some(LifecycleState::Draining),
2339 "stopping" => Some(LifecycleState::Stopping),
2340 "stopped" => Some(LifecycleState::Stopped),
2341 _ => None,
2342 }
2343}
2344
2345fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
2346 if !matches!(value, PickleValue::Dict(_)) {
2347 return Ok(None);
2348 }
2349 let state = value
2350 .get("state")
2351 .and_then(|entry| entry.as_str())
2352 .and_then(parse_lifecycle_state)
2353 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
2354 let drain_age_seconds = value.get("drain_age_seconds").and_then(|entry| {
2355 entry
2356 .as_float()
2357 .or_else(|| entry.as_int().map(|v| v as f64))
2358 });
2359 let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
2360 entry
2361 .as_float()
2362 .or_else(|| entry.as_int().map(|v| v as f64))
2363 });
2364 let drain_complete = value
2365 .get("drain_complete")
2366 .and_then(|entry| entry.as_bool())
2367 .unwrap_or(false);
2368 let interface_writer_queued_frames = value
2369 .get("interface_writer_queued_frames")
2370 .and_then(|entry| entry.as_int())
2371 .unwrap_or(0)
2372 .max(0) as usize;
2373 let provider_backlog_events = value
2374 .get("provider_backlog_events")
2375 .and_then(|entry| entry.as_int())
2376 .unwrap_or(0)
2377 .max(0) as usize;
2378 let provider_consumer_queued_events = value
2379 .get("provider_consumer_queued_events")
2380 .and_then(|entry| entry.as_int())
2381 .unwrap_or(0)
2382 .max(0) as usize;
2383 let detail = value
2384 .get("detail")
2385 .and_then(|entry| entry.as_str().map(|v| v.to_string()));
2386 Ok(Some(DrainStatus {
2387 state,
2388 drain_age_seconds,
2389 deadline_remaining_seconds,
2390 drain_complete,
2391 interface_writer_queued_frames,
2392 provider_backlog_events,
2393 provider_consumer_queued_events,
2394 detail,
2395 }))
2396}
2397
2398fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
2399 let ok = response
2400 .get("ok")
2401 .and_then(|v| v.as_bool())
2402 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
2403 if ok {
2404 Ok(Ok(()))
2405 } else {
2406 Ok(Err(response
2407 .get("error")
2408 .and_then(|v| v.as_str())
2409 .unwrap_or("unknown hook error")
2410 .to_string()))
2411 }
2412}
2413
2414fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
2415 let list = response
2416 .as_list()
2417 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
2418 let mut hooks = Vec::with_capacity(list.len());
2419 for item in list {
2420 hooks.push(HookInfo {
2421 name: item
2422 .get("name")
2423 .and_then(|v| v.as_str())
2424 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
2425 .to_string(),
2426 hook_type: item
2427 .get("type")
2428 .and_then(|v| v.as_str())
2429 .unwrap_or(default_hook_type())
2430 .to_string(),
2431 attach_point: item
2432 .get("attach_point")
2433 .and_then(|v| v.as_str())
2434 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
2435 .to_string(),
2436 priority: item
2437 .get("priority")
2438 .and_then(|v| v.as_int())
2439 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
2440 as i32,
2441 enabled: item
2442 .get("enabled")
2443 .and_then(|v| v.as_bool())
2444 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
2445 consecutive_traps: item
2446 .get("consecutive_traps")
2447 .and_then(|v| v.as_int())
2448 .ok_or_else(|| {
2449 io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
2450 })? as u32,
2451 });
2452 }
2453 Ok(hooks)
2454}
2455
2456fn default_hook_type() -> &'static str {
2457 #[cfg(feature = "rns-hooks-native")]
2458 {
2459 return "native";
2460 }
2461 #[cfg(all(not(feature = "rns-hooks-native"), feature = "rns-hooks-wasm"))]
2462 {
2463 return "wasm";
2464 }
2465 #[cfg(all(not(feature = "rns-hooks-native"), not(feature = "rns-hooks-wasm")))]
2466 {
2467 "wasm"
2468 }
2469}
2470
2471fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
2473 let challenge = recv_bytes(stream)?;
2475
2476 if !challenge.starts_with(CHALLENGE_PREFIX) {
2477 return Err(io::Error::new(
2478 io::ErrorKind::InvalidData,
2479 "expected challenge",
2480 ));
2481 }
2482
2483 let message = &challenge[CHALLENGE_PREFIX.len()..];
2484
2485 let response = create_response(auth_key, message);
2487 send_bytes(stream, &response)?;
2488
2489 let result = recv_bytes(stream)?;
2491 if result == WELCOME {
2492 Ok(())
2493 } else {
2494 Err(io::Error::new(
2495 io::ErrorKind::PermissionDenied,
2496 "authentication failed",
2497 ))
2498 }
2499}
2500
2501fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
2503 if message.starts_with(b"{sha256}") || message.len() > 20 {
2505 let digest = hmac_sha256(auth_key, message);
2507 let mut response = Vec::with_capacity(8 + 32);
2508 response.extend_from_slice(b"{sha256}");
2509 response.extend_from_slice(&digest);
2510 response
2511 } else {
2512 let digest = hmac_md5(auth_key, message);
2514 digest.to_vec()
2515 }
2516}
2517
2518pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
2520 sha256(private_key)
2521}
2522
2523#[cfg(test)]
2524mod tests {
2525 use super::*;
2526
2527 #[test]
2528 fn send_recv_bytes_roundtrip() {
2529 let (mut c1, mut c2) = tcp_pair();
2530 let data = b"hello world";
2531 send_bytes(&mut c1, data).unwrap();
2532 let received = recv_bytes(&mut c2).unwrap();
2533 assert_eq!(&received, data);
2534 }
2535
2536 #[test]
2537 fn send_recv_empty() {
2538 let (mut c1, mut c2) = tcp_pair();
2539 send_bytes(&mut c1, b"").unwrap();
2540 let received = recv_bytes(&mut c2).unwrap();
2541 assert!(received.is_empty());
2542 }
2543
2544 #[test]
2545 fn auth_success() {
2546 let key = derive_auth_key(b"test-private-key");
2547 let (mut server, mut client) = tcp_pair();
2548
2549 let key2 = key;
2550 let t = thread::spawn(move || {
2551 client_auth(&mut client, &key2).unwrap();
2552 });
2553
2554 server_auth(&mut server, &key).unwrap();
2555 t.join().unwrap();
2556 }
2557
2558 #[test]
2559 fn auth_failure_wrong_key() {
2560 let server_key = derive_auth_key(b"server-key");
2561 let client_key = derive_auth_key(b"wrong-key");
2562 let (mut server, mut client) = tcp_pair();
2563
2564 let t = thread::spawn(move || {
2565 let result = client_auth(&mut client, &client_key);
2566 assert!(result.is_err());
2567 });
2568
2569 let result = server_auth(&mut server, &server_key);
2570 assert!(result.is_err());
2571 t.join().unwrap();
2572 }
2573
2574 #[test]
2575 fn verify_sha256_response() {
2576 let key = derive_auth_key(b"mykey");
2577 let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2578 let response = create_response(&key, message);
2579 assert!(response.starts_with(b"{sha256}"));
2580 assert!(verify_response(&key, message, &response));
2581 }
2582
2583 #[test]
2584 fn verify_legacy_md5_response() {
2585 let key = derive_auth_key(b"mykey");
2586 let message = b"01234567890123456789";
2588 let digest = hmac_md5(&key, message);
2590 assert!(verify_response(&key, message, &digest));
2591 }
2592
2593 #[test]
2594 fn constant_time_eq_works() {
2595 assert!(constant_time_eq(b"hello", b"hello"));
2596 assert!(!constant_time_eq(b"hello", b"world"));
2597 assert!(!constant_time_eq(b"hello", b"hell"));
2598 }
2599
2600 #[test]
2601 fn rpc_payload_accepts_msgpack_and_legacy_pickle() {
2602 let request = PickleValue::Dict(vec![
2603 (
2604 PickleValue::String("get".into()),
2605 PickleValue::String("link_count".into()),
2606 ),
2607 (PickleValue::String("max_hops".into()), PickleValue::Int(3)),
2608 ]);
2609
2610 let msgpack_bytes = encode_rpc_payload(&request, RpcCodec::Msgpack);
2611 let (decoded_msgpack, msgpack_codec) = decode_rpc_payload(&msgpack_bytes).unwrap();
2612 assert_eq!(decoded_msgpack, request);
2613 assert_eq!(msgpack_codec, RpcCodec::Msgpack);
2614
2615 let pickle_bytes = encode_rpc_payload(&request, RpcCodec::Pickle);
2616 let (decoded_pickle, pickle_codec) = decode_rpc_payload(&pickle_bytes).unwrap();
2617 assert_eq!(decoded_pickle, request);
2618 assert_eq!(pickle_codec, RpcCodec::Pickle);
2619 }
2620
2621 #[test]
2622 fn rpc_roundtrip() {
2623 let key = derive_auth_key(b"test-key");
2624 let (event_tx, event_rx) = crate::event::channel();
2625
2626 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2629 let port = listener.local_addr().unwrap().port();
2630 listener.set_nonblocking(true).unwrap();
2631
2632 let shutdown = Arc::new(AtomicBool::new(false));
2633 let shutdown2 = shutdown.clone();
2634
2635 let driver_thread = thread::spawn(move || loop {
2637 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2638 Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2639 let _ = resp_tx.send(QueryResponse::LinkCount(42));
2640 }
2641 Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2642 let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2643 interfaces: vec![SingleInterfaceStat {
2644 id: 7,
2645 name: "TestInterface".into(),
2646 status: true,
2647 mode: 1,
2648 rxb: 1000,
2649 txb: 2000,
2650 rx_packets: 10,
2651 tx_packets: 20,
2652 bitrate: Some(10_000_000),
2653 ifac_size: None,
2654 started: 1000.0,
2655 ia_freq: 0.0,
2656 ip_freq: 0.0,
2657 op_freq: 0.0,
2658 op_samples: 0,
2659 burst_active: false,
2660 burst_activated: 0.0,
2661 pr_burst_active: false,
2662 pr_burst_activated: 0.0,
2663 oa_freq: 0.0,
2664 clients: Some(2),
2665 announce_rate_target: Some(3600.0),
2666 announce_rate_grace: 5,
2667 announce_rate_penalty: 0.0,
2668 interface_type: "TestInterface".into(),
2669 }],
2670 transport_id: None,
2671 transport_enabled: true,
2672 transport_uptime: 3600.0,
2673 total_rxb: 1000,
2674 total_txb: 2000,
2675 probe_responder: None,
2676 backbone_peer_pool: None,
2677 }));
2678 }
2679 _ => break,
2680 }
2681 });
2682
2683 let key2 = key;
2684 let shutdown3 = shutdown2.clone();
2685 let server_thread = thread::spawn(move || {
2686 rpc_server_loop(listener, key2, event_tx, shutdown3);
2687 });
2688
2689 thread::sleep(std::time::Duration::from_millis(50));
2691
2692 let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2694 let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2695 let response = client
2696 .call(&PickleValue::Dict(vec![(
2697 PickleValue::String("get".into()),
2698 PickleValue::String("link_count".into()),
2699 )]))
2700 .unwrap();
2701 assert_eq!(response.as_int().unwrap(), 42);
2702 drop(client);
2703
2704 let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2706 let response2 = client2
2707 .call(&PickleValue::Dict(vec![(
2708 PickleValue::String("get".into()),
2709 PickleValue::String("interface_stats".into()),
2710 )]))
2711 .unwrap();
2712 let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2713 assert_eq!(ifaces.len(), 1);
2714 let iface = &ifaces[0];
2715 assert_eq!(
2716 iface.get("name").unwrap().as_str().unwrap(),
2717 "TestInterface"
2718 );
2719 assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2720 drop(client2);
2721
2722 shutdown2.store(true, Ordering::Relaxed);
2724 server_thread.join().unwrap();
2725 driver_thread.join().unwrap();
2726 }
2727
2728 #[test]
2729 fn derive_auth_key_deterministic() {
2730 let key1 = derive_auth_key(b"test");
2731 let key2 = derive_auth_key(b"test");
2732 assert_eq!(key1, key2);
2733 let key3 = derive_auth_key(b"other");
2735 assert_ne!(key1, key3);
2736 }
2737
2738 #[test]
2739 fn pickle_request_handling() {
2740 let (event_tx, event_rx) = crate::event::channel();
2742
2743 let driver = thread::spawn(move || {
2744 if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2745 {
2746 assert_eq!(dest_hash, [1u8; 16]);
2747 let _ = resp_tx.send(QueryResponse::DropPath(true));
2748 }
2749 });
2750
2751 let request = PickleValue::Dict(vec![
2752 (
2753 PickleValue::String("drop".into()),
2754 PickleValue::String("path".into()),
2755 ),
2756 (
2757 PickleValue::String("destination_hash".into()),
2758 PickleValue::Bytes(vec![1u8; 16]),
2759 ),
2760 ]);
2761
2762 let response = handle_rpc_request(&request, &event_tx).unwrap();
2763 assert_eq!(response, PickleValue::Bool(true));
2764 driver.join().unwrap();
2765 }
2766
2767 #[test]
2768 fn hook_list_request_handling() {
2769 let (event_tx, event_rx) = crate::event::channel();
2770
2771 let driver = thread::spawn(move || {
2772 if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2773 let _ = response_tx.send(vec![HookInfo {
2774 name: "stats".into(),
2775 hook_type: "wasm".into(),
2776 attach_point: "PreIngress".into(),
2777 priority: 7,
2778 enabled: true,
2779 consecutive_traps: 0,
2780 }]);
2781 }
2782 });
2783
2784 let request = PickleValue::Dict(vec![(
2785 PickleValue::String("get".into()),
2786 PickleValue::String("hooks".into()),
2787 )]);
2788 let response = handle_rpc_request(&request, &event_tx).unwrap();
2789 let hooks = parse_hook_list(&response).unwrap();
2790 assert_eq!(hooks.len(), 1);
2791 assert_eq!(hooks[0].name, "stats");
2792 driver.join().unwrap();
2793 }
2794
2795 #[test]
2796 fn hook_load_request_handling() {
2797 let (event_tx, event_rx) = crate::event::channel();
2798
2799 let driver = thread::spawn(move || {
2800 if let Ok(Event::LoadHook {
2801 name,
2802 wasm_bytes,
2803 attach_point,
2804 priority,
2805 response_tx,
2806 }) = event_rx.recv()
2807 {
2808 assert_eq!(name, "stats");
2809 assert_eq!(attach_point, "PreIngress");
2810 assert_eq!(priority, 11);
2811 assert_eq!(wasm_bytes, vec![1, 2, 3]);
2812 let _ = response_tx.send(Ok(()));
2813 }
2814 });
2815
2816 let request = PickleValue::Dict(vec![
2817 (
2818 PickleValue::String("hook".into()),
2819 PickleValue::String("load".into()),
2820 ),
2821 (
2822 PickleValue::String("name".into()),
2823 PickleValue::String("stats".into()),
2824 ),
2825 (
2826 PickleValue::String("attach_point".into()),
2827 PickleValue::String("PreIngress".into()),
2828 ),
2829 (PickleValue::String("priority".into()), PickleValue::Int(11)),
2830 (
2831 PickleValue::String("wasm".into()),
2832 PickleValue::Bytes(vec![1, 2, 3]),
2833 ),
2834 ]);
2835 let response = handle_rpc_request(&request, &event_tx).unwrap();
2836 assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2837 driver.join().unwrap();
2838 }
2839
2840 #[test]
2841 fn interface_stats_pickle_format() {
2842 let stats = InterfaceStatsResponse {
2843 interfaces: vec![SingleInterfaceStat {
2844 id: 1,
2845 name: "TCP".into(),
2846 status: true,
2847 mode: 1,
2848 rxb: 100,
2849 txb: 200,
2850 rx_packets: 5,
2851 tx_packets: 10,
2852 bitrate: Some(1000000),
2853 ifac_size: Some(16),
2854 started: 1000.0,
2855 ia_freq: 1.0,
2856 ip_freq: 2.0,
2857 op_freq: 3.0,
2858 op_samples: 0,
2859 burst_active: true,
2860 burst_activated: 1200.0,
2861 pr_burst_active: true,
2862 pr_burst_activated: 1300.0,
2863 oa_freq: 4.0,
2864 clients: Some(3),
2865 announce_rate_target: Some(3600.0),
2866 announce_rate_grace: 5,
2867 announce_rate_penalty: 0.0,
2868 interface_type: "TCPClientInterface".into(),
2869 }],
2870 transport_id: Some([0xAB; 16]),
2871 transport_enabled: true,
2872 transport_uptime: 3600.0,
2873 total_rxb: 100,
2874 total_txb: 200,
2875 probe_responder: None,
2876 backbone_peer_pool: None,
2877 };
2878
2879 let pickle = interface_stats_to_pickle(&stats);
2880
2881 let encoded = pickle::encode(&pickle);
2883 let decoded = pickle::decode(&encoded).unwrap();
2884 assert_eq!(
2885 decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2886 true
2887 );
2888 let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2889 assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2890 assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2891 assert_eq!(ifaces[0].get("ia_freq").unwrap().as_float().unwrap(), 1.0);
2892 assert_eq!(ifaces[0].get("ip_freq").unwrap().as_float().unwrap(), 2.0);
2893 assert_eq!(ifaces[0].get("op_freq").unwrap().as_float().unwrap(), 3.0);
2894 assert_eq!(ifaces[0].get("oa_freq").unwrap().as_float().unwrap(), 4.0);
2895 assert_eq!(
2896 ifaces[0].get("burst_active").unwrap().as_bool().unwrap(),
2897 true
2898 );
2899 assert_eq!(
2900 ifaces[0]
2901 .get("burst_activated")
2902 .unwrap()
2903 .as_float()
2904 .unwrap(),
2905 1200.0
2906 );
2907 assert_eq!(
2908 ifaces[0].get("pr_burst_active").unwrap().as_bool().unwrap(),
2909 true
2910 );
2911 assert_eq!(
2912 ifaces[0]
2913 .get("pr_burst_activated")
2914 .unwrap()
2915 .as_float()
2916 .unwrap(),
2917 1300.0
2918 );
2919 assert_eq!(
2920 ifaces[0]
2921 .get("announce_rate_target")
2922 .unwrap()
2923 .as_float()
2924 .unwrap(),
2925 3600.0
2926 );
2927 assert_eq!(
2928 ifaces[0]
2929 .get("announce_rate_grace")
2930 .unwrap()
2931 .as_int()
2932 .unwrap(),
2933 5
2934 );
2935 assert_eq!(
2936 ifaces[0]
2937 .get("announce_rate_penalty")
2938 .unwrap()
2939 .as_float()
2940 .unwrap(),
2941 0.0
2942 );
2943 assert_eq!(ifaces[0].get("clients").unwrap().as_int().unwrap(), 3);
2944 }
2945
2946 #[test]
2947 fn interface_stats_pickle_includes_backbone_peer_pool_priority() {
2948 let stats = InterfaceStatsResponse {
2949 interfaces: vec![],
2950 transport_id: None,
2951 transport_enabled: true,
2952 transport_uptime: 1.0,
2953 total_rxb: 0,
2954 total_txb: 0,
2955 probe_responder: None,
2956 backbone_peer_pool: Some(crate::event::BackbonePeerPoolStatus {
2957 max_connected: 1,
2958 active_count: 1,
2959 standby_count: 0,
2960 cooldown_count: 0,
2961 members: vec![crate::event::BackbonePeerPoolMemberStatus {
2962 name: "peer".into(),
2963 remote: "127.0.0.1:4242".into(),
2964 source: "configured".into(),
2965 priority: 77,
2966 state: "active".into(),
2967 interface_id: Some(42),
2968 failure_count: 0,
2969 last_error: None,
2970 cooldown_remaining_seconds: None,
2971 }],
2972 }),
2973 };
2974
2975 let pickle = interface_stats_to_pickle(&stats);
2976 let encoded = pickle::encode(&pickle);
2977 let decoded = pickle::decode(&encoded).unwrap();
2978 let pool = decoded.get("backbone_peer_pool").unwrap();
2979 let members = pool.get("members").unwrap().as_list().unwrap();
2980 assert_eq!(members[0].get("priority").unwrap().as_int().unwrap(), 77);
2981 }
2982
2983 #[test]
2984 fn send_probe_rpc_unknown_dest() {
2985 let (event_tx, event_rx) = crate::event::channel();
2986
2987 let driver = thread::spawn(move || {
2988 if let Ok(Event::Query(
2989 QueryRequest::SendProbe {
2990 dest_hash,
2991 payload_size,
2992 },
2993 resp_tx,
2994 )) = event_rx.recv()
2995 {
2996 assert_eq!(dest_hash, [0xAA; 16]);
2997 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
2999 }
3000 });
3001
3002 let request = PickleValue::Dict(vec![(
3003 PickleValue::String("send_probe".into()),
3004 PickleValue::Bytes(vec![0xAA; 16]),
3005 )]);
3006
3007 let response = handle_rpc_request(&request, &event_tx).unwrap();
3008 assert_eq!(response, PickleValue::None);
3009 driver.join().unwrap();
3010 }
3011
3012 #[test]
3013 fn send_probe_rpc_with_result() {
3014 let (event_tx, event_rx) = crate::event::channel();
3015
3016 let packet_hash = [0xBB; 32];
3017 let driver = thread::spawn(move || {
3018 if let Ok(Event::Query(
3019 QueryRequest::SendProbe {
3020 dest_hash,
3021 payload_size,
3022 },
3023 resp_tx,
3024 )) = event_rx.recv()
3025 {
3026 assert_eq!(dest_hash, [0xCC; 16]);
3027 assert_eq!(payload_size, 32);
3028 let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
3029 }
3030 });
3031
3032 let request = PickleValue::Dict(vec![
3033 (
3034 PickleValue::String("send_probe".into()),
3035 PickleValue::Bytes(vec![0xCC; 16]),
3036 ),
3037 (PickleValue::String("size".into()), PickleValue::Int(32)),
3038 ]);
3039
3040 let response = handle_rpc_request(&request, &event_tx).unwrap();
3041 let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
3042 assert_eq!(ph, &[0xBB; 32]);
3043 assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
3044 driver.join().unwrap();
3045 }
3046
3047 #[test]
3048 fn retain_identity_rpc_sends_identity_hash_query() {
3049 let (event_tx, event_rx) = crate::event::channel();
3050 let identity_hash = [0x44; 16];
3051
3052 let driver = thread::spawn(move || {
3053 if let Ok(Event::Query(
3054 QueryRequest::RetainIdentity {
3055 identity_hash: hash,
3056 },
3057 resp_tx,
3058 )) = event_rx.recv()
3059 {
3060 assert_eq!(hash, identity_hash);
3061 let _ = resp_tx.send(QueryResponse::RetainIdentity(true));
3062 }
3063 });
3064
3065 let request = PickleValue::Dict(vec![
3066 (
3067 PickleValue::String("set".into()),
3068 PickleValue::String("identity_retained".into()),
3069 ),
3070 (
3071 PickleValue::String("identity_hash".into()),
3072 PickleValue::Bytes(identity_hash.to_vec()),
3073 ),
3074 ]);
3075
3076 let response = handle_rpc_request(&request, &event_tx).unwrap();
3077 assert_eq!(response, PickleValue::Bool(true));
3078 driver.join().unwrap();
3079 }
3080
3081 #[test]
3082 fn send_probe_rpc_size_validation() {
3083 let (event_tx, event_rx) = crate::event::channel();
3084
3085 let driver = thread::spawn(move || {
3087 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
3088 event_rx.recv()
3089 {
3090 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
3092 }
3093 });
3094
3095 let request = PickleValue::Dict(vec![
3096 (
3097 PickleValue::String("send_probe".into()),
3098 PickleValue::Bytes(vec![0xDD; 16]),
3099 ),
3100 (PickleValue::String("size".into()), PickleValue::Int(-1)),
3101 ]);
3102
3103 let response = handle_rpc_request(&request, &event_tx).unwrap();
3104 assert_eq!(response, PickleValue::None);
3105 driver.join().unwrap();
3106 }
3107
3108 #[test]
3109 fn send_probe_rpc_size_too_large() {
3110 let (event_tx, event_rx) = crate::event::channel();
3111
3112 let driver = thread::spawn(move || {
3114 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
3115 event_rx.recv()
3116 {
3117 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
3119 }
3120 });
3121
3122 let request = PickleValue::Dict(vec![
3123 (
3124 PickleValue::String("send_probe".into()),
3125 PickleValue::Bytes(vec![0xDD; 16]),
3126 ),
3127 (PickleValue::String("size".into()), PickleValue::Int(999)),
3128 ]);
3129
3130 let response = handle_rpc_request(&request, &event_tx).unwrap();
3131 assert_eq!(response, PickleValue::None);
3132 driver.join().unwrap();
3133 }
3134
3135 #[test]
3136 fn check_proof_rpc_not_found() {
3137 let (event_tx, event_rx) = crate::event::channel();
3138
3139 let driver = thread::spawn(move || {
3140 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
3141 event_rx.recv()
3142 {
3143 assert_eq!(packet_hash, [0xEE; 32]);
3144 let _ = resp_tx.send(QueryResponse::CheckProof(None));
3145 }
3146 });
3147
3148 let request = PickleValue::Dict(vec![(
3149 PickleValue::String("check_proof".into()),
3150 PickleValue::Bytes(vec![0xEE; 32]),
3151 )]);
3152
3153 let response = handle_rpc_request(&request, &event_tx).unwrap();
3154 assert_eq!(response, PickleValue::None);
3155 driver.join().unwrap();
3156 }
3157
3158 #[test]
3159 fn check_proof_rpc_found() {
3160 let (event_tx, event_rx) = crate::event::channel();
3161
3162 let driver = thread::spawn(move || {
3163 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
3164 event_rx.recv()
3165 {
3166 assert_eq!(packet_hash, [0xFF; 32]);
3167 let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
3168 }
3169 });
3170
3171 let request = PickleValue::Dict(vec![(
3172 PickleValue::String("check_proof".into()),
3173 PickleValue::Bytes(vec![0xFF; 32]),
3174 )]);
3175
3176 let response = handle_rpc_request(&request, &event_tx).unwrap();
3177 if let PickleValue::Float(rtt) = response {
3178 assert!((rtt - 0.352).abs() < 0.001);
3179 } else {
3180 panic!("Expected Float, got {:?}", response);
3181 }
3182 driver.join().unwrap();
3183 }
3184
3185 #[test]
3186 fn request_path_rpc() {
3187 let (event_tx, event_rx) = crate::event::channel();
3188
3189 let driver =
3190 thread::spawn(
3191 move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
3192 Ok(Event::RequestPath { dest_hash }) => {
3193 assert_eq!(dest_hash, [0x11; 16]);
3194 }
3195 other => panic!("Expected RequestPath event, got {:?}", other),
3196 },
3197 );
3198
3199 let request = PickleValue::Dict(vec![(
3200 PickleValue::String("request_path".into()),
3201 PickleValue::Bytes(vec![0x11; 16]),
3202 )]);
3203
3204 let response = handle_rpc_request(&request, &event_tx).unwrap();
3205 assert_eq!(response, PickleValue::Bool(true));
3206 driver.join().unwrap();
3207 }
3208
3209 #[test]
3210 fn begin_drain_rpc_emits_event() {
3211 let (event_tx, event_rx) = crate::event::channel();
3212
3213 let driver = thread::spawn(
3214 move || match event_rx.recv_timeout(Duration::from_secs(5)) {
3215 Ok(Event::BeginDrain { timeout }) => {
3216 assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
3217 }
3218 other => panic!("Expected BeginDrain event, got {:?}", other),
3219 },
3220 );
3221
3222 let request = PickleValue::Dict(vec![(
3223 PickleValue::String("begin_drain".into()),
3224 PickleValue::Float(1.5),
3225 )]);
3226
3227 let response = handle_rpc_request(&request, &event_tx).unwrap();
3228 assert_eq!(response, PickleValue::Bool(true));
3229 driver.join().unwrap();
3230 }
3231
3232 #[test]
3233 fn drain_status_rpc_roundtrips_fields() {
3234 let (event_tx, event_rx) = crate::event::channel();
3235
3236 let driver = thread::spawn(move || {
3237 if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
3238 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
3239 state: LifecycleState::Draining,
3240 drain_age_seconds: Some(0.75),
3241 deadline_remaining_seconds: Some(2.25),
3242 drain_complete: false,
3243 interface_writer_queued_frames: 3,
3244 provider_backlog_events: 4,
3245 provider_consumer_queued_events: 5,
3246 detail: Some("node is draining existing work".into()),
3247 }));
3248 }
3249 });
3250
3251 let request = PickleValue::Dict(vec![(
3252 PickleValue::String("get".into()),
3253 PickleValue::String("drain_status".into()),
3254 )]);
3255
3256 let response = handle_rpc_request(&request, &event_tx).unwrap();
3257 assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
3258 assert_eq!(
3259 response.get("drain_complete").unwrap().as_bool(),
3260 Some(false)
3261 );
3262 assert_eq!(
3263 response
3264 .get("deadline_remaining_seconds")
3265 .unwrap()
3266 .as_float(),
3267 Some(2.25)
3268 );
3269 assert_eq!(
3270 response
3271 .get("interface_writer_queued_frames")
3272 .unwrap()
3273 .as_int(),
3274 Some(3)
3275 );
3276 assert_eq!(
3277 response.get("provider_backlog_events").unwrap().as_int(),
3278 Some(4)
3279 );
3280 assert_eq!(
3281 response
3282 .get("provider_consumer_queued_events")
3283 .unwrap()
3284 .as_int(),
3285 Some(5)
3286 );
3287 assert_eq!(
3288 response.get("detail").unwrap().as_str(),
3289 Some("node is draining existing work")
3290 );
3291 driver.join().unwrap();
3292 }
3293
3294 #[test]
3295 fn interface_stats_with_probe_responder() {
3296 let probe_hash = [0x42; 16];
3297 let stats = InterfaceStatsResponse {
3298 interfaces: vec![],
3299 transport_id: None,
3300 transport_enabled: true,
3301 transport_uptime: 100.0,
3302 total_rxb: 0,
3303 total_txb: 0,
3304 probe_responder: Some(probe_hash),
3305 backbone_peer_pool: None,
3306 };
3307
3308 let pickle = interface_stats_to_pickle(&stats);
3309 let encoded = pickle::encode(&pickle);
3310 let decoded = pickle::decode(&encoded).unwrap();
3311
3312 let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
3313 assert_eq!(pr, &probe_hash);
3314 }
3315
3316 #[test]
3317 fn runtime_config_get_and_set_rpc() {
3318 let (event_tx, event_rx) = crate::event::channel();
3319
3320 let driver = thread::spawn(move || {
3321 if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
3322 event_rx.recv()
3323 {
3324 assert_eq!(key, "global.tick_interval_ms");
3325 let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
3326 RuntimeConfigEntry {
3327 key,
3328 value: RuntimeConfigValue::Int(1000),
3329 default: RuntimeConfigValue::Int(1000),
3330 source: RuntimeConfigSource::Startup,
3331 apply_mode: RuntimeConfigApplyMode::Immediate,
3332 description: Some("tick".into()),
3333 },
3334 )));
3335 } else {
3336 panic!("expected GetRuntimeConfig query");
3337 }
3338
3339 if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
3340 event_rx.recv()
3341 {
3342 assert_eq!(key, "global.tick_interval_ms");
3343 assert_eq!(value, RuntimeConfigValue::Int(250));
3344 let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
3345 key,
3346 value: RuntimeConfigValue::Int(250),
3347 default: RuntimeConfigValue::Int(1000),
3348 source: RuntimeConfigSource::RuntimeOverride,
3349 apply_mode: RuntimeConfigApplyMode::Immediate,
3350 description: Some("tick".into()),
3351 })));
3352 } else {
3353 panic!("expected SetRuntimeConfig query");
3354 }
3355 });
3356
3357 let get_request = PickleValue::Dict(vec![
3358 (
3359 PickleValue::String("get".into()),
3360 PickleValue::String("runtime_config_entry".into()),
3361 ),
3362 (
3363 PickleValue::String("key".into()),
3364 PickleValue::String("global.tick_interval_ms".into()),
3365 ),
3366 ]);
3367 let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
3368 assert_eq!(
3369 get_response.get("key").and_then(|v| v.as_str()),
3370 Some("global.tick_interval_ms")
3371 );
3372
3373 let set_request = PickleValue::Dict(vec![
3374 (
3375 PickleValue::String("set".into()),
3376 PickleValue::String("runtime_config".into()),
3377 ),
3378 (
3379 PickleValue::String("key".into()),
3380 PickleValue::String("global.tick_interval_ms".into()),
3381 ),
3382 (PickleValue::String("value".into()), PickleValue::Int(250)),
3383 ]);
3384 let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
3385 assert_eq!(
3386 set_response.get("value").and_then(|v| v.as_int()),
3387 Some(250)
3388 );
3389
3390 driver.join().unwrap();
3391 }
3392
3393 fn tcp_pair() -> (TcpStream, TcpStream) {
3395 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3396 let port = listener.local_addr().unwrap().port();
3397 let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
3398 let (server, _) = listener.accept().unwrap();
3399 client
3400 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3401 .unwrap();
3402 server
3403 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3404 .unwrap();
3405 (server, client)
3406 }
3407}