Skip to main content

rns_net/
rpc.rs

1//! RPC server and client for cross-process daemon communication.
2//!
3//! Implements Python `multiprocessing.connection` wire protocol:
4//! - 4-byte big-endian signed i32 length prefix + payload
5//! - HMAC-SHA256 challenge-response authentication
6//! - Pickle serialization for request/response dictionaries
7//!
8//! Server translates pickle dicts into [`QueryRequest`] events, sends
9//! them through the driver event channel, and returns pickle responses.
10
11use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15    atomic::{AtomicBool, Ordering},
16    Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_crypto::hmac::hmac_sha256;
22use rns_crypto::sha256::sha256;
23
24use crate::event::{
25    BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
26    HookInfo, InterfaceStatsResponse, LifecycleState, PathTableEntry, ProviderBridgeStats,
27    QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode, RuntimeConfigEntry,
28    RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource, RuntimeConfigValue,
29    SingleInterfaceStat,
30};
31use crate::md5::hmac_md5;
32use crate::pickle::{self, PickleValue};
33
34const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
35const WELCOME: &[u8] = b"#WELCOME#";
36const FAILURE: &[u8] = b"#FAILURE#";
37const CHALLENGE_LEN: usize = 40;
38
39/// RPC address types.
40#[derive(Debug, Clone)]
41pub enum RpcAddr {
42    Tcp(String, u16),
43}
44
45/// RPC server that listens for incoming connections and handles queries.
46pub struct RpcServer {
47    shutdown: Arc<AtomicBool>,
48    thread: Option<thread::JoinHandle<()>>,
49}
50
51impl RpcServer {
52    /// Start the RPC server on the given address.
53    pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
54        let shutdown = Arc::new(AtomicBool::new(false));
55        let shutdown2 = shutdown.clone();
56
57        let listener = match addr {
58            RpcAddr::Tcp(host, port) => {
59                let l = TcpListener::bind((host.as_str(), *port))?;
60                // Non-blocking so we can check shutdown flag
61                l.set_nonblocking(true)?;
62                l
63            }
64        };
65
66        let thread = thread::Builder::new()
67            .name("rpc-server".into())
68            .spawn(move || {
69                rpc_server_loop(listener, auth_key, event_tx, shutdown2);
70            })
71            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
72
73        Ok(RpcServer {
74            shutdown,
75            thread: Some(thread),
76        })
77    }
78
79    /// Stop the RPC server.
80    pub fn stop(&mut self) {
81        self.shutdown.store(true, Ordering::Relaxed);
82        if let Some(handle) = self.thread.take() {
83            let _ = handle.join();
84        }
85    }
86}
87
88impl Drop for RpcServer {
89    fn drop(&mut self) {
90        self.stop();
91    }
92}
93
94fn rpc_server_loop(
95    listener: TcpListener,
96    auth_key: [u8; 32],
97    event_tx: EventSender,
98    shutdown: Arc<AtomicBool>,
99) {
100    loop {
101        if shutdown.load(Ordering::Relaxed) {
102            break;
103        }
104
105        match listener.accept() {
106            Ok((stream, _addr)) => {
107                // Set blocking for this connection
108                let _ = stream.set_nonblocking(false);
109                let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
110                let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
111
112                if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
113                    log::debug!("RPC connection error: {}", e);
114                }
115            }
116            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117                // No pending connection, sleep briefly and retry
118                thread::sleep(std::time::Duration::from_millis(100));
119            }
120            Err(e) => {
121                log::error!("RPC accept error: {}", e);
122                thread::sleep(std::time::Duration::from_millis(100));
123            }
124        }
125    }
126}
127
128fn handle_connection(
129    mut stream: TcpStream,
130    auth_key: &[u8; 32],
131    event_tx: &EventSender,
132) -> io::Result<()> {
133    // Authentication: send challenge, verify response
134    server_auth(&mut stream, auth_key)?;
135
136    // Read request (pickle dict)
137    let request_bytes = recv_bytes(&mut stream)?;
138    let request = pickle::decode(&request_bytes)
139        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
140
141    // Translate pickle dict to query, send to driver, get response
142    let response = handle_rpc_request(&request, event_tx)?;
143
144    // Encode response and send
145    let response_bytes = pickle::encode(&response);
146    send_bytes(&mut stream, &response_bytes)?;
147
148    Ok(())
149}
150
151/// Server-side authentication: challenge-response.
152fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
153    // Generate challenge: #CHALLENGE#{sha256}<40 random bytes>
154    let mut random_bytes = [0u8; CHALLENGE_LEN];
155    // Use /dev/urandom for randomness
156    {
157        let mut f = std::fs::File::open("/dev/urandom")?;
158        f.read_exact(&mut random_bytes)?;
159    }
160
161    let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
162    challenge_message.extend_from_slice(CHALLENGE_PREFIX);
163    challenge_message.extend_from_slice(b"{sha256}");
164    challenge_message.extend_from_slice(&random_bytes);
165
166    send_bytes(stream, &challenge_message)?;
167
168    // Read response (max 256 bytes)
169    let response = recv_bytes(stream)?;
170
171    // Verify response
172    // The message to HMAC is everything after #CHALLENGE# (i.e. {sha256}<random>)
173    let message = &challenge_message[CHALLENGE_PREFIX.len()..];
174
175    if verify_response(auth_key, message, &response) {
176        send_bytes(stream, WELCOME)?;
177        Ok(())
178    } else {
179        send_bytes(stream, FAILURE)?;
180        Err(io::Error::new(
181            io::ErrorKind::PermissionDenied,
182            "auth failed",
183        ))
184    }
185}
186
187/// Verify a client's HMAC response.
188fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
189    // Modern protocol: response = {sha256}<hmac-sha256 digest>
190    if response.starts_with(b"{sha256}") {
191        let digest = &response[8..];
192        let expected = hmac_sha256(auth_key, message);
193        constant_time_eq(digest, &expected)
194    }
195    // Legacy protocol: response = raw 16-byte HMAC-MD5 digest
196    else if response.len() == 16 {
197        let expected = hmac_md5(auth_key, message);
198        constant_time_eq(response, &expected)
199    }
200    // Legacy with {md5} prefix
201    else if response.starts_with(b"{md5}") {
202        let digest = &response[5..];
203        let expected = hmac_md5(auth_key, message);
204        constant_time_eq(digest, &expected)
205    } else {
206        false
207    }
208}
209
210/// Constant-time byte comparison.
211fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
212    if a.len() != b.len() {
213        return false;
214    }
215    let mut diff = 0u8;
216    for (x, y) in a.iter().zip(b.iter()) {
217        diff |= x ^ y;
218    }
219    diff == 0
220}
221
222/// Send bytes with 4-byte big-endian length prefix.
223fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
224    let len = data.len() as i32;
225    stream.write_all(&len.to_be_bytes())?;
226    stream.write_all(data)?;
227    stream.flush()
228}
229
230/// Receive bytes with 4-byte big-endian length prefix.
231fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
232    let mut len_buf = [0u8; 4];
233    stream.read_exact(&mut len_buf)?;
234    let len = i32::from_be_bytes(len_buf);
235
236    if len < 0 {
237        // Extended format: 8-byte length
238        let mut len8_buf = [0u8; 8];
239        stream.read_exact(&mut len8_buf)?;
240        let len = u64::from_be_bytes(len8_buf) as usize;
241        if len > 64 * 1024 * 1024 {
242            return Err(io::Error::new(
243                io::ErrorKind::InvalidData,
244                "message too large",
245            ));
246        }
247        let mut buf = vec![0u8; len];
248        stream.read_exact(&mut buf)?;
249        Ok(buf)
250    } else {
251        let len = len as usize;
252        if len > 64 * 1024 * 1024 {
253            return Err(io::Error::new(
254                io::ErrorKind::InvalidData,
255                "message too large",
256            ));
257        }
258        let mut buf = vec![0u8; len];
259        stream.read_exact(&mut buf)?;
260        Ok(buf)
261    }
262}
263
264/// Translate a pickle request dict to a query event and get response.
265fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
266    // Handle "get" requests
267    if let Some(get_val) = request.get("get") {
268        if let Some(path) = get_val.as_str() {
269            return match path {
270                "interface_stats" => {
271                    let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
272                    if let QueryResponse::InterfaceStats(stats) = resp {
273                        Ok(interface_stats_to_pickle(&stats))
274                    } else {
275                        Ok(PickleValue::None)
276                    }
277                }
278                "path_table" => {
279                    let max_hops = request
280                        .get("max_hops")
281                        .and_then(|v| v.as_int().map(|n| n as u8));
282                    let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
283                    if let QueryResponse::PathTable(entries) = resp {
284                        Ok(path_table_to_pickle(&entries))
285                    } else {
286                        Ok(PickleValue::None)
287                    }
288                }
289                "rate_table" => {
290                    let resp = send_query(event_tx, QueryRequest::RateTable)?;
291                    if let QueryResponse::RateTable(entries) = resp {
292                        Ok(rate_table_to_pickle(&entries))
293                    } else {
294                        Ok(PickleValue::None)
295                    }
296                }
297                "next_hop" => {
298                    let hash = extract_dest_hash(request, "destination_hash")?;
299                    let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
300                    if let QueryResponse::NextHop(Some(nh)) = resp {
301                        Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
302                    } else {
303                        Ok(PickleValue::None)
304                    }
305                }
306                "next_hop_if_name" => {
307                    let hash = extract_dest_hash(request, "destination_hash")?;
308                    let resp =
309                        send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
310                    if let QueryResponse::NextHopIfName(Some(name)) = resp {
311                        Ok(PickleValue::String(name))
312                    } else {
313                        Ok(PickleValue::None)
314                    }
315                }
316                "link_count" => {
317                    let resp = send_query(event_tx, QueryRequest::LinkCount)?;
318                    if let QueryResponse::LinkCount(n) = resp {
319                        Ok(PickleValue::Int(n as i64))
320                    } else {
321                        Ok(PickleValue::None)
322                    }
323                }
324                "transport_identity" => {
325                    let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
326                    if let QueryResponse::TransportIdentity(Some(hash)) = resp {
327                        Ok(PickleValue::Bytes(hash.to_vec()))
328                    } else {
329                        Ok(PickleValue::None)
330                    }
331                }
332                "blackholed" => {
333                    let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
334                    if let QueryResponse::Blackholed(entries) = resp {
335                        Ok(blackholed_to_pickle(&entries))
336                    } else {
337                        Ok(PickleValue::None)
338                    }
339                }
340                "discovered_interfaces" => {
341                    let only_available = request
342                        .get("only_available")
343                        .and_then(|v| v.as_bool())
344                        .unwrap_or(false);
345                    let only_transport = request
346                        .get("only_transport")
347                        .and_then(|v| v.as_bool())
348                        .unwrap_or(false);
349                    let resp = send_query(
350                        event_tx,
351                        QueryRequest::DiscoveredInterfaces {
352                            only_available,
353                            only_transport,
354                        },
355                    )?;
356                    if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
357                        Ok(discovered_interfaces_to_pickle(&interfaces))
358                    } else {
359                        Ok(PickleValue::None)
360                    }
361                }
362                "hooks" => {
363                    let (response_tx, response_rx) = mpsc::channel();
364                    event_tx
365                        .send(Event::ListHooks { response_tx })
366                        .map_err(|_| {
367                            io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
368                        })?;
369                    let hooks = response_rx
370                        .recv_timeout(std::time::Duration::from_secs(5))
371                        .map_err(|_| {
372                            io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
373                        })?;
374                    Ok(hooks_to_pickle(&hooks))
375                }
376                "runtime_config" => {
377                    let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
378                    if let QueryResponse::RuntimeConfigList(entries) = resp {
379                        Ok(runtime_config_list_to_pickle(&entries))
380                    } else {
381                        Ok(PickleValue::None)
382                    }
383                }
384                "runtime_config_entry" => {
385                    let key = request
386                        .get("key")
387                        .and_then(|v| v.as_str())
388                        .unwrap_or_default()
389                        .to_string();
390                    let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
391                    if let QueryResponse::RuntimeConfigEntry(entry) = resp {
392                        Ok(entry
393                            .as_ref()
394                            .map(runtime_config_entry_to_pickle)
395                            .unwrap_or(PickleValue::None))
396                    } else {
397                        Ok(PickleValue::None)
398                    }
399                }
400                "backbone_peer_state" => {
401                    let interface_name = request
402                        .get("interface")
403                        .and_then(|v| v.as_str())
404                        .map(|s| s.to_string());
405                    let resp =
406                        send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
407                    if let QueryResponse::BackbonePeerState(entries) = resp {
408                        Ok(backbone_peer_state_to_pickle(&entries))
409                    } else {
410                        Ok(PickleValue::None)
411                    }
412                }
413                "backbone_interfaces" => {
414                    let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
415                    if let QueryResponse::BackboneInterfaces(entries) = resp {
416                        Ok(backbone_interfaces_to_pickle(&entries))
417                    } else {
418                        Ok(PickleValue::None)
419                    }
420                }
421                "provider_bridge_stats" => {
422                    let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
423                    if let QueryResponse::ProviderBridgeStats(stats) = resp {
424                        Ok(stats
425                            .as_ref()
426                            .map(provider_bridge_stats_to_pickle)
427                            .unwrap_or(PickleValue::None))
428                    } else {
429                        Ok(PickleValue::None)
430                    }
431                }
432                "drain_status" => {
433                    let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
434                    if let QueryResponse::DrainStatus(status) = resp {
435                        Ok(drain_status_to_pickle(&status))
436                    } else {
437                        Ok(PickleValue::None)
438                    }
439                }
440                _ => Ok(PickleValue::None),
441            };
442        }
443    }
444
445    if let Some(begin_val) = request.get("begin_drain") {
446        let timeout_secs = begin_val
447            .as_float()
448            .or_else(|| begin_val.as_int().map(|value| value as f64))
449            .unwrap_or(0.0)
450            .max(0.0);
451        let timeout = Duration::from_secs_f64(timeout_secs);
452        let _ = event_tx.send(Event::BeginDrain { timeout });
453        return Ok(PickleValue::Bool(true));
454    }
455
456    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
457        if set_val == "runtime_config" {
458            let key = request
459                .get("key")
460                .and_then(|v| v.as_str())
461                .unwrap_or_default()
462                .to_string();
463            let Some(value) = request
464                .get("value")
465                .and_then(runtime_config_value_from_pickle)
466            else {
467                return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
468                    code: RuntimeConfigErrorCode::InvalidType,
469                    message: "runtime-config set requires a scalar value".into(),
470                }));
471            };
472            let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
473            return if let QueryResponse::RuntimeConfigSet(result) = resp {
474                Ok(runtime_config_result_to_pickle(result))
475            } else {
476                Ok(PickleValue::None)
477            };
478        }
479    }
480
481    if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
482        if reset_val == "runtime_config" {
483            let key = request
484                .get("key")
485                .and_then(|v| v.as_str())
486                .unwrap_or_default()
487                .to_string();
488            let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
489            return if let QueryResponse::RuntimeConfigReset(result) = resp {
490                Ok(runtime_config_result_to_pickle(result))
491            } else {
492                Ok(PickleValue::None)
493            };
494        }
495    }
496
497    if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
498        if clear_val == "backbone_peer_state" {
499            let interface_name = required_string(request, "interface")?;
500            let peer_ip = required_string(request, "ip")?;
501            let peer_ip = peer_ip
502                .parse()
503                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
504            let resp = send_query(
505                event_tx,
506                QueryRequest::ClearBackbonePeerState {
507                    interface_name,
508                    peer_ip,
509                },
510            )?;
511            return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
512                Ok(PickleValue::Bool(ok))
513            } else {
514                Ok(PickleValue::None)
515            };
516        }
517    }
518
519    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
520        if set_val == "backbone_peer_blacklist" {
521            let interface_name = required_string(request, "interface")?;
522            let peer_ip = required_string(request, "ip")?;
523            let peer_ip: IpAddr = peer_ip
524                .parse()
525                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
526            let duration_secs = request
527                .get("duration_secs")
528                .and_then(|v| v.as_int())
529                .ok_or_else(|| {
530                    io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
531                })?;
532            let reason = request
533                .get("reason")
534                .and_then(|v| v.as_str())
535                .unwrap_or("sentinel blacklist")
536                .to_string();
537            let penalty_level = request
538                .get("penalty_level")
539                .and_then(|v| v.as_int())
540                .unwrap_or(0)
541                .clamp(0, u8::MAX as i64) as u8;
542            let resp = send_query(
543                event_tx,
544                QueryRequest::BlacklistBackbonePeer {
545                    interface_name,
546                    peer_ip,
547                    duration: Duration::from_secs(duration_secs as u64),
548                    reason,
549                    penalty_level,
550                },
551            )?;
552            return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
553                Ok(PickleValue::Bool(ok))
554            } else {
555                Ok(PickleValue::None)
556            };
557        }
558    }
559
560    // Handle "request_path" -- trigger a path request to the network
561    if let Some(hash_val) = request.get("request_path") {
562        if let Some(hash_bytes) = hash_val.as_bytes() {
563            if hash_bytes.len() >= 16 {
564                let mut dest_hash = [0u8; 16];
565                dest_hash.copy_from_slice(&hash_bytes[..16]);
566                let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
567                return Ok(PickleValue::Bool(true));
568            }
569        }
570    }
571
572    // Handle "send_probe" requests
573    if let Some(hash_val) = request.get("send_probe") {
574        if let Some(hash_bytes) = hash_val.as_bytes() {
575            if hash_bytes.len() >= 16 {
576                let mut dest_hash = [0u8; 16];
577                dest_hash.copy_from_slice(&hash_bytes[..16]);
578                let payload_size = request
579                    .get("size")
580                    .and_then(|v| v.as_int())
581                    .and_then(|n| {
582                        if n > 0 && n <= 400 {
583                            Some(n as usize)
584                        } else {
585                            None
586                        }
587                    })
588                    .unwrap_or(16);
589                let resp = send_query(
590                    event_tx,
591                    QueryRequest::SendProbe {
592                        dest_hash,
593                        payload_size,
594                    },
595                )?;
596                if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
597                    return Ok(PickleValue::Dict(vec![
598                        (
599                            PickleValue::String("packet_hash".into()),
600                            PickleValue::Bytes(packet_hash.to_vec()),
601                        ),
602                        (
603                            PickleValue::String("hops".into()),
604                            PickleValue::Int(hops as i64),
605                        ),
606                    ]));
607                } else {
608                    return Ok(PickleValue::None);
609                }
610            }
611        }
612    }
613
614    // Handle "check_proof" requests
615    if let Some(hash_val) = request.get("check_proof") {
616        if let Some(hash_bytes) = hash_val.as_bytes() {
617            if hash_bytes.len() >= 32 {
618                let mut packet_hash = [0u8; 32];
619                packet_hash.copy_from_slice(&hash_bytes[..32]);
620                let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
621                if let QueryResponse::CheckProof(Some(rtt)) = resp {
622                    return Ok(PickleValue::Float(rtt));
623                } else {
624                    return Ok(PickleValue::None);
625                }
626            }
627        }
628    }
629
630    // Handle "blackhole" requests
631    if let Some(hash_val) = request.get("blackhole") {
632        if let Some(hash_bytes) = hash_val.as_bytes() {
633            if hash_bytes.len() >= 16 {
634                let mut identity_hash = [0u8; 16];
635                identity_hash.copy_from_slice(&hash_bytes[..16]);
636                let duration_hours = request.get("duration").and_then(|v| v.as_float());
637                let reason = request
638                    .get("reason")
639                    .and_then(|v| v.as_str())
640                    .map(|s| s.to_string());
641                let resp = send_query(
642                    event_tx,
643                    QueryRequest::BlackholeIdentity {
644                        identity_hash,
645                        duration_hours,
646                        reason,
647                    },
648                )?;
649                return Ok(PickleValue::Bool(matches!(
650                    resp,
651                    QueryResponse::BlackholeResult(true)
652                )));
653            }
654        }
655    }
656
657    // Handle "unblackhole" requests
658    if let Some(hash_val) = request.get("unblackhole") {
659        if let Some(hash_bytes) = hash_val.as_bytes() {
660            if hash_bytes.len() >= 16 {
661                let mut identity_hash = [0u8; 16];
662                identity_hash.copy_from_slice(&hash_bytes[..16]);
663                let resp = send_query(
664                    event_tx,
665                    QueryRequest::UnblackholeIdentity { identity_hash },
666                )?;
667                return Ok(PickleValue::Bool(matches!(
668                    resp,
669                    QueryResponse::UnblackholeResult(true)
670                )));
671            }
672        }
673    }
674
675    // Handle "drop" requests
676    if let Some(drop_val) = request.get("drop") {
677        if let Some(path) = drop_val.as_str() {
678            return match path {
679                "path" => {
680                    let hash = extract_dest_hash(request, "destination_hash")?;
681                    let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
682                    if let QueryResponse::DropPath(ok) = resp {
683                        Ok(PickleValue::Bool(ok))
684                    } else {
685                        Ok(PickleValue::None)
686                    }
687                }
688                "all_via" => {
689                    let hash = extract_dest_hash(request, "destination_hash")?;
690                    let resp = send_query(
691                        event_tx,
692                        QueryRequest::DropAllVia {
693                            transport_hash: hash,
694                        },
695                    )?;
696                    if let QueryResponse::DropAllVia(n) = resp {
697                        Ok(PickleValue::Int(n as i64))
698                    } else {
699                        Ok(PickleValue::None)
700                    }
701                }
702                "announce_queues" => {
703                    let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
704                    if let QueryResponse::DropAnnounceQueues = resp {
705                        Ok(PickleValue::Bool(true))
706                    } else {
707                        Ok(PickleValue::None)
708                    }
709                }
710                _ => Ok(PickleValue::None),
711            };
712        }
713    }
714
715    if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
716        return handle_hook_rpc_request(hook_val, request, event_tx);
717    }
718
719    Ok(PickleValue::None)
720}
721
722fn handle_hook_rpc_request(
723    op: &str,
724    request: &PickleValue,
725    event_tx: &EventSender,
726) -> io::Result<PickleValue> {
727    match op {
728        "load" => {
729            let name = required_string(request, "name")?;
730            let attach_point = required_string(request, "attach_point")?;
731            let priority = request
732                .get("priority")
733                .and_then(|v| v.as_int())
734                .unwrap_or(0) as i32;
735            let wasm = request
736                .get("wasm")
737                .and_then(|v| v.as_bytes())
738                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
739                .to_vec();
740            let (response_tx, response_rx) = mpsc::channel();
741            event_tx
742                .send(Event::LoadHook {
743                    name,
744                    wasm_bytes: wasm,
745                    attach_point,
746                    priority,
747                    response_tx,
748                })
749                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
750            let response = response_rx
751                .recv_timeout(std::time::Duration::from_secs(5))
752                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
753            Ok(hook_result_to_pickle(response))
754        }
755        "unload" => {
756            let name = required_string(request, "name")?;
757            let attach_point = required_string(request, "attach_point")?;
758            let (response_tx, response_rx) = mpsc::channel();
759            event_tx
760                .send(Event::UnloadHook {
761                    name,
762                    attach_point,
763                    response_tx,
764                })
765                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
766            let response = response_rx
767                .recv_timeout(std::time::Duration::from_secs(5))
768                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
769            Ok(hook_result_to_pickle(response))
770        }
771        "enable" | "disable" => {
772            let name = required_string(request, "name")?;
773            let attach_point = required_string(request, "attach_point")?;
774            let enabled = op == "enable";
775            let (response_tx, response_rx) = mpsc::channel();
776            event_tx
777                .send(Event::SetHookEnabled {
778                    name,
779                    attach_point,
780                    enabled,
781                    response_tx,
782                })
783                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
784            let response = response_rx
785                .recv_timeout(std::time::Duration::from_secs(5))
786                .map_err(|_| {
787                    io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
788                })?;
789            Ok(hook_result_to_pickle(response))
790        }
791        "set_priority" => {
792            let name = required_string(request, "name")?;
793            let attach_point = required_string(request, "attach_point")?;
794            let priority = request
795                .get("priority")
796                .and_then(|v| v.as_int())
797                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
798                as i32;
799            let (response_tx, response_rx) = mpsc::channel();
800            event_tx
801                .send(Event::SetHookPriority {
802                    name,
803                    attach_point,
804                    priority,
805                    response_tx,
806                })
807                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
808            let response = response_rx
809                .recv_timeout(std::time::Duration::from_secs(5))
810                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
811            Ok(hook_result_to_pickle(response))
812        }
813        _ => Ok(PickleValue::None),
814    }
815}
816
817/// Send a query to the driver and wait for the response.
818fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
819    let (resp_tx, resp_rx) = mpsc::channel();
820    event_tx
821        .send(Event::Query(request, resp_tx))
822        .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
823    resp_rx
824        .recv_timeout(std::time::Duration::from_secs(5))
825        .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
826}
827
828/// Extract a 16-byte destination hash from a pickle dict field.
829fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
830    let bytes = request
831        .get(key)
832        .and_then(|v| v.as_bytes())
833        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
834    if bytes.len() < 16 {
835        return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
836    }
837    let mut hash = [0u8; 16];
838    hash.copy_from_slice(&bytes[..16]);
839    Ok(hash)
840}
841
842fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
843    request
844        .get(key)
845        .and_then(|v| v.as_str())
846        .map(|s| s.to_string())
847        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
848}
849
850fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
851    match result {
852        Ok(()) => PickleValue::Dict(vec![(
853            PickleValue::String("ok".into()),
854            PickleValue::Bool(true),
855        )]),
856        Err(error) => PickleValue::Dict(vec![
857            (PickleValue::String("ok".into()), PickleValue::Bool(false)),
858            (
859                PickleValue::String("error".into()),
860                PickleValue::String(error),
861            ),
862        ]),
863    }
864}
865
866// --- Pickle response builders ---
867
868fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
869    let mut ifaces = Vec::new();
870    for iface in &stats.interfaces {
871        ifaces.push(single_iface_to_pickle(iface));
872    }
873
874    let mut dict = vec![
875        (
876            PickleValue::String("interfaces".into()),
877            PickleValue::List(ifaces),
878        ),
879        (
880            PickleValue::String("transport_enabled".into()),
881            PickleValue::Bool(stats.transport_enabled),
882        ),
883        (
884            PickleValue::String("transport_uptime".into()),
885            PickleValue::Float(stats.transport_uptime),
886        ),
887        (
888            PickleValue::String("rxb".into()),
889            PickleValue::Int(stats.total_rxb as i64),
890        ),
891        (
892            PickleValue::String("txb".into()),
893            PickleValue::Int(stats.total_txb as i64),
894        ),
895    ];
896
897    if let Some(tid) = stats.transport_id {
898        dict.push((
899            PickleValue::String("transport_id".into()),
900            PickleValue::Bytes(tid.to_vec()),
901        ));
902    } else {
903        dict.push((
904            PickleValue::String("transport_id".into()),
905            PickleValue::None,
906        ));
907    }
908
909    if let Some(pr) = stats.probe_responder {
910        dict.push((
911            PickleValue::String("probe_responder".into()),
912            PickleValue::Bytes(pr.to_vec()),
913        ));
914    } else {
915        dict.push((
916            PickleValue::String("probe_responder".into()),
917            PickleValue::None,
918        ));
919    }
920
921    PickleValue::Dict(dict)
922}
923
924fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
925    let mut dict = vec![
926        (
927            PickleValue::String("id".into()),
928            PickleValue::Int(s.id as i64),
929        ),
930        (
931            PickleValue::String("name".into()),
932            PickleValue::String(s.name.clone()),
933        ),
934        (
935            PickleValue::String("status".into()),
936            PickleValue::Bool(s.status),
937        ),
938        (
939            PickleValue::String("mode".into()),
940            PickleValue::Int(s.mode as i64),
941        ),
942        (
943            PickleValue::String("rxb".into()),
944            PickleValue::Int(s.rxb as i64),
945        ),
946        (
947            PickleValue::String("txb".into()),
948            PickleValue::Int(s.txb as i64),
949        ),
950        (
951            PickleValue::String("rx_packets".into()),
952            PickleValue::Int(s.rx_packets as i64),
953        ),
954        (
955            PickleValue::String("tx_packets".into()),
956            PickleValue::Int(s.tx_packets as i64),
957        ),
958        (
959            PickleValue::String("started".into()),
960            PickleValue::Float(s.started),
961        ),
962        (
963            PickleValue::String("ia_freq".into()),
964            PickleValue::Float(s.ia_freq),
965        ),
966        (
967            PickleValue::String("oa_freq".into()),
968            PickleValue::Float(s.oa_freq),
969        ),
970    ];
971
972    match s.bitrate {
973        Some(br) => dict.push((
974            PickleValue::String("bitrate".into()),
975            PickleValue::Int(br as i64),
976        )),
977        None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
978    }
979
980    match s.ifac_size {
981        Some(sz) => dict.push((
982            PickleValue::String("ifac_size".into()),
983            PickleValue::Int(sz as i64),
984        )),
985        None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
986    }
987
988    PickleValue::Dict(dict)
989}
990
991fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
992    let list: Vec<PickleValue> = entries
993        .iter()
994        .map(|e| {
995            PickleValue::Dict(vec![
996                (
997                    PickleValue::String("hash".into()),
998                    PickleValue::Bytes(e.hash.to_vec()),
999                ),
1000                (
1001                    PickleValue::String("timestamp".into()),
1002                    PickleValue::Float(e.timestamp),
1003                ),
1004                (
1005                    PickleValue::String("via".into()),
1006                    PickleValue::Bytes(e.via.to_vec()),
1007                ),
1008                (
1009                    PickleValue::String("hops".into()),
1010                    PickleValue::Int(e.hops as i64),
1011                ),
1012                (
1013                    PickleValue::String("expires".into()),
1014                    PickleValue::Float(e.expires),
1015                ),
1016                (
1017                    PickleValue::String("interface".into()),
1018                    PickleValue::String(e.interface_name.clone()),
1019                ),
1020            ])
1021        })
1022        .collect();
1023    PickleValue::List(list)
1024}
1025
1026fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1027    let list: Vec<PickleValue> = entries
1028        .iter()
1029        .map(|e| {
1030            PickleValue::Dict(vec![
1031                (
1032                    PickleValue::String("hash".into()),
1033                    PickleValue::Bytes(e.hash.to_vec()),
1034                ),
1035                (
1036                    PickleValue::String("last".into()),
1037                    PickleValue::Float(e.last),
1038                ),
1039                (
1040                    PickleValue::String("rate_violations".into()),
1041                    PickleValue::Int(e.rate_violations as i64),
1042                ),
1043                (
1044                    PickleValue::String("blocked_until".into()),
1045                    PickleValue::Float(e.blocked_until),
1046                ),
1047                (
1048                    PickleValue::String("timestamps".into()),
1049                    PickleValue::List(
1050                        e.timestamps
1051                            .iter()
1052                            .map(|&t| PickleValue::Float(t))
1053                            .collect(),
1054                    ),
1055                ),
1056            ])
1057        })
1058        .collect();
1059    PickleValue::List(list)
1060}
1061
1062fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1063    let list: Vec<PickleValue> = entries
1064        .iter()
1065        .map(|e| {
1066            let mut dict = vec![
1067                (
1068                    PickleValue::String("identity_hash".into()),
1069                    PickleValue::Bytes(e.identity_hash.to_vec()),
1070                ),
1071                (
1072                    PickleValue::String("created".into()),
1073                    PickleValue::Float(e.created),
1074                ),
1075                (
1076                    PickleValue::String("expires".into()),
1077                    PickleValue::Float(e.expires),
1078                ),
1079            ];
1080            if let Some(ref reason) = e.reason {
1081                dict.push((
1082                    PickleValue::String("reason".into()),
1083                    PickleValue::String(reason.clone()),
1084                ));
1085            } else {
1086                dict.push((PickleValue::String("reason".into()), PickleValue::None));
1087            }
1088            PickleValue::Dict(dict)
1089        })
1090        .collect();
1091    PickleValue::List(list)
1092}
1093
1094fn discovered_interfaces_to_pickle(
1095    interfaces: &[crate::discovery::DiscoveredInterface],
1096) -> PickleValue {
1097    let list: Vec<PickleValue> = interfaces
1098        .iter()
1099        .map(|iface| {
1100            let mut dict = vec![
1101                (
1102                    PickleValue::String("type".into()),
1103                    PickleValue::String(iface.interface_type.clone()),
1104                ),
1105                (
1106                    PickleValue::String("transport".into()),
1107                    PickleValue::Bool(iface.transport),
1108                ),
1109                (
1110                    PickleValue::String("name".into()),
1111                    PickleValue::String(iface.name.clone()),
1112                ),
1113                (
1114                    PickleValue::String("discovered".into()),
1115                    PickleValue::Float(iface.discovered),
1116                ),
1117                (
1118                    PickleValue::String("last_heard".into()),
1119                    PickleValue::Float(iface.last_heard),
1120                ),
1121                (
1122                    PickleValue::String("heard_count".into()),
1123                    PickleValue::Int(iface.heard_count as i64),
1124                ),
1125                (
1126                    PickleValue::String("status".into()),
1127                    PickleValue::String(iface.status.as_str().into()),
1128                ),
1129                (
1130                    PickleValue::String("stamp".into()),
1131                    PickleValue::Bytes(iface.stamp.clone()),
1132                ),
1133                (
1134                    PickleValue::String("value".into()),
1135                    PickleValue::Int(iface.stamp_value as i64),
1136                ),
1137                (
1138                    PickleValue::String("transport_id".into()),
1139                    PickleValue::Bytes(iface.transport_id.to_vec()),
1140                ),
1141                (
1142                    PickleValue::String("network_id".into()),
1143                    PickleValue::Bytes(iface.network_id.to_vec()),
1144                ),
1145                (
1146                    PickleValue::String("hops".into()),
1147                    PickleValue::Int(iface.hops as i64),
1148                ),
1149            ];
1150
1151            // Optional location fields
1152            if let Some(v) = iface.latitude {
1153                dict.push((
1154                    PickleValue::String("latitude".into()),
1155                    PickleValue::Float(v),
1156                ));
1157            } else {
1158                dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1159            }
1160            if let Some(v) = iface.longitude {
1161                dict.push((
1162                    PickleValue::String("longitude".into()),
1163                    PickleValue::Float(v),
1164                ));
1165            } else {
1166                dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1167            }
1168            if let Some(v) = iface.height {
1169                dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1170            } else {
1171                dict.push((PickleValue::String("height".into()), PickleValue::None));
1172            }
1173
1174            // Connection info
1175            if let Some(ref v) = iface.reachable_on {
1176                dict.push((
1177                    PickleValue::String("reachable_on".into()),
1178                    PickleValue::String(v.clone()),
1179                ));
1180            } else {
1181                dict.push((
1182                    PickleValue::String("reachable_on".into()),
1183                    PickleValue::None,
1184                ));
1185            }
1186            if let Some(v) = iface.port {
1187                dict.push((
1188                    PickleValue::String("port".into()),
1189                    PickleValue::Int(v as i64),
1190                ));
1191            } else {
1192                dict.push((PickleValue::String("port".into()), PickleValue::None));
1193            }
1194
1195            // RNode/RF specific
1196            if let Some(v) = iface.frequency {
1197                dict.push((
1198                    PickleValue::String("frequency".into()),
1199                    PickleValue::Int(v as i64),
1200                ));
1201            } else {
1202                dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1203            }
1204            if let Some(v) = iface.bandwidth {
1205                dict.push((
1206                    PickleValue::String("bandwidth".into()),
1207                    PickleValue::Int(v as i64),
1208                ));
1209            } else {
1210                dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1211            }
1212            if let Some(v) = iface.spreading_factor {
1213                dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1214            } else {
1215                dict.push((PickleValue::String("sf".into()), PickleValue::None));
1216            }
1217            if let Some(v) = iface.coding_rate {
1218                dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1219            } else {
1220                dict.push((PickleValue::String("cr".into()), PickleValue::None));
1221            }
1222            if let Some(ref v) = iface.modulation {
1223                dict.push((
1224                    PickleValue::String("modulation".into()),
1225                    PickleValue::String(v.clone()),
1226                ));
1227            } else {
1228                dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1229            }
1230            if let Some(v) = iface.channel {
1231                dict.push((
1232                    PickleValue::String("channel".into()),
1233                    PickleValue::Int(v as i64),
1234                ));
1235            } else {
1236                dict.push((PickleValue::String("channel".into()), PickleValue::None));
1237            }
1238
1239            // IFAC info
1240            if let Some(ref v) = iface.ifac_netname {
1241                dict.push((
1242                    PickleValue::String("ifac_netname".into()),
1243                    PickleValue::String(v.clone()),
1244                ));
1245            } else {
1246                dict.push((
1247                    PickleValue::String("ifac_netname".into()),
1248                    PickleValue::None,
1249                ));
1250            }
1251            if let Some(ref v) = iface.ifac_netkey {
1252                dict.push((
1253                    PickleValue::String("ifac_netkey".into()),
1254                    PickleValue::String(v.clone()),
1255                ));
1256            } else {
1257                dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1258            }
1259
1260            // Config entry
1261            if let Some(ref v) = iface.config_entry {
1262                dict.push((
1263                    PickleValue::String("config_entry".into()),
1264                    PickleValue::String(v.clone()),
1265                ));
1266            } else {
1267                dict.push((
1268                    PickleValue::String("config_entry".into()),
1269                    PickleValue::None,
1270                ));
1271            }
1272
1273            dict.push((
1274                PickleValue::String("discovery_hash".into()),
1275                PickleValue::Bytes(iface.discovery_hash.to_vec()),
1276            ));
1277
1278            PickleValue::Dict(dict)
1279        })
1280        .collect();
1281    PickleValue::List(list)
1282}
1283
1284fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1285    PickleValue::List(
1286        hooks
1287            .iter()
1288            .map(|hook| {
1289                PickleValue::Dict(vec![
1290                    (
1291                        PickleValue::String("name".into()),
1292                        PickleValue::String(hook.name.clone()),
1293                    ),
1294                    (
1295                        PickleValue::String("attach_point".into()),
1296                        PickleValue::String(hook.attach_point.clone()),
1297                    ),
1298                    (
1299                        PickleValue::String("priority".into()),
1300                        PickleValue::Int(hook.priority as i64),
1301                    ),
1302                    (
1303                        PickleValue::String("enabled".into()),
1304                        PickleValue::Bool(hook.enabled),
1305                    ),
1306                    (
1307                        PickleValue::String("consecutive_traps".into()),
1308                        PickleValue::Int(hook.consecutive_traps as i64),
1309                    ),
1310                ])
1311            })
1312            .collect(),
1313    )
1314}
1315
1316fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1317    PickleValue::List(
1318        entries
1319            .iter()
1320            .map(|entry| {
1321                PickleValue::Dict(vec![
1322                    (
1323                        PickleValue::String("interface".into()),
1324                        PickleValue::String(entry.interface_name.clone()),
1325                    ),
1326                    (
1327                        PickleValue::String("ip".into()),
1328                        PickleValue::String(entry.peer_ip.to_string()),
1329                    ),
1330                    (
1331                        PickleValue::String("connected_count".into()),
1332                        PickleValue::Int(entry.connected_count as i64),
1333                    ),
1334                    (
1335                        PickleValue::String("blacklisted_remaining_secs".into()),
1336                        entry
1337                            .blacklisted_remaining_secs
1338                            .map(PickleValue::Float)
1339                            .unwrap_or(PickleValue::None),
1340                    ),
1341                    (
1342                        PickleValue::String("blacklist_reason".into()),
1343                        entry
1344                            .blacklist_reason
1345                            .as_ref()
1346                            .map(|v: &String| PickleValue::String(v.clone()))
1347                            .unwrap_or(PickleValue::None),
1348                    ),
1349                    (
1350                        PickleValue::String("reject_count".into()),
1351                        PickleValue::Int(entry.reject_count as i64),
1352                    ),
1353                ])
1354            })
1355            .collect(),
1356    )
1357}
1358
1359fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1360    PickleValue::List(
1361        entries
1362            .iter()
1363            .map(|entry| {
1364                PickleValue::Dict(vec![
1365                    (
1366                        PickleValue::String("id".into()),
1367                        PickleValue::Int(entry.interface_id.0 as i64),
1368                    ),
1369                    (
1370                        PickleValue::String("name".into()),
1371                        PickleValue::String(entry.interface_name.clone()),
1372                    ),
1373                ])
1374            })
1375            .collect(),
1376    )
1377}
1378
1379fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1380    PickleValue::Dict(vec![
1381        (
1382            PickleValue::String("connected".into()),
1383            PickleValue::Bool(stats.connected),
1384        ),
1385        (
1386            PickleValue::String("consumer_count".into()),
1387            PickleValue::Int(stats.consumer_count as i64),
1388        ),
1389        (
1390            PickleValue::String("queue_max_events".into()),
1391            PickleValue::Int(stats.queue_max_events as i64),
1392        ),
1393        (
1394            PickleValue::String("queue_max_bytes".into()),
1395            PickleValue::Int(stats.queue_max_bytes as i64),
1396        ),
1397        (
1398            PickleValue::String("backlog_len".into()),
1399            PickleValue::Int(stats.backlog_len as i64),
1400        ),
1401        (
1402            PickleValue::String("backlog_bytes".into()),
1403            PickleValue::Int(stats.backlog_bytes as i64),
1404        ),
1405        (
1406            PickleValue::String("backlog_dropped_pending".into()),
1407            PickleValue::Int(stats.backlog_dropped_pending as i64),
1408        ),
1409        (
1410            PickleValue::String("backlog_dropped_total".into()),
1411            PickleValue::Int(stats.backlog_dropped_total as i64),
1412        ),
1413        (
1414            PickleValue::String("total_disconnect_count".into()),
1415            PickleValue::Int(stats.total_disconnect_count as i64),
1416        ),
1417        (
1418            PickleValue::String("consumers".into()),
1419            PickleValue::List(
1420                stats
1421                    .consumers
1422                    .iter()
1423                    .map(|consumer| {
1424                        PickleValue::Dict(vec![
1425                            (
1426                                PickleValue::String("id".into()),
1427                                PickleValue::Int(consumer.id as i64),
1428                            ),
1429                            (
1430                                PickleValue::String("connected".into()),
1431                                PickleValue::Bool(consumer.connected),
1432                            ),
1433                            (
1434                                PickleValue::String("queue_len".into()),
1435                                PickleValue::Int(consumer.queue_len as i64),
1436                            ),
1437                            (
1438                                PickleValue::String("queued_bytes".into()),
1439                                PickleValue::Int(consumer.queued_bytes as i64),
1440                            ),
1441                            (
1442                                PickleValue::String("dropped_pending".into()),
1443                                PickleValue::Int(consumer.dropped_pending as i64),
1444                            ),
1445                            (
1446                                PickleValue::String("dropped_total".into()),
1447                                PickleValue::Int(consumer.dropped_total as i64),
1448                            ),
1449                            (
1450                                PickleValue::String("queue_max_events".into()),
1451                                PickleValue::Int(consumer.queue_max_events as i64),
1452                            ),
1453                            (
1454                                PickleValue::String("queue_max_bytes".into()),
1455                                PickleValue::Int(consumer.queue_max_bytes as i64),
1456                            ),
1457                        ])
1458                    })
1459                    .collect(),
1460            ),
1461        ),
1462    ])
1463}
1464
1465fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1466    match state {
1467        LifecycleState::Active => "active",
1468        LifecycleState::Draining => "draining",
1469        LifecycleState::Stopping => "stopping",
1470        LifecycleState::Stopped => "stopped",
1471    }
1472}
1473
1474fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1475    PickleValue::Dict(vec![
1476        (
1477            PickleValue::String("state".into()),
1478            PickleValue::String(lifecycle_state_name(status.state).into()),
1479        ),
1480        (
1481            PickleValue::String("drain_age_seconds".into()),
1482            status
1483                .drain_age_seconds
1484                .map(PickleValue::Float)
1485                .unwrap_or(PickleValue::None),
1486        ),
1487        (
1488            PickleValue::String("deadline_remaining_seconds".into()),
1489            status
1490                .deadline_remaining_seconds
1491                .map(PickleValue::Float)
1492                .unwrap_or(PickleValue::None),
1493        ),
1494        (
1495            PickleValue::String("drain_complete".into()),
1496            PickleValue::Bool(status.drain_complete),
1497        ),
1498        (
1499            PickleValue::String("interface_writer_queued_frames".into()),
1500            PickleValue::Int(status.interface_writer_queued_frames as i64),
1501        ),
1502        (
1503            PickleValue::String("provider_backlog_events".into()),
1504            PickleValue::Int(status.provider_backlog_events as i64),
1505        ),
1506        (
1507            PickleValue::String("provider_consumer_queued_events".into()),
1508            PickleValue::Int(status.provider_consumer_queued_events as i64),
1509        ),
1510        (
1511            PickleValue::String("detail".into()),
1512            status
1513                .detail
1514                .as_ref()
1515                .map(|detail| PickleValue::String(detail.clone()))
1516                .unwrap_or(PickleValue::None),
1517        ),
1518    ])
1519}
1520
1521fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1522    match value {
1523        RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1524        RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1525        RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1526        RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1527        RuntimeConfigValue::Null => PickleValue::None,
1528    }
1529}
1530
1531fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1532    match value {
1533        PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1534        PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1535        PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1536        PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1537        PickleValue::None => Some(RuntimeConfigValue::Null),
1538        _ => None,
1539    }
1540}
1541
1542fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1543    PickleValue::Dict(vec![
1544        (
1545            PickleValue::String("key".into()),
1546            PickleValue::String(entry.key.clone()),
1547        ),
1548        (
1549            PickleValue::String("value".into()),
1550            runtime_config_value_to_pickle(&entry.value),
1551        ),
1552        (
1553            PickleValue::String("default".into()),
1554            runtime_config_value_to_pickle(&entry.default),
1555        ),
1556        (
1557            PickleValue::String("source".into()),
1558            PickleValue::String(match entry.source {
1559                RuntimeConfigSource::Startup => "startup".into(),
1560                RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1561            }),
1562        ),
1563        (
1564            PickleValue::String("apply_mode".into()),
1565            PickleValue::String(match entry.apply_mode {
1566                RuntimeConfigApplyMode::Immediate => "immediate".into(),
1567                RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1568                RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1569                RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1570            }),
1571        ),
1572        (
1573            PickleValue::String("description".into()),
1574            entry
1575                .description
1576                .as_ref()
1577                .map(|v| PickleValue::String(v.clone()))
1578                .unwrap_or(PickleValue::None),
1579        ),
1580    ])
1581}
1582
1583fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1584    PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1585}
1586
1587fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1588    PickleValue::Dict(vec![
1589        (
1590            PickleValue::String("error".into()),
1591            PickleValue::String(match error.code {
1592                RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1593                RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1594                RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1595                RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1596                RuntimeConfigErrorCode::NotFound => "not_found".into(),
1597                RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1598            }),
1599        ),
1600        (
1601            PickleValue::String("message".into()),
1602            PickleValue::String(error.message.clone()),
1603        ),
1604    ])
1605}
1606
1607fn runtime_config_result_to_pickle(
1608    result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1609) -> PickleValue {
1610    match result {
1611        Ok(entry) => runtime_config_entry_to_pickle(&entry),
1612        Err(error) => runtime_config_error_to_pickle(&error),
1613    }
1614}
1615
1616// --- RPC Client ---
1617
1618/// RPC client for connecting to a running daemon.
1619pub struct RpcClient {
1620    stream: TcpStream,
1621}
1622
1623impl RpcClient {
1624    /// Connect to an RPC server and authenticate.
1625    pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
1626        let mut stream = match addr {
1627            RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
1628        };
1629
1630        stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
1631        stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
1632
1633        // Client-side authentication
1634        client_auth(&mut stream, auth_key)?;
1635
1636        Ok(RpcClient { stream })
1637    }
1638
1639    /// Send a pickle request and receive a pickle response.
1640    pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
1641        let request_bytes = pickle::encode(request);
1642        send_bytes(&mut self.stream, &request_bytes)?;
1643
1644        let response_bytes = recv_bytes(&mut self.stream)?;
1645        pickle::decode(&response_bytes)
1646            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
1647    }
1648
1649    pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
1650        let response = self.call(&PickleValue::Dict(vec![(
1651            PickleValue::String("get".into()),
1652            PickleValue::String("hooks".into()),
1653        )]))?;
1654        parse_hook_list(&response)
1655    }
1656
1657    pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
1658        let response = self.call(&PickleValue::Dict(vec![(
1659            PickleValue::String("begin_drain".into()),
1660            PickleValue::Float(timeout.as_secs_f64()),
1661        )]))?;
1662        Ok(response.as_bool().unwrap_or(false))
1663    }
1664
1665    pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
1666        let response = self.call(&PickleValue::Dict(vec![(
1667            PickleValue::String("get".into()),
1668            PickleValue::String("drain_status".into()),
1669        )]))?;
1670        parse_drain_status(&response)
1671    }
1672
1673    pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
1674        self.call(&PickleValue::Dict(vec![(
1675            PickleValue::String("get".into()),
1676            PickleValue::String("provider_bridge_stats".into()),
1677        )]))
1678    }
1679
1680    pub fn load_hook(
1681        &mut self,
1682        name: &str,
1683        attach_point: &str,
1684        priority: i32,
1685        wasm: &[u8],
1686    ) -> io::Result<Result<(), String>> {
1687        let response = self.call(&PickleValue::Dict(vec![
1688            (
1689                PickleValue::String("hook".into()),
1690                PickleValue::String("load".into()),
1691            ),
1692            (
1693                PickleValue::String("name".into()),
1694                PickleValue::String(name.to_string()),
1695            ),
1696            (
1697                PickleValue::String("attach_point".into()),
1698                PickleValue::String(attach_point.to_string()),
1699            ),
1700            (
1701                PickleValue::String("priority".into()),
1702                PickleValue::Int(priority as i64),
1703            ),
1704            (
1705                PickleValue::String("wasm".into()),
1706                PickleValue::Bytes(wasm.to_vec()),
1707            ),
1708        ]))?;
1709        parse_hook_result(&response)
1710    }
1711
1712    pub fn unload_hook(
1713        &mut self,
1714        name: &str,
1715        attach_point: &str,
1716    ) -> io::Result<Result<(), String>> {
1717        let response = self.call(&PickleValue::Dict(vec![
1718            (
1719                PickleValue::String("hook".into()),
1720                PickleValue::String("unload".into()),
1721            ),
1722            (
1723                PickleValue::String("name".into()),
1724                PickleValue::String(name.to_string()),
1725            ),
1726            (
1727                PickleValue::String("attach_point".into()),
1728                PickleValue::String(attach_point.to_string()),
1729            ),
1730        ]))?;
1731        parse_hook_result(&response)
1732    }
1733
1734    pub fn set_hook_enabled(
1735        &mut self,
1736        name: &str,
1737        attach_point: &str,
1738        enabled: bool,
1739    ) -> io::Result<Result<(), String>> {
1740        let op = if enabled { "enable" } else { "disable" };
1741        let response = self.call(&PickleValue::Dict(vec![
1742            (
1743                PickleValue::String("hook".into()),
1744                PickleValue::String(op.into()),
1745            ),
1746            (
1747                PickleValue::String("name".into()),
1748                PickleValue::String(name.to_string()),
1749            ),
1750            (
1751                PickleValue::String("attach_point".into()),
1752                PickleValue::String(attach_point.to_string()),
1753            ),
1754        ]))?;
1755        parse_hook_result(&response)
1756    }
1757
1758    pub fn set_hook_priority(
1759        &mut self,
1760        name: &str,
1761        attach_point: &str,
1762        priority: i32,
1763    ) -> io::Result<Result<(), String>> {
1764        let response = self.call(&PickleValue::Dict(vec![
1765            (
1766                PickleValue::String("hook".into()),
1767                PickleValue::String("set_priority".into()),
1768            ),
1769            (
1770                PickleValue::String("name".into()),
1771                PickleValue::String(name.to_string()),
1772            ),
1773            (
1774                PickleValue::String("attach_point".into()),
1775                PickleValue::String(attach_point.to_string()),
1776            ),
1777            (
1778                PickleValue::String("priority".into()),
1779                PickleValue::Int(priority as i64),
1780            ),
1781        ]))?;
1782        parse_hook_result(&response)
1783    }
1784
1785    pub fn blacklist_backbone_peer(
1786        &mut self,
1787        interface: &str,
1788        ip: &str,
1789        duration_secs: u64,
1790        reason: Option<&str>,
1791        penalty_level: Option<u8>,
1792    ) -> io::Result<bool> {
1793        let mut request = vec![
1794            (
1795                PickleValue::String("set".into()),
1796                PickleValue::String("backbone_peer_blacklist".into()),
1797            ),
1798            (
1799                PickleValue::String("interface".into()),
1800                PickleValue::String(interface.to_string()),
1801            ),
1802            (
1803                PickleValue::String("ip".into()),
1804                PickleValue::String(ip.to_string()),
1805            ),
1806            (
1807                PickleValue::String("duration_secs".into()),
1808                PickleValue::Int(duration_secs as i64),
1809            ),
1810        ];
1811        if let Some(reason) = reason {
1812            request.push((
1813                PickleValue::String("reason".into()),
1814                PickleValue::String(reason.to_string()),
1815            ));
1816        }
1817        if let Some(level) = penalty_level {
1818            request.push((
1819                PickleValue::String("penalty_level".into()),
1820                PickleValue::Int(level as i64),
1821            ));
1822        }
1823        let response = self.call(&PickleValue::Dict(request))?;
1824        Ok(response.as_bool().unwrap_or(false))
1825    }
1826}
1827
1828fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
1829    match value {
1830        "active" => Some(LifecycleState::Active),
1831        "draining" => Some(LifecycleState::Draining),
1832        "stopping" => Some(LifecycleState::Stopping),
1833        "stopped" => Some(LifecycleState::Stopped),
1834        _ => None,
1835    }
1836}
1837
1838fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
1839    if !matches!(value, PickleValue::Dict(_)) {
1840        return Ok(None);
1841    }
1842    let state = value
1843        .get("state")
1844        .and_then(|entry| entry.as_str())
1845        .and_then(parse_lifecycle_state)
1846        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
1847    let drain_age_seconds = value
1848        .get("drain_age_seconds")
1849        .and_then(|entry| entry.as_float().or_else(|| entry.as_int().map(|v| v as f64)));
1850    let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
1851        entry
1852            .as_float()
1853            .or_else(|| entry.as_int().map(|v| v as f64))
1854    });
1855    let drain_complete = value
1856        .get("drain_complete")
1857        .and_then(|entry| entry.as_bool())
1858        .unwrap_or(false);
1859    let interface_writer_queued_frames = value
1860        .get("interface_writer_queued_frames")
1861        .and_then(|entry| entry.as_int())
1862        .unwrap_or(0)
1863        .max(0) as usize;
1864    let provider_backlog_events = value
1865        .get("provider_backlog_events")
1866        .and_then(|entry| entry.as_int())
1867        .unwrap_or(0)
1868        .max(0) as usize;
1869    let provider_consumer_queued_events = value
1870        .get("provider_consumer_queued_events")
1871        .and_then(|entry| entry.as_int())
1872        .unwrap_or(0)
1873        .max(0) as usize;
1874    let detail = value
1875        .get("detail")
1876        .and_then(|entry| entry.as_str().map(|v| v.to_string()));
1877    Ok(Some(DrainStatus {
1878        state,
1879        drain_age_seconds,
1880        deadline_remaining_seconds,
1881        drain_complete,
1882        interface_writer_queued_frames,
1883        provider_backlog_events,
1884        provider_consumer_queued_events,
1885        detail,
1886    }))
1887}
1888
1889fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
1890    let ok = response
1891        .get("ok")
1892        .and_then(|v| v.as_bool())
1893        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
1894    if ok {
1895        Ok(Ok(()))
1896    } else {
1897        Ok(Err(response
1898            .get("error")
1899            .and_then(|v| v.as_str())
1900            .unwrap_or("unknown hook error")
1901            .to_string()))
1902    }
1903}
1904
1905fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
1906    let list = response
1907        .as_list()
1908        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
1909    let mut hooks = Vec::with_capacity(list.len());
1910    for item in list {
1911        hooks.push(HookInfo {
1912            name: item
1913                .get("name")
1914                .and_then(|v| v.as_str())
1915                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
1916                .to_string(),
1917            attach_point: item
1918                .get("attach_point")
1919                .and_then(|v| v.as_str())
1920                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
1921                .to_string(),
1922            priority: item
1923                .get("priority")
1924                .and_then(|v| v.as_int())
1925                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
1926                as i32,
1927            enabled: item
1928                .get("enabled")
1929                .and_then(|v| v.as_bool())
1930                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
1931            consecutive_traps: item
1932                .get("consecutive_traps")
1933                .and_then(|v| v.as_int())
1934                .ok_or_else(|| {
1935                    io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
1936                })? as u32,
1937        });
1938    }
1939    Ok(hooks)
1940}
1941
1942/// Client-side authentication: answer the server's challenge.
1943fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
1944    // Read challenge
1945    let challenge = recv_bytes(stream)?;
1946
1947    if !challenge.starts_with(CHALLENGE_PREFIX) {
1948        return Err(io::Error::new(
1949            io::ErrorKind::InvalidData,
1950            "expected challenge",
1951        ));
1952    }
1953
1954    let message = &challenge[CHALLENGE_PREFIX.len()..];
1955
1956    // Create HMAC response
1957    let response = create_response(auth_key, message);
1958    send_bytes(stream, &response)?;
1959
1960    // Read welcome/failure
1961    let result = recv_bytes(stream)?;
1962    if result == WELCOME {
1963        Ok(())
1964    } else {
1965        Err(io::Error::new(
1966            io::ErrorKind::PermissionDenied,
1967            "authentication failed",
1968        ))
1969    }
1970}
1971
1972/// Create an HMAC response to a challenge message.
1973fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
1974    // Check if message has {sha256} prefix (modern protocol)
1975    if message.starts_with(b"{sha256}") || message.len() > 20 {
1976        // Modern protocol: use HMAC-SHA256 with {sha256} prefix
1977        let digest = hmac_sha256(auth_key, message);
1978        let mut response = Vec::with_capacity(8 + 32);
1979        response.extend_from_slice(b"{sha256}");
1980        response.extend_from_slice(&digest);
1981        response
1982    } else {
1983        // Legacy protocol: raw HMAC-MD5
1984        let digest = hmac_md5(auth_key, message);
1985        digest.to_vec()
1986    }
1987}
1988
1989/// Derive the RPC auth key from transport identity private key.
1990pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
1991    sha256(private_key)
1992}
1993
1994#[cfg(test)]
1995mod tests {
1996    use super::*;
1997
1998    #[test]
1999    fn send_recv_bytes_roundtrip() {
2000        let (mut c1, mut c2) = tcp_pair();
2001        let data = b"hello world";
2002        send_bytes(&mut c1, data).unwrap();
2003        let received = recv_bytes(&mut c2).unwrap();
2004        assert_eq!(&received, data);
2005    }
2006
2007    #[test]
2008    fn send_recv_empty() {
2009        let (mut c1, mut c2) = tcp_pair();
2010        send_bytes(&mut c1, b"").unwrap();
2011        let received = recv_bytes(&mut c2).unwrap();
2012        assert!(received.is_empty());
2013    }
2014
2015    #[test]
2016    fn auth_success() {
2017        let key = derive_auth_key(b"test-private-key");
2018        let (mut server, mut client) = tcp_pair();
2019
2020        let key2 = key;
2021        let t = thread::spawn(move || {
2022            client_auth(&mut client, &key2).unwrap();
2023        });
2024
2025        server_auth(&mut server, &key).unwrap();
2026        t.join().unwrap();
2027    }
2028
2029    #[test]
2030    fn auth_failure_wrong_key() {
2031        let server_key = derive_auth_key(b"server-key");
2032        let client_key = derive_auth_key(b"wrong-key");
2033        let (mut server, mut client) = tcp_pair();
2034
2035        let t = thread::spawn(move || {
2036            let result = client_auth(&mut client, &client_key);
2037            assert!(result.is_err());
2038        });
2039
2040        let result = server_auth(&mut server, &server_key);
2041        assert!(result.is_err());
2042        t.join().unwrap();
2043    }
2044
2045    #[test]
2046    fn verify_sha256_response() {
2047        let key = derive_auth_key(b"mykey");
2048        let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2049        let response = create_response(&key, message);
2050        assert!(response.starts_with(b"{sha256}"));
2051        assert!(verify_response(&key, message, &response));
2052    }
2053
2054    #[test]
2055    fn verify_legacy_md5_response() {
2056        let key = derive_auth_key(b"mykey");
2057        // Legacy message: 20 bytes, no prefix
2058        let message = b"01234567890123456789";
2059        // Create legacy response (raw HMAC-MD5)
2060        let digest = hmac_md5(&key, message);
2061        assert!(verify_response(&key, message, &digest));
2062    }
2063
2064    #[test]
2065    fn constant_time_eq_works() {
2066        assert!(constant_time_eq(b"hello", b"hello"));
2067        assert!(!constant_time_eq(b"hello", b"world"));
2068        assert!(!constant_time_eq(b"hello", b"hell"));
2069    }
2070
2071    #[test]
2072    fn rpc_roundtrip() {
2073        let key = derive_auth_key(b"test-key");
2074        let (event_tx, event_rx) = crate::event::channel();
2075
2076        // Start server
2077        // Bind manually to get the actual port
2078        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2079        let port = listener.local_addr().unwrap().port();
2080        listener.set_nonblocking(true).unwrap();
2081
2082        let shutdown = Arc::new(AtomicBool::new(false));
2083        let shutdown2 = shutdown.clone();
2084
2085        // Driver thread that handles queries
2086        let driver_thread = thread::spawn(move || loop {
2087            match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2088                Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2089                    let _ = resp_tx.send(QueryResponse::LinkCount(42));
2090                }
2091                Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2092                    let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2093                        interfaces: vec![SingleInterfaceStat {
2094                            id: 7,
2095                            name: "TestInterface".into(),
2096                            status: true,
2097                            mode: 1,
2098                            rxb: 1000,
2099                            txb: 2000,
2100                            rx_packets: 10,
2101                            tx_packets: 20,
2102                            bitrate: Some(10_000_000),
2103                            ifac_size: None,
2104                            started: 1000.0,
2105                            ia_freq: 0.0,
2106                            oa_freq: 0.0,
2107                            interface_type: "TestInterface".into(),
2108                        }],
2109                        transport_id: None,
2110                        transport_enabled: true,
2111                        transport_uptime: 3600.0,
2112                        total_rxb: 1000,
2113                        total_txb: 2000,
2114                        probe_responder: None,
2115                    }));
2116                }
2117                _ => break,
2118            }
2119        });
2120
2121        let key2 = key;
2122        let shutdown3 = shutdown2.clone();
2123        let server_thread = thread::spawn(move || {
2124            rpc_server_loop(listener, key2, event_tx, shutdown3);
2125        });
2126
2127        // Give server time to start
2128        thread::sleep(std::time::Duration::from_millis(50));
2129
2130        // Client: connect and query link count
2131        let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2132        let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2133        let response = client
2134            .call(&PickleValue::Dict(vec![(
2135                PickleValue::String("get".into()),
2136                PickleValue::String("link_count".into()),
2137            )]))
2138            .unwrap();
2139        assert_eq!(response.as_int().unwrap(), 42);
2140        drop(client);
2141
2142        // Client: query interface stats
2143        let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2144        let response2 = client2
2145            .call(&PickleValue::Dict(vec![(
2146                PickleValue::String("get".into()),
2147                PickleValue::String("interface_stats".into()),
2148            )]))
2149            .unwrap();
2150        let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2151        assert_eq!(ifaces.len(), 1);
2152        let iface = &ifaces[0];
2153        assert_eq!(
2154            iface.get("name").unwrap().as_str().unwrap(),
2155            "TestInterface"
2156        );
2157        assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2158        drop(client2);
2159
2160        // Shutdown
2161        shutdown2.store(true, Ordering::Relaxed);
2162        server_thread.join().unwrap();
2163        driver_thread.join().unwrap();
2164    }
2165
2166    #[test]
2167    fn derive_auth_key_deterministic() {
2168        let key1 = derive_auth_key(b"test");
2169        let key2 = derive_auth_key(b"test");
2170        assert_eq!(key1, key2);
2171        // Different input → different key
2172        let key3 = derive_auth_key(b"other");
2173        assert_ne!(key1, key3);
2174    }
2175
2176    #[test]
2177    fn pickle_request_handling() {
2178        // Test the request → query translation without networking
2179        let (event_tx, event_rx) = crate::event::channel();
2180
2181        let driver = thread::spawn(move || {
2182            if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2183            {
2184                assert_eq!(dest_hash, [1u8; 16]);
2185                let _ = resp_tx.send(QueryResponse::DropPath(true));
2186            }
2187        });
2188
2189        let request = PickleValue::Dict(vec![
2190            (
2191                PickleValue::String("drop".into()),
2192                PickleValue::String("path".into()),
2193            ),
2194            (
2195                PickleValue::String("destination_hash".into()),
2196                PickleValue::Bytes(vec![1u8; 16]),
2197            ),
2198        ]);
2199
2200        let response = handle_rpc_request(&request, &event_tx).unwrap();
2201        assert_eq!(response, PickleValue::Bool(true));
2202        driver.join().unwrap();
2203    }
2204
2205    #[test]
2206    fn hook_list_request_handling() {
2207        let (event_tx, event_rx) = crate::event::channel();
2208
2209        let driver = thread::spawn(move || {
2210            if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2211                let _ = response_tx.send(vec![HookInfo {
2212                    name: "stats".into(),
2213                    attach_point: "PreIngress".into(),
2214                    priority: 7,
2215                    enabled: true,
2216                    consecutive_traps: 0,
2217                }]);
2218            }
2219        });
2220
2221        let request = PickleValue::Dict(vec![(
2222            PickleValue::String("get".into()),
2223            PickleValue::String("hooks".into()),
2224        )]);
2225        let response = handle_rpc_request(&request, &event_tx).unwrap();
2226        let hooks = parse_hook_list(&response).unwrap();
2227        assert_eq!(hooks.len(), 1);
2228        assert_eq!(hooks[0].name, "stats");
2229        driver.join().unwrap();
2230    }
2231
2232    #[test]
2233    fn hook_load_request_handling() {
2234        let (event_tx, event_rx) = crate::event::channel();
2235
2236        let driver = thread::spawn(move || {
2237            if let Ok(Event::LoadHook {
2238                name,
2239                wasm_bytes,
2240                attach_point,
2241                priority,
2242                response_tx,
2243            }) = event_rx.recv()
2244            {
2245                assert_eq!(name, "stats");
2246                assert_eq!(attach_point, "PreIngress");
2247                assert_eq!(priority, 11);
2248                assert_eq!(wasm_bytes, vec![1, 2, 3]);
2249                let _ = response_tx.send(Ok(()));
2250            }
2251        });
2252
2253        let request = PickleValue::Dict(vec![
2254            (
2255                PickleValue::String("hook".into()),
2256                PickleValue::String("load".into()),
2257            ),
2258            (
2259                PickleValue::String("name".into()),
2260                PickleValue::String("stats".into()),
2261            ),
2262            (
2263                PickleValue::String("attach_point".into()),
2264                PickleValue::String("PreIngress".into()),
2265            ),
2266            (PickleValue::String("priority".into()), PickleValue::Int(11)),
2267            (
2268                PickleValue::String("wasm".into()),
2269                PickleValue::Bytes(vec![1, 2, 3]),
2270            ),
2271        ]);
2272        let response = handle_rpc_request(&request, &event_tx).unwrap();
2273        assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2274        driver.join().unwrap();
2275    }
2276
2277    #[test]
2278    fn interface_stats_pickle_format() {
2279        let stats = InterfaceStatsResponse {
2280            interfaces: vec![SingleInterfaceStat {
2281                id: 1,
2282                name: "TCP".into(),
2283                status: true,
2284                mode: 1,
2285                rxb: 100,
2286                txb: 200,
2287                rx_packets: 5,
2288                tx_packets: 10,
2289                bitrate: Some(1000000),
2290                ifac_size: Some(16),
2291                started: 1000.0,
2292                ia_freq: 0.0,
2293                oa_freq: 0.0,
2294                interface_type: "TCPClientInterface".into(),
2295            }],
2296            transport_id: Some([0xAB; 16]),
2297            transport_enabled: true,
2298            transport_uptime: 3600.0,
2299            total_rxb: 100,
2300            total_txb: 200,
2301            probe_responder: None,
2302        };
2303
2304        let pickle = interface_stats_to_pickle(&stats);
2305
2306        // Verify it round-trips through encode/decode
2307        let encoded = pickle::encode(&pickle);
2308        let decoded = pickle::decode(&encoded).unwrap();
2309        assert_eq!(
2310            decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2311            true
2312        );
2313        let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2314        assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2315        assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2316    }
2317
2318    #[test]
2319    fn send_probe_rpc_unknown_dest() {
2320        let (event_tx, event_rx) = crate::event::channel();
2321
2322        let driver = thread::spawn(move || {
2323            if let Ok(Event::Query(
2324                QueryRequest::SendProbe {
2325                    dest_hash,
2326                    payload_size,
2327                },
2328                resp_tx,
2329            )) = event_rx.recv()
2330            {
2331                assert_eq!(dest_hash, [0xAA; 16]);
2332                assert_eq!(payload_size, 16); // default
2333                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2334            }
2335        });
2336
2337        let request = PickleValue::Dict(vec![(
2338            PickleValue::String("send_probe".into()),
2339            PickleValue::Bytes(vec![0xAA; 16]),
2340        )]);
2341
2342        let response = handle_rpc_request(&request, &event_tx).unwrap();
2343        assert_eq!(response, PickleValue::None);
2344        driver.join().unwrap();
2345    }
2346
2347    #[test]
2348    fn send_probe_rpc_with_result() {
2349        let (event_tx, event_rx) = crate::event::channel();
2350
2351        let packet_hash = [0xBB; 32];
2352        let driver = thread::spawn(move || {
2353            if let Ok(Event::Query(
2354                QueryRequest::SendProbe {
2355                    dest_hash,
2356                    payload_size,
2357                },
2358                resp_tx,
2359            )) = event_rx.recv()
2360            {
2361                assert_eq!(dest_hash, [0xCC; 16]);
2362                assert_eq!(payload_size, 32);
2363                let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
2364            }
2365        });
2366
2367        let request = PickleValue::Dict(vec![
2368            (
2369                PickleValue::String("send_probe".into()),
2370                PickleValue::Bytes(vec![0xCC; 16]),
2371            ),
2372            (PickleValue::String("size".into()), PickleValue::Int(32)),
2373        ]);
2374
2375        let response = handle_rpc_request(&request, &event_tx).unwrap();
2376        let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
2377        assert_eq!(ph, &[0xBB; 32]);
2378        assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
2379        driver.join().unwrap();
2380    }
2381
2382    #[test]
2383    fn send_probe_rpc_size_validation() {
2384        let (event_tx, event_rx) = crate::event::channel();
2385
2386        // Negative size should be clamped to default (16)
2387        let driver = thread::spawn(move || {
2388            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2389                event_rx.recv()
2390            {
2391                assert_eq!(payload_size, 16); // default, not negative
2392                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2393            }
2394        });
2395
2396        let request = PickleValue::Dict(vec![
2397            (
2398                PickleValue::String("send_probe".into()),
2399                PickleValue::Bytes(vec![0xDD; 16]),
2400            ),
2401            (PickleValue::String("size".into()), PickleValue::Int(-1)),
2402        ]);
2403
2404        let response = handle_rpc_request(&request, &event_tx).unwrap();
2405        assert_eq!(response, PickleValue::None);
2406        driver.join().unwrap();
2407    }
2408
2409    #[test]
2410    fn send_probe_rpc_size_too_large() {
2411        let (event_tx, event_rx) = crate::event::channel();
2412
2413        // Size > 400 should be clamped to default (16)
2414        let driver = thread::spawn(move || {
2415            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
2416                event_rx.recv()
2417            {
2418                assert_eq!(payload_size, 16); // default, not 999
2419                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2420            }
2421        });
2422
2423        let request = PickleValue::Dict(vec![
2424            (
2425                PickleValue::String("send_probe".into()),
2426                PickleValue::Bytes(vec![0xDD; 16]),
2427            ),
2428            (PickleValue::String("size".into()), PickleValue::Int(999)),
2429        ]);
2430
2431        let response = handle_rpc_request(&request, &event_tx).unwrap();
2432        assert_eq!(response, PickleValue::None);
2433        driver.join().unwrap();
2434    }
2435
2436    #[test]
2437    fn check_proof_rpc_not_found() {
2438        let (event_tx, event_rx) = crate::event::channel();
2439
2440        let driver = thread::spawn(move || {
2441            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2442                event_rx.recv()
2443            {
2444                assert_eq!(packet_hash, [0xEE; 32]);
2445                let _ = resp_tx.send(QueryResponse::CheckProof(None));
2446            }
2447        });
2448
2449        let request = PickleValue::Dict(vec![(
2450            PickleValue::String("check_proof".into()),
2451            PickleValue::Bytes(vec![0xEE; 32]),
2452        )]);
2453
2454        let response = handle_rpc_request(&request, &event_tx).unwrap();
2455        assert_eq!(response, PickleValue::None);
2456        driver.join().unwrap();
2457    }
2458
2459    #[test]
2460    fn check_proof_rpc_found() {
2461        let (event_tx, event_rx) = crate::event::channel();
2462
2463        let driver = thread::spawn(move || {
2464            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
2465                event_rx.recv()
2466            {
2467                assert_eq!(packet_hash, [0xFF; 32]);
2468                let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
2469            }
2470        });
2471
2472        let request = PickleValue::Dict(vec![(
2473            PickleValue::String("check_proof".into()),
2474            PickleValue::Bytes(vec![0xFF; 32]),
2475        )]);
2476
2477        let response = handle_rpc_request(&request, &event_tx).unwrap();
2478        if let PickleValue::Float(rtt) = response {
2479            assert!((rtt - 0.352).abs() < 0.001);
2480        } else {
2481            panic!("Expected Float, got {:?}", response);
2482        }
2483        driver.join().unwrap();
2484    }
2485
2486    #[test]
2487    fn request_path_rpc() {
2488        let (event_tx, event_rx) = crate::event::channel();
2489
2490        let driver =
2491            thread::spawn(
2492                move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2493                    Ok(Event::RequestPath { dest_hash }) => {
2494                        assert_eq!(dest_hash, [0x11; 16]);
2495                    }
2496                    other => panic!("Expected RequestPath event, got {:?}", other),
2497                },
2498            );
2499
2500        let request = PickleValue::Dict(vec![(
2501            PickleValue::String("request_path".into()),
2502            PickleValue::Bytes(vec![0x11; 16]),
2503        )]);
2504
2505        let response = handle_rpc_request(&request, &event_tx).unwrap();
2506        assert_eq!(response, PickleValue::Bool(true));
2507        driver.join().unwrap();
2508    }
2509
2510    #[test]
2511    fn begin_drain_rpc_emits_event() {
2512        let (event_tx, event_rx) = crate::event::channel();
2513
2514        let driver = thread::spawn(move || match event_rx.recv_timeout(Duration::from_secs(5)) {
2515            Ok(Event::BeginDrain { timeout }) => {
2516                assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
2517            }
2518            other => panic!("Expected BeginDrain event, got {:?}", other),
2519        });
2520
2521        let request = PickleValue::Dict(vec![(
2522            PickleValue::String("begin_drain".into()),
2523            PickleValue::Float(1.5),
2524        )]);
2525
2526        let response = handle_rpc_request(&request, &event_tx).unwrap();
2527        assert_eq!(response, PickleValue::Bool(true));
2528        driver.join().unwrap();
2529    }
2530
2531    #[test]
2532    fn drain_status_rpc_roundtrips_fields() {
2533        let (event_tx, event_rx) = crate::event::channel();
2534
2535        let driver = thread::spawn(move || {
2536            if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
2537                let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
2538                    state: LifecycleState::Draining,
2539                    drain_age_seconds: Some(0.75),
2540                    deadline_remaining_seconds: Some(2.25),
2541                    drain_complete: false,
2542                    interface_writer_queued_frames: 3,
2543                    provider_backlog_events: 4,
2544                    provider_consumer_queued_events: 5,
2545                    detail: Some("node is draining existing work".into()),
2546                }));
2547            }
2548        });
2549
2550        let request = PickleValue::Dict(vec![(
2551            PickleValue::String("get".into()),
2552            PickleValue::String("drain_status".into()),
2553        )]);
2554
2555        let response = handle_rpc_request(&request, &event_tx).unwrap();
2556        assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
2557        assert_eq!(
2558            response.get("drain_complete").unwrap().as_bool(),
2559            Some(false)
2560        );
2561        assert_eq!(
2562            response
2563                .get("deadline_remaining_seconds")
2564                .unwrap()
2565                .as_float(),
2566            Some(2.25)
2567        );
2568        assert_eq!(
2569            response
2570                .get("interface_writer_queued_frames")
2571                .unwrap()
2572                .as_int(),
2573            Some(3)
2574        );
2575        assert_eq!(
2576            response
2577                .get("provider_backlog_events")
2578                .unwrap()
2579                .as_int(),
2580            Some(4)
2581        );
2582        assert_eq!(
2583            response
2584                .get("provider_consumer_queued_events")
2585                .unwrap()
2586                .as_int(),
2587            Some(5)
2588        );
2589        assert_eq!(
2590            response.get("detail").unwrap().as_str(),
2591            Some("node is draining existing work")
2592        );
2593        driver.join().unwrap();
2594    }
2595
2596    #[test]
2597    fn interface_stats_with_probe_responder() {
2598        let probe_hash = [0x42; 16];
2599        let stats = InterfaceStatsResponse {
2600            interfaces: vec![],
2601            transport_id: None,
2602            transport_enabled: true,
2603            transport_uptime: 100.0,
2604            total_rxb: 0,
2605            total_txb: 0,
2606            probe_responder: Some(probe_hash),
2607        };
2608
2609        let pickle = interface_stats_to_pickle(&stats);
2610        let encoded = pickle::encode(&pickle);
2611        let decoded = pickle::decode(&encoded).unwrap();
2612
2613        let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
2614        assert_eq!(pr, &probe_hash);
2615    }
2616
2617    #[test]
2618    fn runtime_config_get_and_set_rpc() {
2619        let (event_tx, event_rx) = crate::event::channel();
2620
2621        let driver = thread::spawn(move || {
2622            if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
2623                event_rx.recv()
2624            {
2625                assert_eq!(key, "global.tick_interval_ms");
2626                let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
2627                    RuntimeConfigEntry {
2628                        key,
2629                        value: RuntimeConfigValue::Int(1000),
2630                        default: RuntimeConfigValue::Int(1000),
2631                        source: RuntimeConfigSource::Startup,
2632                        apply_mode: RuntimeConfigApplyMode::Immediate,
2633                        description: Some("tick".into()),
2634                    },
2635                )));
2636            } else {
2637                panic!("expected GetRuntimeConfig query");
2638            }
2639
2640            if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
2641                event_rx.recv()
2642            {
2643                assert_eq!(key, "global.tick_interval_ms");
2644                assert_eq!(value, RuntimeConfigValue::Int(250));
2645                let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
2646                    key,
2647                    value: RuntimeConfigValue::Int(250),
2648                    default: RuntimeConfigValue::Int(1000),
2649                    source: RuntimeConfigSource::RuntimeOverride,
2650                    apply_mode: RuntimeConfigApplyMode::Immediate,
2651                    description: Some("tick".into()),
2652                })));
2653            } else {
2654                panic!("expected SetRuntimeConfig query");
2655            }
2656        });
2657
2658        let get_request = PickleValue::Dict(vec![
2659            (
2660                PickleValue::String("get".into()),
2661                PickleValue::String("runtime_config_entry".into()),
2662            ),
2663            (
2664                PickleValue::String("key".into()),
2665                PickleValue::String("global.tick_interval_ms".into()),
2666            ),
2667        ]);
2668        let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
2669        assert_eq!(
2670            get_response.get("key").and_then(|v| v.as_str()),
2671            Some("global.tick_interval_ms")
2672        );
2673
2674        let set_request = PickleValue::Dict(vec![
2675            (
2676                PickleValue::String("set".into()),
2677                PickleValue::String("runtime_config".into()),
2678            ),
2679            (
2680                PickleValue::String("key".into()),
2681                PickleValue::String("global.tick_interval_ms".into()),
2682            ),
2683            (PickleValue::String("value".into()), PickleValue::Int(250)),
2684        ]);
2685        let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
2686        assert_eq!(
2687            set_response.get("value").and_then(|v| v.as_int()),
2688            Some(250)
2689        );
2690
2691        driver.join().unwrap();
2692    }
2693
2694    // Helper: create a connected TCP pair
2695    fn tcp_pair() -> (TcpStream, TcpStream) {
2696        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2697        let port = listener.local_addr().unwrap().port();
2698        let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
2699        let (server, _) = listener.accept().unwrap();
2700        client
2701            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
2702            .unwrap();
2703        server
2704            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
2705            .unwrap();
2706        (server, client)
2707    }
2708}