Skip to main content

rns_net/
rpc.rs

1//! RPC server and client for cross-process daemon communication.
2//!
3//! Implements Python `multiprocessing.connection` wire protocol:
4//! - 4-byte big-endian signed i32 length prefix + payload
5//! - HMAC-SHA256 challenge-response authentication
6//! - Pickle serialization for request/response dictionaries
7//!
8//! Server translates pickle dicts into [`QueryRequest`] events, sends
9//! them through the driver event channel, and returns pickle responses.
10
11use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15    atomic::{AtomicBool, Ordering},
16    Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_crypto::hmac::hmac_sha256;
22use rns_crypto::sha256::sha256;
23
24use crate::event::{
25    BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
26    HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, PathTableEntry,
27    ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode,
28    RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource,
29    RuntimeConfigValue, SingleInterfaceStat,
30};
31use crate::md5::hmac_md5;
32use crate::pickle::{self, PickleValue};
33
34const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
35const WELCOME: &[u8] = b"#WELCOME#";
36const FAILURE: &[u8] = b"#FAILURE#";
37const CHALLENGE_LEN: usize = 40;
38
39/// RPC address types.
40#[derive(Debug, Clone)]
41pub enum RpcAddr {
42    Tcp(String, u16),
43}
44
45/// RPC server that listens for incoming connections and handles queries.
46pub struct RpcServer {
47    shutdown: Arc<AtomicBool>,
48    thread: Option<thread::JoinHandle<()>>,
49}
50
51impl RpcServer {
52    /// Start the RPC server on the given address.
53    pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
54        let shutdown = Arc::new(AtomicBool::new(false));
55        let shutdown2 = shutdown.clone();
56
57        let listener = match addr {
58            RpcAddr::Tcp(host, port) => {
59                let l = TcpListener::bind((host.as_str(), *port))?;
60                // Non-blocking so we can check shutdown flag
61                l.set_nonblocking(true)?;
62                l
63            }
64        };
65
66        let thread = thread::Builder::new()
67            .name("rpc-server".into())
68            .spawn(move || {
69                rpc_server_loop(listener, auth_key, event_tx, shutdown2);
70            })
71            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
72
73        Ok(RpcServer {
74            shutdown,
75            thread: Some(thread),
76        })
77    }
78
79    /// Stop the RPC server.
80    pub fn stop(&mut self) {
81        self.shutdown.store(true, Ordering::Relaxed);
82        if let Some(handle) = self.thread.take() {
83            let _ = handle.join();
84        }
85    }
86}
87
88impl Drop for RpcServer {
89    fn drop(&mut self) {
90        self.stop();
91    }
92}
93
94fn rpc_server_loop(
95    listener: TcpListener,
96    auth_key: [u8; 32],
97    event_tx: EventSender,
98    shutdown: Arc<AtomicBool>,
99) {
100    loop {
101        if shutdown.load(Ordering::Relaxed) {
102            break;
103        }
104
105        match listener.accept() {
106            Ok((stream, _addr)) => {
107                // Set blocking for this connection
108                let _ = stream.set_nonblocking(false);
109                let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
110                let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
111
112                if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
113                    log::debug!("RPC connection error: {}", e);
114                }
115            }
116            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117                // No pending connection, sleep briefly and retry
118                thread::sleep(std::time::Duration::from_millis(100));
119            }
120            Err(e) => {
121                log::error!("RPC accept error: {}", e);
122                thread::sleep(std::time::Duration::from_millis(100));
123            }
124        }
125    }
126}
127
128fn handle_connection(
129    mut stream: TcpStream,
130    auth_key: &[u8; 32],
131    event_tx: &EventSender,
132) -> io::Result<()> {
133    // Authentication: send challenge, verify response
134    server_auth(&mut stream, auth_key)?;
135
136    // Read request (pickle dict)
137    let request_bytes = recv_bytes(&mut stream)?;
138    let request = pickle::decode(&request_bytes)
139        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
140
141    // Translate pickle dict to query, send to driver, get response
142    let response = handle_rpc_request(&request, event_tx)?;
143
144    // Encode response and send
145    let response_bytes = pickle::encode(&response);
146    send_bytes(&mut stream, &response_bytes)?;
147
148    Ok(())
149}
150
151/// Server-side authentication: challenge-response.
152fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
153    // Generate challenge: #CHALLENGE#{sha256}<40 random bytes>
154    let mut random_bytes = [0u8; CHALLENGE_LEN];
155    // Use /dev/urandom for randomness
156    {
157        let mut f = std::fs::File::open("/dev/urandom")?;
158        f.read_exact(&mut random_bytes)?;
159    }
160
161    let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
162    challenge_message.extend_from_slice(CHALLENGE_PREFIX);
163    challenge_message.extend_from_slice(b"{sha256}");
164    challenge_message.extend_from_slice(&random_bytes);
165
166    send_bytes(stream, &challenge_message)?;
167
168    // Read response (max 256 bytes)
169    let response = recv_bytes(stream)?;
170
171    // Verify response
172    // The message to HMAC is everything after #CHALLENGE# (i.e. {sha256}<random>)
173    let message = &challenge_message[CHALLENGE_PREFIX.len()..];
174
175    if verify_response(auth_key, message, &response) {
176        send_bytes(stream, WELCOME)?;
177        Ok(())
178    } else {
179        send_bytes(stream, FAILURE)?;
180        Err(io::Error::new(
181            io::ErrorKind::PermissionDenied,
182            "auth failed",
183        ))
184    }
185}
186
187/// Verify a client's HMAC response.
188fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
189    // Modern protocol: response = {sha256}<hmac-sha256 digest>
190    if response.starts_with(b"{sha256}") {
191        let digest = &response[8..];
192        let expected = hmac_sha256(auth_key, message);
193        constant_time_eq(digest, &expected)
194    }
195    // Legacy protocol: response = raw 16-byte HMAC-MD5 digest
196    else if response.len() == 16 {
197        let expected = hmac_md5(auth_key, message);
198        constant_time_eq(response, &expected)
199    }
200    // Legacy with {md5} prefix
201    else if response.starts_with(b"{md5}") {
202        let digest = &response[5..];
203        let expected = hmac_md5(auth_key, message);
204        constant_time_eq(digest, &expected)
205    } else {
206        false
207    }
208}
209
210/// Constant-time byte comparison.
211fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
212    if a.len() != b.len() {
213        return false;
214    }
215    let mut diff = 0u8;
216    for (x, y) in a.iter().zip(b.iter()) {
217        diff |= x ^ y;
218    }
219    diff == 0
220}
221
222/// Send bytes with 4-byte big-endian length prefix.
223fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
224    let len = data.len() as i32;
225    stream.write_all(&len.to_be_bytes())?;
226    stream.write_all(data)?;
227    stream.flush()
228}
229
230/// Receive bytes with 4-byte big-endian length prefix.
231fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
232    let mut len_buf = [0u8; 4];
233    stream.read_exact(&mut len_buf)?;
234    let len = i32::from_be_bytes(len_buf);
235
236    if len < 0 {
237        // Extended format: 8-byte length
238        let mut len8_buf = [0u8; 8];
239        stream.read_exact(&mut len8_buf)?;
240        let len = u64::from_be_bytes(len8_buf) as usize;
241        if len > 64 * 1024 * 1024 {
242            return Err(io::Error::new(
243                io::ErrorKind::InvalidData,
244                "message too large",
245            ));
246        }
247        let mut buf = vec![0u8; len];
248        stream.read_exact(&mut buf)?;
249        Ok(buf)
250    } else {
251        let len = len as usize;
252        if len > 64 * 1024 * 1024 {
253            return Err(io::Error::new(
254                io::ErrorKind::InvalidData,
255                "message too large",
256            ));
257        }
258        let mut buf = vec![0u8; len];
259        stream.read_exact(&mut buf)?;
260        Ok(buf)
261    }
262}
263
264/// Translate a pickle request dict to a query event and get response.
265fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
266    // Handle "get" requests
267    if let Some(get_val) = request.get("get") {
268        if let Some(path) = get_val.as_str() {
269            return match path {
270                "interface_stats" => {
271                    let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
272                    if let QueryResponse::InterfaceStats(stats) = resp {
273                        Ok(interface_stats_to_pickle(&stats))
274                    } else {
275                        Ok(PickleValue::None)
276                    }
277                }
278                "path_table" => {
279                    let max_hops = request
280                        .get("max_hops")
281                        .and_then(|v| v.as_int().map(|n| n as u8));
282                    let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
283                    if let QueryResponse::PathTable(entries) = resp {
284                        Ok(path_table_to_pickle(&entries))
285                    } else {
286                        Ok(PickleValue::None)
287                    }
288                }
289                "rate_table" => {
290                    let resp = send_query(event_tx, QueryRequest::RateTable)?;
291                    if let QueryResponse::RateTable(entries) = resp {
292                        Ok(rate_table_to_pickle(&entries))
293                    } else {
294                        Ok(PickleValue::None)
295                    }
296                }
297                "next_hop" => {
298                    let hash = extract_dest_hash(request, "destination_hash")?;
299                    let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
300                    if let QueryResponse::NextHop(Some(nh)) = resp {
301                        Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
302                    } else {
303                        Ok(PickleValue::None)
304                    }
305                }
306                "next_hop_if_name" => {
307                    let hash = extract_dest_hash(request, "destination_hash")?;
308                    let resp =
309                        send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
310                    if let QueryResponse::NextHopIfName(Some(name)) = resp {
311                        Ok(PickleValue::String(name))
312                    } else {
313                        Ok(PickleValue::None)
314                    }
315                }
316                "link_count" => {
317                    let resp = send_query(event_tx, QueryRequest::LinkCount)?;
318                    if let QueryResponse::LinkCount(n) = resp {
319                        Ok(PickleValue::Int(n as i64))
320                    } else {
321                        Ok(PickleValue::None)
322                    }
323                }
324                "transport_identity" => {
325                    let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
326                    if let QueryResponse::TransportIdentity(Some(hash)) = resp {
327                        Ok(PickleValue::Bytes(hash.to_vec()))
328                    } else {
329                        Ok(PickleValue::None)
330                    }
331                }
332                "blackholed" => {
333                    let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
334                    if let QueryResponse::Blackholed(entries) = resp {
335                        Ok(blackholed_to_pickle(&entries))
336                    } else {
337                        Ok(PickleValue::None)
338                    }
339                }
340                "discovered_interfaces" => {
341                    let only_available = request
342                        .get("only_available")
343                        .and_then(|v| v.as_bool())
344                        .unwrap_or(false);
345                    let only_transport = request
346                        .get("only_transport")
347                        .and_then(|v| v.as_bool())
348                        .unwrap_or(false);
349                    let resp = send_query(
350                        event_tx,
351                        QueryRequest::DiscoveredInterfaces {
352                            only_available,
353                            only_transport,
354                        },
355                    )?;
356                    if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
357                        Ok(discovered_interfaces_to_pickle(&interfaces))
358                    } else {
359                        Ok(PickleValue::None)
360                    }
361                }
362                "hooks" => {
363                    let (response_tx, response_rx) = mpsc::channel();
364                    event_tx
365                        .send(Event::ListHooks { response_tx })
366                        .map_err(|_| {
367                            io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
368                        })?;
369                    let hooks = response_rx
370                        .recv_timeout(std::time::Duration::from_secs(5))
371                        .map_err(|_| {
372                            io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
373                        })?;
374                    Ok(hooks_to_pickle(&hooks))
375                }
376                "runtime_config" => {
377                    let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
378                    if let QueryResponse::RuntimeConfigList(entries) = resp {
379                        Ok(runtime_config_list_to_pickle(&entries))
380                    } else {
381                        Ok(PickleValue::None)
382                    }
383                }
384                "known_destinations" => {
385                    let resp = send_query(event_tx, QueryRequest::KnownDestinations)?;
386                    if let QueryResponse::KnownDestinations(entries) = resp {
387                        Ok(known_destinations_to_pickle(&entries))
388                    } else {
389                        Ok(PickleValue::None)
390                    }
391                }
392                "runtime_config_entry" => {
393                    let key = request
394                        .get("key")
395                        .and_then(|v| v.as_str())
396                        .unwrap_or_default()
397                        .to_string();
398                    let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
399                    if let QueryResponse::RuntimeConfigEntry(entry) = resp {
400                        Ok(entry
401                            .as_ref()
402                            .map(runtime_config_entry_to_pickle)
403                            .unwrap_or(PickleValue::None))
404                    } else {
405                        Ok(PickleValue::None)
406                    }
407                }
408                "backbone_peer_state" => {
409                    let interface_name = request
410                        .get("interface")
411                        .and_then(|v| v.as_str())
412                        .map(|s| s.to_string());
413                    let resp =
414                        send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
415                    if let QueryResponse::BackbonePeerState(entries) = resp {
416                        Ok(backbone_peer_state_to_pickle(&entries))
417                    } else {
418                        Ok(PickleValue::None)
419                    }
420                }
421                "backbone_interfaces" => {
422                    let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
423                    if let QueryResponse::BackboneInterfaces(entries) = resp {
424                        Ok(backbone_interfaces_to_pickle(&entries))
425                    } else {
426                        Ok(PickleValue::None)
427                    }
428                }
429                "provider_bridge_stats" => {
430                    let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
431                    if let QueryResponse::ProviderBridgeStats(stats) = resp {
432                        Ok(stats
433                            .as_ref()
434                            .map(provider_bridge_stats_to_pickle)
435                            .unwrap_or(PickleValue::None))
436                    } else {
437                        Ok(PickleValue::None)
438                    }
439                }
440                "drain_status" => {
441                    let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
442                    if let QueryResponse::DrainStatus(status) = resp {
443                        Ok(drain_status_to_pickle(&status))
444                    } else {
445                        Ok(PickleValue::None)
446                    }
447                }
448                _ => Ok(PickleValue::None),
449            };
450        }
451    }
452
453    if let Some(begin_val) = request.get("begin_drain") {
454        let timeout_secs = begin_val
455            .as_float()
456            .or_else(|| begin_val.as_int().map(|value| value as f64))
457            .unwrap_or(0.0)
458            .max(0.0);
459        let timeout = Duration::from_secs_f64(timeout_secs);
460        let _ = event_tx.send(Event::BeginDrain { timeout });
461        return Ok(PickleValue::Bool(true));
462    }
463
464    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
465        if set_val == "known_destination_retained" {
466            let dest_hash = extract_dest_hash(request, "dest_hash")?;
467            let resp = send_query(event_tx, QueryRequest::RetainKnownDestination { dest_hash })?;
468            return if let QueryResponse::RetainKnownDestination(ok) = resp {
469                Ok(PickleValue::Bool(ok))
470            } else {
471                Ok(PickleValue::None)
472            };
473        }
474        if set_val == "known_destination_used" {
475            let dest_hash = extract_dest_hash(request, "dest_hash")?;
476            let resp = send_query(
477                event_tx,
478                QueryRequest::MarkKnownDestinationUsed { dest_hash },
479            )?;
480            return if let QueryResponse::MarkKnownDestinationUsed(ok) = resp {
481                Ok(PickleValue::Bool(ok))
482            } else {
483                Ok(PickleValue::None)
484            };
485        }
486        if set_val == "runtime_config" {
487            let key = request
488                .get("key")
489                .and_then(|v| v.as_str())
490                .unwrap_or_default()
491                .to_string();
492            let Some(value) = request
493                .get("value")
494                .and_then(runtime_config_value_from_pickle)
495            else {
496                return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
497                    code: RuntimeConfigErrorCode::InvalidType,
498                    message: "runtime-config set requires a scalar value".into(),
499                }));
500            };
501            let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
502            return if let QueryResponse::RuntimeConfigSet(result) = resp {
503                Ok(runtime_config_result_to_pickle(result))
504            } else {
505                Ok(PickleValue::None)
506            };
507        }
508    }
509
510    if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
511        if reset_val == "runtime_config" {
512            let key = request
513                .get("key")
514                .and_then(|v| v.as_str())
515                .unwrap_or_default()
516                .to_string();
517            let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
518            return if let QueryResponse::RuntimeConfigReset(result) = resp {
519                Ok(runtime_config_result_to_pickle(result))
520            } else {
521                Ok(PickleValue::None)
522            };
523        }
524    }
525
526    if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
527        if clear_val == "known_destination_retained" {
528            let dest_hash = extract_dest_hash(request, "dest_hash")?;
529            let resp = send_query(
530                event_tx,
531                QueryRequest::UnretainKnownDestination { dest_hash },
532            )?;
533            return if let QueryResponse::UnretainKnownDestination(ok) = resp {
534                Ok(PickleValue::Bool(ok))
535            } else {
536                Ok(PickleValue::None)
537            };
538        }
539        if clear_val == "backbone_peer_state" {
540            let interface_name = required_string(request, "interface")?;
541            let peer_ip = required_string(request, "ip")?;
542            let peer_ip = peer_ip
543                .parse()
544                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
545            let resp = send_query(
546                event_tx,
547                QueryRequest::ClearBackbonePeerState {
548                    interface_name,
549                    peer_ip,
550                },
551            )?;
552            return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
553                Ok(PickleValue::Bool(ok))
554            } else {
555                Ok(PickleValue::None)
556            };
557        }
558    }
559
560    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
561        if set_val == "backbone_peer_blacklist" {
562            let interface_name = required_string(request, "interface")?;
563            let peer_ip = required_string(request, "ip")?;
564            let peer_ip: IpAddr = peer_ip
565                .parse()
566                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
567            let duration_secs = request
568                .get("duration_secs")
569                .and_then(|v| v.as_int())
570                .ok_or_else(|| {
571                    io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
572                })?;
573            let reason = request
574                .get("reason")
575                .and_then(|v| v.as_str())
576                .unwrap_or("sentinel blacklist")
577                .to_string();
578            let penalty_level = request
579                .get("penalty_level")
580                .and_then(|v| v.as_int())
581                .unwrap_or(0)
582                .clamp(0, u8::MAX as i64) as u8;
583            let resp = send_query(
584                event_tx,
585                QueryRequest::BlacklistBackbonePeer {
586                    interface_name,
587                    peer_ip,
588                    duration: Duration::from_secs(duration_secs as u64),
589                    reason,
590                    penalty_level,
591                },
592            )?;
593            return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
594                Ok(PickleValue::Bool(ok))
595            } else {
596                Ok(PickleValue::None)
597            };
598        }
599    }
600
601    // Handle "request_path" -- trigger a path request to the network
602    if let Some(hash_val) = request.get("request_path") {
603        if let Some(hash_bytes) = hash_val.as_bytes() {
604            if hash_bytes.len() >= 16 {
605                let mut dest_hash = [0u8; 16];
606                dest_hash.copy_from_slice(&hash_bytes[..16]);
607                let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
608                return Ok(PickleValue::Bool(true));
609            }
610        }
611    }
612
613    // Handle "send_probe" requests
614    if let Some(hash_val) = request.get("send_probe") {
615        if let Some(hash_bytes) = hash_val.as_bytes() {
616            if hash_bytes.len() >= 16 {
617                let mut dest_hash = [0u8; 16];
618                dest_hash.copy_from_slice(&hash_bytes[..16]);
619                let payload_size = request
620                    .get("size")
621                    .and_then(|v| v.as_int())
622                    .and_then(|n| {
623                        if n > 0 && n <= 400 {
624                            Some(n as usize)
625                        } else {
626                            None
627                        }
628                    })
629                    .unwrap_or(16);
630                let resp = send_query(
631                    event_tx,
632                    QueryRequest::SendProbe {
633                        dest_hash,
634                        payload_size,
635                    },
636                )?;
637                if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
638                    return Ok(PickleValue::Dict(vec![
639                        (
640                            PickleValue::String("packet_hash".into()),
641                            PickleValue::Bytes(packet_hash.to_vec()),
642                        ),
643                        (
644                            PickleValue::String("hops".into()),
645                            PickleValue::Int(hops as i64),
646                        ),
647                    ]));
648                } else {
649                    return Ok(PickleValue::None);
650                }
651            }
652        }
653    }
654
655    // Handle "check_proof" requests
656    if let Some(hash_val) = request.get("check_proof") {
657        if let Some(hash_bytes) = hash_val.as_bytes() {
658            if hash_bytes.len() >= 32 {
659                let mut packet_hash = [0u8; 32];
660                packet_hash.copy_from_slice(&hash_bytes[..32]);
661                let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
662                if let QueryResponse::CheckProof(Some(rtt)) = resp {
663                    return Ok(PickleValue::Float(rtt));
664                } else {
665                    return Ok(PickleValue::None);
666                }
667            }
668        }
669    }
670
671    // Handle "blackhole" requests
672    if let Some(hash_val) = request.get("blackhole") {
673        if let Some(hash_bytes) = hash_val.as_bytes() {
674            if hash_bytes.len() >= 16 {
675                let mut identity_hash = [0u8; 16];
676                identity_hash.copy_from_slice(&hash_bytes[..16]);
677                let duration_hours = request.get("duration").and_then(|v| v.as_float());
678                let reason = request
679                    .get("reason")
680                    .and_then(|v| v.as_str())
681                    .map(|s| s.to_string());
682                let resp = send_query(
683                    event_tx,
684                    QueryRequest::BlackholeIdentity {
685                        identity_hash,
686                        duration_hours,
687                        reason,
688                    },
689                )?;
690                return Ok(PickleValue::Bool(matches!(
691                    resp,
692                    QueryResponse::BlackholeResult(true)
693                )));
694            }
695        }
696    }
697
698    // Handle "unblackhole" requests
699    if let Some(hash_val) = request.get("unblackhole") {
700        if let Some(hash_bytes) = hash_val.as_bytes() {
701            if hash_bytes.len() >= 16 {
702                let mut identity_hash = [0u8; 16];
703                identity_hash.copy_from_slice(&hash_bytes[..16]);
704                let resp = send_query(
705                    event_tx,
706                    QueryRequest::UnblackholeIdentity { identity_hash },
707                )?;
708                return Ok(PickleValue::Bool(matches!(
709                    resp,
710                    QueryResponse::UnblackholeResult(true)
711                )));
712            }
713        }
714    }
715
716    // Handle "drop" requests
717    if let Some(drop_val) = request.get("drop") {
718        if let Some(path) = drop_val.as_str() {
719            return match path {
720                "path" => {
721                    let hash = extract_dest_hash(request, "destination_hash")?;
722                    let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
723                    if let QueryResponse::DropPath(ok) = resp {
724                        Ok(PickleValue::Bool(ok))
725                    } else {
726                        Ok(PickleValue::None)
727                    }
728                }
729                "all_via" => {
730                    let hash = extract_dest_hash(request, "destination_hash")?;
731                    let resp = send_query(
732                        event_tx,
733                        QueryRequest::DropAllVia {
734                            transport_hash: hash,
735                        },
736                    )?;
737                    if let QueryResponse::DropAllVia(n) = resp {
738                        Ok(PickleValue::Int(n as i64))
739                    } else {
740                        Ok(PickleValue::None)
741                    }
742                }
743                "announce_queues" => {
744                    let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
745                    if let QueryResponse::DropAnnounceQueues = resp {
746                        Ok(PickleValue::Bool(true))
747                    } else {
748                        Ok(PickleValue::None)
749                    }
750                }
751                _ => Ok(PickleValue::None),
752            };
753        }
754    }
755
756    if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
757        return handle_hook_rpc_request(hook_val, request, event_tx);
758    }
759
760    Ok(PickleValue::None)
761}
762
763fn handle_hook_rpc_request(
764    op: &str,
765    request: &PickleValue,
766    event_tx: &EventSender,
767) -> io::Result<PickleValue> {
768    match op {
769        "load" => {
770            let name = required_string(request, "name")?;
771            let attach_point = required_string(request, "attach_point")?;
772            let priority = request
773                .get("priority")
774                .and_then(|v| v.as_int())
775                .unwrap_or(0) as i32;
776            let wasm = request
777                .get("wasm")
778                .and_then(|v| v.as_bytes())
779                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
780                .to_vec();
781            let (response_tx, response_rx) = mpsc::channel();
782            event_tx
783                .send(Event::LoadHook {
784                    name,
785                    wasm_bytes: wasm,
786                    attach_point,
787                    priority,
788                    response_tx,
789                })
790                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
791            let response = response_rx
792                .recv_timeout(std::time::Duration::from_secs(5))
793                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
794            Ok(hook_result_to_pickle(response))
795        }
796        "load_builtin" => {
797            let name = required_string(request, "name")?;
798            let attach_point = required_string(request, "attach_point")?;
799            let builtin_id = required_string(request, "builtin_id")?;
800            let priority = request
801                .get("priority")
802                .and_then(|v| v.as_int())
803                .unwrap_or(0) as i32;
804            let (response_tx, response_rx) = mpsc::channel();
805            event_tx
806                .send(Event::LoadBuiltinHook {
807                    name,
808                    builtin_id,
809                    attach_point,
810                    priority,
811                    response_tx,
812                })
813                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
814            let response = response_rx
815                .recv_timeout(std::time::Duration::from_secs(5))
816                .map_err(|_| {
817                    io::Error::new(io::ErrorKind::TimedOut, "built-in hook load timed out")
818                })?;
819            Ok(hook_result_to_pickle(response))
820        }
821        "unload" => {
822            let name = required_string(request, "name")?;
823            let attach_point = required_string(request, "attach_point")?;
824            let (response_tx, response_rx) = mpsc::channel();
825            event_tx
826                .send(Event::UnloadHook {
827                    name,
828                    attach_point,
829                    response_tx,
830                })
831                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
832            let response = response_rx
833                .recv_timeout(std::time::Duration::from_secs(5))
834                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
835            Ok(hook_result_to_pickle(response))
836        }
837        "enable" | "disable" => {
838            let name = required_string(request, "name")?;
839            let attach_point = required_string(request, "attach_point")?;
840            let enabled = op == "enable";
841            let (response_tx, response_rx) = mpsc::channel();
842            event_tx
843                .send(Event::SetHookEnabled {
844                    name,
845                    attach_point,
846                    enabled,
847                    response_tx,
848                })
849                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
850            let response = response_rx
851                .recv_timeout(std::time::Duration::from_secs(5))
852                .map_err(|_| {
853                    io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
854                })?;
855            Ok(hook_result_to_pickle(response))
856        }
857        "set_priority" => {
858            let name = required_string(request, "name")?;
859            let attach_point = required_string(request, "attach_point")?;
860            let priority = request
861                .get("priority")
862                .and_then(|v| v.as_int())
863                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
864                as i32;
865            let (response_tx, response_rx) = mpsc::channel();
866            event_tx
867                .send(Event::SetHookPriority {
868                    name,
869                    attach_point,
870                    priority,
871                    response_tx,
872                })
873                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
874            let response = response_rx
875                .recv_timeout(std::time::Duration::from_secs(5))
876                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
877            Ok(hook_result_to_pickle(response))
878        }
879        _ => Ok(PickleValue::None),
880    }
881}
882
883/// Send a query to the driver and wait for the response.
884fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
885    let (resp_tx, resp_rx) = mpsc::channel();
886    event_tx
887        .send(Event::Query(request, resp_tx))
888        .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
889    resp_rx
890        .recv_timeout(std::time::Duration::from_secs(5))
891        .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
892}
893
894/// Extract a 16-byte destination hash from a pickle dict field.
895fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
896    let bytes = request
897        .get(key)
898        .and_then(|v| v.as_bytes())
899        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
900    if bytes.len() < 16 {
901        return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
902    }
903    let mut hash = [0u8; 16];
904    hash.copy_from_slice(&bytes[..16]);
905    Ok(hash)
906}
907
908fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
909    request
910        .get(key)
911        .and_then(|v| v.as_str())
912        .map(|s| s.to_string())
913        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
914}
915
916fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
917    match result {
918        Ok(()) => PickleValue::Dict(vec![(
919            PickleValue::String("ok".into()),
920            PickleValue::Bool(true),
921        )]),
922        Err(error) => PickleValue::Dict(vec![
923            (PickleValue::String("ok".into()), PickleValue::Bool(false)),
924            (
925                PickleValue::String("error".into()),
926                PickleValue::String(error),
927            ),
928        ]),
929    }
930}
931
932// --- Pickle response builders ---
933
934fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
935    let mut ifaces = Vec::new();
936    for iface in &stats.interfaces {
937        ifaces.push(single_iface_to_pickle(iface));
938    }
939
940    let mut dict = vec![
941        (
942            PickleValue::String("interfaces".into()),
943            PickleValue::List(ifaces),
944        ),
945        (
946            PickleValue::String("transport_enabled".into()),
947            PickleValue::Bool(stats.transport_enabled),
948        ),
949        (
950            PickleValue::String("transport_uptime".into()),
951            PickleValue::Float(stats.transport_uptime),
952        ),
953        (
954            PickleValue::String("rxb".into()),
955            PickleValue::Int(stats.total_rxb as i64),
956        ),
957        (
958            PickleValue::String("txb".into()),
959            PickleValue::Int(stats.total_txb as i64),
960        ),
961    ];
962
963    if let Some(tid) = stats.transport_id {
964        dict.push((
965            PickleValue::String("transport_id".into()),
966            PickleValue::Bytes(tid.to_vec()),
967        ));
968    } else {
969        dict.push((
970            PickleValue::String("transport_id".into()),
971            PickleValue::None,
972        ));
973    }
974
975    if let Some(pr) = stats.probe_responder {
976        dict.push((
977            PickleValue::String("probe_responder".into()),
978            PickleValue::Bytes(pr.to_vec()),
979        ));
980    } else {
981        dict.push((
982            PickleValue::String("probe_responder".into()),
983            PickleValue::None,
984        ));
985    }
986
987    if let Some(pool) = &stats.backbone_peer_pool {
988        let members = pool
989            .members
990            .iter()
991            .map(|member| {
992                let mut member_dict = vec![
993                    (
994                        PickleValue::String("name".into()),
995                        PickleValue::String(member.name.clone()),
996                    ),
997                    (
998                        PickleValue::String("remote".into()),
999                        PickleValue::String(member.remote.clone()),
1000                    ),
1001                    (
1002                        PickleValue::String("state".into()),
1003                        PickleValue::String(member.state.clone()),
1004                    ),
1005                    (
1006                        PickleValue::String("failure_count".into()),
1007                        PickleValue::Int(member.failure_count as i64),
1008                    ),
1009                ];
1010                member_dict.push((
1011                    PickleValue::String("interface_id".into()),
1012                    member
1013                        .interface_id
1014                        .map(|id| PickleValue::Int(id as i64))
1015                        .unwrap_or(PickleValue::None),
1016                ));
1017                member_dict.push((
1018                    PickleValue::String("last_error".into()),
1019                    member
1020                        .last_error
1021                        .as_ref()
1022                        .map(|err| PickleValue::String(err.clone()))
1023                        .unwrap_or(PickleValue::None),
1024                ));
1025                member_dict.push((
1026                    PickleValue::String("cooldown_remaining_seconds".into()),
1027                    member
1028                        .cooldown_remaining_seconds
1029                        .map(PickleValue::Float)
1030                        .unwrap_or(PickleValue::None),
1031                ));
1032                PickleValue::Dict(member_dict)
1033            })
1034            .collect();
1035        dict.push((
1036            PickleValue::String("backbone_peer_pool".into()),
1037            PickleValue::Dict(vec![
1038                (
1039                    PickleValue::String("max_connected".into()),
1040                    PickleValue::Int(pool.max_connected as i64),
1041                ),
1042                (
1043                    PickleValue::String("active_count".into()),
1044                    PickleValue::Int(pool.active_count as i64),
1045                ),
1046                (
1047                    PickleValue::String("standby_count".into()),
1048                    PickleValue::Int(pool.standby_count as i64),
1049                ),
1050                (
1051                    PickleValue::String("cooldown_count".into()),
1052                    PickleValue::Int(pool.cooldown_count as i64),
1053                ),
1054                (
1055                    PickleValue::String("members".into()),
1056                    PickleValue::List(members),
1057                ),
1058            ]),
1059        ));
1060    } else {
1061        dict.push((
1062            PickleValue::String("backbone_peer_pool".into()),
1063            PickleValue::None,
1064        ));
1065    }
1066
1067    PickleValue::Dict(dict)
1068}
1069
1070fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
1071    let mut dict = vec![
1072        (
1073            PickleValue::String("id".into()),
1074            PickleValue::Int(s.id as i64),
1075        ),
1076        (
1077            PickleValue::String("name".into()),
1078            PickleValue::String(s.name.clone()),
1079        ),
1080        (
1081            PickleValue::String("status".into()),
1082            PickleValue::Bool(s.status),
1083        ),
1084        (
1085            PickleValue::String("mode".into()),
1086            PickleValue::Int(s.mode as i64),
1087        ),
1088        (
1089            PickleValue::String("rxb".into()),
1090            PickleValue::Int(s.rxb as i64),
1091        ),
1092        (
1093            PickleValue::String("txb".into()),
1094            PickleValue::Int(s.txb as i64),
1095        ),
1096        (
1097            PickleValue::String("rx_packets".into()),
1098            PickleValue::Int(s.rx_packets as i64),
1099        ),
1100        (
1101            PickleValue::String("tx_packets".into()),
1102            PickleValue::Int(s.tx_packets as i64),
1103        ),
1104        (
1105            PickleValue::String("started".into()),
1106            PickleValue::Float(s.started),
1107        ),
1108        (
1109            PickleValue::String("ia_freq".into()),
1110            PickleValue::Float(s.ia_freq),
1111        ),
1112        (
1113            PickleValue::String("oa_freq".into()),
1114            PickleValue::Float(s.oa_freq),
1115        ),
1116    ];
1117
1118    match s.bitrate {
1119        Some(br) => dict.push((
1120            PickleValue::String("bitrate".into()),
1121            PickleValue::Int(br as i64),
1122        )),
1123        None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
1124    }
1125
1126    match s.ifac_size {
1127        Some(sz) => dict.push((
1128            PickleValue::String("ifac_size".into()),
1129            PickleValue::Int(sz as i64),
1130        )),
1131        None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
1132    }
1133
1134    PickleValue::Dict(dict)
1135}
1136
1137fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
1138    let list: Vec<PickleValue> = entries
1139        .iter()
1140        .map(|e| {
1141            PickleValue::Dict(vec![
1142                (
1143                    PickleValue::String("hash".into()),
1144                    PickleValue::Bytes(e.hash.to_vec()),
1145                ),
1146                (
1147                    PickleValue::String("timestamp".into()),
1148                    PickleValue::Float(e.timestamp),
1149                ),
1150                (
1151                    PickleValue::String("via".into()),
1152                    PickleValue::Bytes(e.via.to_vec()),
1153                ),
1154                (
1155                    PickleValue::String("hops".into()),
1156                    PickleValue::Int(e.hops as i64),
1157                ),
1158                (
1159                    PickleValue::String("expires".into()),
1160                    PickleValue::Float(e.expires),
1161                ),
1162                (
1163                    PickleValue::String("interface".into()),
1164                    PickleValue::String(e.interface_name.clone()),
1165                ),
1166            ])
1167        })
1168        .collect();
1169    PickleValue::List(list)
1170}
1171
1172fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1173    let list: Vec<PickleValue> = entries
1174        .iter()
1175        .map(|e| {
1176            PickleValue::Dict(vec![
1177                (
1178                    PickleValue::String("hash".into()),
1179                    PickleValue::Bytes(e.hash.to_vec()),
1180                ),
1181                (
1182                    PickleValue::String("last".into()),
1183                    PickleValue::Float(e.last),
1184                ),
1185                (
1186                    PickleValue::String("rate_violations".into()),
1187                    PickleValue::Int(e.rate_violations as i64),
1188                ),
1189                (
1190                    PickleValue::String("blocked_until".into()),
1191                    PickleValue::Float(e.blocked_until),
1192                ),
1193                (
1194                    PickleValue::String("timestamps".into()),
1195                    PickleValue::List(
1196                        e.timestamps
1197                            .iter()
1198                            .map(|&t| PickleValue::Float(t))
1199                            .collect(),
1200                    ),
1201                ),
1202            ])
1203        })
1204        .collect();
1205    PickleValue::List(list)
1206}
1207
1208fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1209    let list: Vec<PickleValue> = entries
1210        .iter()
1211        .map(|e| {
1212            let mut dict = vec![
1213                (
1214                    PickleValue::String("identity_hash".into()),
1215                    PickleValue::Bytes(e.identity_hash.to_vec()),
1216                ),
1217                (
1218                    PickleValue::String("created".into()),
1219                    PickleValue::Float(e.created),
1220                ),
1221                (
1222                    PickleValue::String("expires".into()),
1223                    PickleValue::Float(e.expires),
1224                ),
1225            ];
1226            if let Some(ref reason) = e.reason {
1227                dict.push((
1228                    PickleValue::String("reason".into()),
1229                    PickleValue::String(reason.clone()),
1230                ));
1231            } else {
1232                dict.push((PickleValue::String("reason".into()), PickleValue::None));
1233            }
1234            PickleValue::Dict(dict)
1235        })
1236        .collect();
1237    PickleValue::List(list)
1238}
1239
1240fn discovered_interfaces_to_pickle(
1241    interfaces: &[crate::discovery::DiscoveredInterface],
1242) -> PickleValue {
1243    let list: Vec<PickleValue> = interfaces
1244        .iter()
1245        .map(|iface| {
1246            let mut dict = vec![
1247                (
1248                    PickleValue::String("type".into()),
1249                    PickleValue::String(iface.interface_type.clone()),
1250                ),
1251                (
1252                    PickleValue::String("transport".into()),
1253                    PickleValue::Bool(iface.transport),
1254                ),
1255                (
1256                    PickleValue::String("name".into()),
1257                    PickleValue::String(iface.name.clone()),
1258                ),
1259                (
1260                    PickleValue::String("discovered".into()),
1261                    PickleValue::Float(iface.discovered),
1262                ),
1263                (
1264                    PickleValue::String("last_heard".into()),
1265                    PickleValue::Float(iface.last_heard),
1266                ),
1267                (
1268                    PickleValue::String("heard_count".into()),
1269                    PickleValue::Int(iface.heard_count as i64),
1270                ),
1271                (
1272                    PickleValue::String("status".into()),
1273                    PickleValue::String(iface.status.as_str().into()),
1274                ),
1275                (
1276                    PickleValue::String("stamp".into()),
1277                    PickleValue::Bytes(iface.stamp.clone()),
1278                ),
1279                (
1280                    PickleValue::String("value".into()),
1281                    PickleValue::Int(iface.stamp_value as i64),
1282                ),
1283                (
1284                    PickleValue::String("transport_id".into()),
1285                    PickleValue::Bytes(iface.transport_id.to_vec()),
1286                ),
1287                (
1288                    PickleValue::String("network_id".into()),
1289                    PickleValue::Bytes(iface.network_id.to_vec()),
1290                ),
1291                (
1292                    PickleValue::String("hops".into()),
1293                    PickleValue::Int(iface.hops as i64),
1294                ),
1295            ];
1296
1297            // Optional location fields
1298            if let Some(v) = iface.latitude {
1299                dict.push((
1300                    PickleValue::String("latitude".into()),
1301                    PickleValue::Float(v),
1302                ));
1303            } else {
1304                dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1305            }
1306            if let Some(v) = iface.longitude {
1307                dict.push((
1308                    PickleValue::String("longitude".into()),
1309                    PickleValue::Float(v),
1310                ));
1311            } else {
1312                dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1313            }
1314            if let Some(v) = iface.height {
1315                dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1316            } else {
1317                dict.push((PickleValue::String("height".into()), PickleValue::None));
1318            }
1319
1320            // Connection info
1321            if let Some(ref v) = iface.reachable_on {
1322                dict.push((
1323                    PickleValue::String("reachable_on".into()),
1324                    PickleValue::String(v.clone()),
1325                ));
1326            } else {
1327                dict.push((
1328                    PickleValue::String("reachable_on".into()),
1329                    PickleValue::None,
1330                ));
1331            }
1332            if let Some(v) = iface.port {
1333                dict.push((
1334                    PickleValue::String("port".into()),
1335                    PickleValue::Int(v as i64),
1336                ));
1337            } else {
1338                dict.push((PickleValue::String("port".into()), PickleValue::None));
1339            }
1340
1341            // RNode/RF specific
1342            if let Some(v) = iface.frequency {
1343                dict.push((
1344                    PickleValue::String("frequency".into()),
1345                    PickleValue::Int(v as i64),
1346                ));
1347            } else {
1348                dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1349            }
1350            if let Some(v) = iface.bandwidth {
1351                dict.push((
1352                    PickleValue::String("bandwidth".into()),
1353                    PickleValue::Int(v as i64),
1354                ));
1355            } else {
1356                dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1357            }
1358            if let Some(v) = iface.spreading_factor {
1359                dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1360            } else {
1361                dict.push((PickleValue::String("sf".into()), PickleValue::None));
1362            }
1363            if let Some(v) = iface.coding_rate {
1364                dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1365            } else {
1366                dict.push((PickleValue::String("cr".into()), PickleValue::None));
1367            }
1368            if let Some(ref v) = iface.modulation {
1369                dict.push((
1370                    PickleValue::String("modulation".into()),
1371                    PickleValue::String(v.clone()),
1372                ));
1373            } else {
1374                dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1375            }
1376            if let Some(v) = iface.channel {
1377                dict.push((
1378                    PickleValue::String("channel".into()),
1379                    PickleValue::Int(v as i64),
1380                ));
1381            } else {
1382                dict.push((PickleValue::String("channel".into()), PickleValue::None));
1383            }
1384
1385            // IFAC info
1386            if let Some(ref v) = iface.ifac_netname {
1387                dict.push((
1388                    PickleValue::String("ifac_netname".into()),
1389                    PickleValue::String(v.clone()),
1390                ));
1391            } else {
1392                dict.push((
1393                    PickleValue::String("ifac_netname".into()),
1394                    PickleValue::None,
1395                ));
1396            }
1397            if let Some(ref v) = iface.ifac_netkey {
1398                dict.push((
1399                    PickleValue::String("ifac_netkey".into()),
1400                    PickleValue::String(v.clone()),
1401                ));
1402            } else {
1403                dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1404            }
1405
1406            // Config entry
1407            if let Some(ref v) = iface.config_entry {
1408                dict.push((
1409                    PickleValue::String("config_entry".into()),
1410                    PickleValue::String(v.clone()),
1411                ));
1412            } else {
1413                dict.push((
1414                    PickleValue::String("config_entry".into()),
1415                    PickleValue::None,
1416                ));
1417            }
1418
1419            dict.push((
1420                PickleValue::String("discovery_hash".into()),
1421                PickleValue::Bytes(iface.discovery_hash.to_vec()),
1422            ));
1423
1424            PickleValue::Dict(dict)
1425        })
1426        .collect();
1427    PickleValue::List(list)
1428}
1429
1430fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1431    PickleValue::List(
1432        hooks
1433            .iter()
1434            .map(|hook| {
1435                PickleValue::Dict(vec![
1436                    (
1437                        PickleValue::String("name".into()),
1438                        PickleValue::String(hook.name.clone()),
1439                    ),
1440                    (
1441                        PickleValue::String("type".into()),
1442                        PickleValue::String(hook.hook_type.clone()),
1443                    ),
1444                    (
1445                        PickleValue::String("attach_point".into()),
1446                        PickleValue::String(hook.attach_point.clone()),
1447                    ),
1448                    (
1449                        PickleValue::String("priority".into()),
1450                        PickleValue::Int(hook.priority as i64),
1451                    ),
1452                    (
1453                        PickleValue::String("enabled".into()),
1454                        PickleValue::Bool(hook.enabled),
1455                    ),
1456                    (
1457                        PickleValue::String("consecutive_traps".into()),
1458                        PickleValue::Int(hook.consecutive_traps as i64),
1459                    ),
1460                ])
1461            })
1462            .collect(),
1463    )
1464}
1465
1466fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1467    PickleValue::List(
1468        entries
1469            .iter()
1470            .map(|entry| {
1471                PickleValue::Dict(vec![
1472                    (
1473                        PickleValue::String("interface".into()),
1474                        PickleValue::String(entry.interface_name.clone()),
1475                    ),
1476                    (
1477                        PickleValue::String("ip".into()),
1478                        PickleValue::String(entry.peer_ip.to_string()),
1479                    ),
1480                    (
1481                        PickleValue::String("connected_count".into()),
1482                        PickleValue::Int(entry.connected_count as i64),
1483                    ),
1484                    (
1485                        PickleValue::String("blacklisted_remaining_secs".into()),
1486                        entry
1487                            .blacklisted_remaining_secs
1488                            .map(PickleValue::Float)
1489                            .unwrap_or(PickleValue::None),
1490                    ),
1491                    (
1492                        PickleValue::String("blacklist_reason".into()),
1493                        entry
1494                            .blacklist_reason
1495                            .as_ref()
1496                            .map(|v: &String| PickleValue::String(v.clone()))
1497                            .unwrap_or(PickleValue::None),
1498                    ),
1499                    (
1500                        PickleValue::String("reject_count".into()),
1501                        PickleValue::Int(entry.reject_count as i64),
1502                    ),
1503                ])
1504            })
1505            .collect(),
1506    )
1507}
1508
1509fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1510    PickleValue::List(
1511        entries
1512            .iter()
1513            .map(|entry| {
1514                PickleValue::Dict(vec![
1515                    (
1516                        PickleValue::String("id".into()),
1517                        PickleValue::Int(entry.interface_id.0 as i64),
1518                    ),
1519                    (
1520                        PickleValue::String("name".into()),
1521                        PickleValue::String(entry.interface_name.clone()),
1522                    ),
1523                ])
1524            })
1525            .collect(),
1526    )
1527}
1528
1529fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1530    PickleValue::Dict(vec![
1531        (
1532            PickleValue::String("connected".into()),
1533            PickleValue::Bool(stats.connected),
1534        ),
1535        (
1536            PickleValue::String("consumer_count".into()),
1537            PickleValue::Int(stats.consumer_count as i64),
1538        ),
1539        (
1540            PickleValue::String("queue_max_events".into()),
1541            PickleValue::Int(stats.queue_max_events as i64),
1542        ),
1543        (
1544            PickleValue::String("queue_max_bytes".into()),
1545            PickleValue::Int(stats.queue_max_bytes as i64),
1546        ),
1547        (
1548            PickleValue::String("backlog_len".into()),
1549            PickleValue::Int(stats.backlog_len as i64),
1550        ),
1551        (
1552            PickleValue::String("backlog_bytes".into()),
1553            PickleValue::Int(stats.backlog_bytes as i64),
1554        ),
1555        (
1556            PickleValue::String("backlog_dropped_pending".into()),
1557            PickleValue::Int(stats.backlog_dropped_pending as i64),
1558        ),
1559        (
1560            PickleValue::String("backlog_dropped_total".into()),
1561            PickleValue::Int(stats.backlog_dropped_total as i64),
1562        ),
1563        (
1564            PickleValue::String("total_disconnect_count".into()),
1565            PickleValue::Int(stats.total_disconnect_count as i64),
1566        ),
1567        (
1568            PickleValue::String("consumers".into()),
1569            PickleValue::List(
1570                stats
1571                    .consumers
1572                    .iter()
1573                    .map(|consumer| {
1574                        PickleValue::Dict(vec![
1575                            (
1576                                PickleValue::String("id".into()),
1577                                PickleValue::Int(consumer.id as i64),
1578                            ),
1579                            (
1580                                PickleValue::String("connected".into()),
1581                                PickleValue::Bool(consumer.connected),
1582                            ),
1583                            (
1584                                PickleValue::String("queue_len".into()),
1585                                PickleValue::Int(consumer.queue_len as i64),
1586                            ),
1587                            (
1588                                PickleValue::String("queued_bytes".into()),
1589                                PickleValue::Int(consumer.queued_bytes as i64),
1590                            ),
1591                            (
1592                                PickleValue::String("dropped_pending".into()),
1593                                PickleValue::Int(consumer.dropped_pending as i64),
1594                            ),
1595                            (
1596                                PickleValue::String("dropped_total".into()),
1597                                PickleValue::Int(consumer.dropped_total as i64),
1598                            ),
1599                            (
1600                                PickleValue::String("queue_max_events".into()),
1601                                PickleValue::Int(consumer.queue_max_events as i64),
1602                            ),
1603                            (
1604                                PickleValue::String("queue_max_bytes".into()),
1605                                PickleValue::Int(consumer.queue_max_bytes as i64),
1606                            ),
1607                        ])
1608                    })
1609                    .collect(),
1610            ),
1611        ),
1612    ])
1613}
1614
1615fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1616    match state {
1617        LifecycleState::Active => "active",
1618        LifecycleState::Draining => "draining",
1619        LifecycleState::Stopping => "stopping",
1620        LifecycleState::Stopped => "stopped",
1621    }
1622}
1623
1624fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1625    PickleValue::Dict(vec![
1626        (
1627            PickleValue::String("state".into()),
1628            PickleValue::String(lifecycle_state_name(status.state).into()),
1629        ),
1630        (
1631            PickleValue::String("drain_age_seconds".into()),
1632            status
1633                .drain_age_seconds
1634                .map(PickleValue::Float)
1635                .unwrap_or(PickleValue::None),
1636        ),
1637        (
1638            PickleValue::String("deadline_remaining_seconds".into()),
1639            status
1640                .deadline_remaining_seconds
1641                .map(PickleValue::Float)
1642                .unwrap_or(PickleValue::None),
1643        ),
1644        (
1645            PickleValue::String("drain_complete".into()),
1646            PickleValue::Bool(status.drain_complete),
1647        ),
1648        (
1649            PickleValue::String("interface_writer_queued_frames".into()),
1650            PickleValue::Int(status.interface_writer_queued_frames as i64),
1651        ),
1652        (
1653            PickleValue::String("provider_backlog_events".into()),
1654            PickleValue::Int(status.provider_backlog_events as i64),
1655        ),
1656        (
1657            PickleValue::String("provider_consumer_queued_events".into()),
1658            PickleValue::Int(status.provider_consumer_queued_events as i64),
1659        ),
1660        (
1661            PickleValue::String("detail".into()),
1662            status
1663                .detail
1664                .as_ref()
1665                .map(|detail| PickleValue::String(detail.clone()))
1666                .unwrap_or(PickleValue::None),
1667        ),
1668    ])
1669}
1670
1671fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1672    match value {
1673        RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1674        RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1675        RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1676        RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1677        RuntimeConfigValue::Null => PickleValue::None,
1678    }
1679}
1680
1681fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1682    match value {
1683        PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1684        PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1685        PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1686        PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1687        PickleValue::None => Some(RuntimeConfigValue::Null),
1688        _ => None,
1689    }
1690}
1691
1692fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1693    PickleValue::Dict(vec![
1694        (
1695            PickleValue::String("key".into()),
1696            PickleValue::String(entry.key.clone()),
1697        ),
1698        (
1699            PickleValue::String("value".into()),
1700            runtime_config_value_to_pickle(&entry.value),
1701        ),
1702        (
1703            PickleValue::String("default".into()),
1704            runtime_config_value_to_pickle(&entry.default),
1705        ),
1706        (
1707            PickleValue::String("source".into()),
1708            PickleValue::String(match entry.source {
1709                RuntimeConfigSource::Startup => "startup".into(),
1710                RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1711            }),
1712        ),
1713        (
1714            PickleValue::String("apply_mode".into()),
1715            PickleValue::String(match entry.apply_mode {
1716                RuntimeConfigApplyMode::Immediate => "immediate".into(),
1717                RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1718                RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1719                RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1720            }),
1721        ),
1722        (
1723            PickleValue::String("description".into()),
1724            entry
1725                .description
1726                .as_ref()
1727                .map(|v| PickleValue::String(v.clone()))
1728                .unwrap_or(PickleValue::None),
1729        ),
1730    ])
1731}
1732
1733fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1734    PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1735}
1736
1737fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1738    PickleValue::Dict(vec![
1739        (
1740            PickleValue::String("error".into()),
1741            PickleValue::String(match error.code {
1742                RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1743                RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1744                RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1745                RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1746                RuntimeConfigErrorCode::NotFound => "not_found".into(),
1747                RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1748            }),
1749        ),
1750        (
1751            PickleValue::String("message".into()),
1752            PickleValue::String(error.message.clone()),
1753        ),
1754    ])
1755}
1756
1757fn runtime_config_result_to_pickle(
1758    result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1759) -> PickleValue {
1760    match result {
1761        Ok(entry) => runtime_config_entry_to_pickle(&entry),
1762        Err(error) => runtime_config_error_to_pickle(&error),
1763    }
1764}
1765
1766fn known_destination_entry_to_pickle(entry: &KnownDestinationEntry) -> PickleValue {
1767    PickleValue::Dict(vec![
1768        (
1769            PickleValue::String("dest_hash".into()),
1770            PickleValue::Bytes(entry.dest_hash.to_vec()),
1771        ),
1772        (
1773            PickleValue::String("identity_hash".into()),
1774            PickleValue::Bytes(entry.identity_hash.to_vec()),
1775        ),
1776        (
1777            PickleValue::String("public_key".into()),
1778            PickleValue::Bytes(entry.public_key.to_vec()),
1779        ),
1780        (
1781            PickleValue::String("app_data".into()),
1782            entry
1783                .app_data
1784                .as_ref()
1785                .map(|data: &Vec<u8>| PickleValue::Bytes(data.clone()))
1786                .unwrap_or(PickleValue::None),
1787        ),
1788        (
1789            PickleValue::String("hops".into()),
1790            PickleValue::Int(entry.hops as i64),
1791        ),
1792        (
1793            PickleValue::String("received_at".into()),
1794            PickleValue::Float(entry.received_at),
1795        ),
1796        (
1797            PickleValue::String("receiving_interface".into()),
1798            PickleValue::Int(entry.receiving_interface.0 as i64),
1799        ),
1800        (
1801            PickleValue::String("was_used".into()),
1802            PickleValue::Bool(entry.was_used),
1803        ),
1804        (
1805            PickleValue::String("last_used_at".into()),
1806            entry
1807                .last_used_at
1808                .map(PickleValue::Float)
1809                .unwrap_or(PickleValue::None),
1810        ),
1811        (
1812            PickleValue::String("retained".into()),
1813            PickleValue::Bool(entry.retained),
1814        ),
1815    ])
1816}
1817
1818fn known_destinations_to_pickle(entries: &[KnownDestinationEntry]) -> PickleValue {
1819    PickleValue::List(
1820        entries
1821            .iter()
1822            .map(known_destination_entry_to_pickle)
1823            .collect(),
1824    )
1825}
1826
1827fn parse_known_destination_entry(value: &PickleValue) -> io::Result<KnownDestinationEntry> {
1828    let get_bytes = |key: &str, len: usize| -> io::Result<Vec<u8>> {
1829        let value = value.get(key).ok_or_else(|| {
1830            io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key))
1831        })?;
1832        let bytes = value.as_bytes().ok_or_else(|| {
1833            io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key))
1834        })?;
1835        if bytes.len() != len {
1836            return Err(io::Error::new(
1837                io::ErrorKind::InvalidData,
1838                format!("invalid {} length", key),
1839            ));
1840        }
1841        Ok(bytes.to_vec())
1842    };
1843    let get_int = |key: &str| -> io::Result<i64> {
1844        value
1845            .get(key)
1846            .and_then(|v| v.as_int())
1847            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1848    };
1849    let get_float = |key: &str| -> io::Result<f64> {
1850        value
1851            .get(key)
1852            .and_then(|v| v.as_float())
1853            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1854    };
1855    let get_bool = |key: &str| -> io::Result<bool> {
1856        value
1857            .get(key)
1858            .and_then(|v| v.as_bool())
1859            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1860    };
1861
1862    let mut dest_hash = [0u8; 16];
1863    dest_hash.copy_from_slice(&get_bytes("dest_hash", 16)?);
1864    let mut identity_hash = [0u8; 16];
1865    identity_hash.copy_from_slice(&get_bytes("identity_hash", 16)?);
1866    let mut public_key = [0u8; 64];
1867    public_key.copy_from_slice(&get_bytes("public_key", 64)?);
1868    let app_data = value
1869        .get("app_data")
1870        .and_then(|v| v.as_bytes())
1871        .map(|bytes| bytes.to_vec());
1872    let last_used_at = value.get("last_used_at").and_then(|v| v.as_float());
1873
1874    Ok(KnownDestinationEntry {
1875        dest_hash,
1876        identity_hash,
1877        public_key,
1878        app_data,
1879        hops: get_int("hops")? as u8,
1880        received_at: get_float("received_at")?,
1881        receiving_interface: rns_core::transport::types::InterfaceId(
1882            get_int("receiving_interface")? as u64,
1883        ),
1884        was_used: get_bool("was_used")?,
1885        last_used_at,
1886        retained: get_bool("retained")?,
1887    })
1888}
1889
1890fn parse_known_destination_list(value: &PickleValue) -> io::Result<Vec<KnownDestinationEntry>> {
1891    let list = value
1892        .as_list()
1893        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "expected list"))?;
1894    list.iter().map(parse_known_destination_entry).collect()
1895}
1896
1897// --- RPC Client ---
1898
1899/// RPC client for connecting to a running daemon.
1900pub struct RpcClient {
1901    stream: TcpStream,
1902}
1903
1904impl RpcClient {
1905    /// Connect to an RPC server and authenticate.
1906    pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
1907        let mut stream = match addr {
1908            RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
1909        };
1910
1911        stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
1912        stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
1913
1914        // Client-side authentication
1915        client_auth(&mut stream, auth_key)?;
1916
1917        Ok(RpcClient { stream })
1918    }
1919
1920    /// Send a pickle request and receive a pickle response.
1921    pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
1922        let request_bytes = pickle::encode(request);
1923        send_bytes(&mut self.stream, &request_bytes)?;
1924
1925        let response_bytes = recv_bytes(&mut self.stream)?;
1926        pickle::decode(&response_bytes)
1927            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
1928    }
1929
1930    pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
1931        let response = self.call(&PickleValue::Dict(vec![(
1932            PickleValue::String("get".into()),
1933            PickleValue::String("hooks".into()),
1934        )]))?;
1935        parse_hook_list(&response)
1936    }
1937
1938    pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
1939        let response = self.call(&PickleValue::Dict(vec![(
1940            PickleValue::String("begin_drain".into()),
1941            PickleValue::Float(timeout.as_secs_f64()),
1942        )]))?;
1943        Ok(response.as_bool().unwrap_or(false))
1944    }
1945
1946    pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
1947        let response = self.call(&PickleValue::Dict(vec![(
1948            PickleValue::String("get".into()),
1949            PickleValue::String("drain_status".into()),
1950        )]))?;
1951        parse_drain_status(&response)
1952    }
1953
1954    pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
1955        self.call(&PickleValue::Dict(vec![(
1956            PickleValue::String("get".into()),
1957            PickleValue::String("provider_bridge_stats".into()),
1958        )]))
1959    }
1960
1961    pub fn load_hook(
1962        &mut self,
1963        name: &str,
1964        attach_point: &str,
1965        priority: i32,
1966        wasm: &[u8],
1967    ) -> io::Result<Result<(), String>> {
1968        let response = self.call(&PickleValue::Dict(vec![
1969            (
1970                PickleValue::String("hook".into()),
1971                PickleValue::String("load".into()),
1972            ),
1973            (
1974                PickleValue::String("name".into()),
1975                PickleValue::String(name.to_string()),
1976            ),
1977            (
1978                PickleValue::String("attach_point".into()),
1979                PickleValue::String(attach_point.to_string()),
1980            ),
1981            (
1982                PickleValue::String("priority".into()),
1983                PickleValue::Int(priority as i64),
1984            ),
1985            (
1986                PickleValue::String("wasm".into()),
1987                PickleValue::Bytes(wasm.to_vec()),
1988            ),
1989        ]))?;
1990        parse_hook_result(&response)
1991    }
1992
1993    pub fn load_builtin_hook(
1994        &mut self,
1995        name: &str,
1996        attach_point: &str,
1997        priority: i32,
1998        builtin_id: &str,
1999    ) -> io::Result<Result<(), String>> {
2000        let response = self.call(&PickleValue::Dict(vec![
2001            (
2002                PickleValue::String("hook".into()),
2003                PickleValue::String("load_builtin".into()),
2004            ),
2005            (
2006                PickleValue::String("name".into()),
2007                PickleValue::String(name.to_string()),
2008            ),
2009            (
2010                PickleValue::String("attach_point".into()),
2011                PickleValue::String(attach_point.to_string()),
2012            ),
2013            (
2014                PickleValue::String("priority".into()),
2015                PickleValue::Int(priority as i64),
2016            ),
2017            (
2018                PickleValue::String("builtin_id".into()),
2019                PickleValue::String(builtin_id.to_string()),
2020            ),
2021        ]))?;
2022        parse_hook_result(&response)
2023    }
2024
2025    pub fn unload_hook(
2026        &mut self,
2027        name: &str,
2028        attach_point: &str,
2029    ) -> io::Result<Result<(), String>> {
2030        let response = self.call(&PickleValue::Dict(vec![
2031            (
2032                PickleValue::String("hook".into()),
2033                PickleValue::String("unload".into()),
2034            ),
2035            (
2036                PickleValue::String("name".into()),
2037                PickleValue::String(name.to_string()),
2038            ),
2039            (
2040                PickleValue::String("attach_point".into()),
2041                PickleValue::String(attach_point.to_string()),
2042            ),
2043        ]))?;
2044        parse_hook_result(&response)
2045    }
2046
2047    pub fn set_hook_enabled(
2048        &mut self,
2049        name: &str,
2050        attach_point: &str,
2051        enabled: bool,
2052    ) -> io::Result<Result<(), String>> {
2053        let op = if enabled { "enable" } else { "disable" };
2054        let response = self.call(&PickleValue::Dict(vec![
2055            (
2056                PickleValue::String("hook".into()),
2057                PickleValue::String(op.into()),
2058            ),
2059            (
2060                PickleValue::String("name".into()),
2061                PickleValue::String(name.to_string()),
2062            ),
2063            (
2064                PickleValue::String("attach_point".into()),
2065                PickleValue::String(attach_point.to_string()),
2066            ),
2067        ]))?;
2068        parse_hook_result(&response)
2069    }
2070
2071    pub fn set_hook_priority(
2072        &mut self,
2073        name: &str,
2074        attach_point: &str,
2075        priority: i32,
2076    ) -> io::Result<Result<(), String>> {
2077        let response = self.call(&PickleValue::Dict(vec![
2078            (
2079                PickleValue::String("hook".into()),
2080                PickleValue::String("set_priority".into()),
2081            ),
2082            (
2083                PickleValue::String("name".into()),
2084                PickleValue::String(name.to_string()),
2085            ),
2086            (
2087                PickleValue::String("attach_point".into()),
2088                PickleValue::String(attach_point.to_string()),
2089            ),
2090            (
2091                PickleValue::String("priority".into()),
2092                PickleValue::Int(priority as i64),
2093            ),
2094        ]))?;
2095        parse_hook_result(&response)
2096    }
2097
2098    pub fn blacklist_backbone_peer(
2099        &mut self,
2100        interface: &str,
2101        ip: &str,
2102        duration_secs: u64,
2103        reason: Option<&str>,
2104        penalty_level: Option<u8>,
2105    ) -> io::Result<bool> {
2106        let mut request = vec![
2107            (
2108                PickleValue::String("set".into()),
2109                PickleValue::String("backbone_peer_blacklist".into()),
2110            ),
2111            (
2112                PickleValue::String("interface".into()),
2113                PickleValue::String(interface.to_string()),
2114            ),
2115            (
2116                PickleValue::String("ip".into()),
2117                PickleValue::String(ip.to_string()),
2118            ),
2119            (
2120                PickleValue::String("duration_secs".into()),
2121                PickleValue::Int(duration_secs as i64),
2122            ),
2123        ];
2124        if let Some(reason) = reason {
2125            request.push((
2126                PickleValue::String("reason".into()),
2127                PickleValue::String(reason.to_string()),
2128            ));
2129        }
2130        if let Some(level) = penalty_level {
2131            request.push((
2132                PickleValue::String("penalty_level".into()),
2133                PickleValue::Int(level as i64),
2134            ));
2135        }
2136        let response = self.call(&PickleValue::Dict(request))?;
2137        Ok(response.as_bool().unwrap_or(false))
2138    }
2139
2140    pub fn known_destinations(&mut self) -> io::Result<Vec<KnownDestinationEntry>> {
2141        let response = self.call(&PickleValue::Dict(vec![(
2142            PickleValue::String("get".into()),
2143            PickleValue::String("known_destinations".into()),
2144        )]))?;
2145        parse_known_destination_list(&response)
2146    }
2147
2148    pub fn retain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2149        let response = self.call(&PickleValue::Dict(vec![
2150            (
2151                PickleValue::String("set".into()),
2152                PickleValue::String("known_destination_retained".into()),
2153            ),
2154            (
2155                PickleValue::String("dest_hash".into()),
2156                PickleValue::Bytes(dest_hash.to_vec()),
2157            ),
2158        ]))?;
2159        Ok(response.as_bool().unwrap_or(false))
2160    }
2161
2162    pub fn unretain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2163        let response = self.call(&PickleValue::Dict(vec![
2164            (
2165                PickleValue::String("clear".into()),
2166                PickleValue::String("known_destination_retained".into()),
2167            ),
2168            (
2169                PickleValue::String("dest_hash".into()),
2170                PickleValue::Bytes(dest_hash.to_vec()),
2171            ),
2172        ]))?;
2173        Ok(response.as_bool().unwrap_or(false))
2174    }
2175
2176    pub fn mark_known_destination_used(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2177        let response = self.call(&PickleValue::Dict(vec![
2178            (
2179                PickleValue::String("set".into()),
2180                PickleValue::String("known_destination_used".into()),
2181            ),
2182            (
2183                PickleValue::String("dest_hash".into()),
2184                PickleValue::Bytes(dest_hash.to_vec()),
2185            ),
2186        ]))?;
2187        Ok(response.as_bool().unwrap_or(false))
2188    }
2189}
2190
2191fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
2192    match value {
2193        "active" => Some(LifecycleState::Active),
2194        "draining" => Some(LifecycleState::Draining),
2195        "stopping" => Some(LifecycleState::Stopping),
2196        "stopped" => Some(LifecycleState::Stopped),
2197        _ => None,
2198    }
2199}
2200
2201fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
2202    if !matches!(value, PickleValue::Dict(_)) {
2203        return Ok(None);
2204    }
2205    let state = value
2206        .get("state")
2207        .and_then(|entry| entry.as_str())
2208        .and_then(parse_lifecycle_state)
2209        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
2210    let drain_age_seconds = value.get("drain_age_seconds").and_then(|entry| {
2211        entry
2212            .as_float()
2213            .or_else(|| entry.as_int().map(|v| v as f64))
2214    });
2215    let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
2216        entry
2217            .as_float()
2218            .or_else(|| entry.as_int().map(|v| v as f64))
2219    });
2220    let drain_complete = value
2221        .get("drain_complete")
2222        .and_then(|entry| entry.as_bool())
2223        .unwrap_or(false);
2224    let interface_writer_queued_frames = value
2225        .get("interface_writer_queued_frames")
2226        .and_then(|entry| entry.as_int())
2227        .unwrap_or(0)
2228        .max(0) as usize;
2229    let provider_backlog_events = value
2230        .get("provider_backlog_events")
2231        .and_then(|entry| entry.as_int())
2232        .unwrap_or(0)
2233        .max(0) as usize;
2234    let provider_consumer_queued_events = value
2235        .get("provider_consumer_queued_events")
2236        .and_then(|entry| entry.as_int())
2237        .unwrap_or(0)
2238        .max(0) as usize;
2239    let detail = value
2240        .get("detail")
2241        .and_then(|entry| entry.as_str().map(|v| v.to_string()));
2242    Ok(Some(DrainStatus {
2243        state,
2244        drain_age_seconds,
2245        deadline_remaining_seconds,
2246        drain_complete,
2247        interface_writer_queued_frames,
2248        provider_backlog_events,
2249        provider_consumer_queued_events,
2250        detail,
2251    }))
2252}
2253
2254fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
2255    let ok = response
2256        .get("ok")
2257        .and_then(|v| v.as_bool())
2258        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
2259    if ok {
2260        Ok(Ok(()))
2261    } else {
2262        Ok(Err(response
2263            .get("error")
2264            .and_then(|v| v.as_str())
2265            .unwrap_or("unknown hook error")
2266            .to_string()))
2267    }
2268}
2269
2270fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
2271    let list = response
2272        .as_list()
2273        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
2274    let mut hooks = Vec::with_capacity(list.len());
2275    for item in list {
2276        hooks.push(HookInfo {
2277            name: item
2278                .get("name")
2279                .and_then(|v| v.as_str())
2280                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
2281                .to_string(),
2282            hook_type: item
2283                .get("type")
2284                .and_then(|v| v.as_str())
2285                .unwrap_or(default_hook_type())
2286                .to_string(),
2287            attach_point: item
2288                .get("attach_point")
2289                .and_then(|v| v.as_str())
2290                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
2291                .to_string(),
2292            priority: item
2293                .get("priority")
2294                .and_then(|v| v.as_int())
2295                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
2296                as i32,
2297            enabled: item
2298                .get("enabled")
2299                .and_then(|v| v.as_bool())
2300                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
2301            consecutive_traps: item
2302                .get("consecutive_traps")
2303                .and_then(|v| v.as_int())
2304                .ok_or_else(|| {
2305                    io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
2306                })? as u32,
2307        });
2308    }
2309    Ok(hooks)
2310}
2311
2312fn default_hook_type() -> &'static str {
2313    #[cfg(feature = "rns-hooks-native")]
2314    {
2315        return "native";
2316    }
2317    #[cfg(all(not(feature = "rns-hooks-native"), feature = "rns-hooks-wasm"))]
2318    {
2319        return "wasm";
2320    }
2321    #[cfg(all(not(feature = "rns-hooks-native"), not(feature = "rns-hooks-wasm")))]
2322    {
2323        "wasm"
2324    }
2325}
2326
2327/// Client-side authentication: answer the server's challenge.
2328fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
2329    // Read challenge
2330    let challenge = recv_bytes(stream)?;
2331
2332    if !challenge.starts_with(CHALLENGE_PREFIX) {
2333        return Err(io::Error::new(
2334            io::ErrorKind::InvalidData,
2335            "expected challenge",
2336        ));
2337    }
2338
2339    let message = &challenge[CHALLENGE_PREFIX.len()..];
2340
2341    // Create HMAC response
2342    let response = create_response(auth_key, message);
2343    send_bytes(stream, &response)?;
2344
2345    // Read welcome/failure
2346    let result = recv_bytes(stream)?;
2347    if result == WELCOME {
2348        Ok(())
2349    } else {
2350        Err(io::Error::new(
2351            io::ErrorKind::PermissionDenied,
2352            "authentication failed",
2353        ))
2354    }
2355}
2356
2357/// Create an HMAC response to a challenge message.
2358fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
2359    // Check if message has {sha256} prefix (modern protocol)
2360    if message.starts_with(b"{sha256}") || message.len() > 20 {
2361        // Modern protocol: use HMAC-SHA256 with {sha256} prefix
2362        let digest = hmac_sha256(auth_key, message);
2363        let mut response = Vec::with_capacity(8 + 32);
2364        response.extend_from_slice(b"{sha256}");
2365        response.extend_from_slice(&digest);
2366        response
2367    } else {
2368        // Legacy protocol: raw HMAC-MD5
2369        let digest = hmac_md5(auth_key, message);
2370        digest.to_vec()
2371    }
2372}
2373
2374/// Derive the RPC auth key from transport identity private key.
2375pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
2376    sha256(private_key)
2377}
2378
2379#[cfg(test)]
2380mod tests {
2381    use super::*;
2382
2383    #[test]
2384    fn send_recv_bytes_roundtrip() {
2385        let (mut c1, mut c2) = tcp_pair();
2386        let data = b"hello world";
2387        send_bytes(&mut c1, data).unwrap();
2388        let received = recv_bytes(&mut c2).unwrap();
2389        assert_eq!(&received, data);
2390    }
2391
2392    #[test]
2393    fn send_recv_empty() {
2394        let (mut c1, mut c2) = tcp_pair();
2395        send_bytes(&mut c1, b"").unwrap();
2396        let received = recv_bytes(&mut c2).unwrap();
2397        assert!(received.is_empty());
2398    }
2399
2400    #[test]
2401    fn auth_success() {
2402        let key = derive_auth_key(b"test-private-key");
2403        let (mut server, mut client) = tcp_pair();
2404
2405        let key2 = key;
2406        let t = thread::spawn(move || {
2407            client_auth(&mut client, &key2).unwrap();
2408        });
2409
2410        server_auth(&mut server, &key).unwrap();
2411        t.join().unwrap();
2412    }
2413
2414    #[test]
2415    fn auth_failure_wrong_key() {
2416        let server_key = derive_auth_key(b"server-key");
2417        let client_key = derive_auth_key(b"wrong-key");
2418        let (mut server, mut client) = tcp_pair();
2419
2420        let t = thread::spawn(move || {
2421            let result = client_auth(&mut client, &client_key);
2422            assert!(result.is_err());
2423        });
2424
2425        let result = server_auth(&mut server, &server_key);
2426        assert!(result.is_err());
2427        t.join().unwrap();
2428    }
2429
2430    #[test]
2431    fn verify_sha256_response() {
2432        let key = derive_auth_key(b"mykey");
2433        let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2434        let response = create_response(&key, message);
2435        assert!(response.starts_with(b"{sha256}"));
2436        assert!(verify_response(&key, message, &response));
2437    }
2438
2439    #[test]
2440    fn verify_legacy_md5_response() {
2441        let key = derive_auth_key(b"mykey");
2442        // Legacy message: 20 bytes, no prefix
2443        let message = b"01234567890123456789";
2444        // Create legacy response (raw HMAC-MD5)
2445        let digest = hmac_md5(&key, message);
2446        assert!(verify_response(&key, message, &digest));
2447    }
2448
2449    #[test]
2450    fn constant_time_eq_works() {
2451        assert!(constant_time_eq(b"hello", b"hello"));
2452        assert!(!constant_time_eq(b"hello", b"world"));
2453        assert!(!constant_time_eq(b"hello", b"hell"));
2454    }
2455
2456    #[test]
2457    fn rpc_roundtrip() {
2458        let key = derive_auth_key(b"test-key");
2459        let (event_tx, event_rx) = crate::event::channel();
2460
2461        // Start server
2462        // Bind manually to get the actual port
2463        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2464        let port = listener.local_addr().unwrap().port();
2465        listener.set_nonblocking(true).unwrap();
2466
2467        let shutdown = Arc::new(AtomicBool::new(false));
2468        let shutdown2 = shutdown.clone();
2469
2470        // Driver thread that handles queries
2471        let driver_thread = thread::spawn(move || loop {
2472            match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2473                Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2474                    let _ = resp_tx.send(QueryResponse::LinkCount(42));
2475                }
2476                Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2477                    let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2478                        interfaces: vec![SingleInterfaceStat {
2479                            id: 7,
2480                            name: "TestInterface".into(),
2481                            status: true,
2482                            mode: 1,
2483                            rxb: 1000,
2484                            txb: 2000,
2485                            rx_packets: 10,
2486                            tx_packets: 20,
2487                            bitrate: Some(10_000_000),
2488                            ifac_size: None,
2489                            started: 1000.0,
2490                            ia_freq: 0.0,
2491                            oa_freq: 0.0,
2492                            interface_type: "TestInterface".into(),
2493                        }],
2494                        transport_id: None,
2495                        transport_enabled: true,
2496                        transport_uptime: 3600.0,
2497                        total_rxb: 1000,
2498                        total_txb: 2000,
2499                        probe_responder: None,
2500                        backbone_peer_pool: None,
2501                    }));
2502                }
2503                _ => break,
2504            }
2505        });
2506
2507        let key2 = key;
2508        let shutdown3 = shutdown2.clone();
2509        let server_thread = thread::spawn(move || {
2510            rpc_server_loop(listener, key2, event_tx, shutdown3);
2511        });
2512
2513        // Give server time to start
2514        thread::sleep(std::time::Duration::from_millis(50));
2515
2516        // Client: connect and query link count
2517        let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2518        let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2519        let response = client
2520            .call(&PickleValue::Dict(vec![(
2521                PickleValue::String("get".into()),
2522                PickleValue::String("link_count".into()),
2523            )]))
2524            .unwrap();
2525        assert_eq!(response.as_int().unwrap(), 42);
2526        drop(client);
2527
2528        // Client: query interface stats
2529        let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2530        let response2 = client2
2531            .call(&PickleValue::Dict(vec![(
2532                PickleValue::String("get".into()),
2533                PickleValue::String("interface_stats".into()),
2534            )]))
2535            .unwrap();
2536        let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2537        assert_eq!(ifaces.len(), 1);
2538        let iface = &ifaces[0];
2539        assert_eq!(
2540            iface.get("name").unwrap().as_str().unwrap(),
2541            "TestInterface"
2542        );
2543        assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2544        drop(client2);
2545
2546        // Shutdown
2547        shutdown2.store(true, Ordering::Relaxed);
2548        server_thread.join().unwrap();
2549        driver_thread.join().unwrap();
2550    }
2551
2552    #[test]
2553    fn derive_auth_key_deterministic() {
2554        let key1 = derive_auth_key(b"test");
2555        let key2 = derive_auth_key(b"test");
2556        assert_eq!(key1, key2);
2557        // Different input → different key
2558        let key3 = derive_auth_key(b"other");
2559        assert_ne!(key1, key3);
2560    }
2561
2562    #[test]
2563    fn pickle_request_handling() {
2564        // Test the request → query translation without networking
2565        let (event_tx, event_rx) = crate::event::channel();
2566
2567        let driver = thread::spawn(move || {
2568            if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2569            {
2570                assert_eq!(dest_hash, [1u8; 16]);
2571                let _ = resp_tx.send(QueryResponse::DropPath(true));
2572            }
2573        });
2574
2575        let request = PickleValue::Dict(vec![
2576            (
2577                PickleValue::String("drop".into()),
2578                PickleValue::String("path".into()),
2579            ),
2580            (
2581                PickleValue::String("destination_hash".into()),
2582                PickleValue::Bytes(vec![1u8; 16]),
2583            ),
2584        ]);
2585
2586        let response = handle_rpc_request(&request, &event_tx).unwrap();
2587        assert_eq!(response, PickleValue::Bool(true));
2588        driver.join().unwrap();
2589    }
2590
2591    #[test]
2592    fn hook_list_request_handling() {
2593        let (event_tx, event_rx) = crate::event::channel();
2594
2595        let driver = thread::spawn(move || {
2596            if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2597                let _ = response_tx.send(vec![HookInfo {
2598                    name: "stats".into(),
2599                    hook_type: "wasm".into(),
2600                    attach_point: "PreIngress".into(),
2601                    priority: 7,
2602                    enabled: true,
2603                    consecutive_traps: 0,
2604                }]);
2605            }
2606        });
2607
2608        let request = PickleValue::Dict(vec![(
2609            PickleValue::String("get".into()),
2610            PickleValue::String("hooks".into()),
2611        )]);
2612        let response = handle_rpc_request(&request, &event_tx).unwrap();
2613        let hooks = parse_hook_list(&response).unwrap();
2614        assert_eq!(hooks.len(), 1);
2615        assert_eq!(hooks[0].name, "stats");
2616        driver.join().unwrap();
2617    }
2618
2619    #[test]
2620    fn hook_load_request_handling() {
2621        let (event_tx, event_rx) = crate::event::channel();
2622
2623        let driver = thread::spawn(move || {
2624            if let Ok(Event::LoadHook {
2625                name,
2626                wasm_bytes,
2627                attach_point,
2628                priority,
2629                response_tx,
2630            }) = event_rx.recv()
2631            {
2632                assert_eq!(name, "stats");
2633                assert_eq!(attach_point, "PreIngress");
2634                assert_eq!(priority, 11);
2635                assert_eq!(wasm_bytes, vec![1, 2, 3]);
2636                let _ = response_tx.send(Ok(()));
2637            }
2638        });
2639
2640        let request = PickleValue::Dict(vec![
2641            (
2642                PickleValue::String("hook".into()),
2643                PickleValue::String("load".into()),
2644            ),
2645            (
2646                PickleValue::String("name".into()),
2647                PickleValue::String("stats".into()),
2648            ),
2649            (
2650                PickleValue::String("attach_point".into()),
2651                PickleValue::String("PreIngress".into()),
2652            ),
2653            (PickleValue::String("priority".into()), PickleValue::Int(11)),
2654            (
2655                PickleValue::String("wasm".into()),
2656                PickleValue::Bytes(vec![1, 2, 3]),
2657            ),
2658        ]);
2659        let response = handle_rpc_request(&request, &event_tx).unwrap();
2660        assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2661        driver.join().unwrap();
2662    }
2663
2664    #[test]
2665    fn interface_stats_pickle_format() {
2666        let stats = InterfaceStatsResponse {
2667            interfaces: vec![SingleInterfaceStat {
2668                id: 1,
2669                name: "TCP".into(),
2670                status: true,
2671                mode: 1,
2672                rxb: 100,
2673                txb: 200,
2674                rx_packets: 5,
2675                tx_packets: 10,
2676                bitrate: Some(1000000),
2677                ifac_size: Some(16),
2678                started: 1000.0,
2679                ia_freq: 0.0,
2680                oa_freq: 0.0,
2681                interface_type: "TCPClientInterface".into(),
2682            }],
2683            transport_id: Some([0xAB; 16]),
2684            transport_enabled: true,
2685            transport_uptime: 3600.0,
2686            total_rxb: 100,
2687            total_txb: 200,
2688            probe_responder: None,
2689            backbone_peer_pool: None,
2690        };
2691
2692        let pickle = interface_stats_to_pickle(&stats);
2693
2694        // Verify it round-trips through encode/decode
2695        let encoded = pickle::encode(&pickle);
2696        let decoded = pickle::decode(&encoded).unwrap();
2697        assert_eq!(
2698            decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2699            true
2700        );
2701        let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2702        assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2703        assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2704    }
2705
2706    #[test]
2707    fn send_probe_rpc_unknown_dest() {
2708        let (event_tx, event_rx) = crate::event::channel();
2709
2710        let driver = thread::spawn(move || {
2711            if let Ok(Event::Query(
2712                QueryRequest::SendProbe {
2713                    dest_hash,
2714                    payload_size,
2715                },
2716                resp_tx,
2717            )) = event_rx.recv()
2718            {
2719                assert_eq!(dest_hash, [0xAA; 16]);
2720                assert_eq!(payload_size, 16); // default
2721                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2722            }
2723        });
2724
2725        let request = PickleValue::Dict(vec![(
2726            PickleValue::String("send_probe".into()),
2727            PickleValue::Bytes(vec![0xAA; 16]),
2728        )]);
2729
2730        let response = handle_rpc_request(&request, &event_tx).unwrap();
2731        assert_eq!(response, PickleValue::None);
2732        driver.join().unwrap();
2733    }
2734
2735    #[test]
2736    fn send_probe_rpc_with_result() {
2737        let (event_tx, event_rx) = crate::event::channel();
2738
2739        let packet_hash = [0xBB; 32];
2740        let driver = thread::spawn(move || {
2741            if let Ok(Event::Query(
2742                QueryRequest::SendProbe {
2743                    dest_hash,
2744                    payload_size,
2745                },
2746                resp_tx,
2747            )) = event_rx.recv()
2748            {
2749                assert_eq!(dest_hash, [0xCC; 16]);
2750                assert_eq!(payload_size, 32);
2751                let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
2752            }
2753        });
2754
2755        let request = PickleValue::Dict(vec![
2756            (
2757                PickleValue::String("send_probe".into()),
2758                PickleValue::Bytes(vec![0xCC; 16]),
2759            ),
2760            (PickleValue::String("size".into()), PickleValue::Int(32)),
2761        ]);
2762
2763        let response = handle_rpc_request(&request, &event_tx).unwrap();
2764        let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
2765        assert_eq!(ph, &[0xBB; 32]);
2766        assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
2767        driver.join().unwrap();
2768    }
2769
2770    #[test]
2771    fn send_probe_rpc_size_validation() {
2772        let (event_tx, event_rx) = crate::event::channel();
2773
2774        // Negative size should be clamped to default (16)
2775        let driver = thread::spawn(move || {
2776            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2777                event_rx.recv()
2778            {
2779                assert_eq!(payload_size, 16); // default, not negative
2780                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2781            }
2782        });
2783
2784        let request = PickleValue::Dict(vec![
2785            (
2786                PickleValue::String("send_probe".into()),
2787                PickleValue::Bytes(vec![0xDD; 16]),
2788            ),
2789            (PickleValue::String("size".into()), PickleValue::Int(-1)),
2790        ]);
2791
2792        let response = handle_rpc_request(&request, &event_tx).unwrap();
2793        assert_eq!(response, PickleValue::None);
2794        driver.join().unwrap();
2795    }
2796
2797    #[test]
2798    fn send_probe_rpc_size_too_large() {
2799        let (event_tx, event_rx) = crate::event::channel();
2800
2801        // Size > 400 should be clamped to default (16)
2802        let driver = thread::spawn(move || {
2803            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2804                event_rx.recv()
2805            {
2806                assert_eq!(payload_size, 16); // default, not 999
2807                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2808            }
2809        });
2810
2811        let request = PickleValue::Dict(vec![
2812            (
2813                PickleValue::String("send_probe".into()),
2814                PickleValue::Bytes(vec![0xDD; 16]),
2815            ),
2816            (PickleValue::String("size".into()), PickleValue::Int(999)),
2817        ]);
2818
2819        let response = handle_rpc_request(&request, &event_tx).unwrap();
2820        assert_eq!(response, PickleValue::None);
2821        driver.join().unwrap();
2822    }
2823
2824    #[test]
2825    fn check_proof_rpc_not_found() {
2826        let (event_tx, event_rx) = crate::event::channel();
2827
2828        let driver = thread::spawn(move || {
2829            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2830                event_rx.recv()
2831            {
2832                assert_eq!(packet_hash, [0xEE; 32]);
2833                let _ = resp_tx.send(QueryResponse::CheckProof(None));
2834            }
2835        });
2836
2837        let request = PickleValue::Dict(vec![(
2838            PickleValue::String("check_proof".into()),
2839            PickleValue::Bytes(vec![0xEE; 32]),
2840        )]);
2841
2842        let response = handle_rpc_request(&request, &event_tx).unwrap();
2843        assert_eq!(response, PickleValue::None);
2844        driver.join().unwrap();
2845    }
2846
2847    #[test]
2848    fn check_proof_rpc_found() {
2849        let (event_tx, event_rx) = crate::event::channel();
2850
2851        let driver = thread::spawn(move || {
2852            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2853                event_rx.recv()
2854            {
2855                assert_eq!(packet_hash, [0xFF; 32]);
2856                let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
2857            }
2858        });
2859
2860        let request = PickleValue::Dict(vec![(
2861            PickleValue::String("check_proof".into()),
2862            PickleValue::Bytes(vec![0xFF; 32]),
2863        )]);
2864
2865        let response = handle_rpc_request(&request, &event_tx).unwrap();
2866        if let PickleValue::Float(rtt) = response {
2867            assert!((rtt - 0.352).abs() < 0.001);
2868        } else {
2869            panic!("Expected Float, got {:?}", response);
2870        }
2871        driver.join().unwrap();
2872    }
2873
2874    #[test]
2875    fn request_path_rpc() {
2876        let (event_tx, event_rx) = crate::event::channel();
2877
2878        let driver =
2879            thread::spawn(
2880                move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2881                    Ok(Event::RequestPath { dest_hash }) => {
2882                        assert_eq!(dest_hash, [0x11; 16]);
2883                    }
2884                    other => panic!("Expected RequestPath event, got {:?}", other),
2885                },
2886            );
2887
2888        let request = PickleValue::Dict(vec![(
2889            PickleValue::String("request_path".into()),
2890            PickleValue::Bytes(vec![0x11; 16]),
2891        )]);
2892
2893        let response = handle_rpc_request(&request, &event_tx).unwrap();
2894        assert_eq!(response, PickleValue::Bool(true));
2895        driver.join().unwrap();
2896    }
2897
2898    #[test]
2899    fn begin_drain_rpc_emits_event() {
2900        let (event_tx, event_rx) = crate::event::channel();
2901
2902        let driver = thread::spawn(
2903            move || match event_rx.recv_timeout(Duration::from_secs(5)) {
2904                Ok(Event::BeginDrain { timeout }) => {
2905                    assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
2906                }
2907                other => panic!("Expected BeginDrain event, got {:?}", other),
2908            },
2909        );
2910
2911        let request = PickleValue::Dict(vec![(
2912            PickleValue::String("begin_drain".into()),
2913            PickleValue::Float(1.5),
2914        )]);
2915
2916        let response = handle_rpc_request(&request, &event_tx).unwrap();
2917        assert_eq!(response, PickleValue::Bool(true));
2918        driver.join().unwrap();
2919    }
2920
2921    #[test]
2922    fn drain_status_rpc_roundtrips_fields() {
2923        let (event_tx, event_rx) = crate::event::channel();
2924
2925        let driver = thread::spawn(move || {
2926            if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
2927                let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
2928                    state: LifecycleState::Draining,
2929                    drain_age_seconds: Some(0.75),
2930                    deadline_remaining_seconds: Some(2.25),
2931                    drain_complete: false,
2932                    interface_writer_queued_frames: 3,
2933                    provider_backlog_events: 4,
2934                    provider_consumer_queued_events: 5,
2935                    detail: Some("node is draining existing work".into()),
2936                }));
2937            }
2938        });
2939
2940        let request = PickleValue::Dict(vec![(
2941            PickleValue::String("get".into()),
2942            PickleValue::String("drain_status".into()),
2943        )]);
2944
2945        let response = handle_rpc_request(&request, &event_tx).unwrap();
2946        assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
2947        assert_eq!(
2948            response.get("drain_complete").unwrap().as_bool(),
2949            Some(false)
2950        );
2951        assert_eq!(
2952            response
2953                .get("deadline_remaining_seconds")
2954                .unwrap()
2955                .as_float(),
2956            Some(2.25)
2957        );
2958        assert_eq!(
2959            response
2960                .get("interface_writer_queued_frames")
2961                .unwrap()
2962                .as_int(),
2963            Some(3)
2964        );
2965        assert_eq!(
2966            response.get("provider_backlog_events").unwrap().as_int(),
2967            Some(4)
2968        );
2969        assert_eq!(
2970            response
2971                .get("provider_consumer_queued_events")
2972                .unwrap()
2973                .as_int(),
2974            Some(5)
2975        );
2976        assert_eq!(
2977            response.get("detail").unwrap().as_str(),
2978            Some("node is draining existing work")
2979        );
2980        driver.join().unwrap();
2981    }
2982
2983    #[test]
2984    fn interface_stats_with_probe_responder() {
2985        let probe_hash = [0x42; 16];
2986        let stats = InterfaceStatsResponse {
2987            interfaces: vec![],
2988            transport_id: None,
2989            transport_enabled: true,
2990            transport_uptime: 100.0,
2991            total_rxb: 0,
2992            total_txb: 0,
2993            probe_responder: Some(probe_hash),
2994            backbone_peer_pool: None,
2995        };
2996
2997        let pickle = interface_stats_to_pickle(&stats);
2998        let encoded = pickle::encode(&pickle);
2999        let decoded = pickle::decode(&encoded).unwrap();
3000
3001        let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
3002        assert_eq!(pr, &probe_hash);
3003    }
3004
3005    #[test]
3006    fn runtime_config_get_and_set_rpc() {
3007        let (event_tx, event_rx) = crate::event::channel();
3008
3009        let driver = thread::spawn(move || {
3010            if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
3011                event_rx.recv()
3012            {
3013                assert_eq!(key, "global.tick_interval_ms");
3014                let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
3015                    RuntimeConfigEntry {
3016                        key,
3017                        value: RuntimeConfigValue::Int(1000),
3018                        default: RuntimeConfigValue::Int(1000),
3019                        source: RuntimeConfigSource::Startup,
3020                        apply_mode: RuntimeConfigApplyMode::Immediate,
3021                        description: Some("tick".into()),
3022                    },
3023                )));
3024            } else {
3025                panic!("expected GetRuntimeConfig query");
3026            }
3027
3028            if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
3029                event_rx.recv()
3030            {
3031                assert_eq!(key, "global.tick_interval_ms");
3032                assert_eq!(value, RuntimeConfigValue::Int(250));
3033                let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
3034                    key,
3035                    value: RuntimeConfigValue::Int(250),
3036                    default: RuntimeConfigValue::Int(1000),
3037                    source: RuntimeConfigSource::RuntimeOverride,
3038                    apply_mode: RuntimeConfigApplyMode::Immediate,
3039                    description: Some("tick".into()),
3040                })));
3041            } else {
3042                panic!("expected SetRuntimeConfig query");
3043            }
3044        });
3045
3046        let get_request = PickleValue::Dict(vec![
3047            (
3048                PickleValue::String("get".into()),
3049                PickleValue::String("runtime_config_entry".into()),
3050            ),
3051            (
3052                PickleValue::String("key".into()),
3053                PickleValue::String("global.tick_interval_ms".into()),
3054            ),
3055        ]);
3056        let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
3057        assert_eq!(
3058            get_response.get("key").and_then(|v| v.as_str()),
3059            Some("global.tick_interval_ms")
3060        );
3061
3062        let set_request = PickleValue::Dict(vec![
3063            (
3064                PickleValue::String("set".into()),
3065                PickleValue::String("runtime_config".into()),
3066            ),
3067            (
3068                PickleValue::String("key".into()),
3069                PickleValue::String("global.tick_interval_ms".into()),
3070            ),
3071            (PickleValue::String("value".into()), PickleValue::Int(250)),
3072        ]);
3073        let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
3074        assert_eq!(
3075            set_response.get("value").and_then(|v| v.as_int()),
3076            Some(250)
3077        );
3078
3079        driver.join().unwrap();
3080    }
3081
3082    // Helper: create a connected TCP pair
3083    fn tcp_pair() -> (TcpStream, TcpStream) {
3084        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3085        let port = listener.local_addr().unwrap().port();
3086        let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
3087        let (server, _) = listener.accept().unwrap();
3088        client
3089            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3090            .unwrap();
3091        server
3092            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3093            .unwrap();
3094        (server, client)
3095    }
3096}