Skip to main content

rns_net/
rpc.rs

1//! RPC server and client for cross-process daemon communication.
2//!
3//! Implements Python `multiprocessing.connection` wire protocol:
4//! - 4-byte big-endian signed i32 length prefix + payload
5//! - HMAC-SHA256 challenge-response authentication
6//! - Pickle serialization for request/response dictionaries
7//!
8//! Server translates pickle dicts into [`QueryRequest`] events, sends
9//! them through the driver event channel, and returns pickle responses.
10
11use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15    atomic::{AtomicBool, Ordering},
16    Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_crypto::hmac::hmac_sha256;
22use rns_crypto::sha256::sha256;
23
24use crate::event::{
25    BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
26    HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, PathTableEntry,
27    ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode,
28    RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource,
29    RuntimeConfigValue, SingleInterfaceStat,
30};
31use crate::md5::hmac_md5;
32use crate::pickle::{self, PickleValue};
33
34const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
35const WELCOME: &[u8] = b"#WELCOME#";
36const FAILURE: &[u8] = b"#FAILURE#";
37const CHALLENGE_LEN: usize = 40;
38
39/// RPC address types.
40#[derive(Debug, Clone)]
41pub enum RpcAddr {
42    Tcp(String, u16),
43}
44
45/// RPC server that listens for incoming connections and handles queries.
46pub struct RpcServer {
47    shutdown: Arc<AtomicBool>,
48    thread: Option<thread::JoinHandle<()>>,
49}
50
51impl RpcServer {
52    /// Start the RPC server on the given address.
53    pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
54        let shutdown = Arc::new(AtomicBool::new(false));
55        let shutdown2 = shutdown.clone();
56
57        let listener = match addr {
58            RpcAddr::Tcp(host, port) => {
59                let l = TcpListener::bind((host.as_str(), *port))?;
60                // Non-blocking so we can check shutdown flag
61                l.set_nonblocking(true)?;
62                l
63            }
64        };
65
66        let thread = thread::Builder::new()
67            .name("rpc-server".into())
68            .spawn(move || {
69                rpc_server_loop(listener, auth_key, event_tx, shutdown2);
70            })
71            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
72
73        Ok(RpcServer {
74            shutdown,
75            thread: Some(thread),
76        })
77    }
78
79    /// Stop the RPC server.
80    pub fn stop(&mut self) {
81        self.shutdown.store(true, Ordering::Relaxed);
82        if let Some(handle) = self.thread.take() {
83            let _ = handle.join();
84        }
85    }
86}
87
88impl Drop for RpcServer {
89    fn drop(&mut self) {
90        self.stop();
91    }
92}
93
94fn rpc_server_loop(
95    listener: TcpListener,
96    auth_key: [u8; 32],
97    event_tx: EventSender,
98    shutdown: Arc<AtomicBool>,
99) {
100    loop {
101        if shutdown.load(Ordering::Relaxed) {
102            break;
103        }
104
105        match listener.accept() {
106            Ok((stream, _addr)) => {
107                // Set blocking for this connection
108                let _ = stream.set_nonblocking(false);
109                let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
110                let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
111
112                if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
113                    log::debug!("RPC connection error: {}", e);
114                }
115            }
116            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117                // No pending connection, sleep briefly and retry
118                thread::sleep(std::time::Duration::from_millis(100));
119            }
120            Err(e) => {
121                log::error!("RPC accept error: {}", e);
122                thread::sleep(std::time::Duration::from_millis(100));
123            }
124        }
125    }
126}
127
128fn handle_connection(
129    mut stream: TcpStream,
130    auth_key: &[u8; 32],
131    event_tx: &EventSender,
132) -> io::Result<()> {
133    // Authentication: send challenge, verify response
134    server_auth(&mut stream, auth_key)?;
135
136    // Read request (pickle dict)
137    let request_bytes = recv_bytes(&mut stream)?;
138    let request = pickle::decode(&request_bytes)
139        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
140
141    // Translate pickle dict to query, send to driver, get response
142    let response = handle_rpc_request(&request, event_tx)?;
143
144    // Encode response and send
145    let response_bytes = pickle::encode(&response);
146    send_bytes(&mut stream, &response_bytes)?;
147
148    Ok(())
149}
150
151/// Server-side authentication: challenge-response.
152fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
153    // Generate challenge: #CHALLENGE#{sha256}<40 random bytes>
154    let mut random_bytes = [0u8; CHALLENGE_LEN];
155    // Use /dev/urandom for randomness
156    {
157        let mut f = std::fs::File::open("/dev/urandom")?;
158        f.read_exact(&mut random_bytes)?;
159    }
160
161    let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
162    challenge_message.extend_from_slice(CHALLENGE_PREFIX);
163    challenge_message.extend_from_slice(b"{sha256}");
164    challenge_message.extend_from_slice(&random_bytes);
165
166    send_bytes(stream, &challenge_message)?;
167
168    // Read response (max 256 bytes)
169    let response = recv_bytes(stream)?;
170
171    // Verify response
172    // The message to HMAC is everything after #CHALLENGE# (i.e. {sha256}<random>)
173    let message = &challenge_message[CHALLENGE_PREFIX.len()..];
174
175    if verify_response(auth_key, message, &response) {
176        send_bytes(stream, WELCOME)?;
177        Ok(())
178    } else {
179        send_bytes(stream, FAILURE)?;
180        Err(io::Error::new(
181            io::ErrorKind::PermissionDenied,
182            "auth failed",
183        ))
184    }
185}
186
187/// Verify a client's HMAC response.
188fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
189    // Modern protocol: response = {sha256}<hmac-sha256 digest>
190    if response.starts_with(b"{sha256}") {
191        let digest = &response[8..];
192        let expected = hmac_sha256(auth_key, message);
193        constant_time_eq(digest, &expected)
194    }
195    // Legacy protocol: response = raw 16-byte HMAC-MD5 digest
196    else if response.len() == 16 {
197        let expected = hmac_md5(auth_key, message);
198        constant_time_eq(response, &expected)
199    }
200    // Legacy with {md5} prefix
201    else if response.starts_with(b"{md5}") {
202        let digest = &response[5..];
203        let expected = hmac_md5(auth_key, message);
204        constant_time_eq(digest, &expected)
205    } else {
206        false
207    }
208}
209
210/// Constant-time byte comparison.
211fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
212    if a.len() != b.len() {
213        return false;
214    }
215    let mut diff = 0u8;
216    for (x, y) in a.iter().zip(b.iter()) {
217        diff |= x ^ y;
218    }
219    diff == 0
220}
221
222/// Send bytes with 4-byte big-endian length prefix.
223fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
224    let len = data.len() as i32;
225    stream.write_all(&len.to_be_bytes())?;
226    stream.write_all(data)?;
227    stream.flush()
228}
229
230/// Receive bytes with 4-byte big-endian length prefix.
231fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
232    let mut len_buf = [0u8; 4];
233    stream.read_exact(&mut len_buf)?;
234    let len = i32::from_be_bytes(len_buf);
235
236    if len < 0 {
237        // Extended format: 8-byte length
238        let mut len8_buf = [0u8; 8];
239        stream.read_exact(&mut len8_buf)?;
240        let len = u64::from_be_bytes(len8_buf) as usize;
241        if len > 64 * 1024 * 1024 {
242            return Err(io::Error::new(
243                io::ErrorKind::InvalidData,
244                "message too large",
245            ));
246        }
247        let mut buf = vec![0u8; len];
248        stream.read_exact(&mut buf)?;
249        Ok(buf)
250    } else {
251        let len = len as usize;
252        if len > 64 * 1024 * 1024 {
253            return Err(io::Error::new(
254                io::ErrorKind::InvalidData,
255                "message too large",
256            ));
257        }
258        let mut buf = vec![0u8; len];
259        stream.read_exact(&mut buf)?;
260        Ok(buf)
261    }
262}
263
264/// Translate a pickle request dict to a query event and get response.
265fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
266    // Handle "get" requests
267    if let Some(get_val) = request.get("get") {
268        if let Some(path) = get_val.as_str() {
269            return match path {
270                "interface_stats" => {
271                    let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
272                    if let QueryResponse::InterfaceStats(stats) = resp {
273                        Ok(interface_stats_to_pickle(&stats))
274                    } else {
275                        Ok(PickleValue::None)
276                    }
277                }
278                "path_table" => {
279                    let max_hops = request
280                        .get("max_hops")
281                        .and_then(|v| v.as_int().map(|n| n as u8));
282                    let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
283                    if let QueryResponse::PathTable(entries) = resp {
284                        Ok(path_table_to_pickle(&entries))
285                    } else {
286                        Ok(PickleValue::None)
287                    }
288                }
289                "rate_table" => {
290                    let resp = send_query(event_tx, QueryRequest::RateTable)?;
291                    if let QueryResponse::RateTable(entries) = resp {
292                        Ok(rate_table_to_pickle(&entries))
293                    } else {
294                        Ok(PickleValue::None)
295                    }
296                }
297                "next_hop" => {
298                    let hash = extract_dest_hash(request, "destination_hash")?;
299                    let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
300                    if let QueryResponse::NextHop(Some(nh)) = resp {
301                        Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
302                    } else {
303                        Ok(PickleValue::None)
304                    }
305                }
306                "next_hop_if_name" => {
307                    let hash = extract_dest_hash(request, "destination_hash")?;
308                    let resp =
309                        send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
310                    if let QueryResponse::NextHopIfName(Some(name)) = resp {
311                        Ok(PickleValue::String(name))
312                    } else {
313                        Ok(PickleValue::None)
314                    }
315                }
316                "link_count" => {
317                    let resp = send_query(event_tx, QueryRequest::LinkCount)?;
318                    if let QueryResponse::LinkCount(n) = resp {
319                        Ok(PickleValue::Int(n as i64))
320                    } else {
321                        Ok(PickleValue::None)
322                    }
323                }
324                "transport_identity" => {
325                    let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
326                    if let QueryResponse::TransportIdentity(Some(hash)) = resp {
327                        Ok(PickleValue::Bytes(hash.to_vec()))
328                    } else {
329                        Ok(PickleValue::None)
330                    }
331                }
332                "blackholed" => {
333                    let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
334                    if let QueryResponse::Blackholed(entries) = resp {
335                        Ok(blackholed_to_pickle(&entries))
336                    } else {
337                        Ok(PickleValue::None)
338                    }
339                }
340                "discovered_interfaces" => {
341                    let only_available = request
342                        .get("only_available")
343                        .and_then(|v| v.as_bool())
344                        .unwrap_or(false);
345                    let only_transport = request
346                        .get("only_transport")
347                        .and_then(|v| v.as_bool())
348                        .unwrap_or(false);
349                    let resp = send_query(
350                        event_tx,
351                        QueryRequest::DiscoveredInterfaces {
352                            only_available,
353                            only_transport,
354                        },
355                    )?;
356                    if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
357                        Ok(discovered_interfaces_to_pickle(&interfaces))
358                    } else {
359                        Ok(PickleValue::None)
360                    }
361                }
362                "hooks" => {
363                    let (response_tx, response_rx) = mpsc::channel();
364                    event_tx
365                        .send(Event::ListHooks { response_tx })
366                        .map_err(|_| {
367                            io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
368                        })?;
369                    let hooks = response_rx
370                        .recv_timeout(std::time::Duration::from_secs(5))
371                        .map_err(|_| {
372                            io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
373                        })?;
374                    Ok(hooks_to_pickle(&hooks))
375                }
376                "runtime_config" => {
377                    let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
378                    if let QueryResponse::RuntimeConfigList(entries) = resp {
379                        Ok(runtime_config_list_to_pickle(&entries))
380                    } else {
381                        Ok(PickleValue::None)
382                    }
383                }
384                "known_destinations" => {
385                    let resp = send_query(event_tx, QueryRequest::KnownDestinations)?;
386                    if let QueryResponse::KnownDestinations(entries) = resp {
387                        Ok(known_destinations_to_pickle(&entries))
388                    } else {
389                        Ok(PickleValue::None)
390                    }
391                }
392                "runtime_config_entry" => {
393                    let key = request
394                        .get("key")
395                        .and_then(|v| v.as_str())
396                        .unwrap_or_default()
397                        .to_string();
398                    let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
399                    if let QueryResponse::RuntimeConfigEntry(entry) = resp {
400                        Ok(entry
401                            .as_ref()
402                            .map(runtime_config_entry_to_pickle)
403                            .unwrap_or(PickleValue::None))
404                    } else {
405                        Ok(PickleValue::None)
406                    }
407                }
408                "backbone_peer_state" => {
409                    let interface_name = request
410                        .get("interface")
411                        .and_then(|v| v.as_str())
412                        .map(|s| s.to_string());
413                    let resp =
414                        send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
415                    if let QueryResponse::BackbonePeerState(entries) = resp {
416                        Ok(backbone_peer_state_to_pickle(&entries))
417                    } else {
418                        Ok(PickleValue::None)
419                    }
420                }
421                "backbone_interfaces" => {
422                    let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
423                    if let QueryResponse::BackboneInterfaces(entries) = resp {
424                        Ok(backbone_interfaces_to_pickle(&entries))
425                    } else {
426                        Ok(PickleValue::None)
427                    }
428                }
429                "provider_bridge_stats" => {
430                    let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
431                    if let QueryResponse::ProviderBridgeStats(stats) = resp {
432                        Ok(stats
433                            .as_ref()
434                            .map(provider_bridge_stats_to_pickle)
435                            .unwrap_or(PickleValue::None))
436                    } else {
437                        Ok(PickleValue::None)
438                    }
439                }
440                "drain_status" => {
441                    let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
442                    if let QueryResponse::DrainStatus(status) = resp {
443                        Ok(drain_status_to_pickle(&status))
444                    } else {
445                        Ok(PickleValue::None)
446                    }
447                }
448                _ => Ok(PickleValue::None),
449            };
450        }
451    }
452
453    if let Some(begin_val) = request.get("begin_drain") {
454        let timeout_secs = begin_val
455            .as_float()
456            .or_else(|| begin_val.as_int().map(|value| value as f64))
457            .unwrap_or(0.0)
458            .max(0.0);
459        let timeout = Duration::from_secs_f64(timeout_secs);
460        let _ = event_tx.send(Event::BeginDrain { timeout });
461        return Ok(PickleValue::Bool(true));
462    }
463
464    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
465        if set_val == "known_destination_retained" {
466            let dest_hash = extract_dest_hash(request, "dest_hash")?;
467            let resp = send_query(event_tx, QueryRequest::RetainKnownDestination { dest_hash })?;
468            return if let QueryResponse::RetainKnownDestination(ok) = resp {
469                Ok(PickleValue::Bool(ok))
470            } else {
471                Ok(PickleValue::None)
472            };
473        }
474        if set_val == "known_destination_used" {
475            let dest_hash = extract_dest_hash(request, "dest_hash")?;
476            let resp = send_query(
477                event_tx,
478                QueryRequest::MarkKnownDestinationUsed { dest_hash },
479            )?;
480            return if let QueryResponse::MarkKnownDestinationUsed(ok) = resp {
481                Ok(PickleValue::Bool(ok))
482            } else {
483                Ok(PickleValue::None)
484            };
485        }
486        if set_val == "runtime_config" {
487            let key = request
488                .get("key")
489                .and_then(|v| v.as_str())
490                .unwrap_or_default()
491                .to_string();
492            let Some(value) = request
493                .get("value")
494                .and_then(runtime_config_value_from_pickle)
495            else {
496                return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
497                    code: RuntimeConfigErrorCode::InvalidType,
498                    message: "runtime-config set requires a scalar value".into(),
499                }));
500            };
501            let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
502            return if let QueryResponse::RuntimeConfigSet(result) = resp {
503                Ok(runtime_config_result_to_pickle(result))
504            } else {
505                Ok(PickleValue::None)
506            };
507        }
508    }
509
510    if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
511        if reset_val == "runtime_config" {
512            let key = request
513                .get("key")
514                .and_then(|v| v.as_str())
515                .unwrap_or_default()
516                .to_string();
517            let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
518            return if let QueryResponse::RuntimeConfigReset(result) = resp {
519                Ok(runtime_config_result_to_pickle(result))
520            } else {
521                Ok(PickleValue::None)
522            };
523        }
524    }
525
526    if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
527        if clear_val == "known_destination_retained" {
528            let dest_hash = extract_dest_hash(request, "dest_hash")?;
529            let resp = send_query(
530                event_tx,
531                QueryRequest::UnretainKnownDestination { dest_hash },
532            )?;
533            return if let QueryResponse::UnretainKnownDestination(ok) = resp {
534                Ok(PickleValue::Bool(ok))
535            } else {
536                Ok(PickleValue::None)
537            };
538        }
539        if clear_val == "backbone_peer_state" {
540            let interface_name = required_string(request, "interface")?;
541            let peer_ip = required_string(request, "ip")?;
542            let peer_ip = peer_ip
543                .parse()
544                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
545            let resp = send_query(
546                event_tx,
547                QueryRequest::ClearBackbonePeerState {
548                    interface_name,
549                    peer_ip,
550                },
551            )?;
552            return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
553                Ok(PickleValue::Bool(ok))
554            } else {
555                Ok(PickleValue::None)
556            };
557        }
558    }
559
560    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
561        if set_val == "backbone_peer_blacklist" {
562            let interface_name = required_string(request, "interface")?;
563            let peer_ip = required_string(request, "ip")?;
564            let peer_ip: IpAddr = peer_ip
565                .parse()
566                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
567            let duration_secs = request
568                .get("duration_secs")
569                .and_then(|v| v.as_int())
570                .ok_or_else(|| {
571                    io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
572                })?;
573            let reason = request
574                .get("reason")
575                .and_then(|v| v.as_str())
576                .unwrap_or("sentinel blacklist")
577                .to_string();
578            let penalty_level = request
579                .get("penalty_level")
580                .and_then(|v| v.as_int())
581                .unwrap_or(0)
582                .clamp(0, u8::MAX as i64) as u8;
583            let resp = send_query(
584                event_tx,
585                QueryRequest::BlacklistBackbonePeer {
586                    interface_name,
587                    peer_ip,
588                    duration: Duration::from_secs(duration_secs as u64),
589                    reason,
590                    penalty_level,
591                },
592            )?;
593            return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
594                Ok(PickleValue::Bool(ok))
595            } else {
596                Ok(PickleValue::None)
597            };
598        }
599    }
600
601    // Handle "request_path" -- trigger a path request to the network
602    if let Some(hash_val) = request.get("request_path") {
603        if let Some(hash_bytes) = hash_val.as_bytes() {
604            if hash_bytes.len() >= 16 {
605                let mut dest_hash = [0u8; 16];
606                dest_hash.copy_from_slice(&hash_bytes[..16]);
607                let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
608                return Ok(PickleValue::Bool(true));
609            }
610        }
611    }
612
613    // Handle "send_probe" requests
614    if let Some(hash_val) = request.get("send_probe") {
615        if let Some(hash_bytes) = hash_val.as_bytes() {
616            if hash_bytes.len() >= 16 {
617                let mut dest_hash = [0u8; 16];
618                dest_hash.copy_from_slice(&hash_bytes[..16]);
619                let payload_size = request
620                    .get("size")
621                    .and_then(|v| v.as_int())
622                    .and_then(|n| {
623                        if n > 0 && n <= 400 {
624                            Some(n as usize)
625                        } else {
626                            None
627                        }
628                    })
629                    .unwrap_or(16);
630                let resp = send_query(
631                    event_tx,
632                    QueryRequest::SendProbe {
633                        dest_hash,
634                        payload_size,
635                    },
636                )?;
637                if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
638                    return Ok(PickleValue::Dict(vec![
639                        (
640                            PickleValue::String("packet_hash".into()),
641                            PickleValue::Bytes(packet_hash.to_vec()),
642                        ),
643                        (
644                            PickleValue::String("hops".into()),
645                            PickleValue::Int(hops as i64),
646                        ),
647                    ]));
648                } else {
649                    return Ok(PickleValue::None);
650                }
651            }
652        }
653    }
654
655    // Handle "check_proof" requests
656    if let Some(hash_val) = request.get("check_proof") {
657        if let Some(hash_bytes) = hash_val.as_bytes() {
658            if hash_bytes.len() >= 32 {
659                let mut packet_hash = [0u8; 32];
660                packet_hash.copy_from_slice(&hash_bytes[..32]);
661                let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
662                if let QueryResponse::CheckProof(Some(rtt)) = resp {
663                    return Ok(PickleValue::Float(rtt));
664                } else {
665                    return Ok(PickleValue::None);
666                }
667            }
668        }
669    }
670
671    // Handle "blackhole" requests
672    if let Some(hash_val) = request.get("blackhole") {
673        if let Some(hash_bytes) = hash_val.as_bytes() {
674            if hash_bytes.len() >= 16 {
675                let mut identity_hash = [0u8; 16];
676                identity_hash.copy_from_slice(&hash_bytes[..16]);
677                let duration_hours = request.get("duration").and_then(|v| v.as_float());
678                let reason = request
679                    .get("reason")
680                    .and_then(|v| v.as_str())
681                    .map(|s| s.to_string());
682                let resp = send_query(
683                    event_tx,
684                    QueryRequest::BlackholeIdentity {
685                        identity_hash,
686                        duration_hours,
687                        reason,
688                    },
689                )?;
690                return Ok(PickleValue::Bool(matches!(
691                    resp,
692                    QueryResponse::BlackholeResult(true)
693                )));
694            }
695        }
696    }
697
698    // Handle "unblackhole" requests
699    if let Some(hash_val) = request.get("unblackhole") {
700        if let Some(hash_bytes) = hash_val.as_bytes() {
701            if hash_bytes.len() >= 16 {
702                let mut identity_hash = [0u8; 16];
703                identity_hash.copy_from_slice(&hash_bytes[..16]);
704                let resp = send_query(
705                    event_tx,
706                    QueryRequest::UnblackholeIdentity { identity_hash },
707                )?;
708                return Ok(PickleValue::Bool(matches!(
709                    resp,
710                    QueryResponse::UnblackholeResult(true)
711                )));
712            }
713        }
714    }
715
716    // Handle "drop" requests
717    if let Some(drop_val) = request.get("drop") {
718        if let Some(path) = drop_val.as_str() {
719            return match path {
720                "path" => {
721                    let hash = extract_dest_hash(request, "destination_hash")?;
722                    let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
723                    if let QueryResponse::DropPath(ok) = resp {
724                        Ok(PickleValue::Bool(ok))
725                    } else {
726                        Ok(PickleValue::None)
727                    }
728                }
729                "all_via" => {
730                    let hash = extract_dest_hash(request, "destination_hash")?;
731                    let resp = send_query(
732                        event_tx,
733                        QueryRequest::DropAllVia {
734                            transport_hash: hash,
735                        },
736                    )?;
737                    if let QueryResponse::DropAllVia(n) = resp {
738                        Ok(PickleValue::Int(n as i64))
739                    } else {
740                        Ok(PickleValue::None)
741                    }
742                }
743                "announce_queues" => {
744                    let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
745                    if let QueryResponse::DropAnnounceQueues = resp {
746                        Ok(PickleValue::Bool(true))
747                    } else {
748                        Ok(PickleValue::None)
749                    }
750                }
751                _ => Ok(PickleValue::None),
752            };
753        }
754    }
755
756    if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
757        return handle_hook_rpc_request(hook_val, request, event_tx);
758    }
759
760    Ok(PickleValue::None)
761}
762
763fn handle_hook_rpc_request(
764    op: &str,
765    request: &PickleValue,
766    event_tx: &EventSender,
767) -> io::Result<PickleValue> {
768    match op {
769        "load" => {
770            let name = required_string(request, "name")?;
771            let attach_point = required_string(request, "attach_point")?;
772            let priority = request
773                .get("priority")
774                .and_then(|v| v.as_int())
775                .unwrap_or(0) as i32;
776            let wasm = request
777                .get("wasm")
778                .and_then(|v| v.as_bytes())
779                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
780                .to_vec();
781            let (response_tx, response_rx) = mpsc::channel();
782            event_tx
783                .send(Event::LoadHook {
784                    name,
785                    wasm_bytes: wasm,
786                    attach_point,
787                    priority,
788                    response_tx,
789                })
790                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
791            let response = response_rx
792                .recv_timeout(std::time::Duration::from_secs(5))
793                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
794            Ok(hook_result_to_pickle(response))
795        }
796        "unload" => {
797            let name = required_string(request, "name")?;
798            let attach_point = required_string(request, "attach_point")?;
799            let (response_tx, response_rx) = mpsc::channel();
800            event_tx
801                .send(Event::UnloadHook {
802                    name,
803                    attach_point,
804                    response_tx,
805                })
806                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
807            let response = response_rx
808                .recv_timeout(std::time::Duration::from_secs(5))
809                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
810            Ok(hook_result_to_pickle(response))
811        }
812        "enable" | "disable" => {
813            let name = required_string(request, "name")?;
814            let attach_point = required_string(request, "attach_point")?;
815            let enabled = op == "enable";
816            let (response_tx, response_rx) = mpsc::channel();
817            event_tx
818                .send(Event::SetHookEnabled {
819                    name,
820                    attach_point,
821                    enabled,
822                    response_tx,
823                })
824                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
825            let response = response_rx
826                .recv_timeout(std::time::Duration::from_secs(5))
827                .map_err(|_| {
828                    io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
829                })?;
830            Ok(hook_result_to_pickle(response))
831        }
832        "set_priority" => {
833            let name = required_string(request, "name")?;
834            let attach_point = required_string(request, "attach_point")?;
835            let priority = request
836                .get("priority")
837                .and_then(|v| v.as_int())
838                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
839                as i32;
840            let (response_tx, response_rx) = mpsc::channel();
841            event_tx
842                .send(Event::SetHookPriority {
843                    name,
844                    attach_point,
845                    priority,
846                    response_tx,
847                })
848                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
849            let response = response_rx
850                .recv_timeout(std::time::Duration::from_secs(5))
851                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
852            Ok(hook_result_to_pickle(response))
853        }
854        _ => Ok(PickleValue::None),
855    }
856}
857
858/// Send a query to the driver and wait for the response.
859fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
860    let (resp_tx, resp_rx) = mpsc::channel();
861    event_tx
862        .send(Event::Query(request, resp_tx))
863        .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
864    resp_rx
865        .recv_timeout(std::time::Duration::from_secs(5))
866        .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
867}
868
869/// Extract a 16-byte destination hash from a pickle dict field.
870fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
871    let bytes = request
872        .get(key)
873        .and_then(|v| v.as_bytes())
874        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
875    if bytes.len() < 16 {
876        return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
877    }
878    let mut hash = [0u8; 16];
879    hash.copy_from_slice(&bytes[..16]);
880    Ok(hash)
881}
882
883fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
884    request
885        .get(key)
886        .and_then(|v| v.as_str())
887        .map(|s| s.to_string())
888        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
889}
890
891fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
892    match result {
893        Ok(()) => PickleValue::Dict(vec![(
894            PickleValue::String("ok".into()),
895            PickleValue::Bool(true),
896        )]),
897        Err(error) => PickleValue::Dict(vec![
898            (PickleValue::String("ok".into()), PickleValue::Bool(false)),
899            (
900                PickleValue::String("error".into()),
901                PickleValue::String(error),
902            ),
903        ]),
904    }
905}
906
907// --- Pickle response builders ---
908
909fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
910    let mut ifaces = Vec::new();
911    for iface in &stats.interfaces {
912        ifaces.push(single_iface_to_pickle(iface));
913    }
914
915    let mut dict = vec![
916        (
917            PickleValue::String("interfaces".into()),
918            PickleValue::List(ifaces),
919        ),
920        (
921            PickleValue::String("transport_enabled".into()),
922            PickleValue::Bool(stats.transport_enabled),
923        ),
924        (
925            PickleValue::String("transport_uptime".into()),
926            PickleValue::Float(stats.transport_uptime),
927        ),
928        (
929            PickleValue::String("rxb".into()),
930            PickleValue::Int(stats.total_rxb as i64),
931        ),
932        (
933            PickleValue::String("txb".into()),
934            PickleValue::Int(stats.total_txb as i64),
935        ),
936    ];
937
938    if let Some(tid) = stats.transport_id {
939        dict.push((
940            PickleValue::String("transport_id".into()),
941            PickleValue::Bytes(tid.to_vec()),
942        ));
943    } else {
944        dict.push((
945            PickleValue::String("transport_id".into()),
946            PickleValue::None,
947        ));
948    }
949
950    if let Some(pr) = stats.probe_responder {
951        dict.push((
952            PickleValue::String("probe_responder".into()),
953            PickleValue::Bytes(pr.to_vec()),
954        ));
955    } else {
956        dict.push((
957            PickleValue::String("probe_responder".into()),
958            PickleValue::None,
959        ));
960    }
961
962    if let Some(pool) = &stats.backbone_peer_pool {
963        let members = pool
964            .members
965            .iter()
966            .map(|member| {
967                let mut member_dict = vec![
968                    (
969                        PickleValue::String("name".into()),
970                        PickleValue::String(member.name.clone()),
971                    ),
972                    (
973                        PickleValue::String("remote".into()),
974                        PickleValue::String(member.remote.clone()),
975                    ),
976                    (
977                        PickleValue::String("state".into()),
978                        PickleValue::String(member.state.clone()),
979                    ),
980                    (
981                        PickleValue::String("failure_count".into()),
982                        PickleValue::Int(member.failure_count as i64),
983                    ),
984                ];
985                member_dict.push((
986                    PickleValue::String("interface_id".into()),
987                    member
988                        .interface_id
989                        .map(|id| PickleValue::Int(id as i64))
990                        .unwrap_or(PickleValue::None),
991                ));
992                member_dict.push((
993                    PickleValue::String("last_error".into()),
994                    member
995                        .last_error
996                        .as_ref()
997                        .map(|err| PickleValue::String(err.clone()))
998                        .unwrap_or(PickleValue::None),
999                ));
1000                member_dict.push((
1001                    PickleValue::String("cooldown_remaining_seconds".into()),
1002                    member
1003                        .cooldown_remaining_seconds
1004                        .map(PickleValue::Float)
1005                        .unwrap_or(PickleValue::None),
1006                ));
1007                PickleValue::Dict(member_dict)
1008            })
1009            .collect();
1010        dict.push((
1011            PickleValue::String("backbone_peer_pool".into()),
1012            PickleValue::Dict(vec![
1013                (
1014                    PickleValue::String("max_connected".into()),
1015                    PickleValue::Int(pool.max_connected as i64),
1016                ),
1017                (
1018                    PickleValue::String("active_count".into()),
1019                    PickleValue::Int(pool.active_count as i64),
1020                ),
1021                (
1022                    PickleValue::String("standby_count".into()),
1023                    PickleValue::Int(pool.standby_count as i64),
1024                ),
1025                (
1026                    PickleValue::String("cooldown_count".into()),
1027                    PickleValue::Int(pool.cooldown_count as i64),
1028                ),
1029                (
1030                    PickleValue::String("members".into()),
1031                    PickleValue::List(members),
1032                ),
1033            ]),
1034        ));
1035    } else {
1036        dict.push((
1037            PickleValue::String("backbone_peer_pool".into()),
1038            PickleValue::None,
1039        ));
1040    }
1041
1042    PickleValue::Dict(dict)
1043}
1044
1045fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
1046    let mut dict = vec![
1047        (
1048            PickleValue::String("id".into()),
1049            PickleValue::Int(s.id as i64),
1050        ),
1051        (
1052            PickleValue::String("name".into()),
1053            PickleValue::String(s.name.clone()),
1054        ),
1055        (
1056            PickleValue::String("status".into()),
1057            PickleValue::Bool(s.status),
1058        ),
1059        (
1060            PickleValue::String("mode".into()),
1061            PickleValue::Int(s.mode as i64),
1062        ),
1063        (
1064            PickleValue::String("rxb".into()),
1065            PickleValue::Int(s.rxb as i64),
1066        ),
1067        (
1068            PickleValue::String("txb".into()),
1069            PickleValue::Int(s.txb as i64),
1070        ),
1071        (
1072            PickleValue::String("rx_packets".into()),
1073            PickleValue::Int(s.rx_packets as i64),
1074        ),
1075        (
1076            PickleValue::String("tx_packets".into()),
1077            PickleValue::Int(s.tx_packets as i64),
1078        ),
1079        (
1080            PickleValue::String("started".into()),
1081            PickleValue::Float(s.started),
1082        ),
1083        (
1084            PickleValue::String("ia_freq".into()),
1085            PickleValue::Float(s.ia_freq),
1086        ),
1087        (
1088            PickleValue::String("oa_freq".into()),
1089            PickleValue::Float(s.oa_freq),
1090        ),
1091    ];
1092
1093    match s.bitrate {
1094        Some(br) => dict.push((
1095            PickleValue::String("bitrate".into()),
1096            PickleValue::Int(br as i64),
1097        )),
1098        None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
1099    }
1100
1101    match s.ifac_size {
1102        Some(sz) => dict.push((
1103            PickleValue::String("ifac_size".into()),
1104            PickleValue::Int(sz as i64),
1105        )),
1106        None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
1107    }
1108
1109    PickleValue::Dict(dict)
1110}
1111
1112fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
1113    let list: Vec<PickleValue> = entries
1114        .iter()
1115        .map(|e| {
1116            PickleValue::Dict(vec![
1117                (
1118                    PickleValue::String("hash".into()),
1119                    PickleValue::Bytes(e.hash.to_vec()),
1120                ),
1121                (
1122                    PickleValue::String("timestamp".into()),
1123                    PickleValue::Float(e.timestamp),
1124                ),
1125                (
1126                    PickleValue::String("via".into()),
1127                    PickleValue::Bytes(e.via.to_vec()),
1128                ),
1129                (
1130                    PickleValue::String("hops".into()),
1131                    PickleValue::Int(e.hops as i64),
1132                ),
1133                (
1134                    PickleValue::String("expires".into()),
1135                    PickleValue::Float(e.expires),
1136                ),
1137                (
1138                    PickleValue::String("interface".into()),
1139                    PickleValue::String(e.interface_name.clone()),
1140                ),
1141            ])
1142        })
1143        .collect();
1144    PickleValue::List(list)
1145}
1146
1147fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1148    let list: Vec<PickleValue> = entries
1149        .iter()
1150        .map(|e| {
1151            PickleValue::Dict(vec![
1152                (
1153                    PickleValue::String("hash".into()),
1154                    PickleValue::Bytes(e.hash.to_vec()),
1155                ),
1156                (
1157                    PickleValue::String("last".into()),
1158                    PickleValue::Float(e.last),
1159                ),
1160                (
1161                    PickleValue::String("rate_violations".into()),
1162                    PickleValue::Int(e.rate_violations as i64),
1163                ),
1164                (
1165                    PickleValue::String("blocked_until".into()),
1166                    PickleValue::Float(e.blocked_until),
1167                ),
1168                (
1169                    PickleValue::String("timestamps".into()),
1170                    PickleValue::List(
1171                        e.timestamps
1172                            .iter()
1173                            .map(|&t| PickleValue::Float(t))
1174                            .collect(),
1175                    ),
1176                ),
1177            ])
1178        })
1179        .collect();
1180    PickleValue::List(list)
1181}
1182
1183fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1184    let list: Vec<PickleValue> = entries
1185        .iter()
1186        .map(|e| {
1187            let mut dict = vec![
1188                (
1189                    PickleValue::String("identity_hash".into()),
1190                    PickleValue::Bytes(e.identity_hash.to_vec()),
1191                ),
1192                (
1193                    PickleValue::String("created".into()),
1194                    PickleValue::Float(e.created),
1195                ),
1196                (
1197                    PickleValue::String("expires".into()),
1198                    PickleValue::Float(e.expires),
1199                ),
1200            ];
1201            if let Some(ref reason) = e.reason {
1202                dict.push((
1203                    PickleValue::String("reason".into()),
1204                    PickleValue::String(reason.clone()),
1205                ));
1206            } else {
1207                dict.push((PickleValue::String("reason".into()), PickleValue::None));
1208            }
1209            PickleValue::Dict(dict)
1210        })
1211        .collect();
1212    PickleValue::List(list)
1213}
1214
1215fn discovered_interfaces_to_pickle(
1216    interfaces: &[crate::discovery::DiscoveredInterface],
1217) -> PickleValue {
1218    let list: Vec<PickleValue> = interfaces
1219        .iter()
1220        .map(|iface| {
1221            let mut dict = vec![
1222                (
1223                    PickleValue::String("type".into()),
1224                    PickleValue::String(iface.interface_type.clone()),
1225                ),
1226                (
1227                    PickleValue::String("transport".into()),
1228                    PickleValue::Bool(iface.transport),
1229                ),
1230                (
1231                    PickleValue::String("name".into()),
1232                    PickleValue::String(iface.name.clone()),
1233                ),
1234                (
1235                    PickleValue::String("discovered".into()),
1236                    PickleValue::Float(iface.discovered),
1237                ),
1238                (
1239                    PickleValue::String("last_heard".into()),
1240                    PickleValue::Float(iface.last_heard),
1241                ),
1242                (
1243                    PickleValue::String("heard_count".into()),
1244                    PickleValue::Int(iface.heard_count as i64),
1245                ),
1246                (
1247                    PickleValue::String("status".into()),
1248                    PickleValue::String(iface.status.as_str().into()),
1249                ),
1250                (
1251                    PickleValue::String("stamp".into()),
1252                    PickleValue::Bytes(iface.stamp.clone()),
1253                ),
1254                (
1255                    PickleValue::String("value".into()),
1256                    PickleValue::Int(iface.stamp_value as i64),
1257                ),
1258                (
1259                    PickleValue::String("transport_id".into()),
1260                    PickleValue::Bytes(iface.transport_id.to_vec()),
1261                ),
1262                (
1263                    PickleValue::String("network_id".into()),
1264                    PickleValue::Bytes(iface.network_id.to_vec()),
1265                ),
1266                (
1267                    PickleValue::String("hops".into()),
1268                    PickleValue::Int(iface.hops as i64),
1269                ),
1270            ];
1271
1272            // Optional location fields
1273            if let Some(v) = iface.latitude {
1274                dict.push((
1275                    PickleValue::String("latitude".into()),
1276                    PickleValue::Float(v),
1277                ));
1278            } else {
1279                dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1280            }
1281            if let Some(v) = iface.longitude {
1282                dict.push((
1283                    PickleValue::String("longitude".into()),
1284                    PickleValue::Float(v),
1285                ));
1286            } else {
1287                dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1288            }
1289            if let Some(v) = iface.height {
1290                dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1291            } else {
1292                dict.push((PickleValue::String("height".into()), PickleValue::None));
1293            }
1294
1295            // Connection info
1296            if let Some(ref v) = iface.reachable_on {
1297                dict.push((
1298                    PickleValue::String("reachable_on".into()),
1299                    PickleValue::String(v.clone()),
1300                ));
1301            } else {
1302                dict.push((
1303                    PickleValue::String("reachable_on".into()),
1304                    PickleValue::None,
1305                ));
1306            }
1307            if let Some(v) = iface.port {
1308                dict.push((
1309                    PickleValue::String("port".into()),
1310                    PickleValue::Int(v as i64),
1311                ));
1312            } else {
1313                dict.push((PickleValue::String("port".into()), PickleValue::None));
1314            }
1315
1316            // RNode/RF specific
1317            if let Some(v) = iface.frequency {
1318                dict.push((
1319                    PickleValue::String("frequency".into()),
1320                    PickleValue::Int(v as i64),
1321                ));
1322            } else {
1323                dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1324            }
1325            if let Some(v) = iface.bandwidth {
1326                dict.push((
1327                    PickleValue::String("bandwidth".into()),
1328                    PickleValue::Int(v as i64),
1329                ));
1330            } else {
1331                dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1332            }
1333            if let Some(v) = iface.spreading_factor {
1334                dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1335            } else {
1336                dict.push((PickleValue::String("sf".into()), PickleValue::None));
1337            }
1338            if let Some(v) = iface.coding_rate {
1339                dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1340            } else {
1341                dict.push((PickleValue::String("cr".into()), PickleValue::None));
1342            }
1343            if let Some(ref v) = iface.modulation {
1344                dict.push((
1345                    PickleValue::String("modulation".into()),
1346                    PickleValue::String(v.clone()),
1347                ));
1348            } else {
1349                dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1350            }
1351            if let Some(v) = iface.channel {
1352                dict.push((
1353                    PickleValue::String("channel".into()),
1354                    PickleValue::Int(v as i64),
1355                ));
1356            } else {
1357                dict.push((PickleValue::String("channel".into()), PickleValue::None));
1358            }
1359
1360            // IFAC info
1361            if let Some(ref v) = iface.ifac_netname {
1362                dict.push((
1363                    PickleValue::String("ifac_netname".into()),
1364                    PickleValue::String(v.clone()),
1365                ));
1366            } else {
1367                dict.push((
1368                    PickleValue::String("ifac_netname".into()),
1369                    PickleValue::None,
1370                ));
1371            }
1372            if let Some(ref v) = iface.ifac_netkey {
1373                dict.push((
1374                    PickleValue::String("ifac_netkey".into()),
1375                    PickleValue::String(v.clone()),
1376                ));
1377            } else {
1378                dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1379            }
1380
1381            // Config entry
1382            if let Some(ref v) = iface.config_entry {
1383                dict.push((
1384                    PickleValue::String("config_entry".into()),
1385                    PickleValue::String(v.clone()),
1386                ));
1387            } else {
1388                dict.push((
1389                    PickleValue::String("config_entry".into()),
1390                    PickleValue::None,
1391                ));
1392            }
1393
1394            dict.push((
1395                PickleValue::String("discovery_hash".into()),
1396                PickleValue::Bytes(iface.discovery_hash.to_vec()),
1397            ));
1398
1399            PickleValue::Dict(dict)
1400        })
1401        .collect();
1402    PickleValue::List(list)
1403}
1404
1405fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1406    PickleValue::List(
1407        hooks
1408            .iter()
1409            .map(|hook| {
1410                PickleValue::Dict(vec![
1411                    (
1412                        PickleValue::String("name".into()),
1413                        PickleValue::String(hook.name.clone()),
1414                    ),
1415                    (
1416                        PickleValue::String("attach_point".into()),
1417                        PickleValue::String(hook.attach_point.clone()),
1418                    ),
1419                    (
1420                        PickleValue::String("priority".into()),
1421                        PickleValue::Int(hook.priority as i64),
1422                    ),
1423                    (
1424                        PickleValue::String("enabled".into()),
1425                        PickleValue::Bool(hook.enabled),
1426                    ),
1427                    (
1428                        PickleValue::String("consecutive_traps".into()),
1429                        PickleValue::Int(hook.consecutive_traps as i64),
1430                    ),
1431                ])
1432            })
1433            .collect(),
1434    )
1435}
1436
1437fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1438    PickleValue::List(
1439        entries
1440            .iter()
1441            .map(|entry| {
1442                PickleValue::Dict(vec![
1443                    (
1444                        PickleValue::String("interface".into()),
1445                        PickleValue::String(entry.interface_name.clone()),
1446                    ),
1447                    (
1448                        PickleValue::String("ip".into()),
1449                        PickleValue::String(entry.peer_ip.to_string()),
1450                    ),
1451                    (
1452                        PickleValue::String("connected_count".into()),
1453                        PickleValue::Int(entry.connected_count as i64),
1454                    ),
1455                    (
1456                        PickleValue::String("blacklisted_remaining_secs".into()),
1457                        entry
1458                            .blacklisted_remaining_secs
1459                            .map(PickleValue::Float)
1460                            .unwrap_or(PickleValue::None),
1461                    ),
1462                    (
1463                        PickleValue::String("blacklist_reason".into()),
1464                        entry
1465                            .blacklist_reason
1466                            .as_ref()
1467                            .map(|v: &String| PickleValue::String(v.clone()))
1468                            .unwrap_or(PickleValue::None),
1469                    ),
1470                    (
1471                        PickleValue::String("reject_count".into()),
1472                        PickleValue::Int(entry.reject_count as i64),
1473                    ),
1474                ])
1475            })
1476            .collect(),
1477    )
1478}
1479
1480fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1481    PickleValue::List(
1482        entries
1483            .iter()
1484            .map(|entry| {
1485                PickleValue::Dict(vec![
1486                    (
1487                        PickleValue::String("id".into()),
1488                        PickleValue::Int(entry.interface_id.0 as i64),
1489                    ),
1490                    (
1491                        PickleValue::String("name".into()),
1492                        PickleValue::String(entry.interface_name.clone()),
1493                    ),
1494                ])
1495            })
1496            .collect(),
1497    )
1498}
1499
1500fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1501    PickleValue::Dict(vec![
1502        (
1503            PickleValue::String("connected".into()),
1504            PickleValue::Bool(stats.connected),
1505        ),
1506        (
1507            PickleValue::String("consumer_count".into()),
1508            PickleValue::Int(stats.consumer_count as i64),
1509        ),
1510        (
1511            PickleValue::String("queue_max_events".into()),
1512            PickleValue::Int(stats.queue_max_events as i64),
1513        ),
1514        (
1515            PickleValue::String("queue_max_bytes".into()),
1516            PickleValue::Int(stats.queue_max_bytes as i64),
1517        ),
1518        (
1519            PickleValue::String("backlog_len".into()),
1520            PickleValue::Int(stats.backlog_len as i64),
1521        ),
1522        (
1523            PickleValue::String("backlog_bytes".into()),
1524            PickleValue::Int(stats.backlog_bytes as i64),
1525        ),
1526        (
1527            PickleValue::String("backlog_dropped_pending".into()),
1528            PickleValue::Int(stats.backlog_dropped_pending as i64),
1529        ),
1530        (
1531            PickleValue::String("backlog_dropped_total".into()),
1532            PickleValue::Int(stats.backlog_dropped_total as i64),
1533        ),
1534        (
1535            PickleValue::String("total_disconnect_count".into()),
1536            PickleValue::Int(stats.total_disconnect_count as i64),
1537        ),
1538        (
1539            PickleValue::String("consumers".into()),
1540            PickleValue::List(
1541                stats
1542                    .consumers
1543                    .iter()
1544                    .map(|consumer| {
1545                        PickleValue::Dict(vec![
1546                            (
1547                                PickleValue::String("id".into()),
1548                                PickleValue::Int(consumer.id as i64),
1549                            ),
1550                            (
1551                                PickleValue::String("connected".into()),
1552                                PickleValue::Bool(consumer.connected),
1553                            ),
1554                            (
1555                                PickleValue::String("queue_len".into()),
1556                                PickleValue::Int(consumer.queue_len as i64),
1557                            ),
1558                            (
1559                                PickleValue::String("queued_bytes".into()),
1560                                PickleValue::Int(consumer.queued_bytes as i64),
1561                            ),
1562                            (
1563                                PickleValue::String("dropped_pending".into()),
1564                                PickleValue::Int(consumer.dropped_pending as i64),
1565                            ),
1566                            (
1567                                PickleValue::String("dropped_total".into()),
1568                                PickleValue::Int(consumer.dropped_total as i64),
1569                            ),
1570                            (
1571                                PickleValue::String("queue_max_events".into()),
1572                                PickleValue::Int(consumer.queue_max_events as i64),
1573                            ),
1574                            (
1575                                PickleValue::String("queue_max_bytes".into()),
1576                                PickleValue::Int(consumer.queue_max_bytes as i64),
1577                            ),
1578                        ])
1579                    })
1580                    .collect(),
1581            ),
1582        ),
1583    ])
1584}
1585
1586fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1587    match state {
1588        LifecycleState::Active => "active",
1589        LifecycleState::Draining => "draining",
1590        LifecycleState::Stopping => "stopping",
1591        LifecycleState::Stopped => "stopped",
1592    }
1593}
1594
1595fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1596    PickleValue::Dict(vec![
1597        (
1598            PickleValue::String("state".into()),
1599            PickleValue::String(lifecycle_state_name(status.state).into()),
1600        ),
1601        (
1602            PickleValue::String("drain_age_seconds".into()),
1603            status
1604                .drain_age_seconds
1605                .map(PickleValue::Float)
1606                .unwrap_or(PickleValue::None),
1607        ),
1608        (
1609            PickleValue::String("deadline_remaining_seconds".into()),
1610            status
1611                .deadline_remaining_seconds
1612                .map(PickleValue::Float)
1613                .unwrap_or(PickleValue::None),
1614        ),
1615        (
1616            PickleValue::String("drain_complete".into()),
1617            PickleValue::Bool(status.drain_complete),
1618        ),
1619        (
1620            PickleValue::String("interface_writer_queued_frames".into()),
1621            PickleValue::Int(status.interface_writer_queued_frames as i64),
1622        ),
1623        (
1624            PickleValue::String("provider_backlog_events".into()),
1625            PickleValue::Int(status.provider_backlog_events as i64),
1626        ),
1627        (
1628            PickleValue::String("provider_consumer_queued_events".into()),
1629            PickleValue::Int(status.provider_consumer_queued_events as i64),
1630        ),
1631        (
1632            PickleValue::String("detail".into()),
1633            status
1634                .detail
1635                .as_ref()
1636                .map(|detail| PickleValue::String(detail.clone()))
1637                .unwrap_or(PickleValue::None),
1638        ),
1639    ])
1640}
1641
1642fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1643    match value {
1644        RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1645        RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1646        RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1647        RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1648        RuntimeConfigValue::Null => PickleValue::None,
1649    }
1650}
1651
1652fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1653    match value {
1654        PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1655        PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1656        PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1657        PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1658        PickleValue::None => Some(RuntimeConfigValue::Null),
1659        _ => None,
1660    }
1661}
1662
1663fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1664    PickleValue::Dict(vec![
1665        (
1666            PickleValue::String("key".into()),
1667            PickleValue::String(entry.key.clone()),
1668        ),
1669        (
1670            PickleValue::String("value".into()),
1671            runtime_config_value_to_pickle(&entry.value),
1672        ),
1673        (
1674            PickleValue::String("default".into()),
1675            runtime_config_value_to_pickle(&entry.default),
1676        ),
1677        (
1678            PickleValue::String("source".into()),
1679            PickleValue::String(match entry.source {
1680                RuntimeConfigSource::Startup => "startup".into(),
1681                RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1682            }),
1683        ),
1684        (
1685            PickleValue::String("apply_mode".into()),
1686            PickleValue::String(match entry.apply_mode {
1687                RuntimeConfigApplyMode::Immediate => "immediate".into(),
1688                RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1689                RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1690                RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1691            }),
1692        ),
1693        (
1694            PickleValue::String("description".into()),
1695            entry
1696                .description
1697                .as_ref()
1698                .map(|v| PickleValue::String(v.clone()))
1699                .unwrap_or(PickleValue::None),
1700        ),
1701    ])
1702}
1703
1704fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1705    PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1706}
1707
1708fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1709    PickleValue::Dict(vec![
1710        (
1711            PickleValue::String("error".into()),
1712            PickleValue::String(match error.code {
1713                RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1714                RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1715                RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1716                RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1717                RuntimeConfigErrorCode::NotFound => "not_found".into(),
1718                RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1719            }),
1720        ),
1721        (
1722            PickleValue::String("message".into()),
1723            PickleValue::String(error.message.clone()),
1724        ),
1725    ])
1726}
1727
1728fn runtime_config_result_to_pickle(
1729    result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1730) -> PickleValue {
1731    match result {
1732        Ok(entry) => runtime_config_entry_to_pickle(&entry),
1733        Err(error) => runtime_config_error_to_pickle(&error),
1734    }
1735}
1736
1737fn known_destination_entry_to_pickle(entry: &KnownDestinationEntry) -> PickleValue {
1738    PickleValue::Dict(vec![
1739        (
1740            PickleValue::String("dest_hash".into()),
1741            PickleValue::Bytes(entry.dest_hash.to_vec()),
1742        ),
1743        (
1744            PickleValue::String("identity_hash".into()),
1745            PickleValue::Bytes(entry.identity_hash.to_vec()),
1746        ),
1747        (
1748            PickleValue::String("public_key".into()),
1749            PickleValue::Bytes(entry.public_key.to_vec()),
1750        ),
1751        (
1752            PickleValue::String("app_data".into()),
1753            entry
1754                .app_data
1755                .as_ref()
1756                .map(|data: &Vec<u8>| PickleValue::Bytes(data.clone()))
1757                .unwrap_or(PickleValue::None),
1758        ),
1759        (
1760            PickleValue::String("hops".into()),
1761            PickleValue::Int(entry.hops as i64),
1762        ),
1763        (
1764            PickleValue::String("received_at".into()),
1765            PickleValue::Float(entry.received_at),
1766        ),
1767        (
1768            PickleValue::String("receiving_interface".into()),
1769            PickleValue::Int(entry.receiving_interface.0 as i64),
1770        ),
1771        (
1772            PickleValue::String("was_used".into()),
1773            PickleValue::Bool(entry.was_used),
1774        ),
1775        (
1776            PickleValue::String("last_used_at".into()),
1777            entry
1778                .last_used_at
1779                .map(PickleValue::Float)
1780                .unwrap_or(PickleValue::None),
1781        ),
1782        (
1783            PickleValue::String("retained".into()),
1784            PickleValue::Bool(entry.retained),
1785        ),
1786    ])
1787}
1788
1789fn known_destinations_to_pickle(entries: &[KnownDestinationEntry]) -> PickleValue {
1790    PickleValue::List(
1791        entries
1792            .iter()
1793            .map(known_destination_entry_to_pickle)
1794            .collect(),
1795    )
1796}
1797
1798fn parse_known_destination_entry(value: &PickleValue) -> io::Result<KnownDestinationEntry> {
1799    let get_bytes = |key: &str, len: usize| -> io::Result<Vec<u8>> {
1800        let value = value.get(key).ok_or_else(|| {
1801            io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key))
1802        })?;
1803        let bytes = value.as_bytes().ok_or_else(|| {
1804            io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key))
1805        })?;
1806        if bytes.len() != len {
1807            return Err(io::Error::new(
1808                io::ErrorKind::InvalidData,
1809                format!("invalid {} length", key),
1810            ));
1811        }
1812        Ok(bytes.to_vec())
1813    };
1814    let get_int = |key: &str| -> io::Result<i64> {
1815        value
1816            .get(key)
1817            .and_then(|v| v.as_int())
1818            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1819    };
1820    let get_float = |key: &str| -> io::Result<f64> {
1821        value
1822            .get(key)
1823            .and_then(|v| v.as_float())
1824            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1825    };
1826    let get_bool = |key: &str| -> io::Result<bool> {
1827        value
1828            .get(key)
1829            .and_then(|v| v.as_bool())
1830            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1831    };
1832
1833    let mut dest_hash = [0u8; 16];
1834    dest_hash.copy_from_slice(&get_bytes("dest_hash", 16)?);
1835    let mut identity_hash = [0u8; 16];
1836    identity_hash.copy_from_slice(&get_bytes("identity_hash", 16)?);
1837    let mut public_key = [0u8; 64];
1838    public_key.copy_from_slice(&get_bytes("public_key", 64)?);
1839    let app_data = value
1840        .get("app_data")
1841        .and_then(|v| v.as_bytes())
1842        .map(|bytes| bytes.to_vec());
1843    let last_used_at = value.get("last_used_at").and_then(|v| v.as_float());
1844
1845    Ok(KnownDestinationEntry {
1846        dest_hash,
1847        identity_hash,
1848        public_key,
1849        app_data,
1850        hops: get_int("hops")? as u8,
1851        received_at: get_float("received_at")?,
1852        receiving_interface: rns_core::transport::types::InterfaceId(
1853            get_int("receiving_interface")? as u64,
1854        ),
1855        was_used: get_bool("was_used")?,
1856        last_used_at,
1857        retained: get_bool("retained")?,
1858    })
1859}
1860
1861fn parse_known_destination_list(value: &PickleValue) -> io::Result<Vec<KnownDestinationEntry>> {
1862    let list = value
1863        .as_list()
1864        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "expected list"))?;
1865    list.iter().map(parse_known_destination_entry).collect()
1866}
1867
1868// --- RPC Client ---
1869
1870/// RPC client for connecting to a running daemon.
1871pub struct RpcClient {
1872    stream: TcpStream,
1873}
1874
1875impl RpcClient {
1876    /// Connect to an RPC server and authenticate.
1877    pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
1878        let mut stream = match addr {
1879            RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
1880        };
1881
1882        stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
1883        stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
1884
1885        // Client-side authentication
1886        client_auth(&mut stream, auth_key)?;
1887
1888        Ok(RpcClient { stream })
1889    }
1890
1891    /// Send a pickle request and receive a pickle response.
1892    pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
1893        let request_bytes = pickle::encode(request);
1894        send_bytes(&mut self.stream, &request_bytes)?;
1895
1896        let response_bytes = recv_bytes(&mut self.stream)?;
1897        pickle::decode(&response_bytes)
1898            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
1899    }
1900
1901    pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
1902        let response = self.call(&PickleValue::Dict(vec![(
1903            PickleValue::String("get".into()),
1904            PickleValue::String("hooks".into()),
1905        )]))?;
1906        parse_hook_list(&response)
1907    }
1908
1909    pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
1910        let response = self.call(&PickleValue::Dict(vec![(
1911            PickleValue::String("begin_drain".into()),
1912            PickleValue::Float(timeout.as_secs_f64()),
1913        )]))?;
1914        Ok(response.as_bool().unwrap_or(false))
1915    }
1916
1917    pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
1918        let response = self.call(&PickleValue::Dict(vec![(
1919            PickleValue::String("get".into()),
1920            PickleValue::String("drain_status".into()),
1921        )]))?;
1922        parse_drain_status(&response)
1923    }
1924
1925    pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
1926        self.call(&PickleValue::Dict(vec![(
1927            PickleValue::String("get".into()),
1928            PickleValue::String("provider_bridge_stats".into()),
1929        )]))
1930    }
1931
1932    pub fn load_hook(
1933        &mut self,
1934        name: &str,
1935        attach_point: &str,
1936        priority: i32,
1937        wasm: &[u8],
1938    ) -> io::Result<Result<(), String>> {
1939        let response = self.call(&PickleValue::Dict(vec![
1940            (
1941                PickleValue::String("hook".into()),
1942                PickleValue::String("load".into()),
1943            ),
1944            (
1945                PickleValue::String("name".into()),
1946                PickleValue::String(name.to_string()),
1947            ),
1948            (
1949                PickleValue::String("attach_point".into()),
1950                PickleValue::String(attach_point.to_string()),
1951            ),
1952            (
1953                PickleValue::String("priority".into()),
1954                PickleValue::Int(priority as i64),
1955            ),
1956            (
1957                PickleValue::String("wasm".into()),
1958                PickleValue::Bytes(wasm.to_vec()),
1959            ),
1960        ]))?;
1961        parse_hook_result(&response)
1962    }
1963
1964    pub fn unload_hook(
1965        &mut self,
1966        name: &str,
1967        attach_point: &str,
1968    ) -> io::Result<Result<(), String>> {
1969        let response = self.call(&PickleValue::Dict(vec![
1970            (
1971                PickleValue::String("hook".into()),
1972                PickleValue::String("unload".into()),
1973            ),
1974            (
1975                PickleValue::String("name".into()),
1976                PickleValue::String(name.to_string()),
1977            ),
1978            (
1979                PickleValue::String("attach_point".into()),
1980                PickleValue::String(attach_point.to_string()),
1981            ),
1982        ]))?;
1983        parse_hook_result(&response)
1984    }
1985
1986    pub fn set_hook_enabled(
1987        &mut self,
1988        name: &str,
1989        attach_point: &str,
1990        enabled: bool,
1991    ) -> io::Result<Result<(), String>> {
1992        let op = if enabled { "enable" } else { "disable" };
1993        let response = self.call(&PickleValue::Dict(vec![
1994            (
1995                PickleValue::String("hook".into()),
1996                PickleValue::String(op.into()),
1997            ),
1998            (
1999                PickleValue::String("name".into()),
2000                PickleValue::String(name.to_string()),
2001            ),
2002            (
2003                PickleValue::String("attach_point".into()),
2004                PickleValue::String(attach_point.to_string()),
2005            ),
2006        ]))?;
2007        parse_hook_result(&response)
2008    }
2009
2010    pub fn set_hook_priority(
2011        &mut self,
2012        name: &str,
2013        attach_point: &str,
2014        priority: i32,
2015    ) -> io::Result<Result<(), String>> {
2016        let response = self.call(&PickleValue::Dict(vec![
2017            (
2018                PickleValue::String("hook".into()),
2019                PickleValue::String("set_priority".into()),
2020            ),
2021            (
2022                PickleValue::String("name".into()),
2023                PickleValue::String(name.to_string()),
2024            ),
2025            (
2026                PickleValue::String("attach_point".into()),
2027                PickleValue::String(attach_point.to_string()),
2028            ),
2029            (
2030                PickleValue::String("priority".into()),
2031                PickleValue::Int(priority as i64),
2032            ),
2033        ]))?;
2034        parse_hook_result(&response)
2035    }
2036
2037    pub fn blacklist_backbone_peer(
2038        &mut self,
2039        interface: &str,
2040        ip: &str,
2041        duration_secs: u64,
2042        reason: Option<&str>,
2043        penalty_level: Option<u8>,
2044    ) -> io::Result<bool> {
2045        let mut request = vec![
2046            (
2047                PickleValue::String("set".into()),
2048                PickleValue::String("backbone_peer_blacklist".into()),
2049            ),
2050            (
2051                PickleValue::String("interface".into()),
2052                PickleValue::String(interface.to_string()),
2053            ),
2054            (
2055                PickleValue::String("ip".into()),
2056                PickleValue::String(ip.to_string()),
2057            ),
2058            (
2059                PickleValue::String("duration_secs".into()),
2060                PickleValue::Int(duration_secs as i64),
2061            ),
2062        ];
2063        if let Some(reason) = reason {
2064            request.push((
2065                PickleValue::String("reason".into()),
2066                PickleValue::String(reason.to_string()),
2067            ));
2068        }
2069        if let Some(level) = penalty_level {
2070            request.push((
2071                PickleValue::String("penalty_level".into()),
2072                PickleValue::Int(level as i64),
2073            ));
2074        }
2075        let response = self.call(&PickleValue::Dict(request))?;
2076        Ok(response.as_bool().unwrap_or(false))
2077    }
2078
2079    pub fn known_destinations(&mut self) -> io::Result<Vec<KnownDestinationEntry>> {
2080        let response = self.call(&PickleValue::Dict(vec![(
2081            PickleValue::String("get".into()),
2082            PickleValue::String("known_destinations".into()),
2083        )]))?;
2084        parse_known_destination_list(&response)
2085    }
2086
2087    pub fn retain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2088        let response = self.call(&PickleValue::Dict(vec![
2089            (
2090                PickleValue::String("set".into()),
2091                PickleValue::String("known_destination_retained".into()),
2092            ),
2093            (
2094                PickleValue::String("dest_hash".into()),
2095                PickleValue::Bytes(dest_hash.to_vec()),
2096            ),
2097        ]))?;
2098        Ok(response.as_bool().unwrap_or(false))
2099    }
2100
2101    pub fn unretain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2102        let response = self.call(&PickleValue::Dict(vec![
2103            (
2104                PickleValue::String("clear".into()),
2105                PickleValue::String("known_destination_retained".into()),
2106            ),
2107            (
2108                PickleValue::String("dest_hash".into()),
2109                PickleValue::Bytes(dest_hash.to_vec()),
2110            ),
2111        ]))?;
2112        Ok(response.as_bool().unwrap_or(false))
2113    }
2114
2115    pub fn mark_known_destination_used(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2116        let response = self.call(&PickleValue::Dict(vec![
2117            (
2118                PickleValue::String("set".into()),
2119                PickleValue::String("known_destination_used".into()),
2120            ),
2121            (
2122                PickleValue::String("dest_hash".into()),
2123                PickleValue::Bytes(dest_hash.to_vec()),
2124            ),
2125        ]))?;
2126        Ok(response.as_bool().unwrap_or(false))
2127    }
2128}
2129
2130fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
2131    match value {
2132        "active" => Some(LifecycleState::Active),
2133        "draining" => Some(LifecycleState::Draining),
2134        "stopping" => Some(LifecycleState::Stopping),
2135        "stopped" => Some(LifecycleState::Stopped),
2136        _ => None,
2137    }
2138}
2139
2140fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
2141    if !matches!(value, PickleValue::Dict(_)) {
2142        return Ok(None);
2143    }
2144    let state = value
2145        .get("state")
2146        .and_then(|entry| entry.as_str())
2147        .and_then(parse_lifecycle_state)
2148        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
2149    let drain_age_seconds = value.get("drain_age_seconds").and_then(|entry| {
2150        entry
2151            .as_float()
2152            .or_else(|| entry.as_int().map(|v| v as f64))
2153    });
2154    let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
2155        entry
2156            .as_float()
2157            .or_else(|| entry.as_int().map(|v| v as f64))
2158    });
2159    let drain_complete = value
2160        .get("drain_complete")
2161        .and_then(|entry| entry.as_bool())
2162        .unwrap_or(false);
2163    let interface_writer_queued_frames = value
2164        .get("interface_writer_queued_frames")
2165        .and_then(|entry| entry.as_int())
2166        .unwrap_or(0)
2167        .max(0) as usize;
2168    let provider_backlog_events = value
2169        .get("provider_backlog_events")
2170        .and_then(|entry| entry.as_int())
2171        .unwrap_or(0)
2172        .max(0) as usize;
2173    let provider_consumer_queued_events = value
2174        .get("provider_consumer_queued_events")
2175        .and_then(|entry| entry.as_int())
2176        .unwrap_or(0)
2177        .max(0) as usize;
2178    let detail = value
2179        .get("detail")
2180        .and_then(|entry| entry.as_str().map(|v| v.to_string()));
2181    Ok(Some(DrainStatus {
2182        state,
2183        drain_age_seconds,
2184        deadline_remaining_seconds,
2185        drain_complete,
2186        interface_writer_queued_frames,
2187        provider_backlog_events,
2188        provider_consumer_queued_events,
2189        detail,
2190    }))
2191}
2192
2193fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
2194    let ok = response
2195        .get("ok")
2196        .and_then(|v| v.as_bool())
2197        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
2198    if ok {
2199        Ok(Ok(()))
2200    } else {
2201        Ok(Err(response
2202            .get("error")
2203            .and_then(|v| v.as_str())
2204            .unwrap_or("unknown hook error")
2205            .to_string()))
2206    }
2207}
2208
2209fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
2210    let list = response
2211        .as_list()
2212        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
2213    let mut hooks = Vec::with_capacity(list.len());
2214    for item in list {
2215        hooks.push(HookInfo {
2216            name: item
2217                .get("name")
2218                .and_then(|v| v.as_str())
2219                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
2220                .to_string(),
2221            attach_point: item
2222                .get("attach_point")
2223                .and_then(|v| v.as_str())
2224                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
2225                .to_string(),
2226            priority: item
2227                .get("priority")
2228                .and_then(|v| v.as_int())
2229                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
2230                as i32,
2231            enabled: item
2232                .get("enabled")
2233                .and_then(|v| v.as_bool())
2234                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
2235            consecutive_traps: item
2236                .get("consecutive_traps")
2237                .and_then(|v| v.as_int())
2238                .ok_or_else(|| {
2239                    io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
2240                })? as u32,
2241        });
2242    }
2243    Ok(hooks)
2244}
2245
2246/// Client-side authentication: answer the server's challenge.
2247fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
2248    // Read challenge
2249    let challenge = recv_bytes(stream)?;
2250
2251    if !challenge.starts_with(CHALLENGE_PREFIX) {
2252        return Err(io::Error::new(
2253            io::ErrorKind::InvalidData,
2254            "expected challenge",
2255        ));
2256    }
2257
2258    let message = &challenge[CHALLENGE_PREFIX.len()..];
2259
2260    // Create HMAC response
2261    let response = create_response(auth_key, message);
2262    send_bytes(stream, &response)?;
2263
2264    // Read welcome/failure
2265    let result = recv_bytes(stream)?;
2266    if result == WELCOME {
2267        Ok(())
2268    } else {
2269        Err(io::Error::new(
2270            io::ErrorKind::PermissionDenied,
2271            "authentication failed",
2272        ))
2273    }
2274}
2275
2276/// Create an HMAC response to a challenge message.
2277fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
2278    // Check if message has {sha256} prefix (modern protocol)
2279    if message.starts_with(b"{sha256}") || message.len() > 20 {
2280        // Modern protocol: use HMAC-SHA256 with {sha256} prefix
2281        let digest = hmac_sha256(auth_key, message);
2282        let mut response = Vec::with_capacity(8 + 32);
2283        response.extend_from_slice(b"{sha256}");
2284        response.extend_from_slice(&digest);
2285        response
2286    } else {
2287        // Legacy protocol: raw HMAC-MD5
2288        let digest = hmac_md5(auth_key, message);
2289        digest.to_vec()
2290    }
2291}
2292
2293/// Derive the RPC auth key from transport identity private key.
2294pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
2295    sha256(private_key)
2296}
2297
2298#[cfg(test)]
2299mod tests {
2300    use super::*;
2301
2302    #[test]
2303    fn send_recv_bytes_roundtrip() {
2304        let (mut c1, mut c2) = tcp_pair();
2305        let data = b"hello world";
2306        send_bytes(&mut c1, data).unwrap();
2307        let received = recv_bytes(&mut c2).unwrap();
2308        assert_eq!(&received, data);
2309    }
2310
2311    #[test]
2312    fn send_recv_empty() {
2313        let (mut c1, mut c2) = tcp_pair();
2314        send_bytes(&mut c1, b"").unwrap();
2315        let received = recv_bytes(&mut c2).unwrap();
2316        assert!(received.is_empty());
2317    }
2318
2319    #[test]
2320    fn auth_success() {
2321        let key = derive_auth_key(b"test-private-key");
2322        let (mut server, mut client) = tcp_pair();
2323
2324        let key2 = key;
2325        let t = thread::spawn(move || {
2326            client_auth(&mut client, &key2).unwrap();
2327        });
2328
2329        server_auth(&mut server, &key).unwrap();
2330        t.join().unwrap();
2331    }
2332
2333    #[test]
2334    fn auth_failure_wrong_key() {
2335        let server_key = derive_auth_key(b"server-key");
2336        let client_key = derive_auth_key(b"wrong-key");
2337        let (mut server, mut client) = tcp_pair();
2338
2339        let t = thread::spawn(move || {
2340            let result = client_auth(&mut client, &client_key);
2341            assert!(result.is_err());
2342        });
2343
2344        let result = server_auth(&mut server, &server_key);
2345        assert!(result.is_err());
2346        t.join().unwrap();
2347    }
2348
2349    #[test]
2350    fn verify_sha256_response() {
2351        let key = derive_auth_key(b"mykey");
2352        let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2353        let response = create_response(&key, message);
2354        assert!(response.starts_with(b"{sha256}"));
2355        assert!(verify_response(&key, message, &response));
2356    }
2357
2358    #[test]
2359    fn verify_legacy_md5_response() {
2360        let key = derive_auth_key(b"mykey");
2361        // Legacy message: 20 bytes, no prefix
2362        let message = b"01234567890123456789";
2363        // Create legacy response (raw HMAC-MD5)
2364        let digest = hmac_md5(&key, message);
2365        assert!(verify_response(&key, message, &digest));
2366    }
2367
2368    #[test]
2369    fn constant_time_eq_works() {
2370        assert!(constant_time_eq(b"hello", b"hello"));
2371        assert!(!constant_time_eq(b"hello", b"world"));
2372        assert!(!constant_time_eq(b"hello", b"hell"));
2373    }
2374
2375    #[test]
2376    fn rpc_roundtrip() {
2377        let key = derive_auth_key(b"test-key");
2378        let (event_tx, event_rx) = crate::event::channel();
2379
2380        // Start server
2381        // Bind manually to get the actual port
2382        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2383        let port = listener.local_addr().unwrap().port();
2384        listener.set_nonblocking(true).unwrap();
2385
2386        let shutdown = Arc::new(AtomicBool::new(false));
2387        let shutdown2 = shutdown.clone();
2388
2389        // Driver thread that handles queries
2390        let driver_thread = thread::spawn(move || loop {
2391            match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2392                Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2393                    let _ = resp_tx.send(QueryResponse::LinkCount(42));
2394                }
2395                Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2396                    let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2397                        interfaces: vec![SingleInterfaceStat {
2398                            id: 7,
2399                            name: "TestInterface".into(),
2400                            status: true,
2401                            mode: 1,
2402                            rxb: 1000,
2403                            txb: 2000,
2404                            rx_packets: 10,
2405                            tx_packets: 20,
2406                            bitrate: Some(10_000_000),
2407                            ifac_size: None,
2408                            started: 1000.0,
2409                            ia_freq: 0.0,
2410                            oa_freq: 0.0,
2411                            interface_type: "TestInterface".into(),
2412                        }],
2413                        transport_id: None,
2414                        transport_enabled: true,
2415                        transport_uptime: 3600.0,
2416                        total_rxb: 1000,
2417                        total_txb: 2000,
2418                        probe_responder: None,
2419                        backbone_peer_pool: None,
2420                    }));
2421                }
2422                _ => break,
2423            }
2424        });
2425
2426        let key2 = key;
2427        let shutdown3 = shutdown2.clone();
2428        let server_thread = thread::spawn(move || {
2429            rpc_server_loop(listener, key2, event_tx, shutdown3);
2430        });
2431
2432        // Give server time to start
2433        thread::sleep(std::time::Duration::from_millis(50));
2434
2435        // Client: connect and query link count
2436        let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2437        let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2438        let response = client
2439            .call(&PickleValue::Dict(vec![(
2440                PickleValue::String("get".into()),
2441                PickleValue::String("link_count".into()),
2442            )]))
2443            .unwrap();
2444        assert_eq!(response.as_int().unwrap(), 42);
2445        drop(client);
2446
2447        // Client: query interface stats
2448        let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2449        let response2 = client2
2450            .call(&PickleValue::Dict(vec![(
2451                PickleValue::String("get".into()),
2452                PickleValue::String("interface_stats".into()),
2453            )]))
2454            .unwrap();
2455        let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2456        assert_eq!(ifaces.len(), 1);
2457        let iface = &ifaces[0];
2458        assert_eq!(
2459            iface.get("name").unwrap().as_str().unwrap(),
2460            "TestInterface"
2461        );
2462        assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2463        drop(client2);
2464
2465        // Shutdown
2466        shutdown2.store(true, Ordering::Relaxed);
2467        server_thread.join().unwrap();
2468        driver_thread.join().unwrap();
2469    }
2470
2471    #[test]
2472    fn derive_auth_key_deterministic() {
2473        let key1 = derive_auth_key(b"test");
2474        let key2 = derive_auth_key(b"test");
2475        assert_eq!(key1, key2);
2476        // Different input → different key
2477        let key3 = derive_auth_key(b"other");
2478        assert_ne!(key1, key3);
2479    }
2480
2481    #[test]
2482    fn pickle_request_handling() {
2483        // Test the request → query translation without networking
2484        let (event_tx, event_rx) = crate::event::channel();
2485
2486        let driver = thread::spawn(move || {
2487            if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2488            {
2489                assert_eq!(dest_hash, [1u8; 16]);
2490                let _ = resp_tx.send(QueryResponse::DropPath(true));
2491            }
2492        });
2493
2494        let request = PickleValue::Dict(vec![
2495            (
2496                PickleValue::String("drop".into()),
2497                PickleValue::String("path".into()),
2498            ),
2499            (
2500                PickleValue::String("destination_hash".into()),
2501                PickleValue::Bytes(vec![1u8; 16]),
2502            ),
2503        ]);
2504
2505        let response = handle_rpc_request(&request, &event_tx).unwrap();
2506        assert_eq!(response, PickleValue::Bool(true));
2507        driver.join().unwrap();
2508    }
2509
2510    #[test]
2511    fn hook_list_request_handling() {
2512        let (event_tx, event_rx) = crate::event::channel();
2513
2514        let driver = thread::spawn(move || {
2515            if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2516                let _ = response_tx.send(vec![HookInfo {
2517                    name: "stats".into(),
2518                    attach_point: "PreIngress".into(),
2519                    priority: 7,
2520                    enabled: true,
2521                    consecutive_traps: 0,
2522                }]);
2523            }
2524        });
2525
2526        let request = PickleValue::Dict(vec![(
2527            PickleValue::String("get".into()),
2528            PickleValue::String("hooks".into()),
2529        )]);
2530        let response = handle_rpc_request(&request, &event_tx).unwrap();
2531        let hooks = parse_hook_list(&response).unwrap();
2532        assert_eq!(hooks.len(), 1);
2533        assert_eq!(hooks[0].name, "stats");
2534        driver.join().unwrap();
2535    }
2536
2537    #[test]
2538    fn hook_load_request_handling() {
2539        let (event_tx, event_rx) = crate::event::channel();
2540
2541        let driver = thread::spawn(move || {
2542            if let Ok(Event::LoadHook {
2543                name,
2544                wasm_bytes,
2545                attach_point,
2546                priority,
2547                response_tx,
2548            }) = event_rx.recv()
2549            {
2550                assert_eq!(name, "stats");
2551                assert_eq!(attach_point, "PreIngress");
2552                assert_eq!(priority, 11);
2553                assert_eq!(wasm_bytes, vec![1, 2, 3]);
2554                let _ = response_tx.send(Ok(()));
2555            }
2556        });
2557
2558        let request = PickleValue::Dict(vec![
2559            (
2560                PickleValue::String("hook".into()),
2561                PickleValue::String("load".into()),
2562            ),
2563            (
2564                PickleValue::String("name".into()),
2565                PickleValue::String("stats".into()),
2566            ),
2567            (
2568                PickleValue::String("attach_point".into()),
2569                PickleValue::String("PreIngress".into()),
2570            ),
2571            (PickleValue::String("priority".into()), PickleValue::Int(11)),
2572            (
2573                PickleValue::String("wasm".into()),
2574                PickleValue::Bytes(vec![1, 2, 3]),
2575            ),
2576        ]);
2577        let response = handle_rpc_request(&request, &event_tx).unwrap();
2578        assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2579        driver.join().unwrap();
2580    }
2581
2582    #[test]
2583    fn interface_stats_pickle_format() {
2584        let stats = InterfaceStatsResponse {
2585            interfaces: vec![SingleInterfaceStat {
2586                id: 1,
2587                name: "TCP".into(),
2588                status: true,
2589                mode: 1,
2590                rxb: 100,
2591                txb: 200,
2592                rx_packets: 5,
2593                tx_packets: 10,
2594                bitrate: Some(1000000),
2595                ifac_size: Some(16),
2596                started: 1000.0,
2597                ia_freq: 0.0,
2598                oa_freq: 0.0,
2599                interface_type: "TCPClientInterface".into(),
2600            }],
2601            transport_id: Some([0xAB; 16]),
2602            transport_enabled: true,
2603            transport_uptime: 3600.0,
2604            total_rxb: 100,
2605            total_txb: 200,
2606            probe_responder: None,
2607            backbone_peer_pool: None,
2608        };
2609
2610        let pickle = interface_stats_to_pickle(&stats);
2611
2612        // Verify it round-trips through encode/decode
2613        let encoded = pickle::encode(&pickle);
2614        let decoded = pickle::decode(&encoded).unwrap();
2615        assert_eq!(
2616            decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2617            true
2618        );
2619        let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2620        assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2621        assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2622    }
2623
2624    #[test]
2625    fn send_probe_rpc_unknown_dest() {
2626        let (event_tx, event_rx) = crate::event::channel();
2627
2628        let driver = thread::spawn(move || {
2629            if let Ok(Event::Query(
2630                QueryRequest::SendProbe {
2631                    dest_hash,
2632                    payload_size,
2633                },
2634                resp_tx,
2635            )) = event_rx.recv()
2636            {
2637                assert_eq!(dest_hash, [0xAA; 16]);
2638                assert_eq!(payload_size, 16); // default
2639                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2640            }
2641        });
2642
2643        let request = PickleValue::Dict(vec![(
2644            PickleValue::String("send_probe".into()),
2645            PickleValue::Bytes(vec![0xAA; 16]),
2646        )]);
2647
2648        let response = handle_rpc_request(&request, &event_tx).unwrap();
2649        assert_eq!(response, PickleValue::None);
2650        driver.join().unwrap();
2651    }
2652
2653    #[test]
2654    fn send_probe_rpc_with_result() {
2655        let (event_tx, event_rx) = crate::event::channel();
2656
2657        let packet_hash = [0xBB; 32];
2658        let driver = thread::spawn(move || {
2659            if let Ok(Event::Query(
2660                QueryRequest::SendProbe {
2661                    dest_hash,
2662                    payload_size,
2663                },
2664                resp_tx,
2665            )) = event_rx.recv()
2666            {
2667                assert_eq!(dest_hash, [0xCC; 16]);
2668                assert_eq!(payload_size, 32);
2669                let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
2670            }
2671        });
2672
2673        let request = PickleValue::Dict(vec![
2674            (
2675                PickleValue::String("send_probe".into()),
2676                PickleValue::Bytes(vec![0xCC; 16]),
2677            ),
2678            (PickleValue::String("size".into()), PickleValue::Int(32)),
2679        ]);
2680
2681        let response = handle_rpc_request(&request, &event_tx).unwrap();
2682        let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
2683        assert_eq!(ph, &[0xBB; 32]);
2684        assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
2685        driver.join().unwrap();
2686    }
2687
2688    #[test]
2689    fn send_probe_rpc_size_validation() {
2690        let (event_tx, event_rx) = crate::event::channel();
2691
2692        // Negative size should be clamped to default (16)
2693        let driver = thread::spawn(move || {
2694            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2695                event_rx.recv()
2696            {
2697                assert_eq!(payload_size, 16); // default, not negative
2698                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2699            }
2700        });
2701
2702        let request = PickleValue::Dict(vec![
2703            (
2704                PickleValue::String("send_probe".into()),
2705                PickleValue::Bytes(vec![0xDD; 16]),
2706            ),
2707            (PickleValue::String("size".into()), PickleValue::Int(-1)),
2708        ]);
2709
2710        let response = handle_rpc_request(&request, &event_tx).unwrap();
2711        assert_eq!(response, PickleValue::None);
2712        driver.join().unwrap();
2713    }
2714
2715    #[test]
2716    fn send_probe_rpc_size_too_large() {
2717        let (event_tx, event_rx) = crate::event::channel();
2718
2719        // Size > 400 should be clamped to default (16)
2720        let driver = thread::spawn(move || {
2721            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2722                event_rx.recv()
2723            {
2724                assert_eq!(payload_size, 16); // default, not 999
2725                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2726            }
2727        });
2728
2729        let request = PickleValue::Dict(vec![
2730            (
2731                PickleValue::String("send_probe".into()),
2732                PickleValue::Bytes(vec![0xDD; 16]),
2733            ),
2734            (PickleValue::String("size".into()), PickleValue::Int(999)),
2735        ]);
2736
2737        let response = handle_rpc_request(&request, &event_tx).unwrap();
2738        assert_eq!(response, PickleValue::None);
2739        driver.join().unwrap();
2740    }
2741
2742    #[test]
2743    fn check_proof_rpc_not_found() {
2744        let (event_tx, event_rx) = crate::event::channel();
2745
2746        let driver = thread::spawn(move || {
2747            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2748                event_rx.recv()
2749            {
2750                assert_eq!(packet_hash, [0xEE; 32]);
2751                let _ = resp_tx.send(QueryResponse::CheckProof(None));
2752            }
2753        });
2754
2755        let request = PickleValue::Dict(vec![(
2756            PickleValue::String("check_proof".into()),
2757            PickleValue::Bytes(vec![0xEE; 32]),
2758        )]);
2759
2760        let response = handle_rpc_request(&request, &event_tx).unwrap();
2761        assert_eq!(response, PickleValue::None);
2762        driver.join().unwrap();
2763    }
2764
2765    #[test]
2766    fn check_proof_rpc_found() {
2767        let (event_tx, event_rx) = crate::event::channel();
2768
2769        let driver = thread::spawn(move || {
2770            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2771                event_rx.recv()
2772            {
2773                assert_eq!(packet_hash, [0xFF; 32]);
2774                let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
2775            }
2776        });
2777
2778        let request = PickleValue::Dict(vec![(
2779            PickleValue::String("check_proof".into()),
2780            PickleValue::Bytes(vec![0xFF; 32]),
2781        )]);
2782
2783        let response = handle_rpc_request(&request, &event_tx).unwrap();
2784        if let PickleValue::Float(rtt) = response {
2785            assert!((rtt - 0.352).abs() < 0.001);
2786        } else {
2787            panic!("Expected Float, got {:?}", response);
2788        }
2789        driver.join().unwrap();
2790    }
2791
2792    #[test]
2793    fn request_path_rpc() {
2794        let (event_tx, event_rx) = crate::event::channel();
2795
2796        let driver =
2797            thread::spawn(
2798                move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2799                    Ok(Event::RequestPath { dest_hash }) => {
2800                        assert_eq!(dest_hash, [0x11; 16]);
2801                    }
2802                    other => panic!("Expected RequestPath event, got {:?}", other),
2803                },
2804            );
2805
2806        let request = PickleValue::Dict(vec![(
2807            PickleValue::String("request_path".into()),
2808            PickleValue::Bytes(vec![0x11; 16]),
2809        )]);
2810
2811        let response = handle_rpc_request(&request, &event_tx).unwrap();
2812        assert_eq!(response, PickleValue::Bool(true));
2813        driver.join().unwrap();
2814    }
2815
2816    #[test]
2817    fn begin_drain_rpc_emits_event() {
2818        let (event_tx, event_rx) = crate::event::channel();
2819
2820        let driver = thread::spawn(
2821            move || match event_rx.recv_timeout(Duration::from_secs(5)) {
2822                Ok(Event::BeginDrain { timeout }) => {
2823                    assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
2824                }
2825                other => panic!("Expected BeginDrain event, got {:?}", other),
2826            },
2827        );
2828
2829        let request = PickleValue::Dict(vec![(
2830            PickleValue::String("begin_drain".into()),
2831            PickleValue::Float(1.5),
2832        )]);
2833
2834        let response = handle_rpc_request(&request, &event_tx).unwrap();
2835        assert_eq!(response, PickleValue::Bool(true));
2836        driver.join().unwrap();
2837    }
2838
2839    #[test]
2840    fn drain_status_rpc_roundtrips_fields() {
2841        let (event_tx, event_rx) = crate::event::channel();
2842
2843        let driver = thread::spawn(move || {
2844            if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
2845                let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
2846                    state: LifecycleState::Draining,
2847                    drain_age_seconds: Some(0.75),
2848                    deadline_remaining_seconds: Some(2.25),
2849                    drain_complete: false,
2850                    interface_writer_queued_frames: 3,
2851                    provider_backlog_events: 4,
2852                    provider_consumer_queued_events: 5,
2853                    detail: Some("node is draining existing work".into()),
2854                }));
2855            }
2856        });
2857
2858        let request = PickleValue::Dict(vec![(
2859            PickleValue::String("get".into()),
2860            PickleValue::String("drain_status".into()),
2861        )]);
2862
2863        let response = handle_rpc_request(&request, &event_tx).unwrap();
2864        assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
2865        assert_eq!(
2866            response.get("drain_complete").unwrap().as_bool(),
2867            Some(false)
2868        );
2869        assert_eq!(
2870            response
2871                .get("deadline_remaining_seconds")
2872                .unwrap()
2873                .as_float(),
2874            Some(2.25)
2875        );
2876        assert_eq!(
2877            response
2878                .get("interface_writer_queued_frames")
2879                .unwrap()
2880                .as_int(),
2881            Some(3)
2882        );
2883        assert_eq!(
2884            response.get("provider_backlog_events").unwrap().as_int(),
2885            Some(4)
2886        );
2887        assert_eq!(
2888            response
2889                .get("provider_consumer_queued_events")
2890                .unwrap()
2891                .as_int(),
2892            Some(5)
2893        );
2894        assert_eq!(
2895            response.get("detail").unwrap().as_str(),
2896            Some("node is draining existing work")
2897        );
2898        driver.join().unwrap();
2899    }
2900
2901    #[test]
2902    fn interface_stats_with_probe_responder() {
2903        let probe_hash = [0x42; 16];
2904        let stats = InterfaceStatsResponse {
2905            interfaces: vec![],
2906            transport_id: None,
2907            transport_enabled: true,
2908            transport_uptime: 100.0,
2909            total_rxb: 0,
2910            total_txb: 0,
2911            probe_responder: Some(probe_hash),
2912            backbone_peer_pool: None,
2913        };
2914
2915        let pickle = interface_stats_to_pickle(&stats);
2916        let encoded = pickle::encode(&pickle);
2917        let decoded = pickle::decode(&encoded).unwrap();
2918
2919        let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
2920        assert_eq!(pr, &probe_hash);
2921    }
2922
2923    #[test]
2924    fn runtime_config_get_and_set_rpc() {
2925        let (event_tx, event_rx) = crate::event::channel();
2926
2927        let driver = thread::spawn(move || {
2928            if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
2929                event_rx.recv()
2930            {
2931                assert_eq!(key, "global.tick_interval_ms");
2932                let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
2933                    RuntimeConfigEntry {
2934                        key,
2935                        value: RuntimeConfigValue::Int(1000),
2936                        default: RuntimeConfigValue::Int(1000),
2937                        source: RuntimeConfigSource::Startup,
2938                        apply_mode: RuntimeConfigApplyMode::Immediate,
2939                        description: Some("tick".into()),
2940                    },
2941                )));
2942            } else {
2943                panic!("expected GetRuntimeConfig query");
2944            }
2945
2946            if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
2947                event_rx.recv()
2948            {
2949                assert_eq!(key, "global.tick_interval_ms");
2950                assert_eq!(value, RuntimeConfigValue::Int(250));
2951                let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
2952                    key,
2953                    value: RuntimeConfigValue::Int(250),
2954                    default: RuntimeConfigValue::Int(1000),
2955                    source: RuntimeConfigSource::RuntimeOverride,
2956                    apply_mode: RuntimeConfigApplyMode::Immediate,
2957                    description: Some("tick".into()),
2958                })));
2959            } else {
2960                panic!("expected SetRuntimeConfig query");
2961            }
2962        });
2963
2964        let get_request = PickleValue::Dict(vec![
2965            (
2966                PickleValue::String("get".into()),
2967                PickleValue::String("runtime_config_entry".into()),
2968            ),
2969            (
2970                PickleValue::String("key".into()),
2971                PickleValue::String("global.tick_interval_ms".into()),
2972            ),
2973        ]);
2974        let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
2975        assert_eq!(
2976            get_response.get("key").and_then(|v| v.as_str()),
2977            Some("global.tick_interval_ms")
2978        );
2979
2980        let set_request = PickleValue::Dict(vec![
2981            (
2982                PickleValue::String("set".into()),
2983                PickleValue::String("runtime_config".into()),
2984            ),
2985            (
2986                PickleValue::String("key".into()),
2987                PickleValue::String("global.tick_interval_ms".into()),
2988            ),
2989            (PickleValue::String("value".into()), PickleValue::Int(250)),
2990        ]);
2991        let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
2992        assert_eq!(
2993            set_response.get("value").and_then(|v| v.as_int()),
2994            Some(250)
2995        );
2996
2997        driver.join().unwrap();
2998    }
2999
3000    // Helper: create a connected TCP pair
3001    fn tcp_pair() -> (TcpStream, TcpStream) {
3002        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3003        let port = listener.local_addr().unwrap().port();
3004        let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
3005        let (server, _) = listener.accept().unwrap();
3006        client
3007            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3008            .unwrap();
3009        server
3010            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3011            .unwrap();
3012        (server, client)
3013    }
3014}