Skip to main content

rns_net/
rpc.rs

1//! RPC server and client for cross-process daemon communication.
2//!
3//! Implements Python `multiprocessing.connection` wire protocol:
4//! - 4-byte big-endian signed i32 length prefix + payload
5//! - HMAC-SHA256 challenge-response authentication
6//! - msgpack serialization for request/response dictionaries
7//!
8//! Server translates RPC dicts into [`QueryRequest`] events, sends
9//! them through the driver event channel, and returns RPC responses.
10
11use std::io::{self, Read, Write};
12use std::net::{IpAddr, TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15    atomic::{AtomicBool, Ordering},
16    Arc,
17};
18use std::thread;
19use std::time::Duration;
20
21use rns_core::msgpack::{self, Value as MsgpackValue};
22use rns_crypto::hmac::hmac_sha256;
23use rns_crypto::sha256::sha256;
24
25use crate::event::{
26    BackboneInterfaceEntry, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, Event, EventSender,
27    HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, PathTableEntry,
28    ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, RuntimeConfigApplyMode,
29    RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode, RuntimeConfigSource,
30    RuntimeConfigValue, SingleInterfaceStat,
31};
32use crate::md5::hmac_md5;
33use crate::pickle::{self, PickleValue};
34
35const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
36const WELCOME: &[u8] = b"#WELCOME#";
37const FAILURE: &[u8] = b"#FAILURE#";
38const CHALLENGE_LEN: usize = 40;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41enum RpcCodec {
42    Msgpack,
43    Pickle,
44}
45
46/// RPC address types.
47#[derive(Debug, Clone)]
48pub enum RpcAddr {
49    Tcp(String, u16),
50}
51
52/// RPC server that listens for incoming connections and handles queries.
53pub struct RpcServer {
54    shutdown: Arc<AtomicBool>,
55    thread: Option<thread::JoinHandle<()>>,
56}
57
58impl RpcServer {
59    /// Start the RPC server on the given address.
60    pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
61        let shutdown = Arc::new(AtomicBool::new(false));
62        let shutdown2 = shutdown.clone();
63
64        let listener = match addr {
65            RpcAddr::Tcp(host, port) => {
66                let l = TcpListener::bind((host.as_str(), *port))?;
67                // Non-blocking so we can check shutdown flag
68                l.set_nonblocking(true)?;
69                l
70            }
71        };
72
73        let thread = thread::Builder::new()
74            .name("rpc-server".into())
75            .spawn(move || {
76                rpc_server_loop(listener, auth_key, event_tx, shutdown2);
77            })
78            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
79
80        Ok(RpcServer {
81            shutdown,
82            thread: Some(thread),
83        })
84    }
85
86    /// Stop the RPC server.
87    pub fn stop(&mut self) {
88        self.shutdown.store(true, Ordering::Relaxed);
89        if let Some(handle) = self.thread.take() {
90            let _ = handle.join();
91        }
92    }
93}
94
95impl Drop for RpcServer {
96    fn drop(&mut self) {
97        self.stop();
98    }
99}
100
101fn rpc_server_loop(
102    listener: TcpListener,
103    auth_key: [u8; 32],
104    event_tx: EventSender,
105    shutdown: Arc<AtomicBool>,
106) {
107    loop {
108        if shutdown.load(Ordering::Relaxed) {
109            break;
110        }
111
112        match listener.accept() {
113            Ok((stream, _addr)) => {
114                // Set blocking for this connection
115                let _ = stream.set_nonblocking(false);
116                let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
117                let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
118
119                if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
120                    log::debug!("RPC connection error: {}", e);
121                }
122            }
123            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
124                // No pending connection, sleep briefly and retry
125                thread::sleep(std::time::Duration::from_millis(100));
126            }
127            Err(e) => {
128                log::error!("RPC accept error: {}", e);
129                thread::sleep(std::time::Duration::from_millis(100));
130            }
131        }
132    }
133}
134
135fn handle_connection(
136    mut stream: TcpStream,
137    auth_key: &[u8; 32],
138    event_tx: &EventSender,
139) -> io::Result<()> {
140    // Authentication: send challenge, verify response
141    server_auth(&mut stream, auth_key)?;
142
143    // Read request.
144    let request_bytes = recv_bytes(&mut stream)?;
145    let (request, codec) = decode_rpc_payload(&request_bytes)?;
146
147    // Translate request dict to query, send to driver, get response.
148    let response = handle_rpc_request(&request, event_tx)?;
149
150    // Encode response and send
151    let response_bytes = encode_rpc_payload(&response, codec);
152    send_bytes(&mut stream, &response_bytes)?;
153
154    Ok(())
155}
156
157fn decode_rpc_payload(data: &[u8]) -> io::Result<(PickleValue, RpcCodec)> {
158    if let Ok(value) = msgpack::unpack_exact(data) {
159        return Ok((msgpack_to_pickle(&value), RpcCodec::Msgpack));
160    }
161    pickle::decode(data)
162        .map(|value| (value, RpcCodec::Pickle))
163        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
164}
165
166fn encode_rpc_payload(value: &PickleValue, codec: RpcCodec) -> Vec<u8> {
167    match codec {
168        RpcCodec::Msgpack => msgpack::pack(&pickle_to_msgpack(value)),
169        RpcCodec::Pickle => pickle::encode(value),
170    }
171}
172
173fn pickle_to_msgpack(value: &PickleValue) -> MsgpackValue {
174    match value {
175        PickleValue::None => MsgpackValue::Nil,
176        PickleValue::Bool(v) => MsgpackValue::Bool(*v),
177        PickleValue::Int(v) if *v >= 0 => MsgpackValue::UInt(*v as u64),
178        PickleValue::Int(v) => MsgpackValue::Int(*v),
179        PickleValue::Float(v) => MsgpackValue::Float(*v),
180        PickleValue::String(v) => MsgpackValue::Str(v.clone()),
181        PickleValue::Bytes(v) => MsgpackValue::Bin(v.clone()),
182        PickleValue::List(values) => {
183            MsgpackValue::Array(values.iter().map(pickle_to_msgpack).collect())
184        }
185        PickleValue::Dict(values) => MsgpackValue::Map(
186            values
187                .iter()
188                .map(|(key, value)| (pickle_to_msgpack(key), pickle_to_msgpack(value)))
189                .collect(),
190        ),
191    }
192}
193
194fn msgpack_to_pickle(value: &MsgpackValue) -> PickleValue {
195    match value {
196        MsgpackValue::Nil => PickleValue::None,
197        MsgpackValue::Bool(v) => PickleValue::Bool(*v),
198        MsgpackValue::UInt(v) if *v <= i64::MAX as u64 => PickleValue::Int(*v as i64),
199        MsgpackValue::UInt(v) => PickleValue::Float(*v as f64),
200        MsgpackValue::Int(v) => PickleValue::Int(*v),
201        MsgpackValue::Float(v) => PickleValue::Float(*v),
202        MsgpackValue::Bin(v) => PickleValue::Bytes(v.clone()),
203        MsgpackValue::Str(v) => PickleValue::String(v.clone()),
204        MsgpackValue::Array(values) => {
205            PickleValue::List(values.iter().map(msgpack_to_pickle).collect())
206        }
207        MsgpackValue::Map(values) => PickleValue::Dict(
208            values
209                .iter()
210                .map(|(key, value)| (msgpack_to_pickle(key), msgpack_to_pickle(value)))
211                .collect(),
212        ),
213    }
214}
215
216/// Server-side authentication: challenge-response.
217fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
218    // Generate challenge: #CHALLENGE#{sha256}<40 random bytes>
219    let mut random_bytes = [0u8; CHALLENGE_LEN];
220    // Use /dev/urandom for randomness
221    {
222        let mut f = std::fs::File::open("/dev/urandom")?;
223        f.read_exact(&mut random_bytes)?;
224    }
225
226    let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
227    challenge_message.extend_from_slice(CHALLENGE_PREFIX);
228    challenge_message.extend_from_slice(b"{sha256}");
229    challenge_message.extend_from_slice(&random_bytes);
230
231    send_bytes(stream, &challenge_message)?;
232
233    // Read response (max 256 bytes)
234    let response = recv_bytes(stream)?;
235
236    // Verify response
237    // The message to HMAC is everything after #CHALLENGE# (i.e. {sha256}<random>)
238    let message = &challenge_message[CHALLENGE_PREFIX.len()..];
239
240    if verify_response(auth_key, message, &response) {
241        send_bytes(stream, WELCOME)?;
242        Ok(())
243    } else {
244        send_bytes(stream, FAILURE)?;
245        Err(io::Error::new(
246            io::ErrorKind::PermissionDenied,
247            "auth failed",
248        ))
249    }
250}
251
252/// Verify a client's HMAC response.
253fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
254    // Modern protocol: response = {sha256}<hmac-sha256 digest>
255    if response.starts_with(b"{sha256}") {
256        let digest = &response[8..];
257        let expected = hmac_sha256(auth_key, message);
258        constant_time_eq(digest, &expected)
259    }
260    // Legacy protocol: response = raw 16-byte HMAC-MD5 digest
261    else if response.len() == 16 {
262        let expected = hmac_md5(auth_key, message);
263        constant_time_eq(response, &expected)
264    }
265    // Legacy with {md5} prefix
266    else if response.starts_with(b"{md5}") {
267        let digest = &response[5..];
268        let expected = hmac_md5(auth_key, message);
269        constant_time_eq(digest, &expected)
270    } else {
271        false
272    }
273}
274
275/// Constant-time byte comparison.
276fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
277    if a.len() != b.len() {
278        return false;
279    }
280    let mut diff = 0u8;
281    for (x, y) in a.iter().zip(b.iter()) {
282        diff |= x ^ y;
283    }
284    diff == 0
285}
286
287/// Send bytes with 4-byte big-endian length prefix.
288fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
289    let len = data.len() as i32;
290    stream.write_all(&len.to_be_bytes())?;
291    stream.write_all(data)?;
292    stream.flush()
293}
294
295/// Receive bytes with 4-byte big-endian length prefix.
296fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
297    let mut len_buf = [0u8; 4];
298    stream.read_exact(&mut len_buf)?;
299    let len = i32::from_be_bytes(len_buf);
300
301    if len < 0 {
302        // Extended format: 8-byte length
303        let mut len8_buf = [0u8; 8];
304        stream.read_exact(&mut len8_buf)?;
305        let len = u64::from_be_bytes(len8_buf) as usize;
306        if len > 64 * 1024 * 1024 {
307            return Err(io::Error::new(
308                io::ErrorKind::InvalidData,
309                "message too large",
310            ));
311        }
312        let mut buf = vec![0u8; len];
313        stream.read_exact(&mut buf)?;
314        Ok(buf)
315    } else {
316        let len = len as usize;
317        if len > 64 * 1024 * 1024 {
318            return Err(io::Error::new(
319                io::ErrorKind::InvalidData,
320                "message too large",
321            ));
322        }
323        let mut buf = vec![0u8; len];
324        stream.read_exact(&mut buf)?;
325        Ok(buf)
326    }
327}
328
329/// Translate an RPC request dict to a query event and get response.
330fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
331    // Handle "get" requests
332    if let Some(get_val) = request.get("get") {
333        if let Some(path) = get_val.as_str() {
334            return match path {
335                "interface_stats" => {
336                    let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
337                    if let QueryResponse::InterfaceStats(stats) = resp {
338                        Ok(interface_stats_to_pickle(&stats))
339                    } else {
340                        Ok(PickleValue::None)
341                    }
342                }
343                "path_table" => {
344                    let max_hops = request
345                        .get("max_hops")
346                        .and_then(|v| v.as_int().map(|n| n as u8));
347                    let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
348                    if let QueryResponse::PathTable(entries) = resp {
349                        Ok(path_table_to_pickle(&entries))
350                    } else {
351                        Ok(PickleValue::None)
352                    }
353                }
354                "rate_table" => {
355                    let resp = send_query(event_tx, QueryRequest::RateTable)?;
356                    if let QueryResponse::RateTable(entries) = resp {
357                        Ok(rate_table_to_pickle(&entries))
358                    } else {
359                        Ok(PickleValue::None)
360                    }
361                }
362                "next_hop" => {
363                    let hash = extract_dest_hash(request, "destination_hash")?;
364                    let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
365                    if let QueryResponse::NextHop(Some(nh)) = resp {
366                        Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
367                    } else {
368                        Ok(PickleValue::None)
369                    }
370                }
371                "next_hop_if_name" => {
372                    let hash = extract_dest_hash(request, "destination_hash")?;
373                    let resp =
374                        send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
375                    if let QueryResponse::NextHopIfName(Some(name)) = resp {
376                        Ok(PickleValue::String(name))
377                    } else {
378                        Ok(PickleValue::None)
379                    }
380                }
381                "link_count" => {
382                    let resp = send_query(event_tx, QueryRequest::LinkCount)?;
383                    if let QueryResponse::LinkCount(n) = resp {
384                        Ok(PickleValue::Int(n as i64))
385                    } else {
386                        Ok(PickleValue::None)
387                    }
388                }
389                "transport_identity" => {
390                    let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
391                    if let QueryResponse::TransportIdentity(Some(hash)) = resp {
392                        Ok(PickleValue::Bytes(hash.to_vec()))
393                    } else {
394                        Ok(PickleValue::None)
395                    }
396                }
397                "blackholed" => {
398                    let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
399                    if let QueryResponse::Blackholed(entries) = resp {
400                        Ok(blackholed_to_pickle(&entries))
401                    } else {
402                        Ok(PickleValue::None)
403                    }
404                }
405                "discovered_interfaces" => {
406                    let only_available = request
407                        .get("only_available")
408                        .and_then(|v| v.as_bool())
409                        .unwrap_or(false);
410                    let only_transport = request
411                        .get("only_transport")
412                        .and_then(|v| v.as_bool())
413                        .unwrap_or(false);
414                    let resp = send_query(
415                        event_tx,
416                        QueryRequest::DiscoveredInterfaces {
417                            only_available,
418                            only_transport,
419                        },
420                    )?;
421                    if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
422                        Ok(discovered_interfaces_to_pickle(&interfaces))
423                    } else {
424                        Ok(PickleValue::None)
425                    }
426                }
427                "hooks" => {
428                    let (response_tx, response_rx) = mpsc::channel();
429                    event_tx
430                        .send(Event::ListHooks { response_tx })
431                        .map_err(|_| {
432                            io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down")
433                        })?;
434                    let hooks = response_rx
435                        .recv_timeout(std::time::Duration::from_secs(5))
436                        .map_err(|_| {
437                            io::Error::new(io::ErrorKind::TimedOut, "list hooks timed out")
438                        })?;
439                    Ok(hooks_to_pickle(&hooks))
440                }
441                "runtime_config" => {
442                    let resp = send_query(event_tx, QueryRequest::ListRuntimeConfig)?;
443                    if let QueryResponse::RuntimeConfigList(entries) = resp {
444                        Ok(runtime_config_list_to_pickle(&entries))
445                    } else {
446                        Ok(PickleValue::None)
447                    }
448                }
449                "known_destinations" => {
450                    let resp = send_query(event_tx, QueryRequest::KnownDestinations)?;
451                    if let QueryResponse::KnownDestinations(entries) = resp {
452                        Ok(known_destinations_to_pickle(&entries))
453                    } else {
454                        Ok(PickleValue::None)
455                    }
456                }
457                "runtime_config_entry" => {
458                    let key = request
459                        .get("key")
460                        .and_then(|v| v.as_str())
461                        .unwrap_or_default()
462                        .to_string();
463                    let resp = send_query(event_tx, QueryRequest::GetRuntimeConfig { key })?;
464                    if let QueryResponse::RuntimeConfigEntry(entry) = resp {
465                        Ok(entry
466                            .as_ref()
467                            .map(runtime_config_entry_to_pickle)
468                            .unwrap_or(PickleValue::None))
469                    } else {
470                        Ok(PickleValue::None)
471                    }
472                }
473                "backbone_peer_state" => {
474                    let interface_name = request
475                        .get("interface")
476                        .and_then(|v| v.as_str())
477                        .map(|s| s.to_string());
478                    let resp =
479                        send_query(event_tx, QueryRequest::BackbonePeerState { interface_name })?;
480                    if let QueryResponse::BackbonePeerState(entries) = resp {
481                        Ok(backbone_peer_state_to_pickle(&entries))
482                    } else {
483                        Ok(PickleValue::None)
484                    }
485                }
486                "backbone_interfaces" => {
487                    let resp = send_query(event_tx, QueryRequest::BackboneInterfaces)?;
488                    if let QueryResponse::BackboneInterfaces(entries) = resp {
489                        Ok(backbone_interfaces_to_pickle(&entries))
490                    } else {
491                        Ok(PickleValue::None)
492                    }
493                }
494                "provider_bridge_stats" => {
495                    let resp = send_query(event_tx, QueryRequest::ProviderBridgeStats)?;
496                    if let QueryResponse::ProviderBridgeStats(stats) = resp {
497                        Ok(stats
498                            .as_ref()
499                            .map(provider_bridge_stats_to_pickle)
500                            .unwrap_or(PickleValue::None))
501                    } else {
502                        Ok(PickleValue::None)
503                    }
504                }
505                "drain_status" => {
506                    let resp = send_query(event_tx, QueryRequest::DrainStatus)?;
507                    if let QueryResponse::DrainStatus(status) = resp {
508                        Ok(drain_status_to_pickle(&status))
509                    } else {
510                        Ok(PickleValue::None)
511                    }
512                }
513                _ => Ok(PickleValue::None),
514            };
515        }
516    }
517
518    if let Some(begin_val) = request.get("begin_drain") {
519        let timeout_secs = begin_val
520            .as_float()
521            .or_else(|| begin_val.as_int().map(|value| value as f64))
522            .unwrap_or(0.0)
523            .max(0.0);
524        let timeout = Duration::from_secs_f64(timeout_secs);
525        let _ = event_tx.send(Event::BeginDrain { timeout });
526        return Ok(PickleValue::Bool(true));
527    }
528
529    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
530        if set_val == "known_destination_retained" {
531            let dest_hash = extract_dest_hash(request, "dest_hash")?;
532            let resp = send_query(event_tx, QueryRequest::RetainKnownDestination { dest_hash })?;
533            return if let QueryResponse::RetainKnownDestination(ok) = resp {
534                Ok(PickleValue::Bool(ok))
535            } else {
536                Ok(PickleValue::None)
537            };
538        }
539        if set_val == "identity_retained" {
540            let identity_hash = extract_dest_hash(request, "identity_hash")?;
541            let resp = send_query(event_tx, QueryRequest::RetainIdentity { identity_hash })?;
542            return if let QueryResponse::RetainIdentity(ok) = resp {
543                Ok(PickleValue::Bool(ok))
544            } else {
545                Ok(PickleValue::None)
546            };
547        }
548        if set_val == "known_destination_used" {
549            let dest_hash = extract_dest_hash(request, "dest_hash")?;
550            let resp = send_query(
551                event_tx,
552                QueryRequest::MarkKnownDestinationUsed { dest_hash },
553            )?;
554            return if let QueryResponse::MarkKnownDestinationUsed(ok) = resp {
555                Ok(PickleValue::Bool(ok))
556            } else {
557                Ok(PickleValue::None)
558            };
559        }
560        if set_val == "runtime_config" {
561            let key = request
562                .get("key")
563                .and_then(|v| v.as_str())
564                .unwrap_or_default()
565                .to_string();
566            let Some(value) = request
567                .get("value")
568                .and_then(runtime_config_value_from_pickle)
569            else {
570                return Ok(runtime_config_error_to_pickle(&RuntimeConfigError {
571                    code: RuntimeConfigErrorCode::InvalidType,
572                    message: "runtime-config set requires a scalar value".into(),
573                }));
574            };
575            let resp = send_query(event_tx, QueryRequest::SetRuntimeConfig { key, value })?;
576            return if let QueryResponse::RuntimeConfigSet(result) = resp {
577                Ok(runtime_config_result_to_pickle(result))
578            } else {
579                Ok(PickleValue::None)
580            };
581        }
582    }
583
584    if let Some(reset_val) = request.get("reset").and_then(|v| v.as_str()) {
585        if reset_val == "runtime_config" {
586            let key = request
587                .get("key")
588                .and_then(|v| v.as_str())
589                .unwrap_or_default()
590                .to_string();
591            let resp = send_query(event_tx, QueryRequest::ResetRuntimeConfig { key })?;
592            return if let QueryResponse::RuntimeConfigReset(result) = resp {
593                Ok(runtime_config_result_to_pickle(result))
594            } else {
595                Ok(PickleValue::None)
596            };
597        }
598    }
599
600    if let Some(clear_val) = request.get("clear").and_then(|v| v.as_str()) {
601        if clear_val == "known_destination_retained" {
602            let dest_hash = extract_dest_hash(request, "dest_hash")?;
603            let resp = send_query(
604                event_tx,
605                QueryRequest::UnretainKnownDestination { dest_hash },
606            )?;
607            return if let QueryResponse::UnretainKnownDestination(ok) = resp {
608                Ok(PickleValue::Bool(ok))
609            } else {
610                Ok(PickleValue::None)
611            };
612        }
613        if clear_val == "backbone_peer_state" {
614            let interface_name = required_string(request, "interface")?;
615            let peer_ip = required_string(request, "ip")?;
616            let peer_ip = peer_ip
617                .parse()
618                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
619            let resp = send_query(
620                event_tx,
621                QueryRequest::ClearBackbonePeerState {
622                    interface_name,
623                    peer_ip,
624                },
625            )?;
626            return if let QueryResponse::ClearBackbonePeerState(ok) = resp {
627                Ok(PickleValue::Bool(ok))
628            } else {
629                Ok(PickleValue::None)
630            };
631        }
632    }
633
634    if let Some(set_val) = request.get("set").and_then(|v| v.as_str()) {
635        if set_val == "backbone_peer_blacklist" {
636            let interface_name = required_string(request, "interface")?;
637            let peer_ip = required_string(request, "ip")?;
638            let peer_ip: IpAddr = peer_ip
639                .parse()
640                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid peer IP"))?;
641            let duration_secs = request
642                .get("duration_secs")
643                .and_then(|v| v.as_int())
644                .ok_or_else(|| {
645                    io::Error::new(io::ErrorKind::InvalidInput, "missing duration_secs")
646                })?;
647            let reason = request
648                .get("reason")
649                .and_then(|v| v.as_str())
650                .unwrap_or("sentinel blacklist")
651                .to_string();
652            let penalty_level = request
653                .get("penalty_level")
654                .and_then(|v| v.as_int())
655                .unwrap_or(0)
656                .clamp(0, u8::MAX as i64) as u8;
657            let resp = send_query(
658                event_tx,
659                QueryRequest::BlacklistBackbonePeer {
660                    interface_name,
661                    peer_ip,
662                    duration: Duration::from_secs(duration_secs as u64),
663                    reason,
664                    penalty_level,
665                },
666            )?;
667            return if let QueryResponse::BlacklistBackbonePeer(ok) = resp {
668                Ok(PickleValue::Bool(ok))
669            } else {
670                Ok(PickleValue::None)
671            };
672        }
673    }
674
675    // Handle "request_path" -- trigger a path request to the network
676    if let Some(hash_val) = request.get("request_path") {
677        if let Some(hash_bytes) = hash_val.as_bytes() {
678            if hash_bytes.len() >= 16 {
679                let mut dest_hash = [0u8; 16];
680                dest_hash.copy_from_slice(&hash_bytes[..16]);
681                let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
682                return Ok(PickleValue::Bool(true));
683            }
684        }
685    }
686
687    // Handle "send_probe" requests
688    if let Some(hash_val) = request.get("send_probe") {
689        if let Some(hash_bytes) = hash_val.as_bytes() {
690            if hash_bytes.len() >= 16 {
691                let mut dest_hash = [0u8; 16];
692                dest_hash.copy_from_slice(&hash_bytes[..16]);
693                let payload_size = request
694                    .get("size")
695                    .and_then(|v| v.as_int())
696                    .and_then(|n| {
697                        if n > 0 && n <= 400 {
698                            Some(n as usize)
699                        } else {
700                            None
701                        }
702                    })
703                    .unwrap_or(16);
704                let resp = send_query(
705                    event_tx,
706                    QueryRequest::SendProbe {
707                        dest_hash,
708                        payload_size,
709                    },
710                )?;
711                if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
712                    return Ok(PickleValue::Dict(vec![
713                        (
714                            PickleValue::String("packet_hash".into()),
715                            PickleValue::Bytes(packet_hash.to_vec()),
716                        ),
717                        (
718                            PickleValue::String("hops".into()),
719                            PickleValue::Int(hops as i64),
720                        ),
721                    ]));
722                } else {
723                    return Ok(PickleValue::None);
724                }
725            }
726        }
727    }
728
729    // Handle "check_proof" requests
730    if let Some(hash_val) = request.get("check_proof") {
731        if let Some(hash_bytes) = hash_val.as_bytes() {
732            if hash_bytes.len() >= 32 {
733                let mut packet_hash = [0u8; 32];
734                packet_hash.copy_from_slice(&hash_bytes[..32]);
735                let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
736                if let QueryResponse::CheckProof(Some(rtt)) = resp {
737                    return Ok(PickleValue::Float(rtt));
738                } else {
739                    return Ok(PickleValue::None);
740                }
741            }
742        }
743    }
744
745    // Handle "blackhole" requests
746    if let Some(hash_val) = request.get("blackhole") {
747        if let Some(hash_bytes) = hash_val.as_bytes() {
748            if hash_bytes.len() >= 16 {
749                let mut identity_hash = [0u8; 16];
750                identity_hash.copy_from_slice(&hash_bytes[..16]);
751                let duration_hours = request.get("duration").and_then(|v| v.as_float());
752                let reason = request
753                    .get("reason")
754                    .and_then(|v| v.as_str())
755                    .map(|s| s.to_string());
756                let resp = send_query(
757                    event_tx,
758                    QueryRequest::BlackholeIdentity {
759                        identity_hash,
760                        duration_hours,
761                        reason,
762                    },
763                )?;
764                return Ok(PickleValue::Bool(matches!(
765                    resp,
766                    QueryResponse::BlackholeResult(true)
767                )));
768            }
769        }
770    }
771
772    // Handle "unblackhole" requests
773    if let Some(hash_val) = request.get("unblackhole") {
774        if let Some(hash_bytes) = hash_val.as_bytes() {
775            if hash_bytes.len() >= 16 {
776                let mut identity_hash = [0u8; 16];
777                identity_hash.copy_from_slice(&hash_bytes[..16]);
778                let resp = send_query(
779                    event_tx,
780                    QueryRequest::UnblackholeIdentity { identity_hash },
781                )?;
782                return Ok(PickleValue::Bool(matches!(
783                    resp,
784                    QueryResponse::UnblackholeResult(true)
785                )));
786            }
787        }
788    }
789
790    // Handle "drop" requests
791    if let Some(drop_val) = request.get("drop") {
792        if let Some(path) = drop_val.as_str() {
793            return match path {
794                "path" => {
795                    let hash = extract_dest_hash(request, "destination_hash")?;
796                    let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
797                    if let QueryResponse::DropPath(ok) = resp {
798                        Ok(PickleValue::Bool(ok))
799                    } else {
800                        Ok(PickleValue::None)
801                    }
802                }
803                "all_via" => {
804                    let hash = extract_dest_hash(request, "destination_hash")?;
805                    let resp = send_query(
806                        event_tx,
807                        QueryRequest::DropAllVia {
808                            transport_hash: hash,
809                        },
810                    )?;
811                    if let QueryResponse::DropAllVia(n) = resp {
812                        Ok(PickleValue::Int(n as i64))
813                    } else {
814                        Ok(PickleValue::None)
815                    }
816                }
817                "announce_queues" => {
818                    let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
819                    if let QueryResponse::DropAnnounceQueues = resp {
820                        Ok(PickleValue::Bool(true))
821                    } else {
822                        Ok(PickleValue::None)
823                    }
824                }
825                _ => Ok(PickleValue::None),
826            };
827        }
828    }
829
830    if let Some(hook_val) = request.get("hook").and_then(|v| v.as_str()) {
831        return handle_hook_rpc_request(hook_val, request, event_tx);
832    }
833
834    Ok(PickleValue::None)
835}
836
837fn handle_hook_rpc_request(
838    op: &str,
839    request: &PickleValue,
840    event_tx: &EventSender,
841) -> io::Result<PickleValue> {
842    match op {
843        "load" => {
844            let name = required_string(request, "name")?;
845            let attach_point = required_string(request, "attach_point")?;
846            let priority = request
847                .get("priority")
848                .and_then(|v| v.as_int())
849                .unwrap_or(0) as i32;
850            let wasm = request
851                .get("wasm")
852                .and_then(|v| v.as_bytes())
853                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing wasm"))?
854                .to_vec();
855            let (response_tx, response_rx) = mpsc::channel();
856            event_tx
857                .send(Event::LoadHook {
858                    name,
859                    wasm_bytes: wasm,
860                    attach_point,
861                    priority,
862                    response_tx,
863                })
864                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
865            let response = response_rx
866                .recv_timeout(std::time::Duration::from_secs(5))
867                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook load timed out"))?;
868            Ok(hook_result_to_pickle(response))
869        }
870        "load_builtin" => {
871            let name = required_string(request, "name")?;
872            let attach_point = required_string(request, "attach_point")?;
873            let builtin_id = required_string(request, "builtin_id")?;
874            let priority = request
875                .get("priority")
876                .and_then(|v| v.as_int())
877                .unwrap_or(0) as i32;
878            let (response_tx, response_rx) = mpsc::channel();
879            event_tx
880                .send(Event::LoadBuiltinHook {
881                    name,
882                    builtin_id,
883                    attach_point,
884                    priority,
885                    response_tx,
886                })
887                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
888            let response = response_rx
889                .recv_timeout(std::time::Duration::from_secs(5))
890                .map_err(|_| {
891                    io::Error::new(io::ErrorKind::TimedOut, "built-in hook load timed out")
892                })?;
893            Ok(hook_result_to_pickle(response))
894        }
895        "unload" => {
896            let name = required_string(request, "name")?;
897            let attach_point = required_string(request, "attach_point")?;
898            let (response_tx, response_rx) = mpsc::channel();
899            event_tx
900                .send(Event::UnloadHook {
901                    name,
902                    attach_point,
903                    response_tx,
904                })
905                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
906            let response = response_rx
907                .recv_timeout(std::time::Duration::from_secs(5))
908                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook unload timed out"))?;
909            Ok(hook_result_to_pickle(response))
910        }
911        "enable" | "disable" => {
912            let name = required_string(request, "name")?;
913            let attach_point = required_string(request, "attach_point")?;
914            let enabled = op == "enable";
915            let (response_tx, response_rx) = mpsc::channel();
916            event_tx
917                .send(Event::SetHookEnabled {
918                    name,
919                    attach_point,
920                    enabled,
921                    response_tx,
922                })
923                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
924            let response = response_rx
925                .recv_timeout(std::time::Duration::from_secs(5))
926                .map_err(|_| {
927                    io::Error::new(io::ErrorKind::TimedOut, "hook enable/disable timed out")
928                })?;
929            Ok(hook_result_to_pickle(response))
930        }
931        "set_priority" => {
932            let name = required_string(request, "name")?;
933            let attach_point = required_string(request, "attach_point")?;
934            let priority = request
935                .get("priority")
936                .and_then(|v| v.as_int())
937                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
938                as i32;
939            let (response_tx, response_rx) = mpsc::channel();
940            event_tx
941                .send(Event::SetHookPriority {
942                    name,
943                    attach_point,
944                    priority,
945                    response_tx,
946                })
947                .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
948            let response = response_rx
949                .recv_timeout(std::time::Duration::from_secs(5))
950                .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "hook priority timed out"))?;
951            Ok(hook_result_to_pickle(response))
952        }
953        _ => Ok(PickleValue::None),
954    }
955}
956
957/// Send a query to the driver and wait for the response.
958fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
959    let (resp_tx, resp_rx) = mpsc::channel();
960    event_tx
961        .send(Event::Query(request, resp_tx))
962        .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
963    resp_rx
964        .recv_timeout(std::time::Duration::from_secs(5))
965        .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
966}
967
968/// Extract a 16-byte destination hash from an RPC dict field.
969fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
970    let bytes = request
971        .get(key)
972        .and_then(|v| v.as_bytes())
973        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
974    if bytes.len() < 16 {
975        return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
976    }
977    let mut hash = [0u8; 16];
978    hash.copy_from_slice(&bytes[..16]);
979    Ok(hash)
980}
981
982fn required_string(request: &PickleValue, key: &str) -> io::Result<String> {
983    request
984        .get(key)
985        .and_then(|v| v.as_str())
986        .map(|s| s.to_string())
987        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key)))
988}
989
990fn hook_result_to_pickle(result: Result<(), String>) -> PickleValue {
991    match result {
992        Ok(()) => PickleValue::Dict(vec![(
993            PickleValue::String("ok".into()),
994            PickleValue::Bool(true),
995        )]),
996        Err(error) => PickleValue::Dict(vec![
997            (PickleValue::String("ok".into()), PickleValue::Bool(false)),
998            (
999                PickleValue::String("error".into()),
1000                PickleValue::String(error),
1001            ),
1002        ]),
1003    }
1004}
1005
1006// --- Pickle response builders ---
1007
1008fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
1009    let mut ifaces = Vec::new();
1010    for iface in &stats.interfaces {
1011        ifaces.push(single_iface_to_pickle(iface));
1012    }
1013
1014    let mut dict = vec![
1015        (
1016            PickleValue::String("interfaces".into()),
1017            PickleValue::List(ifaces),
1018        ),
1019        (
1020            PickleValue::String("transport_enabled".into()),
1021            PickleValue::Bool(stats.transport_enabled),
1022        ),
1023        (
1024            PickleValue::String("transport_uptime".into()),
1025            PickleValue::Float(stats.transport_uptime),
1026        ),
1027        (
1028            PickleValue::String("rxb".into()),
1029            PickleValue::Int(stats.total_rxb as i64),
1030        ),
1031        (
1032            PickleValue::String("txb".into()),
1033            PickleValue::Int(stats.total_txb as i64),
1034        ),
1035    ];
1036
1037    if let Some(tid) = stats.transport_id {
1038        dict.push((
1039            PickleValue::String("transport_id".into()),
1040            PickleValue::Bytes(tid.to_vec()),
1041        ));
1042    } else {
1043        dict.push((
1044            PickleValue::String("transport_id".into()),
1045            PickleValue::None,
1046        ));
1047    }
1048
1049    if let Some(pr) = stats.probe_responder {
1050        dict.push((
1051            PickleValue::String("probe_responder".into()),
1052            PickleValue::Bytes(pr.to_vec()),
1053        ));
1054    } else {
1055        dict.push((
1056            PickleValue::String("probe_responder".into()),
1057            PickleValue::None,
1058        ));
1059    }
1060
1061    if let Some(pool) = &stats.backbone_peer_pool {
1062        let members = pool
1063            .members
1064            .iter()
1065            .map(|member| {
1066                let mut member_dict = vec![
1067                    (
1068                        PickleValue::String("name".into()),
1069                        PickleValue::String(member.name.clone()),
1070                    ),
1071                    (
1072                        PickleValue::String("remote".into()),
1073                        PickleValue::String(member.remote.clone()),
1074                    ),
1075                    (
1076                        PickleValue::String("source".into()),
1077                        PickleValue::String(member.source.clone()),
1078                    ),
1079                    (
1080                        PickleValue::String("priority".into()),
1081                        PickleValue::Int(member.priority as i64),
1082                    ),
1083                    (
1084                        PickleValue::String("state".into()),
1085                        PickleValue::String(member.state.clone()),
1086                    ),
1087                    (
1088                        PickleValue::String("failure_count".into()),
1089                        PickleValue::Int(member.failure_count as i64),
1090                    ),
1091                ];
1092                member_dict.push((
1093                    PickleValue::String("interface_id".into()),
1094                    member
1095                        .interface_id
1096                        .map(|id| PickleValue::Int(id as i64))
1097                        .unwrap_or(PickleValue::None),
1098                ));
1099                member_dict.push((
1100                    PickleValue::String("last_error".into()),
1101                    member
1102                        .last_error
1103                        .as_ref()
1104                        .map(|err| PickleValue::String(err.clone()))
1105                        .unwrap_or(PickleValue::None),
1106                ));
1107                member_dict.push((
1108                    PickleValue::String("cooldown_remaining_seconds".into()),
1109                    member
1110                        .cooldown_remaining_seconds
1111                        .map(PickleValue::Float)
1112                        .unwrap_or(PickleValue::None),
1113                ));
1114                PickleValue::Dict(member_dict)
1115            })
1116            .collect();
1117        dict.push((
1118            PickleValue::String("backbone_peer_pool".into()),
1119            PickleValue::Dict(vec![
1120                (
1121                    PickleValue::String("max_connected".into()),
1122                    PickleValue::Int(pool.max_connected as i64),
1123                ),
1124                (
1125                    PickleValue::String("active_count".into()),
1126                    PickleValue::Int(pool.active_count as i64),
1127                ),
1128                (
1129                    PickleValue::String("standby_count".into()),
1130                    PickleValue::Int(pool.standby_count as i64),
1131                ),
1132                (
1133                    PickleValue::String("cooldown_count".into()),
1134                    PickleValue::Int(pool.cooldown_count as i64),
1135                ),
1136                (
1137                    PickleValue::String("members".into()),
1138                    PickleValue::List(members),
1139                ),
1140            ]),
1141        ));
1142    } else {
1143        dict.push((
1144            PickleValue::String("backbone_peer_pool".into()),
1145            PickleValue::None,
1146        ));
1147    }
1148
1149    PickleValue::Dict(dict)
1150}
1151
1152fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
1153    let mut dict = vec![
1154        (
1155            PickleValue::String("id".into()),
1156            PickleValue::Int(s.id as i64),
1157        ),
1158        (
1159            PickleValue::String("name".into()),
1160            PickleValue::String(s.name.clone()),
1161        ),
1162        (
1163            PickleValue::String("status".into()),
1164            PickleValue::Bool(s.status),
1165        ),
1166        (
1167            PickleValue::String("mode".into()),
1168            PickleValue::Int(s.mode as i64),
1169        ),
1170        (
1171            PickleValue::String("rxb".into()),
1172            PickleValue::Int(s.rxb as i64),
1173        ),
1174        (
1175            PickleValue::String("txb".into()),
1176            PickleValue::Int(s.txb as i64),
1177        ),
1178        (
1179            PickleValue::String("rx_packets".into()),
1180            PickleValue::Int(s.rx_packets as i64),
1181        ),
1182        (
1183            PickleValue::String("tx_packets".into()),
1184            PickleValue::Int(s.tx_packets as i64),
1185        ),
1186        (
1187            PickleValue::String("started".into()),
1188            PickleValue::Float(s.started),
1189        ),
1190        (
1191            PickleValue::String("ia_freq".into()),
1192            PickleValue::Float(s.ia_freq),
1193        ),
1194        (
1195            PickleValue::String("oa_freq".into()),
1196            PickleValue::Float(s.oa_freq),
1197        ),
1198        (
1199            PickleValue::String("ip_freq".into()),
1200            PickleValue::Float(s.ip_freq),
1201        ),
1202        (
1203            PickleValue::String("op_freq".into()),
1204            PickleValue::Float(s.op_freq),
1205        ),
1206        (
1207            PickleValue::String("burst_active".into()),
1208            PickleValue::Bool(s.burst_active),
1209        ),
1210        (
1211            PickleValue::String("burst_activated".into()),
1212            PickleValue::Float(s.burst_activated),
1213        ),
1214        (
1215            PickleValue::String("pr_burst_active".into()),
1216            PickleValue::Bool(s.pr_burst_active),
1217        ),
1218        (
1219            PickleValue::String("pr_burst_activated".into()),
1220            PickleValue::Float(s.pr_burst_activated),
1221        ),
1222        (
1223            PickleValue::String("clients".into()),
1224            s.clients
1225                .map(|clients| PickleValue::Int(clients as i64))
1226                .unwrap_or(PickleValue::None),
1227        ),
1228        (
1229            PickleValue::String("announce_rate_grace".into()),
1230            PickleValue::Int(s.announce_rate_grace as i64),
1231        ),
1232        (
1233            PickleValue::String("announce_rate_penalty".into()),
1234            PickleValue::Float(s.announce_rate_penalty),
1235        ),
1236    ];
1237
1238    match s.announce_rate_target {
1239        Some(target) => dict.push((
1240            PickleValue::String("announce_rate_target".into()),
1241            PickleValue::Float(target),
1242        )),
1243        None => dict.push((
1244            PickleValue::String("announce_rate_target".into()),
1245            PickleValue::None,
1246        )),
1247    }
1248
1249    match s.bitrate {
1250        Some(br) => dict.push((
1251            PickleValue::String("bitrate".into()),
1252            PickleValue::Int(br as i64),
1253        )),
1254        None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
1255    }
1256
1257    match s.ifac_size {
1258        Some(sz) => dict.push((
1259            PickleValue::String("ifac_size".into()),
1260            PickleValue::Int(sz as i64),
1261        )),
1262        None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
1263    }
1264
1265    PickleValue::Dict(dict)
1266}
1267
1268fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
1269    let list: Vec<PickleValue> = entries
1270        .iter()
1271        .map(|e| {
1272            PickleValue::Dict(vec![
1273                (
1274                    PickleValue::String("hash".into()),
1275                    PickleValue::Bytes(e.hash.to_vec()),
1276                ),
1277                (
1278                    PickleValue::String("timestamp".into()),
1279                    PickleValue::Float(e.timestamp),
1280                ),
1281                (
1282                    PickleValue::String("via".into()),
1283                    PickleValue::Bytes(e.via.to_vec()),
1284                ),
1285                (
1286                    PickleValue::String("hops".into()),
1287                    PickleValue::Int(e.hops as i64),
1288                ),
1289                (
1290                    PickleValue::String("expires".into()),
1291                    PickleValue::Float(e.expires),
1292                ),
1293                (
1294                    PickleValue::String("interface".into()),
1295                    PickleValue::String(e.interface_name.clone()),
1296                ),
1297            ])
1298        })
1299        .collect();
1300    PickleValue::List(list)
1301}
1302
1303fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
1304    let list: Vec<PickleValue> = entries
1305        .iter()
1306        .map(|e| {
1307            PickleValue::Dict(vec![
1308                (
1309                    PickleValue::String("hash".into()),
1310                    PickleValue::Bytes(e.hash.to_vec()),
1311                ),
1312                (
1313                    PickleValue::String("last".into()),
1314                    PickleValue::Float(e.last),
1315                ),
1316                (
1317                    PickleValue::String("rate_violations".into()),
1318                    PickleValue::Int(e.rate_violations as i64),
1319                ),
1320                (
1321                    PickleValue::String("blocked_until".into()),
1322                    PickleValue::Float(e.blocked_until),
1323                ),
1324                (
1325                    PickleValue::String("timestamps".into()),
1326                    PickleValue::List(
1327                        e.timestamps
1328                            .iter()
1329                            .map(|&t| PickleValue::Float(t))
1330                            .collect(),
1331                    ),
1332                ),
1333            ])
1334        })
1335        .collect();
1336    PickleValue::List(list)
1337}
1338
1339fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
1340    let list: Vec<PickleValue> = entries
1341        .iter()
1342        .map(|e| {
1343            let mut dict = vec![
1344                (
1345                    PickleValue::String("identity_hash".into()),
1346                    PickleValue::Bytes(e.identity_hash.to_vec()),
1347                ),
1348                (
1349                    PickleValue::String("created".into()),
1350                    PickleValue::Float(e.created),
1351                ),
1352                (
1353                    PickleValue::String("expires".into()),
1354                    PickleValue::Float(e.expires),
1355                ),
1356            ];
1357            if let Some(ref reason) = e.reason {
1358                dict.push((
1359                    PickleValue::String("reason".into()),
1360                    PickleValue::String(reason.clone()),
1361                ));
1362            } else {
1363                dict.push((PickleValue::String("reason".into()), PickleValue::None));
1364            }
1365            PickleValue::Dict(dict)
1366        })
1367        .collect();
1368    PickleValue::List(list)
1369}
1370
1371fn discovered_interfaces_to_pickle(
1372    interfaces: &[crate::discovery::DiscoveredInterface],
1373) -> PickleValue {
1374    let list: Vec<PickleValue> = interfaces
1375        .iter()
1376        .map(|iface| {
1377            let mut dict = vec![
1378                (
1379                    PickleValue::String("type".into()),
1380                    PickleValue::String(iface.interface_type.clone()),
1381                ),
1382                (
1383                    PickleValue::String("transport".into()),
1384                    PickleValue::Bool(iface.transport),
1385                ),
1386                (
1387                    PickleValue::String("name".into()),
1388                    PickleValue::String(iface.name.clone()),
1389                ),
1390                (
1391                    PickleValue::String("discovered".into()),
1392                    PickleValue::Float(iface.discovered),
1393                ),
1394                (
1395                    PickleValue::String("last_heard".into()),
1396                    PickleValue::Float(iface.last_heard),
1397                ),
1398                (
1399                    PickleValue::String("heard_count".into()),
1400                    PickleValue::Int(iface.heard_count as i64),
1401                ),
1402                (
1403                    PickleValue::String("status".into()),
1404                    PickleValue::String(iface.status.as_str().into()),
1405                ),
1406                (
1407                    PickleValue::String("stamp".into()),
1408                    PickleValue::Bytes(iface.stamp.clone()),
1409                ),
1410                (
1411                    PickleValue::String("value".into()),
1412                    PickleValue::Int(iface.stamp_value as i64),
1413                ),
1414                (
1415                    PickleValue::String("transport_id".into()),
1416                    PickleValue::Bytes(iface.transport_id.to_vec()),
1417                ),
1418                (
1419                    PickleValue::String("network_id".into()),
1420                    PickleValue::Bytes(iface.network_id.to_vec()),
1421                ),
1422                (
1423                    PickleValue::String("hops".into()),
1424                    PickleValue::Int(iface.hops as i64),
1425                ),
1426            ];
1427
1428            // Optional location fields
1429            if let Some(v) = iface.latitude {
1430                dict.push((
1431                    PickleValue::String("latitude".into()),
1432                    PickleValue::Float(v),
1433                ));
1434            } else {
1435                dict.push((PickleValue::String("latitude".into()), PickleValue::None));
1436            }
1437            if let Some(v) = iface.longitude {
1438                dict.push((
1439                    PickleValue::String("longitude".into()),
1440                    PickleValue::Float(v),
1441                ));
1442            } else {
1443                dict.push((PickleValue::String("longitude".into()), PickleValue::None));
1444            }
1445            if let Some(v) = iface.height {
1446                dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
1447            } else {
1448                dict.push((PickleValue::String("height".into()), PickleValue::None));
1449            }
1450
1451            // Connection info
1452            if let Some(ref v) = iface.reachable_on {
1453                dict.push((
1454                    PickleValue::String("reachable_on".into()),
1455                    PickleValue::String(v.clone()),
1456                ));
1457            } else {
1458                dict.push((
1459                    PickleValue::String("reachable_on".into()),
1460                    PickleValue::None,
1461                ));
1462            }
1463            if let Some(v) = iface.port {
1464                dict.push((
1465                    PickleValue::String("port".into()),
1466                    PickleValue::Int(v as i64),
1467                ));
1468            } else {
1469                dict.push((PickleValue::String("port".into()), PickleValue::None));
1470            }
1471
1472            // RNode/RF specific
1473            if let Some(v) = iface.frequency {
1474                dict.push((
1475                    PickleValue::String("frequency".into()),
1476                    PickleValue::Int(v as i64),
1477                ));
1478            } else {
1479                dict.push((PickleValue::String("frequency".into()), PickleValue::None));
1480            }
1481            if let Some(v) = iface.bandwidth {
1482                dict.push((
1483                    PickleValue::String("bandwidth".into()),
1484                    PickleValue::Int(v as i64),
1485                ));
1486            } else {
1487                dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
1488            }
1489            if let Some(v) = iface.spreading_factor {
1490                dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
1491            } else {
1492                dict.push((PickleValue::String("sf".into()), PickleValue::None));
1493            }
1494            if let Some(v) = iface.coding_rate {
1495                dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
1496            } else {
1497                dict.push((PickleValue::String("cr".into()), PickleValue::None));
1498            }
1499            if let Some(ref v) = iface.modulation {
1500                dict.push((
1501                    PickleValue::String("modulation".into()),
1502                    PickleValue::String(v.clone()),
1503                ));
1504            } else {
1505                dict.push((PickleValue::String("modulation".into()), PickleValue::None));
1506            }
1507            if let Some(v) = iface.channel {
1508                dict.push((
1509                    PickleValue::String("channel".into()),
1510                    PickleValue::Int(v as i64),
1511                ));
1512            } else {
1513                dict.push((PickleValue::String("channel".into()), PickleValue::None));
1514            }
1515
1516            // IFAC info
1517            if let Some(ref v) = iface.ifac_netname {
1518                dict.push((
1519                    PickleValue::String("ifac_netname".into()),
1520                    PickleValue::String(v.clone()),
1521                ));
1522            } else {
1523                dict.push((
1524                    PickleValue::String("ifac_netname".into()),
1525                    PickleValue::None,
1526                ));
1527            }
1528            if let Some(ref v) = iface.ifac_netkey {
1529                dict.push((
1530                    PickleValue::String("ifac_netkey".into()),
1531                    PickleValue::String(v.clone()),
1532                ));
1533            } else {
1534                dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
1535            }
1536
1537            // Config entry
1538            if let Some(ref v) = iface.config_entry {
1539                dict.push((
1540                    PickleValue::String("config_entry".into()),
1541                    PickleValue::String(v.clone()),
1542                ));
1543            } else {
1544                dict.push((
1545                    PickleValue::String("config_entry".into()),
1546                    PickleValue::None,
1547                ));
1548            }
1549
1550            dict.push((
1551                PickleValue::String("discovery_hash".into()),
1552                PickleValue::Bytes(iface.discovery_hash.to_vec()),
1553            ));
1554
1555            PickleValue::Dict(dict)
1556        })
1557        .collect();
1558    PickleValue::List(list)
1559}
1560
1561fn hooks_to_pickle(hooks: &[HookInfo]) -> PickleValue {
1562    PickleValue::List(
1563        hooks
1564            .iter()
1565            .map(|hook| {
1566                PickleValue::Dict(vec![
1567                    (
1568                        PickleValue::String("name".into()),
1569                        PickleValue::String(hook.name.clone()),
1570                    ),
1571                    (
1572                        PickleValue::String("type".into()),
1573                        PickleValue::String(hook.hook_type.clone()),
1574                    ),
1575                    (
1576                        PickleValue::String("attach_point".into()),
1577                        PickleValue::String(hook.attach_point.clone()),
1578                    ),
1579                    (
1580                        PickleValue::String("priority".into()),
1581                        PickleValue::Int(hook.priority as i64),
1582                    ),
1583                    (
1584                        PickleValue::String("enabled".into()),
1585                        PickleValue::Bool(hook.enabled),
1586                    ),
1587                    (
1588                        PickleValue::String("consecutive_traps".into()),
1589                        PickleValue::Int(hook.consecutive_traps as i64),
1590                    ),
1591                ])
1592            })
1593            .collect(),
1594    )
1595}
1596
1597fn backbone_peer_state_to_pickle(entries: &[BackbonePeerStateEntry]) -> PickleValue {
1598    PickleValue::List(
1599        entries
1600            .iter()
1601            .map(|entry| {
1602                PickleValue::Dict(vec![
1603                    (
1604                        PickleValue::String("interface".into()),
1605                        PickleValue::String(entry.interface_name.clone()),
1606                    ),
1607                    (
1608                        PickleValue::String("ip".into()),
1609                        PickleValue::String(entry.peer_ip.to_string()),
1610                    ),
1611                    (
1612                        PickleValue::String("connected_count".into()),
1613                        PickleValue::Int(entry.connected_count as i64),
1614                    ),
1615                    (
1616                        PickleValue::String("blacklisted_remaining_secs".into()),
1617                        entry
1618                            .blacklisted_remaining_secs
1619                            .map(PickleValue::Float)
1620                            .unwrap_or(PickleValue::None),
1621                    ),
1622                    (
1623                        PickleValue::String("blacklist_reason".into()),
1624                        entry
1625                            .blacklist_reason
1626                            .as_ref()
1627                            .map(|v: &String| PickleValue::String(v.clone()))
1628                            .unwrap_or(PickleValue::None),
1629                    ),
1630                    (
1631                        PickleValue::String("reject_count".into()),
1632                        PickleValue::Int(entry.reject_count as i64),
1633                    ),
1634                ])
1635            })
1636            .collect(),
1637    )
1638}
1639
1640fn backbone_interfaces_to_pickle(entries: &[BackboneInterfaceEntry]) -> PickleValue {
1641    PickleValue::List(
1642        entries
1643            .iter()
1644            .map(|entry| {
1645                PickleValue::Dict(vec![
1646                    (
1647                        PickleValue::String("id".into()),
1648                        PickleValue::Int(entry.interface_id.0 as i64),
1649                    ),
1650                    (
1651                        PickleValue::String("name".into()),
1652                        PickleValue::String(entry.interface_name.clone()),
1653                    ),
1654                ])
1655            })
1656            .collect(),
1657    )
1658}
1659
1660fn provider_bridge_stats_to_pickle(stats: &ProviderBridgeStats) -> PickleValue {
1661    PickleValue::Dict(vec![
1662        (
1663            PickleValue::String("connected".into()),
1664            PickleValue::Bool(stats.connected),
1665        ),
1666        (
1667            PickleValue::String("consumer_count".into()),
1668            PickleValue::Int(stats.consumer_count as i64),
1669        ),
1670        (
1671            PickleValue::String("queue_max_events".into()),
1672            PickleValue::Int(stats.queue_max_events as i64),
1673        ),
1674        (
1675            PickleValue::String("queue_max_bytes".into()),
1676            PickleValue::Int(stats.queue_max_bytes as i64),
1677        ),
1678        (
1679            PickleValue::String("backlog_len".into()),
1680            PickleValue::Int(stats.backlog_len as i64),
1681        ),
1682        (
1683            PickleValue::String("backlog_bytes".into()),
1684            PickleValue::Int(stats.backlog_bytes as i64),
1685        ),
1686        (
1687            PickleValue::String("backlog_dropped_pending".into()),
1688            PickleValue::Int(stats.backlog_dropped_pending as i64),
1689        ),
1690        (
1691            PickleValue::String("backlog_dropped_total".into()),
1692            PickleValue::Int(stats.backlog_dropped_total as i64),
1693        ),
1694        (
1695            PickleValue::String("total_disconnect_count".into()),
1696            PickleValue::Int(stats.total_disconnect_count as i64),
1697        ),
1698        (
1699            PickleValue::String("consumers".into()),
1700            PickleValue::List(
1701                stats
1702                    .consumers
1703                    .iter()
1704                    .map(|consumer| {
1705                        PickleValue::Dict(vec![
1706                            (
1707                                PickleValue::String("id".into()),
1708                                PickleValue::Int(consumer.id as i64),
1709                            ),
1710                            (
1711                                PickleValue::String("connected".into()),
1712                                PickleValue::Bool(consumer.connected),
1713                            ),
1714                            (
1715                                PickleValue::String("queue_len".into()),
1716                                PickleValue::Int(consumer.queue_len as i64),
1717                            ),
1718                            (
1719                                PickleValue::String("queued_bytes".into()),
1720                                PickleValue::Int(consumer.queued_bytes as i64),
1721                            ),
1722                            (
1723                                PickleValue::String("dropped_pending".into()),
1724                                PickleValue::Int(consumer.dropped_pending as i64),
1725                            ),
1726                            (
1727                                PickleValue::String("dropped_total".into()),
1728                                PickleValue::Int(consumer.dropped_total as i64),
1729                            ),
1730                            (
1731                                PickleValue::String("queue_max_events".into()),
1732                                PickleValue::Int(consumer.queue_max_events as i64),
1733                            ),
1734                            (
1735                                PickleValue::String("queue_max_bytes".into()),
1736                                PickleValue::Int(consumer.queue_max_bytes as i64),
1737                            ),
1738                        ])
1739                    })
1740                    .collect(),
1741            ),
1742        ),
1743    ])
1744}
1745
1746fn lifecycle_state_name(state: LifecycleState) -> &'static str {
1747    match state {
1748        LifecycleState::Active => "active",
1749        LifecycleState::Draining => "draining",
1750        LifecycleState::Stopping => "stopping",
1751        LifecycleState::Stopped => "stopped",
1752    }
1753}
1754
1755fn drain_status_to_pickle(status: &DrainStatus) -> PickleValue {
1756    PickleValue::Dict(vec![
1757        (
1758            PickleValue::String("state".into()),
1759            PickleValue::String(lifecycle_state_name(status.state).into()),
1760        ),
1761        (
1762            PickleValue::String("drain_age_seconds".into()),
1763            status
1764                .drain_age_seconds
1765                .map(PickleValue::Float)
1766                .unwrap_or(PickleValue::None),
1767        ),
1768        (
1769            PickleValue::String("deadline_remaining_seconds".into()),
1770            status
1771                .deadline_remaining_seconds
1772                .map(PickleValue::Float)
1773                .unwrap_or(PickleValue::None),
1774        ),
1775        (
1776            PickleValue::String("drain_complete".into()),
1777            PickleValue::Bool(status.drain_complete),
1778        ),
1779        (
1780            PickleValue::String("interface_writer_queued_frames".into()),
1781            PickleValue::Int(status.interface_writer_queued_frames as i64),
1782        ),
1783        (
1784            PickleValue::String("provider_backlog_events".into()),
1785            PickleValue::Int(status.provider_backlog_events as i64),
1786        ),
1787        (
1788            PickleValue::String("provider_consumer_queued_events".into()),
1789            PickleValue::Int(status.provider_consumer_queued_events as i64),
1790        ),
1791        (
1792            PickleValue::String("detail".into()),
1793            status
1794                .detail
1795                .as_ref()
1796                .map(|detail| PickleValue::String(detail.clone()))
1797                .unwrap_or(PickleValue::None),
1798        ),
1799    ])
1800}
1801
1802fn runtime_config_value_to_pickle(value: &RuntimeConfigValue) -> PickleValue {
1803    match value {
1804        RuntimeConfigValue::Int(v) => PickleValue::Int(*v),
1805        RuntimeConfigValue::Float(v) => PickleValue::Float(*v),
1806        RuntimeConfigValue::Bool(v) => PickleValue::Bool(*v),
1807        RuntimeConfigValue::String(v) => PickleValue::String(v.clone()),
1808        RuntimeConfigValue::Null => PickleValue::None,
1809    }
1810}
1811
1812fn runtime_config_value_from_pickle(value: &PickleValue) -> Option<RuntimeConfigValue> {
1813    match value {
1814        PickleValue::Int(v) => Some(RuntimeConfigValue::Int(*v)),
1815        PickleValue::Float(v) => Some(RuntimeConfigValue::Float(*v)),
1816        PickleValue::Bool(v) => Some(RuntimeConfigValue::Bool(*v)),
1817        PickleValue::String(v) => Some(RuntimeConfigValue::String(v.clone())),
1818        PickleValue::None => Some(RuntimeConfigValue::Null),
1819        _ => None,
1820    }
1821}
1822
1823fn runtime_config_entry_to_pickle(entry: &RuntimeConfigEntry) -> PickleValue {
1824    PickleValue::Dict(vec![
1825        (
1826            PickleValue::String("key".into()),
1827            PickleValue::String(entry.key.clone()),
1828        ),
1829        (
1830            PickleValue::String("value".into()),
1831            runtime_config_value_to_pickle(&entry.value),
1832        ),
1833        (
1834            PickleValue::String("default".into()),
1835            runtime_config_value_to_pickle(&entry.default),
1836        ),
1837        (
1838            PickleValue::String("source".into()),
1839            PickleValue::String(match entry.source {
1840                RuntimeConfigSource::Startup => "startup".into(),
1841                RuntimeConfigSource::RuntimeOverride => "runtime_override".into(),
1842            }),
1843        ),
1844        (
1845            PickleValue::String("apply_mode".into()),
1846            PickleValue::String(match entry.apply_mode {
1847                RuntimeConfigApplyMode::Immediate => "immediate".into(),
1848                RuntimeConfigApplyMode::NewConnectionsOnly => "new_connections_only".into(),
1849                RuntimeConfigApplyMode::NextReconnect => "next_reconnect".into(),
1850                RuntimeConfigApplyMode::RestartRequired => "restart_required".into(),
1851            }),
1852        ),
1853        (
1854            PickleValue::String("description".into()),
1855            entry
1856                .description
1857                .as_ref()
1858                .map(|v| PickleValue::String(v.clone()))
1859                .unwrap_or(PickleValue::None),
1860        ),
1861    ])
1862}
1863
1864fn runtime_config_list_to_pickle(entries: &[RuntimeConfigEntry]) -> PickleValue {
1865    PickleValue::List(entries.iter().map(runtime_config_entry_to_pickle).collect())
1866}
1867
1868fn runtime_config_error_to_pickle(error: &RuntimeConfigError) -> PickleValue {
1869    PickleValue::Dict(vec![
1870        (
1871            PickleValue::String("error".into()),
1872            PickleValue::String(match error.code {
1873                RuntimeConfigErrorCode::UnknownKey => "unknown_key".into(),
1874                RuntimeConfigErrorCode::InvalidType => "invalid_type".into(),
1875                RuntimeConfigErrorCode::InvalidValue => "invalid_value".into(),
1876                RuntimeConfigErrorCode::Unsupported => "unsupported".into(),
1877                RuntimeConfigErrorCode::NotFound => "not_found".into(),
1878                RuntimeConfigErrorCode::ApplyFailed => "apply_failed".into(),
1879            }),
1880        ),
1881        (
1882            PickleValue::String("message".into()),
1883            PickleValue::String(error.message.clone()),
1884        ),
1885    ])
1886}
1887
1888fn runtime_config_result_to_pickle(
1889    result: Result<RuntimeConfigEntry, RuntimeConfigError>,
1890) -> PickleValue {
1891    match result {
1892        Ok(entry) => runtime_config_entry_to_pickle(&entry),
1893        Err(error) => runtime_config_error_to_pickle(&error),
1894    }
1895}
1896
1897fn known_destination_entry_to_pickle(entry: &KnownDestinationEntry) -> PickleValue {
1898    PickleValue::Dict(vec![
1899        (
1900            PickleValue::String("dest_hash".into()),
1901            PickleValue::Bytes(entry.dest_hash.to_vec()),
1902        ),
1903        (
1904            PickleValue::String("identity_hash".into()),
1905            PickleValue::Bytes(entry.identity_hash.to_vec()),
1906        ),
1907        (
1908            PickleValue::String("public_key".into()),
1909            PickleValue::Bytes(entry.public_key.to_vec()),
1910        ),
1911        (
1912            PickleValue::String("app_data".into()),
1913            entry
1914                .app_data
1915                .as_ref()
1916                .map(|data: &Vec<u8>| PickleValue::Bytes(data.clone()))
1917                .unwrap_or(PickleValue::None),
1918        ),
1919        (
1920            PickleValue::String("hops".into()),
1921            PickleValue::Int(entry.hops as i64),
1922        ),
1923        (
1924            PickleValue::String("received_at".into()),
1925            PickleValue::Float(entry.received_at),
1926        ),
1927        (
1928            PickleValue::String("receiving_interface".into()),
1929            PickleValue::Int(entry.receiving_interface.0 as i64),
1930        ),
1931        (
1932            PickleValue::String("was_used".into()),
1933            PickleValue::Bool(entry.was_used),
1934        ),
1935        (
1936            PickleValue::String("last_used_at".into()),
1937            entry
1938                .last_used_at
1939                .map(PickleValue::Float)
1940                .unwrap_or(PickleValue::None),
1941        ),
1942        (
1943            PickleValue::String("retained".into()),
1944            PickleValue::Bool(entry.retained),
1945        ),
1946    ])
1947}
1948
1949fn known_destinations_to_pickle(entries: &[KnownDestinationEntry]) -> PickleValue {
1950    PickleValue::List(
1951        entries
1952            .iter()
1953            .map(known_destination_entry_to_pickle)
1954            .collect(),
1955    )
1956}
1957
1958fn parse_known_destination_entry(value: &PickleValue) -> io::Result<KnownDestinationEntry> {
1959    let get_bytes = |key: &str, len: usize| -> io::Result<Vec<u8>> {
1960        let value = value.get(key).ok_or_else(|| {
1961            io::Error::new(io::ErrorKind::InvalidData, format!("missing {}", key))
1962        })?;
1963        let bytes = value.as_bytes().ok_or_else(|| {
1964            io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key))
1965        })?;
1966        if bytes.len() != len {
1967            return Err(io::Error::new(
1968                io::ErrorKind::InvalidData,
1969                format!("invalid {} length", key),
1970            ));
1971        }
1972        Ok(bytes.to_vec())
1973    };
1974    let get_int = |key: &str| -> io::Result<i64> {
1975        value
1976            .get(key)
1977            .and_then(|v| v.as_int())
1978            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1979    };
1980    let get_float = |key: &str| -> io::Result<f64> {
1981        value
1982            .get(key)
1983            .and_then(|v| v.as_float())
1984            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1985    };
1986    let get_bool = |key: &str| -> io::Result<bool> {
1987        value
1988            .get(key)
1989            .and_then(|v| v.as_bool())
1990            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, format!("invalid {}", key)))
1991    };
1992
1993    let mut dest_hash = [0u8; 16];
1994    dest_hash.copy_from_slice(&get_bytes("dest_hash", 16)?);
1995    let mut identity_hash = [0u8; 16];
1996    identity_hash.copy_from_slice(&get_bytes("identity_hash", 16)?);
1997    let mut public_key = [0u8; 64];
1998    public_key.copy_from_slice(&get_bytes("public_key", 64)?);
1999    let app_data = value
2000        .get("app_data")
2001        .and_then(|v| v.as_bytes())
2002        .map(|bytes| bytes.to_vec());
2003    let last_used_at = value.get("last_used_at").and_then(|v| v.as_float());
2004
2005    Ok(KnownDestinationEntry {
2006        dest_hash,
2007        identity_hash,
2008        public_key,
2009        app_data,
2010        hops: get_int("hops")? as u8,
2011        received_at: get_float("received_at")?,
2012        receiving_interface: rns_core::transport::types::InterfaceId(
2013            get_int("receiving_interface")? as u64,
2014        ),
2015        was_used: get_bool("was_used")?,
2016        last_used_at,
2017        retained: get_bool("retained")?,
2018    })
2019}
2020
2021fn parse_known_destination_list(value: &PickleValue) -> io::Result<Vec<KnownDestinationEntry>> {
2022    let list = value
2023        .as_list()
2024        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "expected list"))?;
2025    list.iter().map(parse_known_destination_entry).collect()
2026}
2027
2028// --- RPC Client ---
2029
2030/// RPC client for connecting to a running daemon.
2031pub struct RpcClient {
2032    stream: TcpStream,
2033}
2034
2035impl RpcClient {
2036    /// Connect to an RPC server and authenticate.
2037    pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
2038        let mut stream = match addr {
2039            RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
2040        };
2041
2042        stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
2043        stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
2044
2045        // Client-side authentication
2046        client_auth(&mut stream, auth_key)?;
2047
2048        Ok(RpcClient { stream })
2049    }
2050
2051    /// Send an RPC request and receive a response.
2052    pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
2053        let request_bytes = encode_rpc_payload(request, RpcCodec::Msgpack);
2054        send_bytes(&mut self.stream, &request_bytes)?;
2055
2056        let response_bytes = recv_bytes(&mut self.stream)?;
2057        decode_rpc_payload(&response_bytes).map(|(value, _)| value)
2058    }
2059
2060    pub fn list_hooks(&mut self) -> io::Result<Vec<HookInfo>> {
2061        let response = self.call(&PickleValue::Dict(vec![(
2062            PickleValue::String("get".into()),
2063            PickleValue::String("hooks".into()),
2064        )]))?;
2065        parse_hook_list(&response)
2066    }
2067
2068    pub fn begin_drain(&mut self, timeout: Duration) -> io::Result<bool> {
2069        let response = self.call(&PickleValue::Dict(vec![(
2070            PickleValue::String("begin_drain".into()),
2071            PickleValue::Float(timeout.as_secs_f64()),
2072        )]))?;
2073        Ok(response.as_bool().unwrap_or(false))
2074    }
2075
2076    pub fn drain_status(&mut self) -> io::Result<Option<DrainStatus>> {
2077        let response = self.call(&PickleValue::Dict(vec![(
2078            PickleValue::String("get".into()),
2079            PickleValue::String("drain_status".into()),
2080        )]))?;
2081        parse_drain_status(&response)
2082    }
2083
2084    pub fn provider_bridge_stats(&mut self) -> io::Result<PickleValue> {
2085        self.call(&PickleValue::Dict(vec![(
2086            PickleValue::String("get".into()),
2087            PickleValue::String("provider_bridge_stats".into()),
2088        )]))
2089    }
2090
2091    pub fn load_hook(
2092        &mut self,
2093        name: &str,
2094        attach_point: &str,
2095        priority: i32,
2096        wasm: &[u8],
2097    ) -> io::Result<Result<(), String>> {
2098        let response = self.call(&PickleValue::Dict(vec![
2099            (
2100                PickleValue::String("hook".into()),
2101                PickleValue::String("load".into()),
2102            ),
2103            (
2104                PickleValue::String("name".into()),
2105                PickleValue::String(name.to_string()),
2106            ),
2107            (
2108                PickleValue::String("attach_point".into()),
2109                PickleValue::String(attach_point.to_string()),
2110            ),
2111            (
2112                PickleValue::String("priority".into()),
2113                PickleValue::Int(priority as i64),
2114            ),
2115            (
2116                PickleValue::String("wasm".into()),
2117                PickleValue::Bytes(wasm.to_vec()),
2118            ),
2119        ]))?;
2120        parse_hook_result(&response)
2121    }
2122
2123    pub fn load_builtin_hook(
2124        &mut self,
2125        name: &str,
2126        attach_point: &str,
2127        priority: i32,
2128        builtin_id: &str,
2129    ) -> io::Result<Result<(), String>> {
2130        let response = self.call(&PickleValue::Dict(vec![
2131            (
2132                PickleValue::String("hook".into()),
2133                PickleValue::String("load_builtin".into()),
2134            ),
2135            (
2136                PickleValue::String("name".into()),
2137                PickleValue::String(name.to_string()),
2138            ),
2139            (
2140                PickleValue::String("attach_point".into()),
2141                PickleValue::String(attach_point.to_string()),
2142            ),
2143            (
2144                PickleValue::String("priority".into()),
2145                PickleValue::Int(priority as i64),
2146            ),
2147            (
2148                PickleValue::String("builtin_id".into()),
2149                PickleValue::String(builtin_id.to_string()),
2150            ),
2151        ]))?;
2152        parse_hook_result(&response)
2153    }
2154
2155    pub fn unload_hook(
2156        &mut self,
2157        name: &str,
2158        attach_point: &str,
2159    ) -> io::Result<Result<(), String>> {
2160        let response = self.call(&PickleValue::Dict(vec![
2161            (
2162                PickleValue::String("hook".into()),
2163                PickleValue::String("unload".into()),
2164            ),
2165            (
2166                PickleValue::String("name".into()),
2167                PickleValue::String(name.to_string()),
2168            ),
2169            (
2170                PickleValue::String("attach_point".into()),
2171                PickleValue::String(attach_point.to_string()),
2172            ),
2173        ]))?;
2174        parse_hook_result(&response)
2175    }
2176
2177    pub fn set_hook_enabled(
2178        &mut self,
2179        name: &str,
2180        attach_point: &str,
2181        enabled: bool,
2182    ) -> io::Result<Result<(), String>> {
2183        let op = if enabled { "enable" } else { "disable" };
2184        let response = self.call(&PickleValue::Dict(vec![
2185            (
2186                PickleValue::String("hook".into()),
2187                PickleValue::String(op.into()),
2188            ),
2189            (
2190                PickleValue::String("name".into()),
2191                PickleValue::String(name.to_string()),
2192            ),
2193            (
2194                PickleValue::String("attach_point".into()),
2195                PickleValue::String(attach_point.to_string()),
2196            ),
2197        ]))?;
2198        parse_hook_result(&response)
2199    }
2200
2201    pub fn set_hook_priority(
2202        &mut self,
2203        name: &str,
2204        attach_point: &str,
2205        priority: i32,
2206    ) -> io::Result<Result<(), String>> {
2207        let response = self.call(&PickleValue::Dict(vec![
2208            (
2209                PickleValue::String("hook".into()),
2210                PickleValue::String("set_priority".into()),
2211            ),
2212            (
2213                PickleValue::String("name".into()),
2214                PickleValue::String(name.to_string()),
2215            ),
2216            (
2217                PickleValue::String("attach_point".into()),
2218                PickleValue::String(attach_point.to_string()),
2219            ),
2220            (
2221                PickleValue::String("priority".into()),
2222                PickleValue::Int(priority as i64),
2223            ),
2224        ]))?;
2225        parse_hook_result(&response)
2226    }
2227
2228    pub fn blacklist_backbone_peer(
2229        &mut self,
2230        interface: &str,
2231        ip: &str,
2232        duration_secs: u64,
2233        reason: Option<&str>,
2234        penalty_level: Option<u8>,
2235    ) -> io::Result<bool> {
2236        let mut request = vec![
2237            (
2238                PickleValue::String("set".into()),
2239                PickleValue::String("backbone_peer_blacklist".into()),
2240            ),
2241            (
2242                PickleValue::String("interface".into()),
2243                PickleValue::String(interface.to_string()),
2244            ),
2245            (
2246                PickleValue::String("ip".into()),
2247                PickleValue::String(ip.to_string()),
2248            ),
2249            (
2250                PickleValue::String("duration_secs".into()),
2251                PickleValue::Int(duration_secs as i64),
2252            ),
2253        ];
2254        if let Some(reason) = reason {
2255            request.push((
2256                PickleValue::String("reason".into()),
2257                PickleValue::String(reason.to_string()),
2258            ));
2259        }
2260        if let Some(level) = penalty_level {
2261            request.push((
2262                PickleValue::String("penalty_level".into()),
2263                PickleValue::Int(level as i64),
2264            ));
2265        }
2266        let response = self.call(&PickleValue::Dict(request))?;
2267        Ok(response.as_bool().unwrap_or(false))
2268    }
2269
2270    pub fn known_destinations(&mut self) -> io::Result<Vec<KnownDestinationEntry>> {
2271        let response = self.call(&PickleValue::Dict(vec![(
2272            PickleValue::String("get".into()),
2273            PickleValue::String("known_destinations".into()),
2274        )]))?;
2275        parse_known_destination_list(&response)
2276    }
2277
2278    pub fn retain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2279        let response = self.call(&PickleValue::Dict(vec![
2280            (
2281                PickleValue::String("set".into()),
2282                PickleValue::String("known_destination_retained".into()),
2283            ),
2284            (
2285                PickleValue::String("dest_hash".into()),
2286                PickleValue::Bytes(dest_hash.to_vec()),
2287            ),
2288        ]))?;
2289        Ok(response.as_bool().unwrap_or(false))
2290    }
2291
2292    pub fn retain_identity(&mut self, identity_hash: [u8; 16]) -> io::Result<bool> {
2293        let response = self.call(&PickleValue::Dict(vec![
2294            (
2295                PickleValue::String("set".into()),
2296                PickleValue::String("identity_retained".into()),
2297            ),
2298            (
2299                PickleValue::String("identity_hash".into()),
2300                PickleValue::Bytes(identity_hash.to_vec()),
2301            ),
2302        ]))?;
2303        Ok(response.as_bool().unwrap_or(false))
2304    }
2305
2306    pub fn unretain_known_destination(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2307        let response = self.call(&PickleValue::Dict(vec![
2308            (
2309                PickleValue::String("clear".into()),
2310                PickleValue::String("known_destination_retained".into()),
2311            ),
2312            (
2313                PickleValue::String("dest_hash".into()),
2314                PickleValue::Bytes(dest_hash.to_vec()),
2315            ),
2316        ]))?;
2317        Ok(response.as_bool().unwrap_or(false))
2318    }
2319
2320    pub fn mark_known_destination_used(&mut self, dest_hash: [u8; 16]) -> io::Result<bool> {
2321        let response = self.call(&PickleValue::Dict(vec![
2322            (
2323                PickleValue::String("set".into()),
2324                PickleValue::String("known_destination_used".into()),
2325            ),
2326            (
2327                PickleValue::String("dest_hash".into()),
2328                PickleValue::Bytes(dest_hash.to_vec()),
2329            ),
2330        ]))?;
2331        Ok(response.as_bool().unwrap_or(false))
2332    }
2333}
2334
2335fn parse_lifecycle_state(value: &str) -> Option<LifecycleState> {
2336    match value {
2337        "active" => Some(LifecycleState::Active),
2338        "draining" => Some(LifecycleState::Draining),
2339        "stopping" => Some(LifecycleState::Stopping),
2340        "stopped" => Some(LifecycleState::Stopped),
2341        _ => None,
2342    }
2343}
2344
2345fn parse_drain_status(value: &PickleValue) -> io::Result<Option<DrainStatus>> {
2346    if !matches!(value, PickleValue::Dict(_)) {
2347        return Ok(None);
2348    }
2349    let state = value
2350        .get("state")
2351        .and_then(|entry| entry.as_str())
2352        .and_then(parse_lifecycle_state)
2353        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing drain state"))?;
2354    let drain_age_seconds = value.get("drain_age_seconds").and_then(|entry| {
2355        entry
2356            .as_float()
2357            .or_else(|| entry.as_int().map(|v| v as f64))
2358    });
2359    let deadline_remaining_seconds = value.get("deadline_remaining_seconds").and_then(|entry| {
2360        entry
2361            .as_float()
2362            .or_else(|| entry.as_int().map(|v| v as f64))
2363    });
2364    let drain_complete = value
2365        .get("drain_complete")
2366        .and_then(|entry| entry.as_bool())
2367        .unwrap_or(false);
2368    let interface_writer_queued_frames = value
2369        .get("interface_writer_queued_frames")
2370        .and_then(|entry| entry.as_int())
2371        .unwrap_or(0)
2372        .max(0) as usize;
2373    let provider_backlog_events = value
2374        .get("provider_backlog_events")
2375        .and_then(|entry| entry.as_int())
2376        .unwrap_or(0)
2377        .max(0) as usize;
2378    let provider_consumer_queued_events = value
2379        .get("provider_consumer_queued_events")
2380        .and_then(|entry| entry.as_int())
2381        .unwrap_or(0)
2382        .max(0) as usize;
2383    let detail = value
2384        .get("detail")
2385        .and_then(|entry| entry.as_str().map(|v| v.to_string()));
2386    Ok(Some(DrainStatus {
2387        state,
2388        drain_age_seconds,
2389        deadline_remaining_seconds,
2390        drain_complete,
2391        interface_writer_queued_frames,
2392        provider_backlog_events,
2393        provider_consumer_queued_events,
2394        detail,
2395    }))
2396}
2397
2398fn parse_hook_result(response: &PickleValue) -> io::Result<Result<(), String>> {
2399    let ok = response
2400        .get("ok")
2401        .and_then(|v| v.as_bool())
2402        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hook response"))?;
2403    if ok {
2404        Ok(Ok(()))
2405    } else {
2406        Ok(Err(response
2407            .get("error")
2408            .and_then(|v| v.as_str())
2409            .unwrap_or("unknown hook error")
2410            .to_string()))
2411    }
2412}
2413
2414fn parse_hook_list(response: &PickleValue) -> io::Result<Vec<HookInfo>> {
2415    let list = response
2416        .as_list()
2417        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid hooks response"))?;
2418    let mut hooks = Vec::with_capacity(list.len());
2419    for item in list {
2420        hooks.push(HookInfo {
2421            name: item
2422                .get("name")
2423                .and_then(|v| v.as_str())
2424                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing hook name"))?
2425                .to_string(),
2426            hook_type: item
2427                .get("type")
2428                .and_then(|v| v.as_str())
2429                .unwrap_or(default_hook_type())
2430                .to_string(),
2431            attach_point: item
2432                .get("attach_point")
2433                .and_then(|v| v.as_str())
2434                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing attach_point"))?
2435                .to_string(),
2436            priority: item
2437                .get("priority")
2438                .and_then(|v| v.as_int())
2439                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing priority"))?
2440                as i32,
2441            enabled: item
2442                .get("enabled")
2443                .and_then(|v| v.as_bool())
2444                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing enabled"))?,
2445            consecutive_traps: item
2446                .get("consecutive_traps")
2447                .and_then(|v| v.as_int())
2448                .ok_or_else(|| {
2449                    io::Error::new(io::ErrorKind::InvalidData, "missing consecutive_traps")
2450                })? as u32,
2451        });
2452    }
2453    Ok(hooks)
2454}
2455
2456fn default_hook_type() -> &'static str {
2457    #[cfg(feature = "rns-hooks-native")]
2458    {
2459        return "native";
2460    }
2461    #[cfg(all(not(feature = "rns-hooks-native"), feature = "rns-hooks-wasm"))]
2462    {
2463        return "wasm";
2464    }
2465    #[cfg(all(not(feature = "rns-hooks-native"), not(feature = "rns-hooks-wasm")))]
2466    {
2467        "wasm"
2468    }
2469}
2470
2471/// Client-side authentication: answer the server's challenge.
2472fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
2473    // Read challenge
2474    let challenge = recv_bytes(stream)?;
2475
2476    if !challenge.starts_with(CHALLENGE_PREFIX) {
2477        return Err(io::Error::new(
2478            io::ErrorKind::InvalidData,
2479            "expected challenge",
2480        ));
2481    }
2482
2483    let message = &challenge[CHALLENGE_PREFIX.len()..];
2484
2485    // Create HMAC response
2486    let response = create_response(auth_key, message);
2487    send_bytes(stream, &response)?;
2488
2489    // Read welcome/failure
2490    let result = recv_bytes(stream)?;
2491    if result == WELCOME {
2492        Ok(())
2493    } else {
2494        Err(io::Error::new(
2495            io::ErrorKind::PermissionDenied,
2496            "authentication failed",
2497        ))
2498    }
2499}
2500
2501/// Create an HMAC response to a challenge message.
2502fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
2503    // Check if message has {sha256} prefix (modern protocol)
2504    if message.starts_with(b"{sha256}") || message.len() > 20 {
2505        // Modern protocol: use HMAC-SHA256 with {sha256} prefix
2506        let digest = hmac_sha256(auth_key, message);
2507        let mut response = Vec::with_capacity(8 + 32);
2508        response.extend_from_slice(b"{sha256}");
2509        response.extend_from_slice(&digest);
2510        response
2511    } else {
2512        // Legacy protocol: raw HMAC-MD5
2513        let digest = hmac_md5(auth_key, message);
2514        digest.to_vec()
2515    }
2516}
2517
2518/// Derive the RPC auth key from transport identity private key.
2519pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
2520    sha256(private_key)
2521}
2522
2523#[cfg(test)]
2524mod tests {
2525    use super::*;
2526
2527    #[test]
2528    fn send_recv_bytes_roundtrip() {
2529        let (mut c1, mut c2) = tcp_pair();
2530        let data = b"hello world";
2531        send_bytes(&mut c1, data).unwrap();
2532        let received = recv_bytes(&mut c2).unwrap();
2533        assert_eq!(&received, data);
2534    }
2535
2536    #[test]
2537    fn send_recv_empty() {
2538        let (mut c1, mut c2) = tcp_pair();
2539        send_bytes(&mut c1, b"").unwrap();
2540        let received = recv_bytes(&mut c2).unwrap();
2541        assert!(received.is_empty());
2542    }
2543
2544    #[test]
2545    fn auth_success() {
2546        let key = derive_auth_key(b"test-private-key");
2547        let (mut server, mut client) = tcp_pair();
2548
2549        let key2 = key;
2550        let t = thread::spawn(move || {
2551            client_auth(&mut client, &key2).unwrap();
2552        });
2553
2554        server_auth(&mut server, &key).unwrap();
2555        t.join().unwrap();
2556    }
2557
2558    #[test]
2559    fn auth_failure_wrong_key() {
2560        let server_key = derive_auth_key(b"server-key");
2561        let client_key = derive_auth_key(b"wrong-key");
2562        let (mut server, mut client) = tcp_pair();
2563
2564        let t = thread::spawn(move || {
2565            let result = client_auth(&mut client, &client_key);
2566            assert!(result.is_err());
2567        });
2568
2569        let result = server_auth(&mut server, &server_key);
2570        assert!(result.is_err());
2571        t.join().unwrap();
2572    }
2573
2574    #[test]
2575    fn verify_sha256_response() {
2576        let key = derive_auth_key(b"mykey");
2577        let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
2578        let response = create_response(&key, message);
2579        assert!(response.starts_with(b"{sha256}"));
2580        assert!(verify_response(&key, message, &response));
2581    }
2582
2583    #[test]
2584    fn verify_legacy_md5_response() {
2585        let key = derive_auth_key(b"mykey");
2586        // Legacy message: 20 bytes, no prefix
2587        let message = b"01234567890123456789";
2588        // Create legacy response (raw HMAC-MD5)
2589        let digest = hmac_md5(&key, message);
2590        assert!(verify_response(&key, message, &digest));
2591    }
2592
2593    #[test]
2594    fn constant_time_eq_works() {
2595        assert!(constant_time_eq(b"hello", b"hello"));
2596        assert!(!constant_time_eq(b"hello", b"world"));
2597        assert!(!constant_time_eq(b"hello", b"hell"));
2598    }
2599
2600    #[test]
2601    fn rpc_payload_accepts_msgpack_and_legacy_pickle() {
2602        let request = PickleValue::Dict(vec![
2603            (
2604                PickleValue::String("get".into()),
2605                PickleValue::String("link_count".into()),
2606            ),
2607            (PickleValue::String("max_hops".into()), PickleValue::Int(3)),
2608        ]);
2609
2610        let msgpack_bytes = encode_rpc_payload(&request, RpcCodec::Msgpack);
2611        let (decoded_msgpack, msgpack_codec) = decode_rpc_payload(&msgpack_bytes).unwrap();
2612        assert_eq!(decoded_msgpack, request);
2613        assert_eq!(msgpack_codec, RpcCodec::Msgpack);
2614
2615        let pickle_bytes = encode_rpc_payload(&request, RpcCodec::Pickle);
2616        let (decoded_pickle, pickle_codec) = decode_rpc_payload(&pickle_bytes).unwrap();
2617        assert_eq!(decoded_pickle, request);
2618        assert_eq!(pickle_codec, RpcCodec::Pickle);
2619    }
2620
2621    #[test]
2622    fn rpc_roundtrip() {
2623        let key = derive_auth_key(b"test-key");
2624        let (event_tx, event_rx) = crate::event::channel();
2625
2626        // Start server
2627        // Bind manually to get the actual port
2628        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2629        let port = listener.local_addr().unwrap().port();
2630        listener.set_nonblocking(true).unwrap();
2631
2632        let shutdown = Arc::new(AtomicBool::new(false));
2633        let shutdown2 = shutdown.clone();
2634
2635        // Driver thread that handles queries
2636        let driver_thread = thread::spawn(move || loop {
2637            match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
2638                Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
2639                    let _ = resp_tx.send(QueryResponse::LinkCount(42));
2640                }
2641                Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
2642                    let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
2643                        interfaces: vec![SingleInterfaceStat {
2644                            id: 7,
2645                            name: "TestInterface".into(),
2646                            status: true,
2647                            mode: 1,
2648                            rxb: 1000,
2649                            txb: 2000,
2650                            rx_packets: 10,
2651                            tx_packets: 20,
2652                            bitrate: Some(10_000_000),
2653                            ifac_size: None,
2654                            started: 1000.0,
2655                            ia_freq: 0.0,
2656                            ip_freq: 0.0,
2657                            op_freq: 0.0,
2658                            op_samples: 0,
2659                            burst_active: false,
2660                            burst_activated: 0.0,
2661                            pr_burst_active: false,
2662                            pr_burst_activated: 0.0,
2663                            oa_freq: 0.0,
2664                            clients: Some(2),
2665                            announce_rate_target: Some(3600.0),
2666                            announce_rate_grace: 5,
2667                            announce_rate_penalty: 0.0,
2668                            interface_type: "TestInterface".into(),
2669                        }],
2670                        transport_id: None,
2671                        transport_enabled: true,
2672                        transport_uptime: 3600.0,
2673                        total_rxb: 1000,
2674                        total_txb: 2000,
2675                        probe_responder: None,
2676                        backbone_peer_pool: None,
2677                    }));
2678                }
2679                _ => break,
2680            }
2681        });
2682
2683        let key2 = key;
2684        let shutdown3 = shutdown2.clone();
2685        let server_thread = thread::spawn(move || {
2686            rpc_server_loop(listener, key2, event_tx, shutdown3);
2687        });
2688
2689        // Give server time to start
2690        thread::sleep(std::time::Duration::from_millis(50));
2691
2692        // Client: connect and query link count
2693        let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
2694        let mut client = RpcClient::connect(&server_addr, &key).unwrap();
2695        let response = client
2696            .call(&PickleValue::Dict(vec![(
2697                PickleValue::String("get".into()),
2698                PickleValue::String("link_count".into()),
2699            )]))
2700            .unwrap();
2701        assert_eq!(response.as_int().unwrap(), 42);
2702        drop(client);
2703
2704        // Client: query interface stats
2705        let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
2706        let response2 = client2
2707            .call(&PickleValue::Dict(vec![(
2708                PickleValue::String("get".into()),
2709                PickleValue::String("interface_stats".into()),
2710            )]))
2711            .unwrap();
2712        let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
2713        assert_eq!(ifaces.len(), 1);
2714        let iface = &ifaces[0];
2715        assert_eq!(
2716            iface.get("name").unwrap().as_str().unwrap(),
2717            "TestInterface"
2718        );
2719        assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
2720        drop(client2);
2721
2722        // Shutdown
2723        shutdown2.store(true, Ordering::Relaxed);
2724        server_thread.join().unwrap();
2725        driver_thread.join().unwrap();
2726    }
2727
2728    #[test]
2729    fn derive_auth_key_deterministic() {
2730        let key1 = derive_auth_key(b"test");
2731        let key2 = derive_auth_key(b"test");
2732        assert_eq!(key1, key2);
2733        // Different input → different key
2734        let key3 = derive_auth_key(b"other");
2735        assert_ne!(key1, key3);
2736    }
2737
2738    #[test]
2739    fn pickle_request_handling() {
2740        // Test the request → query translation without networking
2741        let (event_tx, event_rx) = crate::event::channel();
2742
2743        let driver = thread::spawn(move || {
2744            if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
2745            {
2746                assert_eq!(dest_hash, [1u8; 16]);
2747                let _ = resp_tx.send(QueryResponse::DropPath(true));
2748            }
2749        });
2750
2751        let request = PickleValue::Dict(vec![
2752            (
2753                PickleValue::String("drop".into()),
2754                PickleValue::String("path".into()),
2755            ),
2756            (
2757                PickleValue::String("destination_hash".into()),
2758                PickleValue::Bytes(vec![1u8; 16]),
2759            ),
2760        ]);
2761
2762        let response = handle_rpc_request(&request, &event_tx).unwrap();
2763        assert_eq!(response, PickleValue::Bool(true));
2764        driver.join().unwrap();
2765    }
2766
2767    #[test]
2768    fn hook_list_request_handling() {
2769        let (event_tx, event_rx) = crate::event::channel();
2770
2771        let driver = thread::spawn(move || {
2772            if let Ok(Event::ListHooks { response_tx }) = event_rx.recv() {
2773                let _ = response_tx.send(vec![HookInfo {
2774                    name: "stats".into(),
2775                    hook_type: "wasm".into(),
2776                    attach_point: "PreIngress".into(),
2777                    priority: 7,
2778                    enabled: true,
2779                    consecutive_traps: 0,
2780                }]);
2781            }
2782        });
2783
2784        let request = PickleValue::Dict(vec![(
2785            PickleValue::String("get".into()),
2786            PickleValue::String("hooks".into()),
2787        )]);
2788        let response = handle_rpc_request(&request, &event_tx).unwrap();
2789        let hooks = parse_hook_list(&response).unwrap();
2790        assert_eq!(hooks.len(), 1);
2791        assert_eq!(hooks[0].name, "stats");
2792        driver.join().unwrap();
2793    }
2794
2795    #[test]
2796    fn hook_load_request_handling() {
2797        let (event_tx, event_rx) = crate::event::channel();
2798
2799        let driver = thread::spawn(move || {
2800            if let Ok(Event::LoadHook {
2801                name,
2802                wasm_bytes,
2803                attach_point,
2804                priority,
2805                response_tx,
2806            }) = event_rx.recv()
2807            {
2808                assert_eq!(name, "stats");
2809                assert_eq!(attach_point, "PreIngress");
2810                assert_eq!(priority, 11);
2811                assert_eq!(wasm_bytes, vec![1, 2, 3]);
2812                let _ = response_tx.send(Ok(()));
2813            }
2814        });
2815
2816        let request = PickleValue::Dict(vec![
2817            (
2818                PickleValue::String("hook".into()),
2819                PickleValue::String("load".into()),
2820            ),
2821            (
2822                PickleValue::String("name".into()),
2823                PickleValue::String("stats".into()),
2824            ),
2825            (
2826                PickleValue::String("attach_point".into()),
2827                PickleValue::String("PreIngress".into()),
2828            ),
2829            (PickleValue::String("priority".into()), PickleValue::Int(11)),
2830            (
2831                PickleValue::String("wasm".into()),
2832                PickleValue::Bytes(vec![1, 2, 3]),
2833            ),
2834        ]);
2835        let response = handle_rpc_request(&request, &event_tx).unwrap();
2836        assert_eq!(parse_hook_result(&response).unwrap(), Ok(()));
2837        driver.join().unwrap();
2838    }
2839
2840    #[test]
2841    fn interface_stats_pickle_format() {
2842        let stats = InterfaceStatsResponse {
2843            interfaces: vec![SingleInterfaceStat {
2844                id: 1,
2845                name: "TCP".into(),
2846                status: true,
2847                mode: 1,
2848                rxb: 100,
2849                txb: 200,
2850                rx_packets: 5,
2851                tx_packets: 10,
2852                bitrate: Some(1000000),
2853                ifac_size: Some(16),
2854                started: 1000.0,
2855                ia_freq: 1.0,
2856                ip_freq: 2.0,
2857                op_freq: 3.0,
2858                op_samples: 0,
2859                burst_active: true,
2860                burst_activated: 1200.0,
2861                pr_burst_active: true,
2862                pr_burst_activated: 1300.0,
2863                oa_freq: 4.0,
2864                clients: Some(3),
2865                announce_rate_target: Some(3600.0),
2866                announce_rate_grace: 5,
2867                announce_rate_penalty: 0.0,
2868                interface_type: "TCPClientInterface".into(),
2869            }],
2870            transport_id: Some([0xAB; 16]),
2871            transport_enabled: true,
2872            transport_uptime: 3600.0,
2873            total_rxb: 100,
2874            total_txb: 200,
2875            probe_responder: None,
2876            backbone_peer_pool: None,
2877        };
2878
2879        let pickle = interface_stats_to_pickle(&stats);
2880
2881        // Verify it round-trips through encode/decode
2882        let encoded = pickle::encode(&pickle);
2883        let decoded = pickle::decode(&encoded).unwrap();
2884        assert_eq!(
2885            decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
2886            true
2887        );
2888        let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
2889        assert_eq!(ifaces[0].get("id").unwrap().as_int().unwrap(), 1);
2890        assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
2891        assert_eq!(ifaces[0].get("ia_freq").unwrap().as_float().unwrap(), 1.0);
2892        assert_eq!(ifaces[0].get("ip_freq").unwrap().as_float().unwrap(), 2.0);
2893        assert_eq!(ifaces[0].get("op_freq").unwrap().as_float().unwrap(), 3.0);
2894        assert_eq!(ifaces[0].get("oa_freq").unwrap().as_float().unwrap(), 4.0);
2895        assert_eq!(
2896            ifaces[0].get("burst_active").unwrap().as_bool().unwrap(),
2897            true
2898        );
2899        assert_eq!(
2900            ifaces[0]
2901                .get("burst_activated")
2902                .unwrap()
2903                .as_float()
2904                .unwrap(),
2905            1200.0
2906        );
2907        assert_eq!(
2908            ifaces[0].get("pr_burst_active").unwrap().as_bool().unwrap(),
2909            true
2910        );
2911        assert_eq!(
2912            ifaces[0]
2913                .get("pr_burst_activated")
2914                .unwrap()
2915                .as_float()
2916                .unwrap(),
2917            1300.0
2918        );
2919        assert_eq!(
2920            ifaces[0]
2921                .get("announce_rate_target")
2922                .unwrap()
2923                .as_float()
2924                .unwrap(),
2925            3600.0
2926        );
2927        assert_eq!(
2928            ifaces[0]
2929                .get("announce_rate_grace")
2930                .unwrap()
2931                .as_int()
2932                .unwrap(),
2933            5
2934        );
2935        assert_eq!(
2936            ifaces[0]
2937                .get("announce_rate_penalty")
2938                .unwrap()
2939                .as_float()
2940                .unwrap(),
2941            0.0
2942        );
2943        assert_eq!(ifaces[0].get("clients").unwrap().as_int().unwrap(), 3);
2944    }
2945
2946    #[test]
2947    fn interface_stats_pickle_includes_backbone_peer_pool_priority() {
2948        let stats = InterfaceStatsResponse {
2949            interfaces: vec![],
2950            transport_id: None,
2951            transport_enabled: true,
2952            transport_uptime: 1.0,
2953            total_rxb: 0,
2954            total_txb: 0,
2955            probe_responder: None,
2956            backbone_peer_pool: Some(crate::event::BackbonePeerPoolStatus {
2957                max_connected: 1,
2958                active_count: 1,
2959                standby_count: 0,
2960                cooldown_count: 0,
2961                members: vec![crate::event::BackbonePeerPoolMemberStatus {
2962                    name: "peer".into(),
2963                    remote: "127.0.0.1:4242".into(),
2964                    source: "configured".into(),
2965                    priority: 77,
2966                    state: "active".into(),
2967                    interface_id: Some(42),
2968                    failure_count: 0,
2969                    last_error: None,
2970                    cooldown_remaining_seconds: None,
2971                }],
2972            }),
2973        };
2974
2975        let pickle = interface_stats_to_pickle(&stats);
2976        let encoded = pickle::encode(&pickle);
2977        let decoded = pickle::decode(&encoded).unwrap();
2978        let pool = decoded.get("backbone_peer_pool").unwrap();
2979        let members = pool.get("members").unwrap().as_list().unwrap();
2980        assert_eq!(members[0].get("priority").unwrap().as_int().unwrap(), 77);
2981    }
2982
2983    #[test]
2984    fn send_probe_rpc_unknown_dest() {
2985        let (event_tx, event_rx) = crate::event::channel();
2986
2987        let driver = thread::spawn(move || {
2988            if let Ok(Event::Query(
2989                QueryRequest::SendProbe {
2990                    dest_hash,
2991                    payload_size,
2992                },
2993                resp_tx,
2994            )) = event_rx.recv()
2995            {
2996                assert_eq!(dest_hash, [0xAA; 16]);
2997                assert_eq!(payload_size, 16); // default
2998                let _ = resp_tx.send(QueryResponse::SendProbe(None));
2999            }
3000        });
3001
3002        let request = PickleValue::Dict(vec![(
3003            PickleValue::String("send_probe".into()),
3004            PickleValue::Bytes(vec![0xAA; 16]),
3005        )]);
3006
3007        let response = handle_rpc_request(&request, &event_tx).unwrap();
3008        assert_eq!(response, PickleValue::None);
3009        driver.join().unwrap();
3010    }
3011
3012    #[test]
3013    fn send_probe_rpc_with_result() {
3014        let (event_tx, event_rx) = crate::event::channel();
3015
3016        let packet_hash = [0xBB; 32];
3017        let driver = thread::spawn(move || {
3018            if let Ok(Event::Query(
3019                QueryRequest::SendProbe {
3020                    dest_hash,
3021                    payload_size,
3022                },
3023                resp_tx,
3024            )) = event_rx.recv()
3025            {
3026                assert_eq!(dest_hash, [0xCC; 16]);
3027                assert_eq!(payload_size, 32);
3028                let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
3029            }
3030        });
3031
3032        let request = PickleValue::Dict(vec![
3033            (
3034                PickleValue::String("send_probe".into()),
3035                PickleValue::Bytes(vec![0xCC; 16]),
3036            ),
3037            (PickleValue::String("size".into()), PickleValue::Int(32)),
3038        ]);
3039
3040        let response = handle_rpc_request(&request, &event_tx).unwrap();
3041        let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
3042        assert_eq!(ph, &[0xBB; 32]);
3043        assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
3044        driver.join().unwrap();
3045    }
3046
3047    #[test]
3048    fn retain_identity_rpc_sends_identity_hash_query() {
3049        let (event_tx, event_rx) = crate::event::channel();
3050        let identity_hash = [0x44; 16];
3051
3052        let driver = thread::spawn(move || {
3053            if let Ok(Event::Query(
3054                QueryRequest::RetainIdentity {
3055                    identity_hash: hash,
3056                },
3057                resp_tx,
3058            )) = event_rx.recv()
3059            {
3060                assert_eq!(hash, identity_hash);
3061                let _ = resp_tx.send(QueryResponse::RetainIdentity(true));
3062            }
3063        });
3064
3065        let request = PickleValue::Dict(vec![
3066            (
3067                PickleValue::String("set".into()),
3068                PickleValue::String("identity_retained".into()),
3069            ),
3070            (
3071                PickleValue::String("identity_hash".into()),
3072                PickleValue::Bytes(identity_hash.to_vec()),
3073            ),
3074        ]);
3075
3076        let response = handle_rpc_request(&request, &event_tx).unwrap();
3077        assert_eq!(response, PickleValue::Bool(true));
3078        driver.join().unwrap();
3079    }
3080
3081    #[test]
3082    fn send_probe_rpc_size_validation() {
3083        let (event_tx, event_rx) = crate::event::channel();
3084
3085        // Negative size should be clamped to default (16)
3086        let driver = thread::spawn(move || {
3087            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
3088                event_rx.recv()
3089            {
3090                assert_eq!(payload_size, 16); // default, not negative
3091                let _ = resp_tx.send(QueryResponse::SendProbe(None));
3092            }
3093        });
3094
3095        let request = PickleValue::Dict(vec![
3096            (
3097                PickleValue::String("send_probe".into()),
3098                PickleValue::Bytes(vec![0xDD; 16]),
3099            ),
3100            (PickleValue::String("size".into()), PickleValue::Int(-1)),
3101        ]);
3102
3103        let response = handle_rpc_request(&request, &event_tx).unwrap();
3104        assert_eq!(response, PickleValue::None);
3105        driver.join().unwrap();
3106    }
3107
3108    #[test]
3109    fn send_probe_rpc_size_too_large() {
3110        let (event_tx, event_rx) = crate::event::channel();
3111
3112        // Size > 400 should be clamped to default (16)
3113        let driver = thread::spawn(move || {
3114            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
3115                event_rx.recv()
3116            {
3117                assert_eq!(payload_size, 16); // default, not 999
3118                let _ = resp_tx.send(QueryResponse::SendProbe(None));
3119            }
3120        });
3121
3122        let request = PickleValue::Dict(vec![
3123            (
3124                PickleValue::String("send_probe".into()),
3125                PickleValue::Bytes(vec![0xDD; 16]),
3126            ),
3127            (PickleValue::String("size".into()), PickleValue::Int(999)),
3128        ]);
3129
3130        let response = handle_rpc_request(&request, &event_tx).unwrap();
3131        assert_eq!(response, PickleValue::None);
3132        driver.join().unwrap();
3133    }
3134
3135    #[test]
3136    fn check_proof_rpc_not_found() {
3137        let (event_tx, event_rx) = crate::event::channel();
3138
3139        let driver = thread::spawn(move || {
3140            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
3141                event_rx.recv()
3142            {
3143                assert_eq!(packet_hash, [0xEE; 32]);
3144                let _ = resp_tx.send(QueryResponse::CheckProof(None));
3145            }
3146        });
3147
3148        let request = PickleValue::Dict(vec![(
3149            PickleValue::String("check_proof".into()),
3150            PickleValue::Bytes(vec![0xEE; 32]),
3151        )]);
3152
3153        let response = handle_rpc_request(&request, &event_tx).unwrap();
3154        assert_eq!(response, PickleValue::None);
3155        driver.join().unwrap();
3156    }
3157
3158    #[test]
3159    fn check_proof_rpc_found() {
3160        let (event_tx, event_rx) = crate::event::channel();
3161
3162        let driver = thread::spawn(move || {
3163            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
3164                event_rx.recv()
3165            {
3166                assert_eq!(packet_hash, [0xFF; 32]);
3167                let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
3168            }
3169        });
3170
3171        let request = PickleValue::Dict(vec![(
3172            PickleValue::String("check_proof".into()),
3173            PickleValue::Bytes(vec![0xFF; 32]),
3174        )]);
3175
3176        let response = handle_rpc_request(&request, &event_tx).unwrap();
3177        if let PickleValue::Float(rtt) = response {
3178            assert!((rtt - 0.352).abs() < 0.001);
3179        } else {
3180            panic!("Expected Float, got {:?}", response);
3181        }
3182        driver.join().unwrap();
3183    }
3184
3185    #[test]
3186    fn request_path_rpc() {
3187        let (event_tx, event_rx) = crate::event::channel();
3188
3189        let driver =
3190            thread::spawn(
3191                move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
3192                    Ok(Event::RequestPath { dest_hash }) => {
3193                        assert_eq!(dest_hash, [0x11; 16]);
3194                    }
3195                    other => panic!("Expected RequestPath event, got {:?}", other),
3196                },
3197            );
3198
3199        let request = PickleValue::Dict(vec![(
3200            PickleValue::String("request_path".into()),
3201            PickleValue::Bytes(vec![0x11; 16]),
3202        )]);
3203
3204        let response = handle_rpc_request(&request, &event_tx).unwrap();
3205        assert_eq!(response, PickleValue::Bool(true));
3206        driver.join().unwrap();
3207    }
3208
3209    #[test]
3210    fn begin_drain_rpc_emits_event() {
3211        let (event_tx, event_rx) = crate::event::channel();
3212
3213        let driver = thread::spawn(
3214            move || match event_rx.recv_timeout(Duration::from_secs(5)) {
3215                Ok(Event::BeginDrain { timeout }) => {
3216                    assert!((timeout.as_secs_f64() - 1.5).abs() < 0.001);
3217                }
3218                other => panic!("Expected BeginDrain event, got {:?}", other),
3219            },
3220        );
3221
3222        let request = PickleValue::Dict(vec![(
3223            PickleValue::String("begin_drain".into()),
3224            PickleValue::Float(1.5),
3225        )]);
3226
3227        let response = handle_rpc_request(&request, &event_tx).unwrap();
3228        assert_eq!(response, PickleValue::Bool(true));
3229        driver.join().unwrap();
3230    }
3231
3232    #[test]
3233    fn drain_status_rpc_roundtrips_fields() {
3234        let (event_tx, event_rx) = crate::event::channel();
3235
3236        let driver = thread::spawn(move || {
3237            if let Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) = event_rx.recv() {
3238                let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
3239                    state: LifecycleState::Draining,
3240                    drain_age_seconds: Some(0.75),
3241                    deadline_remaining_seconds: Some(2.25),
3242                    drain_complete: false,
3243                    interface_writer_queued_frames: 3,
3244                    provider_backlog_events: 4,
3245                    provider_consumer_queued_events: 5,
3246                    detail: Some("node is draining existing work".into()),
3247                }));
3248            }
3249        });
3250
3251        let request = PickleValue::Dict(vec![(
3252            PickleValue::String("get".into()),
3253            PickleValue::String("drain_status".into()),
3254        )]);
3255
3256        let response = handle_rpc_request(&request, &event_tx).unwrap();
3257        assert_eq!(response.get("state").unwrap().as_str(), Some("draining"));
3258        assert_eq!(
3259            response.get("drain_complete").unwrap().as_bool(),
3260            Some(false)
3261        );
3262        assert_eq!(
3263            response
3264                .get("deadline_remaining_seconds")
3265                .unwrap()
3266                .as_float(),
3267            Some(2.25)
3268        );
3269        assert_eq!(
3270            response
3271                .get("interface_writer_queued_frames")
3272                .unwrap()
3273                .as_int(),
3274            Some(3)
3275        );
3276        assert_eq!(
3277            response.get("provider_backlog_events").unwrap().as_int(),
3278            Some(4)
3279        );
3280        assert_eq!(
3281            response
3282                .get("provider_consumer_queued_events")
3283                .unwrap()
3284                .as_int(),
3285            Some(5)
3286        );
3287        assert_eq!(
3288            response.get("detail").unwrap().as_str(),
3289            Some("node is draining existing work")
3290        );
3291        driver.join().unwrap();
3292    }
3293
3294    #[test]
3295    fn interface_stats_with_probe_responder() {
3296        let probe_hash = [0x42; 16];
3297        let stats = InterfaceStatsResponse {
3298            interfaces: vec![],
3299            transport_id: None,
3300            transport_enabled: true,
3301            transport_uptime: 100.0,
3302            total_rxb: 0,
3303            total_txb: 0,
3304            probe_responder: Some(probe_hash),
3305            backbone_peer_pool: None,
3306        };
3307
3308        let pickle = interface_stats_to_pickle(&stats);
3309        let encoded = pickle::encode(&pickle);
3310        let decoded = pickle::decode(&encoded).unwrap();
3311
3312        let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
3313        assert_eq!(pr, &probe_hash);
3314    }
3315
3316    #[test]
3317    fn runtime_config_get_and_set_rpc() {
3318        let (event_tx, event_rx) = crate::event::channel();
3319
3320        let driver = thread::spawn(move || {
3321            if let Ok(Event::Query(QueryRequest::GetRuntimeConfig { key }, resp_tx)) =
3322                event_rx.recv()
3323            {
3324                assert_eq!(key, "global.tick_interval_ms");
3325                let _ = resp_tx.send(QueryResponse::RuntimeConfigEntry(Some(
3326                    RuntimeConfigEntry {
3327                        key,
3328                        value: RuntimeConfigValue::Int(1000),
3329                        default: RuntimeConfigValue::Int(1000),
3330                        source: RuntimeConfigSource::Startup,
3331                        apply_mode: RuntimeConfigApplyMode::Immediate,
3332                        description: Some("tick".into()),
3333                    },
3334                )));
3335            } else {
3336                panic!("expected GetRuntimeConfig query");
3337            }
3338
3339            if let Ok(Event::Query(QueryRequest::SetRuntimeConfig { key, value }, resp_tx)) =
3340                event_rx.recv()
3341            {
3342                assert_eq!(key, "global.tick_interval_ms");
3343                assert_eq!(value, RuntimeConfigValue::Int(250));
3344                let _ = resp_tx.send(QueryResponse::RuntimeConfigSet(Ok(RuntimeConfigEntry {
3345                    key,
3346                    value: RuntimeConfigValue::Int(250),
3347                    default: RuntimeConfigValue::Int(1000),
3348                    source: RuntimeConfigSource::RuntimeOverride,
3349                    apply_mode: RuntimeConfigApplyMode::Immediate,
3350                    description: Some("tick".into()),
3351                })));
3352            } else {
3353                panic!("expected SetRuntimeConfig query");
3354            }
3355        });
3356
3357        let get_request = PickleValue::Dict(vec![
3358            (
3359                PickleValue::String("get".into()),
3360                PickleValue::String("runtime_config_entry".into()),
3361            ),
3362            (
3363                PickleValue::String("key".into()),
3364                PickleValue::String("global.tick_interval_ms".into()),
3365            ),
3366        ]);
3367        let get_response = handle_rpc_request(&get_request, &event_tx).unwrap();
3368        assert_eq!(
3369            get_response.get("key").and_then(|v| v.as_str()),
3370            Some("global.tick_interval_ms")
3371        );
3372
3373        let set_request = PickleValue::Dict(vec![
3374            (
3375                PickleValue::String("set".into()),
3376                PickleValue::String("runtime_config".into()),
3377            ),
3378            (
3379                PickleValue::String("key".into()),
3380                PickleValue::String("global.tick_interval_ms".into()),
3381            ),
3382            (PickleValue::String("value".into()), PickleValue::Int(250)),
3383        ]);
3384        let set_response = handle_rpc_request(&set_request, &event_tx).unwrap();
3385        assert_eq!(
3386            set_response.get("value").and_then(|v| v.as_int()),
3387            Some(250)
3388        );
3389
3390        driver.join().unwrap();
3391    }
3392
3393    // Helper: create a connected TCP pair
3394    fn tcp_pair() -> (TcpStream, TcpStream) {
3395        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3396        let port = listener.local_addr().unwrap().port();
3397        let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
3398        let (server, _) = listener.accept().unwrap();
3399        client
3400            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3401            .unwrap();
3402        server
3403            .set_read_timeout(Some(std::time::Duration::from_secs(5)))
3404            .unwrap();
3405        (server, client)
3406    }
3407}