Skip to main content

rns_net/
rpc.rs

1//! RPC server and client for cross-process daemon communication.
2//!
3//! Implements Python `multiprocessing.connection` wire protocol:
4//! - 4-byte big-endian signed i32 length prefix + payload
5//! - HMAC-SHA256 challenge-response authentication
6//! - Pickle serialization for request/response dictionaries
7//!
8//! Server translates pickle dicts into [`QueryRequest`] events, sends
9//! them through the driver event channel, and returns pickle responses.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15    atomic::{AtomicBool, Ordering},
16    Arc,
17};
18use std::thread;
19
20use rns_crypto::hmac::hmac_sha256;
21use rns_crypto::sha256::sha256;
22
23use crate::event::{
24    BlackholeInfo, Event, EventSender, InterfaceStatsResponse, PathTableEntry, QueryRequest,
25    QueryResponse, RateTableEntry, SingleInterfaceStat,
26};
27use crate::md5::hmac_md5;
28use crate::pickle::{self, PickleValue};
29
30const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
31const WELCOME: &[u8] = b"#WELCOME#";
32const FAILURE: &[u8] = b"#FAILURE#";
33const CHALLENGE_LEN: usize = 40;
34
35/// RPC address types.
36#[derive(Debug, Clone)]
37pub enum RpcAddr {
38    Tcp(String, u16),
39}
40
41/// RPC server that listens for incoming connections and handles queries.
42pub struct RpcServer {
43    shutdown: Arc<AtomicBool>,
44    thread: Option<thread::JoinHandle<()>>,
45}
46
47impl RpcServer {
48    /// Start the RPC server on the given address.
49    pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
50        let shutdown = Arc::new(AtomicBool::new(false));
51        let shutdown2 = shutdown.clone();
52
53        let listener = match addr {
54            RpcAddr::Tcp(host, port) => {
55                let l = TcpListener::bind((host.as_str(), *port))?;
56                // Non-blocking so we can check shutdown flag
57                l.set_nonblocking(true)?;
58                l
59            }
60        };
61
62        let thread = thread::Builder::new()
63            .name("rpc-server".into())
64            .spawn(move || {
65                rpc_server_loop(listener, auth_key, event_tx, shutdown2);
66            })
67            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
68
69        Ok(RpcServer {
70            shutdown,
71            thread: Some(thread),
72        })
73    }
74
75    /// Stop the RPC server.
76    pub fn stop(&mut self) {
77        self.shutdown.store(true, Ordering::Relaxed);
78        if let Some(handle) = self.thread.take() {
79            let _ = handle.join();
80        }
81    }
82}
83
84impl Drop for RpcServer {
85    fn drop(&mut self) {
86        self.stop();
87    }
88}
89
90fn rpc_server_loop(
91    listener: TcpListener,
92    auth_key: [u8; 32],
93    event_tx: EventSender,
94    shutdown: Arc<AtomicBool>,
95) {
96    loop {
97        if shutdown.load(Ordering::Relaxed) {
98            break;
99        }
100
101        match listener.accept() {
102            Ok((stream, _addr)) => {
103                // Set blocking for this connection
104                let _ = stream.set_nonblocking(false);
105                let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
106                let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
107
108                if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
109                    log::debug!("RPC connection error: {}", e);
110                }
111            }
112            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
113                // No pending connection, sleep briefly and retry
114                thread::sleep(std::time::Duration::from_millis(100));
115            }
116            Err(e) => {
117                log::error!("RPC accept error: {}", e);
118                thread::sleep(std::time::Duration::from_millis(100));
119            }
120        }
121    }
122}
123
124fn handle_connection(
125    mut stream: TcpStream,
126    auth_key: &[u8; 32],
127    event_tx: &EventSender,
128) -> io::Result<()> {
129    // Authentication: send challenge, verify response
130    server_auth(&mut stream, auth_key)?;
131
132    // Read request (pickle dict)
133    let request_bytes = recv_bytes(&mut stream)?;
134    let request = pickle::decode(&request_bytes)
135        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
136
137    // Translate pickle dict to query, send to driver, get response
138    let response = handle_rpc_request(&request, event_tx)?;
139
140    // Encode response and send
141    let response_bytes = pickle::encode(&response);
142    send_bytes(&mut stream, &response_bytes)?;
143
144    Ok(())
145}
146
147/// Server-side authentication: challenge-response.
148fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
149    // Generate challenge: #CHALLENGE#{sha256}<40 random bytes>
150    let mut random_bytes = [0u8; CHALLENGE_LEN];
151    // Use /dev/urandom for randomness
152    {
153        let mut f = std::fs::File::open("/dev/urandom")?;
154        f.read_exact(&mut random_bytes)?;
155    }
156
157    let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
158    challenge_message.extend_from_slice(CHALLENGE_PREFIX);
159    challenge_message.extend_from_slice(b"{sha256}");
160    challenge_message.extend_from_slice(&random_bytes);
161
162    send_bytes(stream, &challenge_message)?;
163
164    // Read response (max 256 bytes)
165    let response = recv_bytes(stream)?;
166
167    // Verify response
168    // The message to HMAC is everything after #CHALLENGE# (i.e. {sha256}<random>)
169    let message = &challenge_message[CHALLENGE_PREFIX.len()..];
170
171    if verify_response(auth_key, message, &response) {
172        send_bytes(stream, WELCOME)?;
173        Ok(())
174    } else {
175        send_bytes(stream, FAILURE)?;
176        Err(io::Error::new(
177            io::ErrorKind::PermissionDenied,
178            "auth failed",
179        ))
180    }
181}
182
183/// Verify a client's HMAC response.
184fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
185    // Modern protocol: response = {sha256}<hmac-sha256 digest>
186    if response.starts_with(b"{sha256}") {
187        let digest = &response[8..];
188        let expected = hmac_sha256(auth_key, message);
189        constant_time_eq(digest, &expected)
190    }
191    // Legacy protocol: response = raw 16-byte HMAC-MD5 digest
192    else if response.len() == 16 {
193        let expected = hmac_md5(auth_key, message);
194        constant_time_eq(response, &expected)
195    }
196    // Legacy with {md5} prefix
197    else if response.starts_with(b"{md5}") {
198        let digest = &response[5..];
199        let expected = hmac_md5(auth_key, message);
200        constant_time_eq(digest, &expected)
201    } else {
202        false
203    }
204}
205
206/// Constant-time byte comparison.
207fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
208    if a.len() != b.len() {
209        return false;
210    }
211    let mut diff = 0u8;
212    for (x, y) in a.iter().zip(b.iter()) {
213        diff |= x ^ y;
214    }
215    diff == 0
216}
217
218/// Send bytes with 4-byte big-endian length prefix.
219fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
220    let len = data.len() as i32;
221    stream.write_all(&len.to_be_bytes())?;
222    stream.write_all(data)?;
223    stream.flush()
224}
225
226/// Receive bytes with 4-byte big-endian length prefix.
227fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
228    let mut len_buf = [0u8; 4];
229    stream.read_exact(&mut len_buf)?;
230    let len = i32::from_be_bytes(len_buf);
231
232    if len < 0 {
233        // Extended format: 8-byte length
234        let mut len8_buf = [0u8; 8];
235        stream.read_exact(&mut len8_buf)?;
236        let len = u64::from_be_bytes(len8_buf) as usize;
237        if len > 64 * 1024 * 1024 {
238            return Err(io::Error::new(
239                io::ErrorKind::InvalidData,
240                "message too large",
241            ));
242        }
243        let mut buf = vec![0u8; len];
244        stream.read_exact(&mut buf)?;
245        Ok(buf)
246    } else {
247        let len = len as usize;
248        if len > 64 * 1024 * 1024 {
249            return Err(io::Error::new(
250                io::ErrorKind::InvalidData,
251                "message too large",
252            ));
253        }
254        let mut buf = vec![0u8; len];
255        stream.read_exact(&mut buf)?;
256        Ok(buf)
257    }
258}
259
260/// Translate a pickle request dict to a query event and get response.
261fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
262    // Handle "get" requests
263    if let Some(get_val) = request.get("get") {
264        if let Some(path) = get_val.as_str() {
265            return match path {
266                "interface_stats" => {
267                    let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
268                    if let QueryResponse::InterfaceStats(stats) = resp {
269                        Ok(interface_stats_to_pickle(&stats))
270                    } else {
271                        Ok(PickleValue::None)
272                    }
273                }
274                "path_table" => {
275                    let max_hops = request
276                        .get("max_hops")
277                        .and_then(|v| v.as_int().map(|n| n as u8));
278                    let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
279                    if let QueryResponse::PathTable(entries) = resp {
280                        Ok(path_table_to_pickle(&entries))
281                    } else {
282                        Ok(PickleValue::None)
283                    }
284                }
285                "rate_table" => {
286                    let resp = send_query(event_tx, QueryRequest::RateTable)?;
287                    if let QueryResponse::RateTable(entries) = resp {
288                        Ok(rate_table_to_pickle(&entries))
289                    } else {
290                        Ok(PickleValue::None)
291                    }
292                }
293                "next_hop" => {
294                    let hash = extract_dest_hash(request, "destination_hash")?;
295                    let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
296                    if let QueryResponse::NextHop(Some(nh)) = resp {
297                        Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
298                    } else {
299                        Ok(PickleValue::None)
300                    }
301                }
302                "next_hop_if_name" => {
303                    let hash = extract_dest_hash(request, "destination_hash")?;
304                    let resp =
305                        send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
306                    if let QueryResponse::NextHopIfName(Some(name)) = resp {
307                        Ok(PickleValue::String(name))
308                    } else {
309                        Ok(PickleValue::None)
310                    }
311                }
312                "link_count" => {
313                    let resp = send_query(event_tx, QueryRequest::LinkCount)?;
314                    if let QueryResponse::LinkCount(n) = resp {
315                        Ok(PickleValue::Int(n as i64))
316                    } else {
317                        Ok(PickleValue::None)
318                    }
319                }
320                "transport_identity" => {
321                    let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
322                    if let QueryResponse::TransportIdentity(Some(hash)) = resp {
323                        Ok(PickleValue::Bytes(hash.to_vec()))
324                    } else {
325                        Ok(PickleValue::None)
326                    }
327                }
328                "blackholed" => {
329                    let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
330                    if let QueryResponse::Blackholed(entries) = resp {
331                        Ok(blackholed_to_pickle(&entries))
332                    } else {
333                        Ok(PickleValue::None)
334                    }
335                }
336                "discovered_interfaces" => {
337                    let only_available = request
338                        .get("only_available")
339                        .and_then(|v| v.as_bool())
340                        .unwrap_or(false);
341                    let only_transport = request
342                        .get("only_transport")
343                        .and_then(|v| v.as_bool())
344                        .unwrap_or(false);
345                    let resp = send_query(
346                        event_tx,
347                        QueryRequest::DiscoveredInterfaces {
348                            only_available,
349                            only_transport,
350                        },
351                    )?;
352                    if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
353                        Ok(discovered_interfaces_to_pickle(&interfaces))
354                    } else {
355                        Ok(PickleValue::None)
356                    }
357                }
358                _ => Ok(PickleValue::None),
359            };
360        }
361    }
362
363    // Handle "request_path" -- trigger a path request to the network
364    if let Some(hash_val) = request.get("request_path") {
365        if let Some(hash_bytes) = hash_val.as_bytes() {
366            if hash_bytes.len() >= 16 {
367                let mut dest_hash = [0u8; 16];
368                dest_hash.copy_from_slice(&hash_bytes[..16]);
369                let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
370                return Ok(PickleValue::Bool(true));
371            }
372        }
373    }
374
375    // Handle "send_probe" requests
376    if let Some(hash_val) = request.get("send_probe") {
377        if let Some(hash_bytes) = hash_val.as_bytes() {
378            if hash_bytes.len() >= 16 {
379                let mut dest_hash = [0u8; 16];
380                dest_hash.copy_from_slice(&hash_bytes[..16]);
381                let payload_size = request
382                    .get("size")
383                    .and_then(|v| v.as_int())
384                    .and_then(|n| {
385                        if n > 0 && n <= 400 {
386                            Some(n as usize)
387                        } else {
388                            None
389                        }
390                    })
391                    .unwrap_or(16);
392                let resp = send_query(
393                    event_tx,
394                    QueryRequest::SendProbe {
395                        dest_hash,
396                        payload_size,
397                    },
398                )?;
399                if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
400                    return Ok(PickleValue::Dict(vec![
401                        (
402                            PickleValue::String("packet_hash".into()),
403                            PickleValue::Bytes(packet_hash.to_vec()),
404                        ),
405                        (
406                            PickleValue::String("hops".into()),
407                            PickleValue::Int(hops as i64),
408                        ),
409                    ]));
410                } else {
411                    return Ok(PickleValue::None);
412                }
413            }
414        }
415    }
416
417    // Handle "check_proof" requests
418    if let Some(hash_val) = request.get("check_proof") {
419        if let Some(hash_bytes) = hash_val.as_bytes() {
420            if hash_bytes.len() >= 32 {
421                let mut packet_hash = [0u8; 32];
422                packet_hash.copy_from_slice(&hash_bytes[..32]);
423                let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
424                if let QueryResponse::CheckProof(Some(rtt)) = resp {
425                    return Ok(PickleValue::Float(rtt));
426                } else {
427                    return Ok(PickleValue::None);
428                }
429            }
430        }
431    }
432
433    // Handle "blackhole" requests
434    if let Some(hash_val) = request.get("blackhole") {
435        if let Some(hash_bytes) = hash_val.as_bytes() {
436            if hash_bytes.len() >= 16 {
437                let mut identity_hash = [0u8; 16];
438                identity_hash.copy_from_slice(&hash_bytes[..16]);
439                let duration_hours = request.get("duration").and_then(|v| v.as_float());
440                let reason = request
441                    .get("reason")
442                    .and_then(|v| v.as_str())
443                    .map(|s| s.to_string());
444                let resp = send_query(
445                    event_tx,
446                    QueryRequest::BlackholeIdentity {
447                        identity_hash,
448                        duration_hours,
449                        reason,
450                    },
451                )?;
452                return Ok(PickleValue::Bool(matches!(
453                    resp,
454                    QueryResponse::BlackholeResult(true)
455                )));
456            }
457        }
458    }
459
460    // Handle "unblackhole" requests
461    if let Some(hash_val) = request.get("unblackhole") {
462        if let Some(hash_bytes) = hash_val.as_bytes() {
463            if hash_bytes.len() >= 16 {
464                let mut identity_hash = [0u8; 16];
465                identity_hash.copy_from_slice(&hash_bytes[..16]);
466                let resp = send_query(
467                    event_tx,
468                    QueryRequest::UnblackholeIdentity { identity_hash },
469                )?;
470                return Ok(PickleValue::Bool(matches!(
471                    resp,
472                    QueryResponse::UnblackholeResult(true)
473                )));
474            }
475        }
476    }
477
478    // Handle "drop" requests
479    if let Some(drop_val) = request.get("drop") {
480        if let Some(path) = drop_val.as_str() {
481            return match path {
482                "path" => {
483                    let hash = extract_dest_hash(request, "destination_hash")?;
484                    let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
485                    if let QueryResponse::DropPath(ok) = resp {
486                        Ok(PickleValue::Bool(ok))
487                    } else {
488                        Ok(PickleValue::None)
489                    }
490                }
491                "all_via" => {
492                    let hash = extract_dest_hash(request, "destination_hash")?;
493                    let resp = send_query(
494                        event_tx,
495                        QueryRequest::DropAllVia {
496                            transport_hash: hash,
497                        },
498                    )?;
499                    if let QueryResponse::DropAllVia(n) = resp {
500                        Ok(PickleValue::Int(n as i64))
501                    } else {
502                        Ok(PickleValue::None)
503                    }
504                }
505                "announce_queues" => {
506                    let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
507                    if let QueryResponse::DropAnnounceQueues = resp {
508                        Ok(PickleValue::Bool(true))
509                    } else {
510                        Ok(PickleValue::None)
511                    }
512                }
513                _ => Ok(PickleValue::None),
514            };
515        }
516    }
517
518    Ok(PickleValue::None)
519}
520
521/// Send a query to the driver and wait for the response.
522fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
523    let (resp_tx, resp_rx) = mpsc::channel();
524    event_tx
525        .send(Event::Query(request, resp_tx))
526        .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
527    resp_rx
528        .recv_timeout(std::time::Duration::from_secs(5))
529        .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
530}
531
532/// Extract a 16-byte destination hash from a pickle dict field.
533fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
534    let bytes = request
535        .get(key)
536        .and_then(|v| v.as_bytes())
537        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
538    if bytes.len() < 16 {
539        return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
540    }
541    let mut hash = [0u8; 16];
542    hash.copy_from_slice(&bytes[..16]);
543    Ok(hash)
544}
545
546// --- Pickle response builders ---
547
548fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
549    let mut ifaces = Vec::new();
550    for iface in &stats.interfaces {
551        ifaces.push(single_iface_to_pickle(iface));
552    }
553
554    let mut dict = vec![
555        (
556            PickleValue::String("interfaces".into()),
557            PickleValue::List(ifaces),
558        ),
559        (
560            PickleValue::String("transport_enabled".into()),
561            PickleValue::Bool(stats.transport_enabled),
562        ),
563        (
564            PickleValue::String("transport_uptime".into()),
565            PickleValue::Float(stats.transport_uptime),
566        ),
567        (
568            PickleValue::String("rxb".into()),
569            PickleValue::Int(stats.total_rxb as i64),
570        ),
571        (
572            PickleValue::String("txb".into()),
573            PickleValue::Int(stats.total_txb as i64),
574        ),
575    ];
576
577    if let Some(tid) = stats.transport_id {
578        dict.push((
579            PickleValue::String("transport_id".into()),
580            PickleValue::Bytes(tid.to_vec()),
581        ));
582    } else {
583        dict.push((
584            PickleValue::String("transport_id".into()),
585            PickleValue::None,
586        ));
587    }
588
589    if let Some(pr) = stats.probe_responder {
590        dict.push((
591            PickleValue::String("probe_responder".into()),
592            PickleValue::Bytes(pr.to_vec()),
593        ));
594    } else {
595        dict.push((
596            PickleValue::String("probe_responder".into()),
597            PickleValue::None,
598        ));
599    }
600
601    PickleValue::Dict(dict)
602}
603
604fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
605    let mut dict = vec![
606        (
607            PickleValue::String("name".into()),
608            PickleValue::String(s.name.clone()),
609        ),
610        (
611            PickleValue::String("status".into()),
612            PickleValue::Bool(s.status),
613        ),
614        (
615            PickleValue::String("mode".into()),
616            PickleValue::Int(s.mode as i64),
617        ),
618        (
619            PickleValue::String("rxb".into()),
620            PickleValue::Int(s.rxb as i64),
621        ),
622        (
623            PickleValue::String("txb".into()),
624            PickleValue::Int(s.txb as i64),
625        ),
626        (
627            PickleValue::String("rx_packets".into()),
628            PickleValue::Int(s.rx_packets as i64),
629        ),
630        (
631            PickleValue::String("tx_packets".into()),
632            PickleValue::Int(s.tx_packets as i64),
633        ),
634        (
635            PickleValue::String("started".into()),
636            PickleValue::Float(s.started),
637        ),
638        (
639            PickleValue::String("ia_freq".into()),
640            PickleValue::Float(s.ia_freq),
641        ),
642        (
643            PickleValue::String("oa_freq".into()),
644            PickleValue::Float(s.oa_freq),
645        ),
646    ];
647
648    match s.bitrate {
649        Some(br) => dict.push((
650            PickleValue::String("bitrate".into()),
651            PickleValue::Int(br as i64),
652        )),
653        None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
654    }
655
656    match s.ifac_size {
657        Some(sz) => dict.push((
658            PickleValue::String("ifac_size".into()),
659            PickleValue::Int(sz as i64),
660        )),
661        None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
662    }
663
664    PickleValue::Dict(dict)
665}
666
667fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
668    let list: Vec<PickleValue> = entries
669        .iter()
670        .map(|e| {
671            PickleValue::Dict(vec![
672                (
673                    PickleValue::String("hash".into()),
674                    PickleValue::Bytes(e.hash.to_vec()),
675                ),
676                (
677                    PickleValue::String("timestamp".into()),
678                    PickleValue::Float(e.timestamp),
679                ),
680                (
681                    PickleValue::String("via".into()),
682                    PickleValue::Bytes(e.via.to_vec()),
683                ),
684                (
685                    PickleValue::String("hops".into()),
686                    PickleValue::Int(e.hops as i64),
687                ),
688                (
689                    PickleValue::String("expires".into()),
690                    PickleValue::Float(e.expires),
691                ),
692                (
693                    PickleValue::String("interface".into()),
694                    PickleValue::String(e.interface_name.clone()),
695                ),
696            ])
697        })
698        .collect();
699    PickleValue::List(list)
700}
701
702fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
703    let list: Vec<PickleValue> = entries
704        .iter()
705        .map(|e| {
706            PickleValue::Dict(vec![
707                (
708                    PickleValue::String("hash".into()),
709                    PickleValue::Bytes(e.hash.to_vec()),
710                ),
711                (
712                    PickleValue::String("last".into()),
713                    PickleValue::Float(e.last),
714                ),
715                (
716                    PickleValue::String("rate_violations".into()),
717                    PickleValue::Int(e.rate_violations as i64),
718                ),
719                (
720                    PickleValue::String("blocked_until".into()),
721                    PickleValue::Float(e.blocked_until),
722                ),
723                (
724                    PickleValue::String("timestamps".into()),
725                    PickleValue::List(
726                        e.timestamps
727                            .iter()
728                            .map(|&t| PickleValue::Float(t))
729                            .collect(),
730                    ),
731                ),
732            ])
733        })
734        .collect();
735    PickleValue::List(list)
736}
737
738fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
739    let list: Vec<PickleValue> = entries
740        .iter()
741        .map(|e| {
742            let mut dict = vec![
743                (
744                    PickleValue::String("identity_hash".into()),
745                    PickleValue::Bytes(e.identity_hash.to_vec()),
746                ),
747                (
748                    PickleValue::String("created".into()),
749                    PickleValue::Float(e.created),
750                ),
751                (
752                    PickleValue::String("expires".into()),
753                    PickleValue::Float(e.expires),
754                ),
755            ];
756            if let Some(ref reason) = e.reason {
757                dict.push((
758                    PickleValue::String("reason".into()),
759                    PickleValue::String(reason.clone()),
760                ));
761            } else {
762                dict.push((PickleValue::String("reason".into()), PickleValue::None));
763            }
764            PickleValue::Dict(dict)
765        })
766        .collect();
767    PickleValue::List(list)
768}
769
770fn discovered_interfaces_to_pickle(
771    interfaces: &[crate::discovery::DiscoveredInterface],
772) -> PickleValue {
773    let list: Vec<PickleValue> = interfaces
774        .iter()
775        .map(|iface| {
776            let mut dict = vec![
777                (
778                    PickleValue::String("type".into()),
779                    PickleValue::String(iface.interface_type.clone()),
780                ),
781                (
782                    PickleValue::String("transport".into()),
783                    PickleValue::Bool(iface.transport),
784                ),
785                (
786                    PickleValue::String("name".into()),
787                    PickleValue::String(iface.name.clone()),
788                ),
789                (
790                    PickleValue::String("discovered".into()),
791                    PickleValue::Float(iface.discovered),
792                ),
793                (
794                    PickleValue::String("last_heard".into()),
795                    PickleValue::Float(iface.last_heard),
796                ),
797                (
798                    PickleValue::String("heard_count".into()),
799                    PickleValue::Int(iface.heard_count as i64),
800                ),
801                (
802                    PickleValue::String("status".into()),
803                    PickleValue::String(iface.status.as_str().into()),
804                ),
805                (
806                    PickleValue::String("stamp".into()),
807                    PickleValue::Bytes(iface.stamp.clone()),
808                ),
809                (
810                    PickleValue::String("value".into()),
811                    PickleValue::Int(iface.stamp_value as i64),
812                ),
813                (
814                    PickleValue::String("transport_id".into()),
815                    PickleValue::Bytes(iface.transport_id.to_vec()),
816                ),
817                (
818                    PickleValue::String("network_id".into()),
819                    PickleValue::Bytes(iface.network_id.to_vec()),
820                ),
821                (
822                    PickleValue::String("hops".into()),
823                    PickleValue::Int(iface.hops as i64),
824                ),
825            ];
826
827            // Optional location fields
828            if let Some(v) = iface.latitude {
829                dict.push((
830                    PickleValue::String("latitude".into()),
831                    PickleValue::Float(v),
832                ));
833            } else {
834                dict.push((PickleValue::String("latitude".into()), PickleValue::None));
835            }
836            if let Some(v) = iface.longitude {
837                dict.push((
838                    PickleValue::String("longitude".into()),
839                    PickleValue::Float(v),
840                ));
841            } else {
842                dict.push((PickleValue::String("longitude".into()), PickleValue::None));
843            }
844            if let Some(v) = iface.height {
845                dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
846            } else {
847                dict.push((PickleValue::String("height".into()), PickleValue::None));
848            }
849
850            // Connection info
851            if let Some(ref v) = iface.reachable_on {
852                dict.push((
853                    PickleValue::String("reachable_on".into()),
854                    PickleValue::String(v.clone()),
855                ));
856            } else {
857                dict.push((
858                    PickleValue::String("reachable_on".into()),
859                    PickleValue::None,
860                ));
861            }
862            if let Some(v) = iface.port {
863                dict.push((
864                    PickleValue::String("port".into()),
865                    PickleValue::Int(v as i64),
866                ));
867            } else {
868                dict.push((PickleValue::String("port".into()), PickleValue::None));
869            }
870
871            // RNode/RF specific
872            if let Some(v) = iface.frequency {
873                dict.push((
874                    PickleValue::String("frequency".into()),
875                    PickleValue::Int(v as i64),
876                ));
877            } else {
878                dict.push((PickleValue::String("frequency".into()), PickleValue::None));
879            }
880            if let Some(v) = iface.bandwidth {
881                dict.push((
882                    PickleValue::String("bandwidth".into()),
883                    PickleValue::Int(v as i64),
884                ));
885            } else {
886                dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
887            }
888            if let Some(v) = iface.spreading_factor {
889                dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
890            } else {
891                dict.push((PickleValue::String("sf".into()), PickleValue::None));
892            }
893            if let Some(v) = iface.coding_rate {
894                dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
895            } else {
896                dict.push((PickleValue::String("cr".into()), PickleValue::None));
897            }
898            if let Some(ref v) = iface.modulation {
899                dict.push((
900                    PickleValue::String("modulation".into()),
901                    PickleValue::String(v.clone()),
902                ));
903            } else {
904                dict.push((PickleValue::String("modulation".into()), PickleValue::None));
905            }
906            if let Some(v) = iface.channel {
907                dict.push((
908                    PickleValue::String("channel".into()),
909                    PickleValue::Int(v as i64),
910                ));
911            } else {
912                dict.push((PickleValue::String("channel".into()), PickleValue::None));
913            }
914
915            // IFAC info
916            if let Some(ref v) = iface.ifac_netname {
917                dict.push((
918                    PickleValue::String("ifac_netname".into()),
919                    PickleValue::String(v.clone()),
920                ));
921            } else {
922                dict.push((
923                    PickleValue::String("ifac_netname".into()),
924                    PickleValue::None,
925                ));
926            }
927            if let Some(ref v) = iface.ifac_netkey {
928                dict.push((
929                    PickleValue::String("ifac_netkey".into()),
930                    PickleValue::String(v.clone()),
931                ));
932            } else {
933                dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
934            }
935
936            // Config entry
937            if let Some(ref v) = iface.config_entry {
938                dict.push((
939                    PickleValue::String("config_entry".into()),
940                    PickleValue::String(v.clone()),
941                ));
942            } else {
943                dict.push((
944                    PickleValue::String("config_entry".into()),
945                    PickleValue::None,
946                ));
947            }
948
949            dict.push((
950                PickleValue::String("discovery_hash".into()),
951                PickleValue::Bytes(iface.discovery_hash.to_vec()),
952            ));
953
954            PickleValue::Dict(dict)
955        })
956        .collect();
957    PickleValue::List(list)
958}
959
960// --- RPC Client ---
961
962/// RPC client for connecting to a running daemon.
963pub struct RpcClient {
964    stream: TcpStream,
965}
966
967impl RpcClient {
968    /// Connect to an RPC server and authenticate.
969    pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
970        let mut stream = match addr {
971            RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
972        };
973
974        stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
975        stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
976
977        // Client-side authentication
978        client_auth(&mut stream, auth_key)?;
979
980        Ok(RpcClient { stream })
981    }
982
983    /// Send a pickle request and receive a pickle response.
984    pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
985        let request_bytes = pickle::encode(request);
986        send_bytes(&mut self.stream, &request_bytes)?;
987
988        let response_bytes = recv_bytes(&mut self.stream)?;
989        pickle::decode(&response_bytes)
990            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
991    }
992}
993
994/// Client-side authentication: answer the server's challenge.
995fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
996    // Read challenge
997    let challenge = recv_bytes(stream)?;
998
999    if !challenge.starts_with(CHALLENGE_PREFIX) {
1000        return Err(io::Error::new(
1001            io::ErrorKind::InvalidData,
1002            "expected challenge",
1003        ));
1004    }
1005
1006    let message = &challenge[CHALLENGE_PREFIX.len()..];
1007
1008    // Create HMAC response
1009    let response = create_response(auth_key, message);
1010    send_bytes(stream, &response)?;
1011
1012    // Read welcome/failure
1013    let result = recv_bytes(stream)?;
1014    if result == WELCOME {
1015        Ok(())
1016    } else {
1017        Err(io::Error::new(
1018            io::ErrorKind::PermissionDenied,
1019            "authentication failed",
1020        ))
1021    }
1022}
1023
1024/// Create an HMAC response to a challenge message.
1025fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
1026    // Check if message has {sha256} prefix (modern protocol)
1027    if message.starts_with(b"{sha256}") || message.len() > 20 {
1028        // Modern protocol: use HMAC-SHA256 with {sha256} prefix
1029        let digest = hmac_sha256(auth_key, message);
1030        let mut response = Vec::with_capacity(8 + 32);
1031        response.extend_from_slice(b"{sha256}");
1032        response.extend_from_slice(&digest);
1033        response
1034    } else {
1035        // Legacy protocol: raw HMAC-MD5
1036        let digest = hmac_md5(auth_key, message);
1037        digest.to_vec()
1038    }
1039}
1040
1041/// Derive the RPC auth key from transport identity private key.
1042pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
1043    sha256(private_key)
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048    use super::*;
1049
1050    #[test]
1051    fn send_recv_bytes_roundtrip() {
1052        let (mut c1, mut c2) = tcp_pair();
1053        let data = b"hello world";
1054        send_bytes(&mut c1, data).unwrap();
1055        let received = recv_bytes(&mut c2).unwrap();
1056        assert_eq!(&received, data);
1057    }
1058
1059    #[test]
1060    fn send_recv_empty() {
1061        let (mut c1, mut c2) = tcp_pair();
1062        send_bytes(&mut c1, b"").unwrap();
1063        let received = recv_bytes(&mut c2).unwrap();
1064        assert!(received.is_empty());
1065    }
1066
1067    #[test]
1068    fn auth_success() {
1069        let key = derive_auth_key(b"test-private-key");
1070        let (mut server, mut client) = tcp_pair();
1071
1072        let key2 = key;
1073        let t = thread::spawn(move || {
1074            client_auth(&mut client, &key2).unwrap();
1075        });
1076
1077        server_auth(&mut server, &key).unwrap();
1078        t.join().unwrap();
1079    }
1080
1081    #[test]
1082    fn auth_failure_wrong_key() {
1083        let server_key = derive_auth_key(b"server-key");
1084        let client_key = derive_auth_key(b"wrong-key");
1085        let (mut server, mut client) = tcp_pair();
1086
1087        let t = thread::spawn(move || {
1088            let result = client_auth(&mut client, &client_key);
1089            assert!(result.is_err());
1090        });
1091
1092        let result = server_auth(&mut server, &server_key);
1093        assert!(result.is_err());
1094        t.join().unwrap();
1095    }
1096
1097    #[test]
1098    fn verify_sha256_response() {
1099        let key = derive_auth_key(b"mykey");
1100        let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
1101        let response = create_response(&key, message);
1102        assert!(response.starts_with(b"{sha256}"));
1103        assert!(verify_response(&key, message, &response));
1104    }
1105
1106    #[test]
1107    fn verify_legacy_md5_response() {
1108        let key = derive_auth_key(b"mykey");
1109        // Legacy message: 20 bytes, no prefix
1110        let message = b"01234567890123456789";
1111        // Create legacy response (raw HMAC-MD5)
1112        let digest = hmac_md5(&key, message);
1113        assert!(verify_response(&key, message, &digest));
1114    }
1115
1116    #[test]
1117    fn constant_time_eq_works() {
1118        assert!(constant_time_eq(b"hello", b"hello"));
1119        assert!(!constant_time_eq(b"hello", b"world"));
1120        assert!(!constant_time_eq(b"hello", b"hell"));
1121    }
1122
1123    #[test]
1124    fn rpc_roundtrip() {
1125        let key = derive_auth_key(b"test-key");
1126        let (event_tx, event_rx) = crate::event::channel();
1127
1128        // Start server
1129        let addr = RpcAddr::Tcp("127.0.0.1".into(), 0);
1130        // Bind manually to get the actual port
1131        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1132        let port = listener.local_addr().unwrap().port();
1133        listener.set_nonblocking(true).unwrap();
1134
1135        let shutdown = Arc::new(AtomicBool::new(false));
1136        let shutdown2 = shutdown.clone();
1137
1138        // Driver thread that handles queries
1139        let driver_thread = thread::spawn(move || loop {
1140            match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
1141                Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
1142                    let _ = resp_tx.send(QueryResponse::LinkCount(42));
1143                }
1144                Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
1145                    let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
1146                        interfaces: vec![SingleInterfaceStat {
1147                            name: "TestInterface".into(),
1148                            status: true,
1149                            mode: 1,
1150                            rxb: 1000,
1151                            txb: 2000,
1152                            rx_packets: 10,
1153                            tx_packets: 20,
1154                            bitrate: Some(10_000_000),
1155                            ifac_size: None,
1156                            started: 1000.0,
1157                            ia_freq: 0.0,
1158                            oa_freq: 0.0,
1159                            interface_type: "TestInterface".into(),
1160                        }],
1161                        transport_id: None,
1162                        transport_enabled: true,
1163                        transport_uptime: 3600.0,
1164                        total_rxb: 1000,
1165                        total_txb: 2000,
1166                        probe_responder: None,
1167                    }));
1168                }
1169                _ => break,
1170            }
1171        });
1172
1173        let key2 = key;
1174        let shutdown3 = shutdown2.clone();
1175        let server_thread = thread::spawn(move || {
1176            rpc_server_loop(listener, key2, event_tx, shutdown3);
1177        });
1178
1179        // Give server time to start
1180        thread::sleep(std::time::Duration::from_millis(50));
1181
1182        // Client: connect and query link count
1183        let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
1184        let mut client = RpcClient::connect(&server_addr, &key).unwrap();
1185        let response = client
1186            .call(&PickleValue::Dict(vec![(
1187                PickleValue::String("get".into()),
1188                PickleValue::String("link_count".into()),
1189            )]))
1190            .unwrap();
1191        assert_eq!(response.as_int().unwrap(), 42);
1192        drop(client);
1193
1194        // Client: query interface stats
1195        let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
1196        let response2 = client2
1197            .call(&PickleValue::Dict(vec![(
1198                PickleValue::String("get".into()),
1199                PickleValue::String("interface_stats".into()),
1200            )]))
1201            .unwrap();
1202        let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
1203        assert_eq!(ifaces.len(), 1);
1204        let iface = &ifaces[0];
1205        assert_eq!(
1206            iface.get("name").unwrap().as_str().unwrap(),
1207            "TestInterface"
1208        );
1209        assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
1210        drop(client2);
1211
1212        // Shutdown
1213        shutdown2.store(true, Ordering::Relaxed);
1214        server_thread.join().unwrap();
1215        driver_thread.join().unwrap();
1216    }
1217
1218    #[test]
1219    fn derive_auth_key_deterministic() {
1220        let key1 = derive_auth_key(b"test");
1221        let key2 = derive_auth_key(b"test");
1222        assert_eq!(key1, key2);
1223        // Different input → different key
1224        let key3 = derive_auth_key(b"other");
1225        assert_ne!(key1, key3);
1226    }
1227
1228    #[test]
1229    fn pickle_request_handling() {
1230        // Test the request → query translation without networking
1231        let (event_tx, event_rx) = crate::event::channel();
1232
1233        let driver = thread::spawn(move || {
1234            if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
1235            {
1236                assert_eq!(dest_hash, [1u8; 16]);
1237                let _ = resp_tx.send(QueryResponse::DropPath(true));
1238            }
1239        });
1240
1241        let request = PickleValue::Dict(vec![
1242            (
1243                PickleValue::String("drop".into()),
1244                PickleValue::String("path".into()),
1245            ),
1246            (
1247                PickleValue::String("destination_hash".into()),
1248                PickleValue::Bytes(vec![1u8; 16]),
1249            ),
1250        ]);
1251
1252        let response = handle_rpc_request(&request, &event_tx).unwrap();
1253        assert_eq!(response, PickleValue::Bool(true));
1254        driver.join().unwrap();
1255    }
1256
1257    #[test]
1258    fn interface_stats_pickle_format() {
1259        let stats = InterfaceStatsResponse {
1260            interfaces: vec![SingleInterfaceStat {
1261                name: "TCP".into(),
1262                status: true,
1263                mode: 1,
1264                rxb: 100,
1265                txb: 200,
1266                rx_packets: 5,
1267                tx_packets: 10,
1268                bitrate: Some(1000000),
1269                ifac_size: Some(16),
1270                started: 1000.0,
1271                ia_freq: 0.0,
1272                oa_freq: 0.0,
1273                interface_type: "TCPClientInterface".into(),
1274            }],
1275            transport_id: Some([0xAB; 16]),
1276            transport_enabled: true,
1277            transport_uptime: 3600.0,
1278            total_rxb: 100,
1279            total_txb: 200,
1280            probe_responder: None,
1281        };
1282
1283        let pickle = interface_stats_to_pickle(&stats);
1284
1285        // Verify it round-trips through encode/decode
1286        let encoded = pickle::encode(&pickle);
1287        let decoded = pickle::decode(&encoded).unwrap();
1288        assert_eq!(
1289            decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
1290            true
1291        );
1292        let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
1293        assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
1294    }
1295
1296    #[test]
1297    fn send_probe_rpc_unknown_dest() {
1298        let (event_tx, event_rx) = crate::event::channel();
1299
1300        let driver = thread::spawn(move || {
1301            if let Ok(Event::Query(
1302                QueryRequest::SendProbe {
1303                    dest_hash,
1304                    payload_size,
1305                },
1306                resp_tx,
1307            )) = event_rx.recv()
1308            {
1309                assert_eq!(dest_hash, [0xAA; 16]);
1310                assert_eq!(payload_size, 16); // default
1311                let _ = resp_tx.send(QueryResponse::SendProbe(None));
1312            }
1313        });
1314
1315        let request = PickleValue::Dict(vec![(
1316            PickleValue::String("send_probe".into()),
1317            PickleValue::Bytes(vec![0xAA; 16]),
1318        )]);
1319
1320        let response = handle_rpc_request(&request, &event_tx).unwrap();
1321        assert_eq!(response, PickleValue::None);
1322        driver.join().unwrap();
1323    }
1324
1325    #[test]
1326    fn send_probe_rpc_with_result() {
1327        let (event_tx, event_rx) = crate::event::channel();
1328
1329        let packet_hash = [0xBB; 32];
1330        let driver = thread::spawn(move || {
1331            if let Ok(Event::Query(
1332                QueryRequest::SendProbe {
1333                    dest_hash,
1334                    payload_size,
1335                },
1336                resp_tx,
1337            )) = event_rx.recv()
1338            {
1339                assert_eq!(dest_hash, [0xCC; 16]);
1340                assert_eq!(payload_size, 32);
1341                let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
1342            }
1343        });
1344
1345        let request = PickleValue::Dict(vec![
1346            (
1347                PickleValue::String("send_probe".into()),
1348                PickleValue::Bytes(vec![0xCC; 16]),
1349            ),
1350            (PickleValue::String("size".into()), PickleValue::Int(32)),
1351        ]);
1352
1353        let response = handle_rpc_request(&request, &event_tx).unwrap();
1354        let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
1355        assert_eq!(ph, &[0xBB; 32]);
1356        assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
1357        driver.join().unwrap();
1358    }
1359
1360    #[test]
1361    fn send_probe_rpc_size_validation() {
1362        let (event_tx, event_rx) = crate::event::channel();
1363
1364        // Negative size should be clamped to default (16)
1365        let driver = thread::spawn(move || {
1366            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
1367                event_rx.recv()
1368            {
1369                assert_eq!(payload_size, 16); // default, not negative
1370                let _ = resp_tx.send(QueryResponse::SendProbe(None));
1371            }
1372        });
1373
1374        let request = PickleValue::Dict(vec![
1375            (
1376                PickleValue::String("send_probe".into()),
1377                PickleValue::Bytes(vec![0xDD; 16]),
1378            ),
1379            (PickleValue::String("size".into()), PickleValue::Int(-1)),
1380        ]);
1381
1382        let response = handle_rpc_request(&request, &event_tx).unwrap();
1383        assert_eq!(response, PickleValue::None);
1384        driver.join().unwrap();
1385    }
1386
1387    #[test]
1388    fn send_probe_rpc_size_too_large() {
1389        let (event_tx, event_rx) = crate::event::channel();
1390
1391        // Size > 400 should be clamped to default (16)
1392        let driver = thread::spawn(move || {
1393            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
1394                event_rx.recv()
1395            {
1396                assert_eq!(payload_size, 16); // default, not 999
1397                let _ = resp_tx.send(QueryResponse::SendProbe(None));
1398            }
1399        });
1400
1401        let request = PickleValue::Dict(vec![
1402            (
1403                PickleValue::String("send_probe".into()),
1404                PickleValue::Bytes(vec![0xDD; 16]),
1405            ),
1406            (PickleValue::String("size".into()), PickleValue::Int(999)),
1407        ]);
1408
1409        let response = handle_rpc_request(&request, &event_tx).unwrap();
1410        assert_eq!(response, PickleValue::None);
1411        driver.join().unwrap();
1412    }
1413
1414    #[test]
1415    fn check_proof_rpc_not_found() {
1416        let (event_tx, event_rx) = crate::event::channel();
1417
1418        let driver = thread::spawn(move || {
1419            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
1420                event_rx.recv()
1421            {
1422                assert_eq!(packet_hash, [0xEE; 32]);
1423                let _ = resp_tx.send(QueryResponse::CheckProof(None));
1424            }
1425        });
1426
1427        let request = PickleValue::Dict(vec![(
1428            PickleValue::String("check_proof".into()),
1429            PickleValue::Bytes(vec![0xEE; 32]),
1430        )]);
1431
1432        let response = handle_rpc_request(&request, &event_tx).unwrap();
1433        assert_eq!(response, PickleValue::None);
1434        driver.join().unwrap();
1435    }
1436
1437    #[test]
1438    fn check_proof_rpc_found() {
1439        let (event_tx, event_rx) = crate::event::channel();
1440
1441        let driver = thread::spawn(move || {
1442            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
1443                event_rx.recv()
1444            {
1445                assert_eq!(packet_hash, [0xFF; 32]);
1446                let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
1447            }
1448        });
1449
1450        let request = PickleValue::Dict(vec![(
1451            PickleValue::String("check_proof".into()),
1452            PickleValue::Bytes(vec![0xFF; 32]),
1453        )]);
1454
1455        let response = handle_rpc_request(&request, &event_tx).unwrap();
1456        if let PickleValue::Float(rtt) = response {
1457            assert!((rtt - 0.352).abs() < 0.001);
1458        } else {
1459            panic!("Expected Float, got {:?}", response);
1460        }
1461        driver.join().unwrap();
1462    }
1463
1464    #[test]
1465    fn request_path_rpc() {
1466        let (event_tx, event_rx) = crate::event::channel();
1467
1468        let driver =
1469            thread::spawn(
1470                move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
1471                    Ok(Event::RequestPath { dest_hash }) => {
1472                        assert_eq!(dest_hash, [0x11; 16]);
1473                    }
1474                    other => panic!("Expected RequestPath event, got {:?}", other),
1475                },
1476            );
1477
1478        let request = PickleValue::Dict(vec![(
1479            PickleValue::String("request_path".into()),
1480            PickleValue::Bytes(vec![0x11; 16]),
1481        )]);
1482
1483        let response = handle_rpc_request(&request, &event_tx).unwrap();
1484        assert_eq!(response, PickleValue::Bool(true));
1485        driver.join().unwrap();
1486    }
1487
1488    #[test]
1489    fn interface_stats_with_probe_responder() {
1490        let probe_hash = [0x42; 16];
1491        let stats = InterfaceStatsResponse {
1492            interfaces: vec![],
1493            transport_id: None,
1494            transport_enabled: true,
1495            transport_uptime: 100.0,
1496            total_rxb: 0,
1497            total_txb: 0,
1498            probe_responder: Some(probe_hash),
1499        };
1500
1501        let pickle = interface_stats_to_pickle(&stats);
1502        let encoded = pickle::encode(&pickle);
1503        let decoded = pickle::decode(&encoded).unwrap();
1504
1505        let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
1506        assert_eq!(pr, &probe_hash);
1507    }
1508
1509    // Helper: create a connected TCP pair
1510    fn tcp_pair() -> (TcpStream, TcpStream) {
1511        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1512        let port = listener.local_addr().unwrap().port();
1513        let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
1514        let (server, _) = listener.accept().unwrap();
1515        client
1516            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
1517            .unwrap();
1518        server
1519            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
1520            .unwrap();
1521        (server, client)
1522    }
1523}