Skip to main content

rns_net/
rpc.rs

1//! RPC server and client for cross-process daemon communication.
2//!
3//! Implements Python `multiprocessing.connection` wire protocol:
4//! - 4-byte big-endian signed i32 length prefix + payload
5//! - HMAC-SHA256 challenge-response authentication
6//! - Pickle serialization for request/response dictionaries
7//!
8//! Server translates pickle dicts into [`QueryRequest`] events, sends
9//! them through the driver event channel, and returns pickle responses.
10
11use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15    atomic::{AtomicBool, Ordering},
16    Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_crypto::hmac::hmac_sha256;
22use rns_crypto::sha256::sha256;
23
24use crate::event::{
25    BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
26    HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, PathTableEntry,
27    ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode,
28    RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource,
29    RuntimeConfigValue, SingleInterfaceStat,
30};
31use crate::md5::hmac_md5;
32use crate::pickle::{self, PickleValue};
33
34const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
35const WELCOME: &[u8] = b"#WELCOME#";
36const FAILURE: &[u8] = b"#FAILURE#";
37const CHALLENGE_LEN: usize = 40;
38
39/// RPC address types.
40#[derive(Debug, Clone)]
41pub enum RpcAddr {
42    Tcp(String, u16),
43}
44
45/// RPC server that listens for incoming connections and handles queries.
46pub struct RpcServer {
47    shutdown: Arc<AtomicBool>,
48    thread: Option<thread::JoinHandle<()>>,
49}
50
51impl RpcServer {
52    /// Start the RPC server on the given address.
53    pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
54        let shutdown = Arc::new(AtomicBool::new(false));
55        let shutdown2 = shutdown.clone();
56
57        let listener = match addr {
58            RpcAddr::Tcp(host, port) => {
59                let l = TcpListener::bind((host.as_str(), *port))?;
60                // Non-blocking so we can check shutdown flag
61                l.set_nonblocking(true)?;
62                l
63            }
64        };
65
66        let thread = thread::Builder::new()
67            .name("rpc-server".into())
68            .spawn(move || {
69                rpc_server_loop(listener, auth_key, event_tx, shutdown2);
70            })
71            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
72
73        Ok(RpcServer {
74            shutdown,
75            thread: Some(thread),
76        })
77    }
78
79    /// Stop the RPC server.
80    pub fn stop(&mut self) {
81        self.shutdown.store(true, Ordering::Relaxed);
82        if let Some(handle) = self.thread.take() {
83            let _ = handle.join();
84        }
85    }
86}
87
88impl Drop for RpcServer {
89    fn drop(&mut self) {
90        self.stop();
91    }
92}
93
94fn rpc_server_loop(
95    listener: TcpListener,
96    auth_key: [u8; 32],
97    event_tx: EventSender,
98    shutdown: Arc<AtomicBool>,
99) {
100    loop {
101        if shutdown.load(Ordering::Relaxed) {
102            break;
103        }
104
105        match listener.accept() {
106            Ok((stream, _addr)) => {
107                // Set blocking for this connection
108                let _ = stream.set_nonblocking(false);
109                let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
110                let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
111
112                if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
113                    log::debug!("RPC connection error: {}", e);
114                }
115            }
116            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117                // No pending connection, sleep briefly and retry
118                thread::sleep(std::time::Duration::from_millis(100));
119            }
120            Err(e) => {
121                log::error!("RPC accept error: {}", e);
122                thread::sleep(std::time::Duration::from_millis(100));
123            }
124        }
125    }
126}
127
128fn handle_connection(
129    mut stream: TcpStream,
130    auth_key: &[u8; 32],
131    event_tx: &EventSender,
132) -> io::Result<()> {
133    // Authentication: send challenge, verify response
134    server_auth(&mut stream, auth_key)?;
135
136    // Read request (pickle dict)
137    let request_bytes = recv_bytes(&mut stream)?;
138    let request = pickle::decode(&request_bytes)
139        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
140
141    // Translate pickle dict to query, send to driver, get response
142    let response = handle_rpc_request(&request, event_tx)?;
143
144    // Encode response and send
145    let response_bytes = pickle::encode(&response);
146    send_bytes(&mut stream, &response_bytes)?;
147
148    Ok(())
149}
150
151/// Server-side authentication: challenge-response.
152fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
153    // Generate challenge: #CHALLENGE#{sha256}<40 random bytes>
154    let mut random_bytes = [0u8; CHALLENGE_LEN];
155    // Use /dev/urandom for randomness
156    {
157        let mut f = std::fs::File::open("/dev/urandom")?;
158        f.read_exact(&mut random_bytes)?;
159    }
160
161    let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
162    challenge_message.extend_from_slice(CHALLENGE_PREFIX);
163    challenge_message.extend_from_slice(b"{sha256}");
164    challenge_message.extend_from_slice(&random_bytes);
165
166    send_bytes(stream, &challenge_message)?;
167
168    // Read response (max 256 bytes)
169    let response = recv_bytes(stream)?;
170
171    // Verify response
172    // The message to HMAC is everything after #CHALLENGE# (i.e. {sha256}<random>)
173    let message = &challenge_message[CHALLENGE_PREFIX.len()..];
174
175    if verify_response(auth_key, message, &response) {
176        send_bytes(stream, WELCOME)?;
177        Ok(())
178    } else {
179        send_bytes(stream, FAILURE)?;
180        Err(io::Error::new(
181            io::ErrorKind::PermissionDenied,
182            "auth failed",
183        ))
184    }
185}
186
187/// Verify a client's HMAC response.
188fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
189    // Modern protocol: response = {sha256}<hmac-sha256 digest>
190    if response.starts_with(b"{sha256}") {
191        let digest = &response[8..];
192        let expected = hmac_sha256(auth_key, message);
193        constant_time_eq(digest, &expected)
194    }
195    // Legacy protocol: response = raw 16-byte HMAC-MD5 digest
196    else if response.len() == 16 {
197        let expected = hmac_md5(auth_key, message);
198        constant_time_eq(response, &expected)
199    }
200    // Legacy with {md5} prefix
201    else if response.starts_with(b"{md5}") {
202        let digest = &response[5..];
203        let expected = hmac_md5(auth_key, message);
204        constant_time_eq(digest, &expected)
205    } else {
206        false
207    }
208}
209
210/// Constant-time byte comparison.
211fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
212    if a.len() != b.len() {
213        return false;
214    }
215    let mut diff = 0u8;
216    for (x, y) in a.iter().zip(b.iter()) {
217        diff |= x ^ y;
218    }
219    diff == 0
220}
221
222/// Send bytes with 4-byte big-endian length prefix.
223fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
224    let len = data.len() as i32;
225    stream.write_all(&len.to_be_bytes())?;
226    stream.write_all(data)?;
227    stream.flush()
228}
229
230/// Receive bytes with 4-byte big-endian length prefix.
231fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
232    let mut len_buf = [0u8; 4];
233    stream.read_exact(&mut len_buf)?;
234    let len = i32::from_be_bytes(len_buf);
235
236    if len < 0 {
237        // Extended format: 8-byte length
238        let mut len8_buf = [0u8; 8];
239        stream.read_exact(&mut len8_buf)?;
240        let len = u64::from_be_bytes(len8_buf) as usize;
241        if len > 64 * 1024 * 1024 {
242            return Err(io::Error::new(
243                io::ErrorKind::InvalidData,
244                "message too large",
245            ));
246        }
247        let mut buf = vec![0u8; len];
248        stream.read_exact(&mut buf)?;
249        Ok(buf)
250    } else {
251        let len = len as usize;
252        if len > 64 * 1024 * 1024 {
253            return Err(io::Error::new(
254                io::ErrorKind::InvalidData,
255                "message too large",
256            ));
257        }
258        let mut buf = vec![0u8; len];
259        stream.read_exact(&mut buf)?;
260        Ok(buf)
261    }
262}
263
264/// Translate a pickle request dict to a query event and get response.
265fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
266    // Handle "get" requests
267    if let Some(get_val) = request.get("get") {
268        if let Some(path) = get_val.as_str() {
269            return match path {
270                "interface_stats" => {
271                    let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
272                    if let QueryResponse::InterfaceStats(stats) = resp {
273                        Ok(interface_stats_to_pickle(&stats))
274                    } else {
275                        Ok(PickleValue::None)
276                    }
277                }
278                "path_table" => {
279                    let max_hops = request
280                        .get("max_hops")
281                        .and_then(|v| v.as_int().map(|n| n as u8));
282                    let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
283                    if let QueryResponse::PathTable(entries) = resp {
284                        Ok(path_table_to_pickle(&entries))
285                    } else {
286                        Ok(PickleValue::None)
287                    }
288                }
289                "rate_table" => {
290                    let resp = send_query(event_tx, QueryRequest::RateTable)?;
291                    if let QueryResponse::RateTable(entries) = resp {
292                        Ok(rate_table_to_pickle(&entries))
293                    } else {
294                        Ok(PickleValue::None)
295                    }
296                }
297                "next_hop" => {
298                    let hash = extract_dest_hash(request, "destination_hash")?;
299                    let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
300                    if let QueryResponse::NextHop(Some(nh)) = resp {
301                        Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
302                    } else {
303                        Ok(PickleValue::None)
304                    }
305                }
306                "next_hop_if_name" => {
307                    let hash = extract_dest_hash(request, "destination_hash")?;
308                    let resp =
309                        send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
310                    if let QueryResponse::NextHopIfName(Some(name)) = resp {
311                        Ok(PickleValue::String(name))
312                    } else {
313                        Ok(PickleValue::None)
314                    }
315                }
316                "link_count" => {
317                    let resp = send_query(event_tx, QueryRequest::LinkCount)?;
318                    if let QueryResponse::LinkCount(n) = resp {
319                        Ok(PickleValue::Int(n as i64))
320                    } else {
321                        Ok(PickleValue::None)
322                    }
323                }
324                "transport_identity" => {
325                    let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
326                    if let QueryResponse::TransportIdentity(Some(hash)) = resp {
327                        Ok(PickleValue::Bytes(hash.to_vec()))
328                    } else {
329                        Ok(PickleValue::None)
330                    }
331                }
332                "blackholed" => {
333                    let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
334                    if let QueryResponse::Blackholed(entries) = resp {
335                        Ok(blackholed_to_pickle(&entries))
336                    } else {
337                        Ok(PickleValue::None)
338                    }
339                }
340                "discovered_interfaces" => {
341                    let only_available = request
342                        .get("only_available")
343                        .and_then(|v| v.as_bool())
344                        .unwrap_or(false);
345                    let only_transport = request
346                        .get("only_transport")
347                        .and_then(|v| v.as_bool())
348                        .unwrap_or(false);
349                    let resp = send_query(
350                        event_tx,
351                        QueryRequest::DiscoveredInterfaces {
352                            only_available,
353                            only_transport,
354                        },
355                    )?;
356                    if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
357                        Ok(discovered_interfaces_to_pickle(&interfaces))
358                    } else {
359                        Ok(PickleValue::None)
360                    }
361                }
362                "hooks" => {
363                    let (response_tx, response_rx) = mpsc::channel();
364                    event_tx
365                        .send(Event::ListHooks { response_tx })
366                        .map_err(|_| {
367                            io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
368                        })?;
369                    let hooks = response_rx
370                        .recv_timeout(std::time::Duration::from_secs(5))
371                        .map_err(|_| {
372                            io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
373                        })?;
374                    Ok(hooks_to_pickle(&hooks))
375                }
376                "runtime_config" => {
377                    let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
378                    if let QueryResponse::RuntimeConfigList(entries) = resp {
379                        Ok(runtime_config_list_to_pickle(&entries))
380                    } else {
381                        Ok(PickleValue::None)
382                    }
383                }
384                "known_destinations" => {
385                    let resp = send_query(event_tx, QueryRequest::KnownDestinations)?;
386                    if let QueryResponse::KnownDestinations(entries) = resp {
387                        Ok(known_destinations_to_pickle(&entries))
388                    } else {
389                        Ok(PickleValue::None)
390                    }
391                }
392                "runtime_config_entry" => {
393                    let key = request
394                        .get("key")
395                        .and_then(|v| v.as_str())
396                        .unwrap_or_default()
397                        .to_string();
398                    let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
399                    if let QueryResponse::RuntimeConfigEntry(entry) = resp {
400                        Ok(entry
401                            .as_ref()
402                            .map(runtime_config_entry_to_pickle)
403                            .unwrap_or(PickleValue::None))
404                    } else {
405                        Ok(PickleValue::None)
406                    }
407                }
408                "backbone_peer_state" => {
409                    let interface_name = request
410                        .get("interface")
411                        .and_then(|v| v.as_str())
412                        .map(|s| s.to_string());
413                    let resp =
414                        send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
415                    if let QueryResponse::BackbonePeerState(entries) = resp {
416                        Ok(backbone_peer_state_to_pickle(&entries))
417                    } else {
418                        Ok(PickleValue::None)
419                    }
420                }
421                "backbone_interfaces" => {
422                    let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
423                    if let QueryResponse::BackboneInterfaces(entries) = resp {
424                        Ok(backbone_interfaces_to_pickle(&entries))
425                    } else {
426                        Ok(PickleValue::None)
427                    }
428                }
429                "provider_bridge_stats" => {
430                    let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
431                    if let QueryResponse::ProviderBridgeStats(stats) = resp {
432                        Ok(stats
433                            .as_ref()
434                            .map(provider_bridge_stats_to_pickle)
435                            .unwrap_or(PickleValue::None))
436                    } else {
437                        Ok(PickleValue::None)
438                    }
439                }
440                "drain_status" => {
441                    let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
442                    if let QueryResponse::DrainStatus(status) = resp {
443                        Ok(drain_status_to_pickle(&status))
444                    } else {
445                        Ok(PickleValue::None)
446                    }
447                }
448                _ => Ok(PickleValue::None),
449            };
450        }
451    }
452
453    if let Some(begin_val) = request.get("begin_drain") {
454        let timeout_secs = begin_val
455            .as_float()
456            .or_else(|| begin_val.as_int().map(|value| value as f64))
457            .unwrap_or(0.0)
458            .max(0.0);
459        let timeout = Duration::from_secs_f64(timeout_secs);
460        let _ = event_tx.send(Event::BeginDrain { timeout });
461        return Ok(PickleValue::Bool(true));
462    }
463
464    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
465        if set_val == "known_destination_retained" {
466            let dest_hash = extract_dest_hash(request, "dest_hash")?;
467            let resp = send_query(event_tx, QueryRequest::RetainKnownDestination { dest_hash })?;
468            return if let QueryResponse::RetainKnownDestination(ok) = resp {
469                Ok(PickleValue::Bool(ok))
470            } else {
471                Ok(PickleValue::None)
472            };
473        }
474        if set_val == "identity_retained" {
475            let identity_hash = extract_dest_hash(request, "identity_hash")?;
476            let resp = send_query(event_tx, QueryRequest::RetainIdentity { identity_hash })?;
477            return if let QueryResponse::RetainIdentity(ok) = resp {
478                Ok(PickleValue::Bool(ok))
479            } else {
480                Ok(PickleValue::None)
481            };
482        }
483        if set_val == "known_destination_used" {
484            let dest_hash = extract_dest_hash(request, "dest_hash")?;
485            let resp = send_query(
486                event_tx,
487                QueryRequest::MarkKnownDestinationUsed { dest_hash },
488            )?;
489            return if let QueryResponse::MarkKnownDestinationUsed(ok) = resp {
490                Ok(PickleValue::Bool(ok))
491            } else {
492                Ok(PickleValue::None)
493            };
494        }
495        if set_val == "runtime_config" {
496            let key = request
497                .get("key")
498                .and_then(|v| v.as_str())
499                .unwrap_or_default()
500                .to_string();
501            let Some(value) = request
502                .get("value")
503                .and_then(runtime_config_value_from_pickle)
504            else {
505                return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
506                    code: RuntimeConfigErrorCode::InvalidType,
507                    message: "runtime-config set requires a scalar value".into(),
508                }));
509            };
510            let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
511            return if let QueryResponse::RuntimeConfigSet(result) = resp {
512                Ok(runtime_config_result_to_pickle(result))
513            } else {
514                Ok(PickleValue::None)
515            };
516        }
517    }
518
519    if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
520        if reset_val == "runtime_config" {
521            let key = request
522                .get("key")
523                .and_then(|v| v.as_str())
524                .unwrap_or_default()
525                .to_string();
526            let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
527            return if let QueryResponse::RuntimeConfigReset(result) = resp {
528                Ok(runtime_config_result_to_pickle(result))
529            } else {
530                Ok(PickleValue::None)
531            };
532        }
533    }
534
535    if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
536        if clear_val == "known_destination_retained" {
537            let dest_hash = extract_dest_hash(request, "dest_hash")?;
538            let resp = send_query(
539                event_tx,
540                QueryRequest::UnretainKnownDestination { dest_hash },
541            )?;
542            return if let QueryResponse::UnretainKnownDestination(ok) = resp {
543                Ok(PickleValue::Bool(ok))
544            } else {
545                Ok(PickleValue::None)
546            };
547        }
548        if clear_val == "backbone_peer_state" {
549            let interface_name = required_string(request, "interface")?;
550            let peer_ip = required_string(request, "ip")?;
551            let peer_ip = peer_ip
552                .parse()
553                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
554            let resp = send_query(
555                event_tx,
556                QueryRequest::ClearBackbonePeerState {
557                    interface_name,
558                    peer_ip,
559                },
560            )?;
561            return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
562                Ok(PickleValue::Bool(ok))
563            } else {
564                Ok(PickleValue::None)
565            };
566        }
567    }
568
569    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
570        if set_val == "backbone_peer_blacklist" {
571            let interface_name = required_string(request, "interface")?;
572            let peer_ip = required_string(request, "ip")?;
573            let peer_ip: IpAddr = peer_ip
574                .parse()
575                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
576            let duration_secs = request
577                .get("duration_secs")
578                .and_then(|v| v.as_int())
579                .ok_or_else(|| {
580                    io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
581                })?;
582            let reason = request
583                .get("reason")
584                .and_then(|v| v.as_str())
585                .unwrap_or("sentinel blacklist")
586                .to_string();
587            let penalty_level = request
588                .get("penalty_level")
589                .and_then(|v| v.as_int())
590                .unwrap_or(0)
591                .clamp(0, u8::MAX as i64) as u8;
592            let resp = send_query(
593                event_tx,
594                QueryRequest::BlacklistBackbonePeer {
595                    interface_name,
596                    peer_ip,
597                    duration: Duration::from_secs(duration_secs as u64),
598                    reason,
599                    penalty_level,
600                },
601            )?;
602            return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
603                Ok(PickleValue::Bool(ok))
604            } else {
605                Ok(PickleValue::None)
606            };
607        }
608    }
609
610    // Handle "request_path" -- trigger a path request to the network
611    if let Some(hash_val) = request.get("request_path") {
612        if let Some(hash_bytes) = hash_val.as_bytes() {
613            if hash_bytes.len() >= 16 {
614                let mut dest_hash = [0u8; 16];
615                dest_hash.copy_from_slice(&hash_bytes[..16]);
616                let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
617                return Ok(PickleValue::Bool(true));
618            }
619        }
620    }
621
622    // Handle "send_probe" requests
623    if let Some(hash_val) = request.get("send_probe") {
624        if let Some(hash_bytes) = hash_val.as_bytes() {
625            if hash_bytes.len() >= 16 {
626                let mut dest_hash = [0u8; 16];
627                dest_hash.copy_from_slice(&hash_bytes[..16]);
628                let payload_size = request
629                    .get("size")
630                    .and_then(|v| v.as_int())
631                    .and_then(|n| {
632                        if n > 0 && n <= 400 {
633                            Some(n as usize)
634                        } else {
635                            None
636                        }
637                    })
638                    .unwrap_or(16);
639                let resp = send_query(
640                    event_tx,
641                    QueryRequest::SendProbe {
642                        dest_hash,
643                        payload_size,
644                    },
645                )?;
646                if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
647                    return Ok(PickleValue::Dict(vec![
648                        (
649                            PickleValue::String("packet_hash".into()),
650                            PickleValue::Bytes(packet_hash.to_vec()),
651                        ),
652                        (
653                            PickleValue::String("hops".into()),
654                            PickleValue::Int(hops as i64),
655                        ),
656                    ]));
657                } else {
658                    return Ok(PickleValue::None);
659                }
660            }
661        }
662    }
663
664    // Handle "check_proof" requests
665    if let Some(hash_val) = request.get("check_proof") {
666        if let Some(hash_bytes) = hash_val.as_bytes() {
667            if hash_bytes.len() >= 32 {
668                let mut packet_hash = [0u8; 32];
669                packet_hash.copy_from_slice(&hash_bytes[..32]);
670                let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
671                if let QueryResponse::CheckProof(Some(rtt)) = resp {
672                    return Ok(PickleValue::Float(rtt));
673                } else {
674                    return Ok(PickleValue::None);
675                }
676            }
677        }
678    }
679
680    // Handle "blackhole" requests
681    if let Some(hash_val) = request.get("blackhole") {
682        if let Some(hash_bytes) = hash_val.as_bytes() {
683            if hash_bytes.len() >= 16 {
684                let mut identity_hash = [0u8; 16];
685                identity_hash.copy_from_slice(&hash_bytes[..16]);
686                let duration_hours = request.get("duration").and_then(|v| v.as_float());
687                let reason = request
688                    .get("reason")
689                    .and_then(|v| v.as_str())
690                    .map(|s| s.to_string());
691                let resp = send_query(
692                    event_tx,
693                    QueryRequest::BlackholeIdentity {
694                        identity_hash,
695                        duration_hours,
696                        reason,
697                    },
698                )?;
699                return Ok(PickleValue::Bool(matches!(
700                    resp,
701                    QueryResponse::BlackholeResult(true)
702                )));
703            }
704        }
705    }
706
707    // Handle "unblackhole" requests
708    if let Some(hash_val) = request.get("unblackhole") {
709        if let Some(hash_bytes) = hash_val.as_bytes() {
710            if hash_bytes.len() >= 16 {
711                let mut identity_hash = [0u8; 16];
712                identity_hash.copy_from_slice(&hash_bytes[..16]);
713                let resp = send_query(
714                    event_tx,
715                    QueryRequest::UnblackholeIdentity { identity_hash },
716                )?;
717                return Ok(PickleValue::Bool(matches!(
718                    resp,
719                    QueryResponse::UnblackholeResult(true)
720                )));
721            }
722        }
723    }
724
725    // Handle "drop" requests
726    if let Some(drop_val) = request.get("drop") {
727        if let Some(path) = drop_val.as_str() {
728            return match path {
729                "path" => {
730                    let hash = extract_dest_hash(request, "destination_hash")?;
731                    let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
732                    if let QueryResponse::DropPath(ok) = resp {
733                        Ok(PickleValue::Bool(ok))
734                    } else {
735                        Ok(PickleValue::None)
736                    }
737                }
738                "all_via" => {
739                    let hash = extract_dest_hash(request, "destination_hash")?;
740                    let resp = send_query(
741                        event_tx,
742                        QueryRequest::DropAllVia {
743                            transport_hash: hash,
744                        },
745                    )?;
746                    if let QueryResponse::DropAllVia(n) = resp {
747                        Ok(PickleValue::Int(n as i64))
748                    } else {
749                        Ok(PickleValue::None)
750                    }
751                }
752                "announce_queues" => {
753                    let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
754                    if let QueryResponse::DropAnnounceQueues = resp {
755                        Ok(PickleValue::Bool(true))
756                    } else {
757                        Ok(PickleValue::None)
758                    }
759                }
760                _ => Ok(PickleValue::None),
761            };
762        }
763    }
764
765    if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
766        return handle_hook_rpc_request(hook_val, request, event_tx);
767    }
768
769    Ok(PickleValue::None)
770}
771
772fn handle_hook_rpc_request(
773    op: &str,
774    request: &PickleValue,
775    event_tx: &EventSender,
776) -> io::Result<PickleValue> {
777    match op {
778        "load" => {
779            let name = required_string(request, "name")?;
780            let attach_point = required_string(request, "attach_point")?;
781            let priority = request
782                .get("priority")
783                .and_then(|v| v.as_int())
784                .unwrap_or(0) as i32;
785            let wasm = request
786                .get("wasm")
787                .and_then(|v| v.as_bytes())
788                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
789                .to_vec();
790            let (response_tx, response_rx) = mpsc::channel();
791            event_tx
792                .send(Event::LoadHook {
793                    name,
794                    wasm_bytes: wasm,
795                    attach_point,
796                    priority,
797                    response_tx,
798                })
799                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
800            let response = response_rx
801                .recv_timeout(std::time::Duration::from_secs(5))
802                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
803            Ok(hook_result_to_pickle(response))
804        }
805        "load_builtin" => {
806            let name = required_string(request, "name")?;
807            let attach_point = required_string(request, "attach_point")?;
808            let builtin_id = required_string(request, "builtin_id")?;
809            let priority = request
810                .get("priority")
811                .and_then(|v| v.as_int())
812                .unwrap_or(0) as i32;
813            let (response_tx, response_rx) = mpsc::channel();
814            event_tx
815                .send(Event::LoadBuiltinHook {
816                    name,
817                    builtin_id,
818                    attach_point,
819                    priority,
820                    response_tx,
821                })
822                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
823            let response = response_rx
824                .recv_timeout(std::time::Duration::from_secs(5))
825                .map_err(|_| {
826                    io::Error::new(io::ErrorKind::TimedOut, "built-in hook load timed out")
827                })?;
828            Ok(hook_result_to_pickle(response))
829        }
830        "unload" => {
831            let name = required_string(request, "name")?;
832            let attach_point = required_string(request, "attach_point")?;
833            let (response_tx, response_rx) = mpsc::channel();
834            event_tx
835                .send(Event::UnloadHook {
836                    name,
837                    attach_point,
838                    response_tx,
839                })
840                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
841            let response = response_rx
842                .recv_timeout(std::time::Duration::from_secs(5))
843                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
844            Ok(hook_result_to_pickle(response))
845        }
846        "enable" | "disable" => {
847            let name = required_string(request, "name")?;
848            let attach_point = required_string(request, "attach_point")?;
849            let enabled = op == "enable";
850            let (response_tx, response_rx) = mpsc::channel();
851            event_tx
852                .send(Event::SetHookEnabled {
853                    name,
854                    attach_point,
855                    enabled,
856                    response_tx,
857                })
858                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
859            let response = response_rx
860                .recv_timeout(std::time::Duration::from_secs(5))
861                .map_err(|_| {
862                    io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
863                })?;
864            Ok(hook_result_to_pickle(response))
865        }
866        "set_priority" => {
867            let name = required_string(request, "name")?;
868            let attach_point = required_string(request, "attach_point")?;
869            let priority = request
870                .get("priority")
871                .and_then(|v| v.as_int())
872                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
873                as i32;
874            let (response_tx, response_rx) = mpsc::channel();
875            event_tx
876                .send(Event::SetHookPriority {
877                    name,
878                    attach_point,
879                    priority,
880                    response_tx,
881                })
882                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
883            let response = response_rx
884                .recv_timeout(std::time::Duration::from_secs(5))
885                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
886            Ok(hook_result_to_pickle(response))
887        }
888        _ => Ok(PickleValue::None),
889    }
890}
891
892/// Send a query to the driver and wait for the response.
893fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
894    let (resp_tx, resp_rx) = mpsc::channel();
895    event_tx
896        .send(Event::Query(request, resp_tx))
897        .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
898    resp_rx
899        .recv_timeout(std::time::Duration::from_secs(5))
900        .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
901}
902
903/// Extract a 16-byte destination hash from a pickle dict field.
904fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
905    let bytes = request
906        .get(key)
907        .and_then(|v| v.as_bytes())
908        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
909    if bytes.len() < 16 {
910        return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
911    }
912    let mut hash = [0u8; 16];
913    hash.copy_from_slice(&bytes[..16]);
914    Ok(hash)
915}
916
917fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
918    request
919        .get(key)
920        .and_then(|v| v.as_str())
921        .map(|s| s.to_string())
922        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
923}
924
925fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
926    match result {
927        Ok(()) => PickleValue::Dict(vec![(
928            PickleValue::String("ok".into()),
929            PickleValue::Bool(true),
930        )]),
931        Err(error) => PickleValue::Dict(vec![
932            (PickleValue::String("ok".into()), PickleValue::Bool(false)),
933            (
934                PickleValue::String("error".into()),
935                PickleValue::String(error),
936            ),
937        ]),
938    }
939}
940
941// --- Pickle response builders ---
942
943fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
944    let mut ifaces = Vec::new();
945    for iface in &stats.interfaces {
946        ifaces.push(single_iface_to_pickle(iface));
947    }
948
949    let mut dict = vec![
950        (
951            PickleValue::String("interfaces".into()),
952            PickleValue::List(ifaces),
953        ),
954        (
955            PickleValue::String("transport_enabled".into()),
956            PickleValue::Bool(stats.transport_enabled),
957        ),
958        (
959            PickleValue::String("transport_uptime".into()),
960            PickleValue::Float(stats.transport_uptime),
961        ),
962        (
963            PickleValue::String("rxb".into()),
964            PickleValue::Int(stats.total_rxb as i64),
965        ),
966        (
967            PickleValue::String("txb".into()),
968            PickleValue::Int(stats.total_txb as i64),
969        ),
970    ];
971
972    if let Some(tid) = stats.transport_id {
973        dict.push((
974            PickleValue::String("transport_id".into()),
975            PickleValue::Bytes(tid.to_vec()),
976        ));
977    } else {
978        dict.push((
979            PickleValue::String("transport_id".into()),
980            PickleValue::None,
981        ));
982    }
983
984    if let Some(pr) = stats.probe_responder {
985        dict.push((
986            PickleValue::String("probe_responder".into()),
987            PickleValue::Bytes(pr.to_vec()),
988        ));
989    } else {
990        dict.push((
991            PickleValue::String("probe_responder".into()),
992            PickleValue::None,
993        ));
994    }
995
996    if let Some(pool) = &stats.backbone_peer_pool {
997        let members = pool
998            .members
999            .iter()
1000            .map(|member| {
1001                let mut member_dict = vec![
1002                    (
1003                        PickleValue::String("name".into()),
1004                        PickleValue::String(member.name.clone()),
1005                    ),
1006                    (
1007                        PickleValue::String("remote".into()),
1008                        PickleValue::String(member.remote.clone()),
1009                    ),
1010                    (
1011                        PickleValue::String("source".into()),
1012                        PickleValue::String(member.source.clone()),
1013                    ),
1014                    (
1015                        PickleValue::String("priority".into()),
1016                        PickleValue::Int(member.priority as i64),
1017                    ),
1018                    (
1019                        PickleValue::String("state".into()),
1020                        PickleValue::String(member.state.clone()),
1021                    ),
1022                    (
1023                        PickleValue::String("failure_count".into()),
1024                        PickleValue::Int(member.failure_count as i64),
1025                    ),
1026                ];
1027                member_dict.push((
1028                    PickleValue::String("interface_id".into()),
1029                    member
1030                        .interface_id
1031                        .map(|id| PickleValue::Int(id as i64))
1032                        .unwrap_or(PickleValue::None),
1033                ));
1034                member_dict.push((
1035                    PickleValue::String("last_error".into()),
1036                    member
1037                        .last_error
1038                        .as_ref()
1039                        .map(|err| PickleValue::String(err.clone()))
1040                        .unwrap_or(PickleValue::None),
1041                ));
1042                member_dict.push((
1043                    PickleValue::String("cooldown_remaining_seconds".into()),
1044                    member
1045                        .cooldown_remaining_seconds
1046                        .map(PickleValue::Float)
1047                        .unwrap_or(PickleValue::None),
1048                ));
1049                PickleValue::Dict(member_dict)
1050            })
1051            .collect();
1052        dict.push((
1053            PickleValue::String("backbone_peer_pool".into()),
1054            PickleValue::Dict(vec![
1055                (
1056                    PickleValue::String("max_connected".into()),
1057                    PickleValue::Int(pool.max_connected as i64),
1058                ),
1059                (
1060                    PickleValue::String("active_count".into()),
1061                    PickleValue::Int(pool.active_count as i64),
1062                ),
1063                (
1064                    PickleValue::String("standby_count".into()),
1065                    PickleValue::Int(pool.standby_count as i64),
1066                ),
1067                (
1068                    PickleValue::String("cooldown_count".into()),
1069                    PickleValue::Int(pool.cooldown_count as i64),
1070                ),
1071                (
1072                    PickleValue::String("members".into()),
1073                    PickleValue::List(members),
1074                ),
1075            ]),
1076        ));
1077    } else {
1078        dict.push((
1079            PickleValue::String("backbone_peer_pool".into()),
1080            PickleValue::None,
1081        ));
1082    }
1083
1084    PickleValue::Dict(dict)
1085}
1086
1087fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
1088    let mut dict = vec![
1089        (
1090            PickleValue::String("id".into()),
1091            PickleValue::Int(s.id as i64),
1092        ),
1093        (
1094            PickleValue::String("name".into()),
1095            PickleValue::String(s.name.clone()),
1096        ),
1097        (
1098            PickleValue::String("status".into()),
1099            PickleValue::Bool(s.status),
1100        ),
1101        (
1102            PickleValue::String("mode".into()),
1103            PickleValue::Int(s.mode as i64),
1104        ),
1105        (
1106            PickleValue::String("rxb".into()),
1107            PickleValue::Int(s.rxb as i64),
1108        ),
1109        (
1110            PickleValue::String("txb".into()),
1111            PickleValue::Int(s.txb as i64),
1112        ),
1113        (
1114            PickleValue::String("rx_packets".into()),
1115            PickleValue::Int(s.rx_packets as i64),
1116        ),
1117        (
1118            PickleValue::String("tx_packets".into()),
1119            PickleValue::Int(s.tx_packets as i64),
1120        ),
1121        (
1122            PickleValue::String("started".into()),
1123            PickleValue::Float(s.started),
1124        ),
1125        (
1126            PickleValue::String("ia_freq".into()),
1127            PickleValue::Float(s.ia_freq),
1128        ),
1129        (
1130            PickleValue::String("oa_freq".into()),
1131            PickleValue::Float(s.oa_freq),
1132        ),
1133        (
1134            PickleValue::String("ip_freq".into()),
1135            PickleValue::Float(s.ip_freq),
1136        ),
1137        (
1138            PickleValue::String("op_freq".into()),
1139            PickleValue::Float(s.op_freq),
1140        ),
1141        (
1142            PickleValue::String("burst_active".into()),
1143            PickleValue::Bool(s.burst_active),
1144        ),
1145        (
1146            PickleValue::String("burst_activated".into()),
1147            PickleValue::Float(s.burst_activated),
1148        ),
1149        (
1150            PickleValue::String("pr_burst_active".into()),
1151            PickleValue::Bool(s.pr_burst_active),
1152        ),
1153        (
1154            PickleValue::String("pr_burst_activated".into()),
1155            PickleValue::Float(s.pr_burst_activated),
1156        ),
1157        (
1158            PickleValue::String("clients".into()),
1159            s.clients
1160                .map(|clients| PickleValue::Int(clients as i64))
1161                .unwrap_or(PickleValue::None),
1162        ),
1163        (
1164            PickleValue::String("announce_rate_grace".into()),
1165            PickleValue::Int(s.announce_rate_grace as i64),
1166        ),
1167        (
1168            PickleValue::String("announce_rate_penalty".into()),
1169            PickleValue::Float(s.announce_rate_penalty),
1170        ),
1171    ];
1172
1173    match s.announce_rate_target {
1174        Some(target) => dict.push((
1175            PickleValue::String("announce_rate_target".into()),
1176            PickleValue::Float(target),
1177        )),
1178        None => dict.push((
1179            PickleValue::String("announce_rate_target".into()),
1180            PickleValue::None,
1181        )),
1182    }
1183
1184    match s.bitrate {
1185        Some(br) => dict.push((
1186            PickleValue::String("bitrate".into()),
1187            PickleValue::Int(br as i64),
1188        )),
1189        None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
1190    }
1191
1192    match s.ifac_size {
1193        Some(sz) => dict.push((
1194            PickleValue::String("ifac_size".into()),
1195            PickleValue::Int(sz as i64),
1196        )),
1197        None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
1198    }
1199
1200    PickleValue::Dict(dict)
1201}
1202
1203fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
1204    let list: Vec<PickleValue> = entries
1205        .iter()
1206        .map(|e| {
1207            PickleValue::Dict(vec![
1208                (
1209                    PickleValue::String("hash".into()),
1210                    PickleValue::Bytes(e.hash.to_vec()),
1211                ),
1212                (
1213                    PickleValue::String("timestamp".into()),
1214                    PickleValue::Float(e.timestamp),
1215                ),
1216                (
1217                    PickleValue::String("via".into()),
1218                    PickleValue::Bytes(e.via.to_vec()),
1219                ),
1220                (
1221                    PickleValue::String("hops".into()),
1222                    PickleValue::Int(e.hops as i64),
1223                ),
1224                (
1225                    PickleValue::String("expires".into()),
1226                    PickleValue::Float(e.expires),
1227                ),
1228                (
1229                    PickleValue::String("interface".into()),
1230                    PickleValue::String(e.interface_name.clone()),
1231                ),
1232            ])
1233        })
1234        .collect();
1235    PickleValue::List(list)
1236}
1237
1238fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1239    let list: Vec<PickleValue> = entries
1240        .iter()
1241        .map(|e| {
1242            PickleValue::Dict(vec![
1243                (
1244                    PickleValue::String("hash".into()),
1245                    PickleValue::Bytes(e.hash.to_vec()),
1246                ),
1247                (
1248                    PickleValue::String("last".into()),
1249                    PickleValue::Float(e.last),
1250                ),
1251                (
1252                    PickleValue::String("rate_violations".into()),
1253                    PickleValue::Int(e.rate_violations as i64),
1254                ),
1255                (
1256                    PickleValue::String("blocked_until".into()),
1257                    PickleValue::Float(e.blocked_until),
1258                ),
1259                (
1260                    PickleValue::String("timestamps".into()),
1261                    PickleValue::List(
1262                        e.timestamps
1263                            .iter()
1264                            .map(|&t| PickleValue::Float(t))
1265                            .collect(),
1266                    ),
1267                ),
1268            ])
1269        })
1270        .collect();
1271    PickleValue::List(list)
1272}
1273
1274fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1275    let list: Vec<PickleValue> = entries
1276        .iter()
1277        .map(|e| {
1278            let mut dict = vec![
1279                (
1280                    PickleValue::String("identity_hash".into()),
1281                    PickleValue::Bytes(e.identity_hash.to_vec()),
1282                ),
1283                (
1284                    PickleValue::String("created".into()),
1285                    PickleValue::Float(e.created),
1286                ),
1287                (
1288                    PickleValue::String("expires".into()),
1289                    PickleValue::Float(e.expires),
1290                ),
1291            ];
1292            if let Some(ref reason) = e.reason {
1293                dict.push((
1294                    PickleValue::String("reason".into()),
1295                    PickleValue::String(reason.clone()),
1296                ));
1297            } else {
1298                dict.push((PickleValue::String("reason".into()), PickleValue::None));
1299            }
1300            PickleValue::Dict(dict)
1301        })
1302        .collect();
1303    PickleValue::List(list)
1304}
1305
1306fn discovered_interfaces_to_pickle(
1307    interfaces: &[crate::discovery::DiscoveredInterface],
1308) -> PickleValue {
1309    let list: Vec<PickleValue> = interfaces
1310        .iter()
1311        .map(|iface| {
1312            let mut dict = vec![
1313                (
1314                    PickleValue::String("type".into()),
1315                    PickleValue::String(iface.interface_type.clone()),
1316                ),
1317                (
1318                    PickleValue::String("transport".into()),
1319                    PickleValue::Bool(iface.transport),
1320                ),
1321                (
1322                    PickleValue::String("name".into()),
1323                    PickleValue::String(iface.name.clone()),
1324                ),
1325                (
1326                    PickleValue::String("discovered".into()),
1327                    PickleValue::Float(iface.discovered),
1328                ),
1329                (
1330                    PickleValue::String("last_heard".into()),
1331                    PickleValue::Float(iface.last_heard),
1332                ),
1333                (
1334                    PickleValue::String("heard_count".into()),
1335                    PickleValue::Int(iface.heard_count as i64),
1336                ),
1337                (
1338                    PickleValue::String("status".into()),
1339                    PickleValue::String(iface.status.as_str().into()),
1340                ),
1341                (
1342                    PickleValue::String("stamp".into()),
1343                    PickleValue::Bytes(iface.stamp.clone()),
1344                ),
1345                (
1346                    PickleValue::String("value".into()),
1347                    PickleValue::Int(iface.stamp_value as i64),
1348                ),
1349                (
1350                    PickleValue::String("transport_id".into()),
1351                    PickleValue::Bytes(iface.transport_id.to_vec()),
1352                ),
1353                (
1354                    PickleValue::String("network_id".into()),
1355                    PickleValue::Bytes(iface.network_id.to_vec()),
1356                ),
1357                (
1358                    PickleValue::String("hops".into()),
1359                    PickleValue::Int(iface.hops as i64),
1360                ),
1361            ];
1362
1363            // Optional location fields
1364            if let Some(v) = iface.latitude {
1365                dict.push((
1366                    PickleValue::String("latitude".into()),
1367                    PickleValue::Float(v),
1368                ));
1369            } else {
1370                dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1371            }
1372            if let Some(v) = iface.longitude {
1373                dict.push((
1374                    PickleValue::String("longitude".into()),
1375                    PickleValue::Float(v),
1376                ));
1377            } else {
1378                dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1379            }
1380            if let Some(v) = iface.height {
1381                dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1382            } else {
1383                dict.push((PickleValue::String("height".into()), PickleValue::None));
1384            }
1385
1386            // Connection info
1387            if let Some(ref v) = iface.reachable_on {
1388                dict.push((
1389                    PickleValue::String("reachable_on".into()),
1390                    PickleValue::String(v.clone()),
1391                ));
1392            } else {
1393                dict.push((
1394                    PickleValue::String("reachable_on".into()),
1395                    PickleValue::None,
1396                ));
1397            }
1398            if let Some(v) = iface.port {
1399                dict.push((
1400                    PickleValue::String("port".into()),
1401                    PickleValue::Int(v as i64),
1402                ));
1403            } else {
1404                dict.push((PickleValue::String("port".into()), PickleValue::None));
1405            }
1406
1407            // RNode/RF specific
1408            if let Some(v) = iface.frequency {
1409                dict.push((
1410                    PickleValue::String("frequency".into()),
1411                    PickleValue::Int(v as i64),
1412                ));
1413            } else {
1414                dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1415            }
1416            if let Some(v) = iface.bandwidth {
1417                dict.push((
1418                    PickleValue::String("bandwidth".into()),
1419                    PickleValue::Int(v as i64),
1420                ));
1421            } else {
1422                dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1423            }
1424            if let Some(v) = iface.spreading_factor {
1425                dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1426            } else {
1427                dict.push((PickleValue::String("sf".into()), PickleValue::None));
1428            }
1429            if let Some(v) = iface.coding_rate {
1430                dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1431            } else {
1432                dict.push((PickleValue::String("cr".into()), PickleValue::None));
1433            }
1434            if let Some(ref v) = iface.modulation {
1435                dict.push((
1436                    PickleValue::String("modulation".into()),
1437                    PickleValue::String(v.clone()),
1438                ));
1439            } else {
1440                dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1441            }
1442            if let Some(v) = iface.channel {
1443                dict.push((
1444                    PickleValue::String("channel".into()),
1445                    PickleValue::Int(v as i64),
1446                ));
1447            } else {
1448                dict.push((PickleValue::String("channel".into()), PickleValue::None));
1449            }
1450
1451            // IFAC info
1452            if let Some(ref v) = iface.ifac_netname {
1453                dict.push((
1454                    PickleValue::String("ifac_netname".into()),
1455                    PickleValue::String(v.clone()),
1456                ));
1457            } else {
1458                dict.push((
1459                    PickleValue::String("ifac_netname".into()),
1460                    PickleValue::None,
1461                ));
1462            }
1463            if let Some(ref v) = iface.ifac_netkey {
1464                dict.push((
1465                    PickleValue::String("ifac_netkey".into()),
1466                    PickleValue::String(v.clone()),
1467                ));
1468            } else {
1469                dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1470            }
1471
1472            // Config entry
1473            if let Some(ref v) = iface.config_entry {
1474                dict.push((
1475                    PickleValue::String("config_entry".into()),
1476                    PickleValue::String(v.clone()),
1477                ));
1478            } else {
1479                dict.push((
1480                    PickleValue::String("config_entry".into()),
1481                    PickleValue::None,
1482                ));
1483            }
1484
1485            dict.push((
1486                PickleValue::String("discovery_hash".into()),
1487                PickleValue::Bytes(iface.discovery_hash.to_vec()),
1488            ));
1489
1490            PickleValue::Dict(dict)
1491        })
1492        .collect();
1493    PickleValue::List(list)
1494}
1495
1496fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1497    PickleValue::List(
1498        hooks
1499            .iter()
1500            .map(|hook| {
1501                PickleValue::Dict(vec![
1502                    (
1503                        PickleValue::String("name".into()),
1504                        PickleValue::String(hook.name.clone()),
1505                    ),
1506                    (
1507                        PickleValue::String("type".into()),
1508                        PickleValue::String(hook.hook_type.clone()),
1509                    ),
1510                    (
1511                        PickleValue::String("attach_point".into()),
1512                        PickleValue::String(hook.attach_point.clone()),
1513                    ),
1514                    (
1515                        PickleValue::String("priority".into()),
1516                        PickleValue::Int(hook.priority as i64),
1517                    ),
1518                    (
1519                        PickleValue::String("enabled".into()),
1520                        PickleValue::Bool(hook.enabled),
1521                    ),
1522                    (
1523                        PickleValue::String("consecutive_traps".into()),
1524                        PickleValue::Int(hook.consecutive_traps as i64),
1525                    ),
1526                ])
1527            })
1528            .collect(),
1529    )
1530}
1531
1532fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1533    PickleValue::List(
1534        entries
1535            .iter()
1536            .map(|entry| {
1537                PickleValue::Dict(vec![
1538                    (
1539                        PickleValue::String("interface".into()),
1540                        PickleValue::String(entry.interface_name.clone()),
1541                    ),
1542                    (
1543                        PickleValue::String("ip".into()),
1544                        PickleValue::String(entry.peer_ip.to_string()),
1545                    ),
1546                    (
1547                        PickleValue::String("connected_count".into()),
1548                        PickleValue::Int(entry.connected_count as i64),
1549                    ),
1550                    (
1551                        PickleValue::String("blacklisted_remaining_secs".into()),
1552                        entry
1553                            .blacklisted_remaining_secs
1554                            .map(PickleValue::Float)
1555                            .unwrap_or(PickleValue::None),
1556                    ),
1557                    (
1558                        PickleValue::String("blacklist_reason".into()),
1559                        entry
1560                            .blacklist_reason
1561                            .as_ref()
1562                            .map(|v: &String| PickleValue::String(v.clone()))
1563                            .unwrap_or(PickleValue::None),
1564                    ),
1565                    (
1566                        PickleValue::String("reject_count".into()),
1567                        PickleValue::Int(entry.reject_count as i64),
1568                    ),
1569                ])
1570            })
1571            .collect(),
1572    )
1573}
1574
1575fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1576    PickleValue::List(
1577        entries
1578            .iter()
1579            .map(|entry| {
1580                PickleValue::Dict(vec![
1581                    (
1582                        PickleValue::String("id".into()),
1583                        PickleValue::Int(entry.interface_id.0 as i64),
1584                    ),
1585                    (
1586                        PickleValue::String("name".into()),
1587                        PickleValue::String(entry.interface_name.clone()),
1588                    ),
1589                ])
1590            })
1591            .collect(),
1592    )
1593}
1594
1595fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1596    PickleValue::Dict(vec![
1597        (
1598            PickleValue::String("connected".into()),
1599            PickleValue::Bool(stats.connected),
1600        ),
1601        (
1602            PickleValue::String("consumer_count".into()),
1603            PickleValue::Int(stats.consumer_count as i64),
1604        ),
1605        (
1606            PickleValue::String("queue_max_events".into()),
1607            PickleValue::Int(stats.queue_max_events as i64),
1608        ),
1609        (
1610            PickleValue::String("queue_max_bytes".into()),
1611            PickleValue::Int(stats.queue_max_bytes as i64),
1612        ),
1613        (
1614            PickleValue::String("backlog_len".into()),
1615            PickleValue::Int(stats.backlog_len as i64),
1616        ),
1617        (
1618            PickleValue::String("backlog_bytes".into()),
1619            PickleValue::Int(stats.backlog_bytes as i64),
1620        ),
1621        (
1622            PickleValue::String("backlog_dropped_pending".into()),
1623            PickleValue::Int(stats.backlog_dropped_pending as i64),
1624        ),
1625        (
1626            PickleValue::String("backlog_dropped_total".into()),
1627            PickleValue::Int(stats.backlog_dropped_total as i64),
1628        ),
1629        (
1630            PickleValue::String("total_disconnect_count".into()),
1631            PickleValue::Int(stats.total_disconnect_count as i64),
1632        ),
1633        (
1634            PickleValue::String("consumers".into()),
1635            PickleValue::List(
1636                stats
1637                    .consumers
1638                    .iter()
1639                    .map(|consumer| {
1640                        PickleValue::Dict(vec![
1641                            (
1642                                PickleValue::String("id".into()),
1643                                PickleValue::Int(consumer.id as i64),
1644                            ),
1645                            (
1646                                PickleValue::String("connected".into()),
1647                                PickleValue::Bool(consumer.connected),
1648                            ),
1649                            (
1650                                PickleValue::String("queue_len".into()),
1651                                PickleValue::Int(consumer.queue_len as i64),
1652                            ),
1653                            (
1654                                PickleValue::String("queued_bytes".into()),
1655                                PickleValue::Int(consumer.queued_bytes as i64),
1656                            ),
1657                            (
1658                                PickleValue::String("dropped_pending".into()),
1659                                PickleValue::Int(consumer.dropped_pending as i64),
1660                            ),
1661                            (
1662                                PickleValue::String("dropped_total".into()),
1663                                PickleValue::Int(consumer.dropped_total as i64),
1664                            ),
1665                            (
1666                                PickleValue::String("queue_max_events".into()),
1667                                PickleValue::Int(consumer.queue_max_events as i64),
1668                            ),
1669                            (
1670                                PickleValue::String("queue_max_bytes".into()),
1671                                PickleValue::Int(consumer.queue_max_bytes as i64),
1672                            ),
1673                        ])
1674                    })
1675                    .collect(),
1676            ),
1677        ),
1678    ])
1679}
1680
1681fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1682    match state {
1683        LifecycleState::Active => "active",
1684        LifecycleState::Draining => "draining",
1685        LifecycleState::Stopping => "stopping",
1686        LifecycleState::Stopped => "stopped",
1687    }
1688}
1689
1690fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1691    PickleValue::Dict(vec![
1692        (
1693            PickleValue::String("state".into()),
1694            PickleValue::String(lifecycle_state_name(status.state).into()),
1695        ),
1696        (
1697            PickleValue::String("drain_age_seconds".into()),
1698            status
1699                .drain_age_seconds
1700                .map(PickleValue::Float)
1701                .unwrap_or(PickleValue::None),
1702        ),
1703        (
1704            PickleValue::String("deadline_remaining_seconds".into()),
1705            status
1706                .deadline_remaining_seconds
1707                .map(PickleValue::Float)
1708                .unwrap_or(PickleValue::None),
1709        ),
1710        (
1711            PickleValue::String("drain_complete".into()),
1712            PickleValue::Bool(status.drain_complete),
1713        ),
1714        (
1715            PickleValue::String("interface_writer_queued_frames".into()),
1716            PickleValue::Int(status.interface_writer_queued_frames as i64),
1717        ),
1718        (
1719            PickleValue::String("provider_backlog_events".into()),
1720            PickleValue::Int(status.provider_backlog_events as i64),
1721        ),
1722        (
1723            PickleValue::String("provider_consumer_queued_events".into()),
1724            PickleValue::Int(status.provider_consumer_queued_events as i64),
1725        ),
1726        (
1727            PickleValue::String("detail".into()),
1728            status
1729                .detail
1730                .as_ref()
1731                .map(|detail| PickleValue::String(detail.clone()))
1732                .unwrap_or(PickleValue::None),
1733        ),
1734    ])
1735}
1736
1737fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1738    match value {
1739        RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1740        RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1741        RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1742        RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1743        RuntimeConfigValue::Null => PickleValue::None,
1744    }
1745}
1746
1747fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1748    match value {
1749        PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1750        PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1751        PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1752        PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1753        PickleValue::None => Some(RuntimeConfigValue::Null),
1754        _ => None,
1755    }
1756}
1757
1758fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1759    PickleValue::Dict(vec![
1760        (
1761            PickleValue::String("key".into()),
1762            PickleValue::String(entry.key.clone()),
1763        ),
1764        (
1765            PickleValue::String("value".into()),
1766            runtime_config_value_to_pickle(&entry.value),
1767        ),
1768        (
1769            PickleValue::String("default".into()),
1770            runtime_config_value_to_pickle(&entry.default),
1771        ),
1772        (
1773            PickleValue::String("source".into()),
1774            PickleValue::String(match entry.source {
1775                RuntimeConfigSource::Startup => "startup".into(),
1776                RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1777            }),
1778        ),
1779        (
1780            PickleValue::String("apply_mode".into()),
1781            PickleValue::String(match entry.apply_mode {
1782                RuntimeConfigApplyMode::Immediate => "immediate".into(),
1783                RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1784                RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1785                RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1786            }),
1787        ),
1788        (
1789            PickleValue::String("description".into()),
1790            entry
1791                .description
1792                .as_ref()
1793                .map(|v| PickleValue::String(v.clone()))
1794                .unwrap_or(PickleValue::None),
1795        ),
1796    ])
1797}
1798
1799fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1800    PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1801}
1802
1803fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1804    PickleValue::Dict(vec![
1805        (
1806            PickleValue::String("error".into()),
1807            PickleValue::String(match error.code {
1808                RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1809                RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1810                RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1811                RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1812                RuntimeConfigErrorCode::NotFound => "not_found".into(),
1813                RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1814            }),
1815        ),
1816        (
1817            PickleValue::String("message".into()),
1818            PickleValue::String(error.message.clone()),
1819        ),
1820    ])
1821}
1822
1823fn runtime_config_result_to_pickle(
1824    result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1825) -> PickleValue {
1826    match result {
1827        Ok(entry) => runtime_config_entry_to_pickle(&entry),
1828        Err(error) => runtime_config_error_to_pickle(&error),
1829    }
1830}
1831
1832fn known_destination_entry_to_pickle(entry: &KnownDestinationEntry) -> PickleValue {
1833    PickleValue::Dict(vec![
1834        (
1835            PickleValue::String("dest_hash".into()),
1836            PickleValue::Bytes(entry.dest_hash.to_vec()),
1837        ),
1838        (
1839            PickleValue::String("identity_hash".into()),
1840            PickleValue::Bytes(entry.identity_hash.to_vec()),
1841        ),
1842        (
1843            PickleValue::String("public_key".into()),
1844            PickleValue::Bytes(entry.public_key.to_vec()),
1845        ),
1846        (
1847            PickleValue::String("app_data".into()),
1848            entry
1849                .app_data
1850                .as_ref()
1851                .map(|data: &Vec<u8>| PickleValue::Bytes(data.clone()))
1852                .unwrap_or(PickleValue::None),
1853        ),
1854        (
1855            PickleValue::String("hops".into()),
1856            PickleValue::Int(entry.hops as i64),
1857        ),
1858        (
1859            PickleValue::String("received_at".into()),
1860            PickleValue::Float(entry.received_at),
1861        ),
1862        (
1863            PickleValue::String("receiving_interface".into()),
1864            PickleValue::Int(entry.receiving_interface.0 as i64),
1865        ),
1866        (
1867            PickleValue::String("was_used".into()),
1868            PickleValue::Bool(entry.was_used),
1869        ),
1870        (
1871            PickleValue::String("last_used_at".into()),
1872            entry
1873                .last_used_at
1874                .map(PickleValue::Float)
1875                .unwrap_or(PickleValue::None),
1876        ),
1877        (
1878            PickleValue::String("retained".into()),
1879            PickleValue::Bool(entry.retained),
1880        ),
1881    ])
1882}
1883
1884fn known_destinations_to_pickle(entries: &[KnownDestinationEntry]) -> PickleValue {
1885    PickleValue::List(
1886        entries
1887            .iter()
1888            .map(known_destination_entry_to_pickle)
1889            .collect(),
1890    )
1891}
1892
1893fn parse_known_destination_entry(value: &PickleValue) -> io::Result<KnownDestinationEntry> {
1894    let get_bytes = |key: &str, len: usize| -> io::Result<Vec<u8>> {
1895        let value = value.get(key).ok_or_else(|| {
1896            io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key))
1897        })?;
1898        let bytes = value.as_bytes().ok_or_else(|| {
1899            io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key))
1900        })?;
1901        if bytes.len() != len {
1902            return Err(io::Error::new(
1903                io::ErrorKind::InvalidData,
1904                format!("invalid {} length", key),
1905            ));
1906        }
1907        Ok(bytes.to_vec())
1908    };
1909    let get_int = |key: &str| -> io::Result<i64> {
1910        value
1911            .get(key)
1912            .and_then(|v| v.as_int())
1913            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1914    };
1915    let get_float = |key: &str| -> io::Result<f64> {
1916        value
1917            .get(key)
1918            .and_then(|v| v.as_float())
1919            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1920    };
1921    let get_bool = |key: &str| -> io::Result<bool> {
1922        value
1923            .get(key)
1924            .and_then(|v| v.as_bool())
1925            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1926    };
1927
1928    let mut dest_hash = [0u8; 16];
1929    dest_hash.copy_from_slice(&get_bytes("dest_hash", 16)?);
1930    let mut identity_hash = [0u8; 16];
1931    identity_hash.copy_from_slice(&get_bytes("identity_hash", 16)?);
1932    let mut public_key = [0u8; 64];
1933    public_key.copy_from_slice(&get_bytes("public_key", 64)?);
1934    let app_data = value
1935        .get("app_data")
1936        .and_then(|v| v.as_bytes())
1937        .map(|bytes| bytes.to_vec());
1938    let last_used_at = value.get("last_used_at").and_then(|v| v.as_float());
1939
1940    Ok(KnownDestinationEntry {
1941        dest_hash,
1942        identity_hash,
1943        public_key,
1944        app_data,
1945        hops: get_int("hops")? as u8,
1946        received_at: get_float("received_at")?,
1947        receiving_interface: rns_core::transport::types::InterfaceId(
1948            get_int("receiving_interface")? as u64,
1949        ),
1950        was_used: get_bool("was_used")?,
1951        last_used_at,
1952        retained: get_bool("retained")?,
1953    })
1954}
1955
1956fn parse_known_destination_list(value: &PickleValue) -> io::Result<Vec<KnownDestinationEntry>> {
1957    let list = value
1958        .as_list()
1959        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "expected list"))?;
1960    list.iter().map(parse_known_destination_entry).collect()
1961}
1962
1963// --- RPC Client ---
1964
1965/// RPC client for connecting to a running daemon.
1966pub struct RpcClient {
1967    stream: TcpStream,
1968}
1969
1970impl RpcClient {
1971    /// Connect to an RPC server and authenticate.
1972    pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
1973        let mut stream = match addr {
1974            RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
1975        };
1976
1977        stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
1978        stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
1979
1980        // Client-side authentication
1981        client_auth(&mut stream, auth_key)?;
1982
1983        Ok(RpcClient { stream })
1984    }
1985
1986    /// Send a pickle request and receive a pickle response.
1987    pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
1988        let request_bytes = pickle::encode(request);
1989        send_bytes(&mut self.stream, &request_bytes)?;
1990
1991        let response_bytes = recv_bytes(&mut self.stream)?;
1992        pickle::decode(&response_bytes)
1993            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
1994    }
1995
1996    pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
1997        let response = self.call(&PickleValue::Dict(vec![(
1998            PickleValue::String("get".into()),
1999            PickleValue::String("hooks".into()),
2000        )]))?;
2001        parse_hook_list(&response)
2002    }
2003
2004    pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
2005        let response = self.call(&PickleValue::Dict(vec![(
2006            PickleValue::String("begin_drain".into()),
2007            PickleValue::Float(timeout.as_secs_f64()),
2008        )]))?;
2009        Ok(response.as_bool().unwrap_or(false))
2010    }
2011
2012    pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
2013        let response = self.call(&PickleValue::Dict(vec![(
2014            PickleValue::String("get".into()),
2015            PickleValue::String("drain_status".into()),
2016        )]))?;
2017        parse_drain_status(&response)
2018    }
2019
2020    pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
2021        self.call(&PickleValue::Dict(vec![(
2022            PickleValue::String("get".into()),
2023            PickleValue::String("provider_bridge_stats".into()),
2024        )]))
2025    }
2026
2027    pub fn load_hook(
2028        &mut self,
2029        name: &str,
2030        attach_point: &str,
2031        priority: i32,
2032        wasm: &[u8],
2033    ) -> io::Result<Result<(), String>> {
2034        let response = self.call(&PickleValue::Dict(vec![
2035            (
2036                PickleValue::String("hook".into()),
2037                PickleValue::String("load".into()),
2038            ),
2039            (
2040                PickleValue::String("name".into()),
2041                PickleValue::String(name.to_string()),
2042            ),
2043            (
2044                PickleValue::String("attach_point".into()),
2045                PickleValue::String(attach_point.to_string()),
2046            ),
2047            (
2048                PickleValue::String("priority".into()),
2049                PickleValue::Int(priority as i64),
2050            ),
2051            (
2052                PickleValue::String("wasm".into()),
2053                PickleValue::Bytes(wasm.to_vec()),
2054            ),
2055        ]))?;
2056        parse_hook_result(&response)
2057    }
2058
2059    pub fn load_builtin_hook(
2060        &mut self,
2061        name: &str,
2062        attach_point: &str,
2063        priority: i32,
2064        builtin_id: &str,
2065    ) -> io::Result<Result<(), String>> {
2066        let response = self.call(&PickleValue::Dict(vec![
2067            (
2068                PickleValue::String("hook".into()),
2069                PickleValue::String("load_builtin".into()),
2070            ),
2071            (
2072                PickleValue::String("name".into()),
2073                PickleValue::String(name.to_string()),
2074            ),
2075            (
2076                PickleValue::String("attach_point".into()),
2077                PickleValue::String(attach_point.to_string()),
2078            ),
2079            (
2080                PickleValue::String("priority".into()),
2081                PickleValue::Int(priority as i64),
2082            ),
2083            (
2084                PickleValue::String("builtin_id".into()),
2085                PickleValue::String(builtin_id.to_string()),
2086            ),
2087        ]))?;
2088        parse_hook_result(&response)
2089    }
2090
2091    pub fn unload_hook(
2092        &mut self,
2093        name: &str,
2094        attach_point: &str,
2095    ) -> io::Result<Result<(), String>> {
2096        let response = self.call(&PickleValue::Dict(vec![
2097            (
2098                PickleValue::String("hook".into()),
2099                PickleValue::String("unload".into()),
2100            ),
2101            (
2102                PickleValue::String("name".into()),
2103                PickleValue::String(name.to_string()),
2104            ),
2105            (
2106                PickleValue::String("attach_point".into()),
2107                PickleValue::String(attach_point.to_string()),
2108            ),
2109        ]))?;
2110        parse_hook_result(&response)
2111    }
2112
2113    pub fn set_hook_enabled(
2114        &mut self,
2115        name: &str,
2116        attach_point: &str,
2117        enabled: bool,
2118    ) -> io::Result<Result<(), String>> {
2119        let op = if enabled { "enable" } else { "disable" };
2120        let response = self.call(&PickleValue::Dict(vec![
2121            (
2122                PickleValue::String("hook".into()),
2123                PickleValue::String(op.into()),
2124            ),
2125            (
2126                PickleValue::String("name".into()),
2127                PickleValue::String(name.to_string()),
2128            ),
2129            (
2130                PickleValue::String("attach_point".into()),
2131                PickleValue::String(attach_point.to_string()),
2132            ),
2133        ]))?;
2134        parse_hook_result(&response)
2135    }
2136
2137    pub fn set_hook_priority(
2138        &mut self,
2139        name: &str,
2140        attach_point: &str,
2141        priority: i32,
2142    ) -> io::Result<Result<(), String>> {
2143        let response = self.call(&PickleValue::Dict(vec![
2144            (
2145                PickleValue::String("hook".into()),
2146                PickleValue::String("set_priority".into()),
2147            ),
2148            (
2149                PickleValue::String("name".into()),
2150                PickleValue::String(name.to_string()),
2151            ),
2152            (
2153                PickleValue::String("attach_point".into()),
2154                PickleValue::String(attach_point.to_string()),
2155            ),
2156            (
2157                PickleValue::String("priority".into()),
2158                PickleValue::Int(priority as i64),
2159            ),
2160        ]))?;
2161        parse_hook_result(&response)
2162    }
2163
2164    pub fn blacklist_backbone_peer(
2165        &mut self,
2166        interface: &str,
2167        ip: &str,
2168        duration_secs: u64,
2169        reason: Option<&str>,
2170        penalty_level: Option<u8>,
2171    ) -> io::Result<bool> {
2172        let mut request = vec![
2173            (
2174                PickleValue::String("set".into()),
2175                PickleValue::String("backbone_peer_blacklist".into()),
2176            ),
2177            (
2178                PickleValue::String("interface".into()),
2179                PickleValue::String(interface.to_string()),
2180            ),
2181            (
2182                PickleValue::String("ip".into()),
2183                PickleValue::String(ip.to_string()),
2184            ),
2185            (
2186                PickleValue::String("duration_secs".into()),
2187                PickleValue::Int(duration_secs as i64),
2188            ),
2189        ];
2190        if let Some(reason) = reason {
2191            request.push((
2192                PickleValue::String("reason".into()),
2193                PickleValue::String(reason.to_string()),
2194            ));
2195        }
2196        if let Some(level) = penalty_level {
2197            request.push((
2198                PickleValue::String("penalty_level".into()),
2199                PickleValue::Int(level as i64),
2200            ));
2201        }
2202        let response = self.call(&PickleValue::Dict(request))?;
2203        Ok(response.as_bool().unwrap_or(false))
2204    }
2205
2206    pub fn known_destinations(&mut self) -> io::Result<Vec<KnownDestinationEntry>> {
2207        let response = self.call(&PickleValue::Dict(vec![(
2208            PickleValue::String("get".into()),
2209            PickleValue::String("known_destinations".into()),
2210        )]))?;
2211        parse_known_destination_list(&response)
2212    }
2213
2214    pub fn retain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2215        let response = self.call(&PickleValue::Dict(vec![
2216            (
2217                PickleValue::String("set".into()),
2218                PickleValue::String("known_destination_retained".into()),
2219            ),
2220            (
2221                PickleValue::String("dest_hash".into()),
2222                PickleValue::Bytes(dest_hash.to_vec()),
2223            ),
2224        ]))?;
2225        Ok(response.as_bool().unwrap_or(false))
2226    }
2227
2228    pub fn retain_identity(&mut self, identity_hash: [u8; 16]) -> io::Result<bool> {
2229        let response = self.call(&PickleValue::Dict(vec![
2230            (
2231                PickleValue::String("set".into()),
2232                PickleValue::String("identity_retained".into()),
2233            ),
2234            (
2235                PickleValue::String("identity_hash".into()),
2236                PickleValue::Bytes(identity_hash.to_vec()),
2237            ),
2238        ]))?;
2239        Ok(response.as_bool().unwrap_or(false))
2240    }
2241
2242    pub fn unretain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2243        let response = self.call(&PickleValue::Dict(vec![
2244            (
2245                PickleValue::String("clear".into()),
2246                PickleValue::String("known_destination_retained".into()),
2247            ),
2248            (
2249                PickleValue::String("dest_hash".into()),
2250                PickleValue::Bytes(dest_hash.to_vec()),
2251            ),
2252        ]))?;
2253        Ok(response.as_bool().unwrap_or(false))
2254    }
2255
2256    pub fn mark_known_destination_used(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2257        let response = self.call(&PickleValue::Dict(vec![
2258            (
2259                PickleValue::String("set".into()),
2260                PickleValue::String("known_destination_used".into()),
2261            ),
2262            (
2263                PickleValue::String("dest_hash".into()),
2264                PickleValue::Bytes(dest_hash.to_vec()),
2265            ),
2266        ]))?;
2267        Ok(response.as_bool().unwrap_or(false))
2268    }
2269}
2270
2271fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
2272    match value {
2273        "active" => Some(LifecycleState::Active),
2274        "draining" => Some(LifecycleState::Draining),
2275        "stopping" => Some(LifecycleState::Stopping),
2276        "stopped" => Some(LifecycleState::Stopped),
2277        _ => None,
2278    }
2279}
2280
2281fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
2282    if !matches!(value, PickleValue::Dict(_)) {
2283        return Ok(None);
2284    }
2285    let state = value
2286        .get("state")
2287        .and_then(|entry| entry.as_str())
2288        .and_then(parse_lifecycle_state)
2289        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
2290    let drain_age_seconds = value.get("drain_age_seconds").and_then(|entry| {
2291        entry
2292            .as_float()
2293            .or_else(|| entry.as_int().map(|v| v as f64))
2294    });
2295    let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
2296        entry
2297            .as_float()
2298            .or_else(|| entry.as_int().map(|v| v as f64))
2299    });
2300    let drain_complete = value
2301        .get("drain_complete")
2302        .and_then(|entry| entry.as_bool())
2303        .unwrap_or(false);
2304    let interface_writer_queued_frames = value
2305        .get("interface_writer_queued_frames")
2306        .and_then(|entry| entry.as_int())
2307        .unwrap_or(0)
2308        .max(0) as usize;
2309    let provider_backlog_events = value
2310        .get("provider_backlog_events")
2311        .and_then(|entry| entry.as_int())
2312        .unwrap_or(0)
2313        .max(0) as usize;
2314    let provider_consumer_queued_events = value
2315        .get("provider_consumer_queued_events")
2316        .and_then(|entry| entry.as_int())
2317        .unwrap_or(0)
2318        .max(0) as usize;
2319    let detail = value
2320        .get("detail")
2321        .and_then(|entry| entry.as_str().map(|v| v.to_string()));
2322    Ok(Some(DrainStatus {
2323        state,
2324        drain_age_seconds,
2325        deadline_remaining_seconds,
2326        drain_complete,
2327        interface_writer_queued_frames,
2328        provider_backlog_events,
2329        provider_consumer_queued_events,
2330        detail,
2331    }))
2332}
2333
2334fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
2335    let ok = response
2336        .get("ok")
2337        .and_then(|v| v.as_bool())
2338        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
2339    if ok {
2340        Ok(Ok(()))
2341    } else {
2342        Ok(Err(response
2343            .get("error")
2344            .and_then(|v| v.as_str())
2345            .unwrap_or("unknown hook error")
2346            .to_string()))
2347    }
2348}
2349
2350fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
2351    let list = response
2352        .as_list()
2353        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
2354    let mut hooks = Vec::with_capacity(list.len());
2355    for item in list {
2356        hooks.push(HookInfo {
2357            name: item
2358                .get("name")
2359                .and_then(|v| v.as_str())
2360                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
2361                .to_string(),
2362            hook_type: item
2363                .get("type")
2364                .and_then(|v| v.as_str())
2365                .unwrap_or(default_hook_type())
2366                .to_string(),
2367            attach_point: item
2368                .get("attach_point")
2369                .and_then(|v| v.as_str())
2370                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
2371                .to_string(),
2372            priority: item
2373                .get("priority")
2374                .and_then(|v| v.as_int())
2375                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
2376                as i32,
2377            enabled: item
2378                .get("enabled")
2379                .and_then(|v| v.as_bool())
2380                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
2381            consecutive_traps: item
2382                .get("consecutive_traps")
2383                .and_then(|v| v.as_int())
2384                .ok_or_else(|| {
2385                    io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
2386                })? as u32,
2387        });
2388    }
2389    Ok(hooks)
2390}
2391
2392fn default_hook_type() -> &'static str {
2393    #[cfg(feature = "rns-hooks-native")]
2394    {
2395        return "native";
2396    }
2397    #[cfg(all(not(feature = "rns-hooks-native"), feature = "rns-hooks-wasm"))]
2398    {
2399        return "wasm";
2400    }
2401    #[cfg(all(not(feature = "rns-hooks-native"), not(feature = "rns-hooks-wasm")))]
2402    {
2403        "wasm"
2404    }
2405}
2406
2407/// Client-side authentication: answer the server's challenge.
2408fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
2409    // Read challenge
2410    let challenge = recv_bytes(stream)?;
2411
2412    if !challenge.starts_with(CHALLENGE_PREFIX) {
2413        return Err(io::Error::new(
2414            io::ErrorKind::InvalidData,
2415            "expected challenge",
2416        ));
2417    }
2418
2419    let message = &challenge[CHALLENGE_PREFIX.len()..];
2420
2421    // Create HMAC response
2422    let response = create_response(auth_key, message);
2423    send_bytes(stream, &response)?;
2424
2425    // Read welcome/failure
2426    let result = recv_bytes(stream)?;
2427    if result == WELCOME {
2428        Ok(())
2429    } else {
2430        Err(io::Error::new(
2431            io::ErrorKind::PermissionDenied,
2432            "authentication failed",
2433        ))
2434    }
2435}
2436
2437/// Create an HMAC response to a challenge message.
2438fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
2439    // Check if message has {sha256} prefix (modern protocol)
2440    if message.starts_with(b"{sha256}") || message.len() > 20 {
2441        // Modern protocol: use HMAC-SHA256 with {sha256} prefix
2442        let digest = hmac_sha256(auth_key, message);
2443        let mut response = Vec::with_capacity(8 + 32);
2444        response.extend_from_slice(b"{sha256}");
2445        response.extend_from_slice(&digest);
2446        response
2447    } else {
2448        // Legacy protocol: raw HMAC-MD5
2449        let digest = hmac_md5(auth_key, message);
2450        digest.to_vec()
2451    }
2452}
2453
2454/// Derive the RPC auth key from transport identity private key.
2455pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
2456    sha256(private_key)
2457}
2458
2459#[cfg(test)]
2460mod tests {
2461    use super::*;
2462
2463    #[test]
2464    fn send_recv_bytes_roundtrip() {
2465        let (mut c1, mut c2) = tcp_pair();
2466        let data = b"hello world";
2467        send_bytes(&mut c1, data).unwrap();
2468        let received = recv_bytes(&mut c2).unwrap();
2469        assert_eq!(&received, data);
2470    }
2471
2472    #[test]
2473    fn send_recv_empty() {
2474        let (mut c1, mut c2) = tcp_pair();
2475        send_bytes(&mut c1, b"").unwrap();
2476        let received = recv_bytes(&mut c2).unwrap();
2477        assert!(received.is_empty());
2478    }
2479
2480    #[test]
2481    fn auth_success() {
2482        let key = derive_auth_key(b"test-private-key");
2483        let (mut server, mut client) = tcp_pair();
2484
2485        let key2 = key;
2486        let t = thread::spawn(move || {
2487            client_auth(&mut client, &key2).unwrap();
2488        });
2489
2490        server_auth(&mut server, &key).unwrap();
2491        t.join().unwrap();
2492    }
2493
2494    #[test]
2495    fn auth_failure_wrong_key() {
2496        let server_key = derive_auth_key(b"server-key");
2497        let client_key = derive_auth_key(b"wrong-key");
2498        let (mut server, mut client) = tcp_pair();
2499
2500        let t = thread::spawn(move || {
2501            let result = client_auth(&mut client, &client_key);
2502            assert!(result.is_err());
2503        });
2504
2505        let result = server_auth(&mut server, &server_key);
2506        assert!(result.is_err());
2507        t.join().unwrap();
2508    }
2509
2510    #[test]
2511    fn verify_sha256_response() {
2512        let key = derive_auth_key(b"mykey");
2513        let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2514        let response = create_response(&key, message);
2515        assert!(response.starts_with(b"{sha256}"));
2516        assert!(verify_response(&key, message, &response));
2517    }
2518
2519    #[test]
2520    fn verify_legacy_md5_response() {
2521        let key = derive_auth_key(b"mykey");
2522        // Legacy message: 20 bytes, no prefix
2523        let message = b"01234567890123456789";
2524        // Create legacy response (raw HMAC-MD5)
2525        let digest = hmac_md5(&key, message);
2526        assert!(verify_response(&key, message, &digest));
2527    }
2528
2529    #[test]
2530    fn constant_time_eq_works() {
2531        assert!(constant_time_eq(b"hello", b"hello"));
2532        assert!(!constant_time_eq(b"hello", b"world"));
2533        assert!(!constant_time_eq(b"hello", b"hell"));
2534    }
2535
2536    #[test]
2537    fn rpc_roundtrip() {
2538        let key = derive_auth_key(b"test-key");
2539        let (event_tx, event_rx) = crate::event::channel();
2540
2541        // Start server
2542        // Bind manually to get the actual port
2543        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2544        let port = listener.local_addr().unwrap().port();
2545        listener.set_nonblocking(true).unwrap();
2546
2547        let shutdown = Arc::new(AtomicBool::new(false));
2548        let shutdown2 = shutdown.clone();
2549
2550        // Driver thread that handles queries
2551        let driver_thread = thread::spawn(move || loop {
2552            match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2553                Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2554                    let _ = resp_tx.send(QueryResponse::LinkCount(42));
2555                }
2556                Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2557                    let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2558                        interfaces: vec![SingleInterfaceStat {
2559                            id: 7,
2560                            name: "TestInterface".into(),
2561                            status: true,
2562                            mode: 1,
2563                            rxb: 1000,
2564                            txb: 2000,
2565                            rx_packets: 10,
2566                            tx_packets: 20,
2567                            bitrate: Some(10_000_000),
2568                            ifac_size: None,
2569                            started: 1000.0,
2570                            ia_freq: 0.0,
2571                            ip_freq: 0.0,
2572                            op_freq: 0.0,
2573                            op_samples: 0,
2574                            burst_active: false,
2575                            burst_activated: 0.0,
2576                            pr_burst_active: false,
2577                            pr_burst_activated: 0.0,
2578                            oa_freq: 0.0,
2579                            clients: Some(2),
2580                            announce_rate_target: Some(3600.0),
2581                            announce_rate_grace: 5,
2582                            announce_rate_penalty: 0.0,
2583                            interface_type: "TestInterface".into(),
2584                        }],
2585                        transport_id: None,
2586                        transport_enabled: true,
2587                        transport_uptime: 3600.0,
2588                        total_rxb: 1000,
2589                        total_txb: 2000,
2590                        probe_responder: None,
2591                        backbone_peer_pool: None,
2592                    }));
2593                }
2594                _ => break,
2595            }
2596        });
2597
2598        let key2 = key;
2599        let shutdown3 = shutdown2.clone();
2600        let server_thread = thread::spawn(move || {
2601            rpc_server_loop(listener, key2, event_tx, shutdown3);
2602        });
2603
2604        // Give server time to start
2605        thread::sleep(std::time::Duration::from_millis(50));
2606
2607        // Client: connect and query link count
2608        let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2609        let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2610        let response = client
2611            .call(&PickleValue::Dict(vec![(
2612                PickleValue::String("get".into()),
2613                PickleValue::String("link_count".into()),
2614            )]))
2615            .unwrap();
2616        assert_eq!(response.as_int().unwrap(), 42);
2617        drop(client);
2618
2619        // Client: query interface stats
2620        let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2621        let response2 = client2
2622            .call(&PickleValue::Dict(vec![(
2623                PickleValue::String("get".into()),
2624                PickleValue::String("interface_stats".into()),
2625            )]))
2626            .unwrap();
2627        let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2628        assert_eq!(ifaces.len(), 1);
2629        let iface = &ifaces[0];
2630        assert_eq!(
2631            iface.get("name").unwrap().as_str().unwrap(),
2632            "TestInterface"
2633        );
2634        assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2635        drop(client2);
2636
2637        // Shutdown
2638        shutdown2.store(true, Ordering::Relaxed);
2639        server_thread.join().unwrap();
2640        driver_thread.join().unwrap();
2641    }
2642
2643    #[test]
2644    fn derive_auth_key_deterministic() {
2645        let key1 = derive_auth_key(b"test");
2646        let key2 = derive_auth_key(b"test");
2647        assert_eq!(key1, key2);
2648        // Different input → different key
2649        let key3 = derive_auth_key(b"other");
2650        assert_ne!(key1, key3);
2651    }
2652
2653    #[test]
2654    fn pickle_request_handling() {
2655        // Test the request → query translation without networking
2656        let (event_tx, event_rx) = crate::event::channel();
2657
2658        let driver = thread::spawn(move || {
2659            if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2660            {
2661                assert_eq!(dest_hash, [1u8; 16]);
2662                let _ = resp_tx.send(QueryResponse::DropPath(true));
2663            }
2664        });
2665
2666        let request = PickleValue::Dict(vec![
2667            (
2668                PickleValue::String("drop".into()),
2669                PickleValue::String("path".into()),
2670            ),
2671            (
2672                PickleValue::String("destination_hash".into()),
2673                PickleValue::Bytes(vec![1u8; 16]),
2674            ),
2675        ]);
2676
2677        let response = handle_rpc_request(&request, &event_tx).unwrap();
2678        assert_eq!(response, PickleValue::Bool(true));
2679        driver.join().unwrap();
2680    }
2681
2682    #[test]
2683    fn hook_list_request_handling() {
2684        let (event_tx, event_rx) = crate::event::channel();
2685
2686        let driver = thread::spawn(move || {
2687            if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2688                let _ = response_tx.send(vec![HookInfo {
2689                    name: "stats".into(),
2690                    hook_type: "wasm".into(),
2691                    attach_point: "PreIngress".into(),
2692                    priority: 7,
2693                    enabled: true,
2694                    consecutive_traps: 0,
2695                }]);
2696            }
2697        });
2698
2699        let request = PickleValue::Dict(vec![(
2700            PickleValue::String("get".into()),
2701            PickleValue::String("hooks".into()),
2702        )]);
2703        let response = handle_rpc_request(&request, &event_tx).unwrap();
2704        let hooks = parse_hook_list(&response).unwrap();
2705        assert_eq!(hooks.len(), 1);
2706        assert_eq!(hooks[0].name, "stats");
2707        driver.join().unwrap();
2708    }
2709
2710    #[test]
2711    fn hook_load_request_handling() {
2712        let (event_tx, event_rx) = crate::event::channel();
2713
2714        let driver = thread::spawn(move || {
2715            if let Ok(Event::LoadHook {
2716                name,
2717                wasm_bytes,
2718                attach_point,
2719                priority,
2720                response_tx,
2721            }) = event_rx.recv()
2722            {
2723                assert_eq!(name, "stats");
2724                assert_eq!(attach_point, "PreIngress");
2725                assert_eq!(priority, 11);
2726                assert_eq!(wasm_bytes, vec![1, 2, 3]);
2727                let _ = response_tx.send(Ok(()));
2728            }
2729        });
2730
2731        let request = PickleValue::Dict(vec![
2732            (
2733                PickleValue::String("hook".into()),
2734                PickleValue::String("load".into()),
2735            ),
2736            (
2737                PickleValue::String("name".into()),
2738                PickleValue::String("stats".into()),
2739            ),
2740            (
2741                PickleValue::String("attach_point".into()),
2742                PickleValue::String("PreIngress".into()),
2743            ),
2744            (PickleValue::String("priority".into()), PickleValue::Int(11)),
2745            (
2746                PickleValue::String("wasm".into()),
2747                PickleValue::Bytes(vec![1, 2, 3]),
2748            ),
2749        ]);
2750        let response = handle_rpc_request(&request, &event_tx).unwrap();
2751        assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2752        driver.join().unwrap();
2753    }
2754
2755    #[test]
2756    fn interface_stats_pickle_format() {
2757        let stats = InterfaceStatsResponse {
2758            interfaces: vec![SingleInterfaceStat {
2759                id: 1,
2760                name: "TCP".into(),
2761                status: true,
2762                mode: 1,
2763                rxb: 100,
2764                txb: 200,
2765                rx_packets: 5,
2766                tx_packets: 10,
2767                bitrate: Some(1000000),
2768                ifac_size: Some(16),
2769                started: 1000.0,
2770                ia_freq: 1.0,
2771                ip_freq: 2.0,
2772                op_freq: 3.0,
2773                op_samples: 0,
2774                burst_active: true,
2775                burst_activated: 1200.0,
2776                pr_burst_active: true,
2777                pr_burst_activated: 1300.0,
2778                oa_freq: 4.0,
2779                clients: Some(3),
2780                announce_rate_target: Some(3600.0),
2781                announce_rate_grace: 5,
2782                announce_rate_penalty: 0.0,
2783                interface_type: "TCPClientInterface".into(),
2784            }],
2785            transport_id: Some([0xAB; 16]),
2786            transport_enabled: true,
2787            transport_uptime: 3600.0,
2788            total_rxb: 100,
2789            total_txb: 200,
2790            probe_responder: None,
2791            backbone_peer_pool: None,
2792        };
2793
2794        let pickle = interface_stats_to_pickle(&stats);
2795
2796        // Verify it round-trips through encode/decode
2797        let encoded = pickle::encode(&pickle);
2798        let decoded = pickle::decode(&encoded).unwrap();
2799        assert_eq!(
2800            decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2801            true
2802        );
2803        let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2804        assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2805        assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2806        assert_eq!(ifaces[0].get("ia_freq").unwrap().as_float().unwrap(), 1.0);
2807        assert_eq!(ifaces[0].get("ip_freq").unwrap().as_float().unwrap(), 2.0);
2808        assert_eq!(ifaces[0].get("op_freq").unwrap().as_float().unwrap(), 3.0);
2809        assert_eq!(ifaces[0].get("oa_freq").unwrap().as_float().unwrap(), 4.0);
2810        assert_eq!(
2811            ifaces[0].get("burst_active").unwrap().as_bool().unwrap(),
2812            true
2813        );
2814        assert_eq!(
2815            ifaces[0]
2816                .get("burst_activated")
2817                .unwrap()
2818                .as_float()
2819                .unwrap(),
2820            1200.0
2821        );
2822        assert_eq!(
2823            ifaces[0].get("pr_burst_active").unwrap().as_bool().unwrap(),
2824            true
2825        );
2826        assert_eq!(
2827            ifaces[0]
2828                .get("pr_burst_activated")
2829                .unwrap()
2830                .as_float()
2831                .unwrap(),
2832            1300.0
2833        );
2834        assert_eq!(
2835            ifaces[0]
2836                .get("announce_rate_target")
2837                .unwrap()
2838                .as_float()
2839                .unwrap(),
2840            3600.0
2841        );
2842        assert_eq!(
2843            ifaces[0]
2844                .get("announce_rate_grace")
2845                .unwrap()
2846                .as_int()
2847                .unwrap(),
2848            5
2849        );
2850        assert_eq!(
2851            ifaces[0]
2852                .get("announce_rate_penalty")
2853                .unwrap()
2854                .as_float()
2855                .unwrap(),
2856            0.0
2857        );
2858        assert_eq!(ifaces[0].get("clients").unwrap().as_int().unwrap(), 3);
2859    }
2860
2861    #[test]
2862    fn interface_stats_pickle_includes_backbone_peer_pool_priority() {
2863        let stats = InterfaceStatsResponse {
2864            interfaces: vec![],
2865            transport_id: None,
2866            transport_enabled: true,
2867            transport_uptime: 1.0,
2868            total_rxb: 0,
2869            total_txb: 0,
2870            probe_responder: None,
2871            backbone_peer_pool: Some(crate::event::BackbonePeerPoolStatus {
2872                max_connected: 1,
2873                active_count: 1,
2874                standby_count: 0,
2875                cooldown_count: 0,
2876                members: vec![crate::event::BackbonePeerPoolMemberStatus {
2877                    name: "peer".into(),
2878                    remote: "127.0.0.1:4242".into(),
2879                    source: "configured".into(),
2880                    priority: 77,
2881                    state: "active".into(),
2882                    interface_id: Some(42),
2883                    failure_count: 0,
2884                    last_error: None,
2885                    cooldown_remaining_seconds: None,
2886                }],
2887            }),
2888        };
2889
2890        let pickle = interface_stats_to_pickle(&stats);
2891        let encoded = pickle::encode(&pickle);
2892        let decoded = pickle::decode(&encoded).unwrap();
2893        let pool = decoded.get("backbone_peer_pool").unwrap();
2894        let members = pool.get("members").unwrap().as_list().unwrap();
2895        assert_eq!(members[0].get("priority").unwrap().as_int().unwrap(), 77);
2896    }
2897
2898    #[test]
2899    fn send_probe_rpc_unknown_dest() {
2900        let (event_tx, event_rx) = crate::event::channel();
2901
2902        let driver = thread::spawn(move || {
2903            if let Ok(Event::Query(
2904                QueryRequest::SendProbe {
2905                    dest_hash,
2906                    payload_size,
2907                },
2908                resp_tx,
2909            )) = event_rx.recv()
2910            {
2911                assert_eq!(dest_hash, [0xAA; 16]);
2912                assert_eq!(payload_size, 16); // default
2913                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2914            }
2915        });
2916
2917        let request = PickleValue::Dict(vec![(
2918            PickleValue::String("send_probe".into()),
2919            PickleValue::Bytes(vec![0xAA; 16]),
2920        )]);
2921
2922        let response = handle_rpc_request(&request, &event_tx).unwrap();
2923        assert_eq!(response, PickleValue::None);
2924        driver.join().unwrap();
2925    }
2926
2927    #[test]
2928    fn send_probe_rpc_with_result() {
2929        let (event_tx, event_rx) = crate::event::channel();
2930
2931        let packet_hash = [0xBB; 32];
2932        let driver = thread::spawn(move || {
2933            if let Ok(Event::Query(
2934                QueryRequest::SendProbe {
2935                    dest_hash,
2936                    payload_size,
2937                },
2938                resp_tx,
2939            )) = event_rx.recv()
2940            {
2941                assert_eq!(dest_hash, [0xCC; 16]);
2942                assert_eq!(payload_size, 32);
2943                let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
2944            }
2945        });
2946
2947        let request = PickleValue::Dict(vec![
2948            (
2949                PickleValue::String("send_probe".into()),
2950                PickleValue::Bytes(vec![0xCC; 16]),
2951            ),
2952            (PickleValue::String("size".into()), PickleValue::Int(32)),
2953        ]);
2954
2955        let response = handle_rpc_request(&request, &event_tx).unwrap();
2956        let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
2957        assert_eq!(ph, &[0xBB; 32]);
2958        assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
2959        driver.join().unwrap();
2960    }
2961
2962    #[test]
2963    fn retain_identity_rpc_sends_identity_hash_query() {
2964        let (event_tx, event_rx) = crate::event::channel();
2965        let identity_hash = [0x44; 16];
2966
2967        let driver = thread::spawn(move || {
2968            if let Ok(Event::Query(
2969                QueryRequest::RetainIdentity {
2970                    identity_hash: hash,
2971                },
2972                resp_tx,
2973            )) = event_rx.recv()
2974            {
2975                assert_eq!(hash, identity_hash);
2976                let _ = resp_tx.send(QueryResponse::RetainIdentity(true));
2977            }
2978        });
2979
2980        let request = PickleValue::Dict(vec![
2981            (
2982                PickleValue::String("set".into()),
2983                PickleValue::String("identity_retained".into()),
2984            ),
2985            (
2986                PickleValue::String("identity_hash".into()),
2987                PickleValue::Bytes(identity_hash.to_vec()),
2988            ),
2989        ]);
2990
2991        let response = handle_rpc_request(&request, &event_tx).unwrap();
2992        assert_eq!(response, PickleValue::Bool(true));
2993        driver.join().unwrap();
2994    }
2995
2996    #[test]
2997    fn send_probe_rpc_size_validation() {
2998        let (event_tx, event_rx) = crate::event::channel();
2999
3000        // Negative size should be clamped to default (16)
3001        let driver = thread::spawn(move || {
3002            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
3003                event_rx.recv()
3004            {
3005                assert_eq!(payload_size, 16); // default, not negative
3006                let _ = resp_tx.send(QueryResponse::SendProbe(None));
3007            }
3008        });
3009
3010        let request = PickleValue::Dict(vec![
3011            (
3012                PickleValue::String("send_probe".into()),
3013                PickleValue::Bytes(vec![0xDD; 16]),
3014            ),
3015            (PickleValue::String("size".into()), PickleValue::Int(-1)),
3016        ]);
3017
3018        let response = handle_rpc_request(&request, &event_tx).unwrap();
3019        assert_eq!(response, PickleValue::None);
3020        driver.join().unwrap();
3021    }
3022
3023    #[test]
3024    fn send_probe_rpc_size_too_large() {
3025        let (event_tx, event_rx) = crate::event::channel();
3026
3027        // Size > 400 should be clamped to default (16)
3028        let driver = thread::spawn(move || {
3029            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
3030                event_rx.recv()
3031            {
3032                assert_eq!(payload_size, 16); // default, not 999
3033                let _ = resp_tx.send(QueryResponse::SendProbe(None));
3034            }
3035        });
3036
3037        let request = PickleValue::Dict(vec![
3038            (
3039                PickleValue::String("send_probe".into()),
3040                PickleValue::Bytes(vec![0xDD; 16]),
3041            ),
3042            (PickleValue::String("size".into()), PickleValue::Int(999)),
3043        ]);
3044
3045        let response = handle_rpc_request(&request, &event_tx).unwrap();
3046        assert_eq!(response, PickleValue::None);
3047        driver.join().unwrap();
3048    }
3049
3050    #[test]
3051    fn check_proof_rpc_not_found() {
3052        let (event_tx, event_rx) = crate::event::channel();
3053
3054        let driver = thread::spawn(move || {
3055            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
3056                event_rx.recv()
3057            {
3058                assert_eq!(packet_hash, [0xEE; 32]);
3059                let _ = resp_tx.send(QueryResponse::CheckProof(None));
3060            }
3061        });
3062
3063        let request = PickleValue::Dict(vec![(
3064            PickleValue::String("check_proof".into()),
3065            PickleValue::Bytes(vec![0xEE; 32]),
3066        )]);
3067
3068        let response = handle_rpc_request(&request, &event_tx).unwrap();
3069        assert_eq!(response, PickleValue::None);
3070        driver.join().unwrap();
3071    }
3072
3073    #[test]
3074    fn check_proof_rpc_found() {
3075        let (event_tx, event_rx) = crate::event::channel();
3076
3077        let driver = thread::spawn(move || {
3078            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
3079                event_rx.recv()
3080            {
3081                assert_eq!(packet_hash, [0xFF; 32]);
3082                let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
3083            }
3084        });
3085
3086        let request = PickleValue::Dict(vec![(
3087            PickleValue::String("check_proof".into()),
3088            PickleValue::Bytes(vec![0xFF; 32]),
3089        )]);
3090
3091        let response = handle_rpc_request(&request, &event_tx).unwrap();
3092        if let PickleValue::Float(rtt) = response {
3093            assert!((rtt - 0.352).abs() < 0.001);
3094        } else {
3095            panic!("Expected Float, got {:?}", response);
3096        }
3097        driver.join().unwrap();
3098    }
3099
3100    #[test]
3101    fn request_path_rpc() {
3102        let (event_tx, event_rx) = crate::event::channel();
3103
3104        let driver =
3105            thread::spawn(
3106                move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
3107                    Ok(Event::RequestPath { dest_hash }) => {
3108                        assert_eq!(dest_hash, [0x11; 16]);
3109                    }
3110                    other => panic!("Expected RequestPath event, got {:?}", other),
3111                },
3112            );
3113
3114        let request = PickleValue::Dict(vec![(
3115            PickleValue::String("request_path".into()),
3116            PickleValue::Bytes(vec![0x11; 16]),
3117        )]);
3118
3119        let response = handle_rpc_request(&request, &event_tx).unwrap();
3120        assert_eq!(response, PickleValue::Bool(true));
3121        driver.join().unwrap();
3122    }
3123
3124    #[test]
3125    fn begin_drain_rpc_emits_event() {
3126        let (event_tx, event_rx) = crate::event::channel();
3127
3128        let driver = thread::spawn(
3129            move || match event_rx.recv_timeout(Duration::from_secs(5)) {
3130                Ok(Event::BeginDrain { timeout }) => {
3131                    assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
3132                }
3133                other => panic!("Expected BeginDrain event, got {:?}", other),
3134            },
3135        );
3136
3137        let request = PickleValue::Dict(vec![(
3138            PickleValue::String("begin_drain".into()),
3139            PickleValue::Float(1.5),
3140        )]);
3141
3142        let response = handle_rpc_request(&request, &event_tx).unwrap();
3143        assert_eq!(response, PickleValue::Bool(true));
3144        driver.join().unwrap();
3145    }
3146
3147    #[test]
3148    fn drain_status_rpc_roundtrips_fields() {
3149        let (event_tx, event_rx) = crate::event::channel();
3150
3151        let driver = thread::spawn(move || {
3152            if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
3153                let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
3154                    state: LifecycleState::Draining,
3155                    drain_age_seconds: Some(0.75),
3156                    deadline_remaining_seconds: Some(2.25),
3157                    drain_complete: false,
3158                    interface_writer_queued_frames: 3,
3159                    provider_backlog_events: 4,
3160                    provider_consumer_queued_events: 5,
3161                    detail: Some("node is draining existing work".into()),
3162                }));
3163            }
3164        });
3165
3166        let request = PickleValue::Dict(vec![(
3167            PickleValue::String("get".into()),
3168            PickleValue::String("drain_status".into()),
3169        )]);
3170
3171        let response = handle_rpc_request(&request, &event_tx).unwrap();
3172        assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
3173        assert_eq!(
3174            response.get("drain_complete").unwrap().as_bool(),
3175            Some(false)
3176        );
3177        assert_eq!(
3178            response
3179                .get("deadline_remaining_seconds")
3180                .unwrap()
3181                .as_float(),
3182            Some(2.25)
3183        );
3184        assert_eq!(
3185            response
3186                .get("interface_writer_queued_frames")
3187                .unwrap()
3188                .as_int(),
3189            Some(3)
3190        );
3191        assert_eq!(
3192            response.get("provider_backlog_events").unwrap().as_int(),
3193            Some(4)
3194        );
3195        assert_eq!(
3196            response
3197                .get("provider_consumer_queued_events")
3198                .unwrap()
3199                .as_int(),
3200            Some(5)
3201        );
3202        assert_eq!(
3203            response.get("detail").unwrap().as_str(),
3204            Some("node is draining existing work")
3205        );
3206        driver.join().unwrap();
3207    }
3208
3209    #[test]
3210    fn interface_stats_with_probe_responder() {
3211        let probe_hash = [0x42; 16];
3212        let stats = InterfaceStatsResponse {
3213            interfaces: vec![],
3214            transport_id: None,
3215            transport_enabled: true,
3216            transport_uptime: 100.0,
3217            total_rxb: 0,
3218            total_txb: 0,
3219            probe_responder: Some(probe_hash),
3220            backbone_peer_pool: None,
3221        };
3222
3223        let pickle = interface_stats_to_pickle(&stats);
3224        let encoded = pickle::encode(&pickle);
3225        let decoded = pickle::decode(&encoded).unwrap();
3226
3227        let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
3228        assert_eq!(pr, &probe_hash);
3229    }
3230
3231    #[test]
3232    fn runtime_config_get_and_set_rpc() {
3233        let (event_tx, event_rx) = crate::event::channel();
3234
3235        let driver = thread::spawn(move || {
3236            if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
3237                event_rx.recv()
3238            {
3239                assert_eq!(key, "global.tick_interval_ms");
3240                let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
3241                    RuntimeConfigEntry {
3242                        key,
3243                        value: RuntimeConfigValue::Int(1000),
3244                        default: RuntimeConfigValue::Int(1000),
3245                        source: RuntimeConfigSource::Startup,
3246                        apply_mode: RuntimeConfigApplyMode::Immediate,
3247                        description: Some("tick".into()),
3248                    },
3249                )));
3250            } else {
3251                panic!("expected GetRuntimeConfig query");
3252            }
3253
3254            if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
3255                event_rx.recv()
3256            {
3257                assert_eq!(key, "global.tick_interval_ms");
3258                assert_eq!(value, RuntimeConfigValue::Int(250));
3259                let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
3260                    key,
3261                    value: RuntimeConfigValue::Int(250),
3262                    default: RuntimeConfigValue::Int(1000),
3263                    source: RuntimeConfigSource::RuntimeOverride,
3264                    apply_mode: RuntimeConfigApplyMode::Immediate,
3265                    description: Some("tick".into()),
3266                })));
3267            } else {
3268                panic!("expected SetRuntimeConfig query");
3269            }
3270        });
3271
3272        let get_request = PickleValue::Dict(vec![
3273            (
3274                PickleValue::String("get".into()),
3275                PickleValue::String("runtime_config_entry".into()),
3276            ),
3277            (
3278                PickleValue::String("key".into()),
3279                PickleValue::String("global.tick_interval_ms".into()),
3280            ),
3281        ]);
3282        let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
3283        assert_eq!(
3284            get_response.get("key").and_then(|v| v.as_str()),
3285            Some("global.tick_interval_ms")
3286        );
3287
3288        let set_request = PickleValue::Dict(vec![
3289            (
3290                PickleValue::String("set".into()),
3291                PickleValue::String("runtime_config".into()),
3292            ),
3293            (
3294                PickleValue::String("key".into()),
3295                PickleValue::String("global.tick_interval_ms".into()),
3296            ),
3297            (PickleValue::String("value".into()), PickleValue::Int(250)),
3298        ]);
3299        let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
3300        assert_eq!(
3301            set_response.get("value").and_then(|v| v.as_int()),
3302            Some(250)
3303        );
3304
3305        driver.join().unwrap();
3306    }
3307
3308    // Helper: create a connected TCP pair
3309    fn tcp_pair() -> (TcpStream, TcpStream) {
3310        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3311        let port = listener.local_addr().unwrap().port();
3312        let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
3313        let (server, _) = listener.accept().unwrap();
3314        client
3315            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3316            .unwrap();
3317        server
3318            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3319            .unwrap();
3320        (server, client)
3321    }
3322}