Skip to main content

rns_net/
rpc.rs

1//! RPC server and client for cross-process daemon communication.
2//!
3//! Implements Python `multiprocessing.connection` wire protocol:
4//! - 4-byte big-endian signed i32 length prefix + payload
5//! - HMAC-SHA256 challenge-response authentication
6//! - Pickle serialization for request/response dictionaries
7//!
8//! Server translates pickle dicts into [`QueryRequest`] events, sends
9//! them through the driver event channel, and returns pickle responses.
10
11use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
15use std::thread;
16
17use rns_crypto::sha256::sha256;
18use rns_crypto::hmac::hmac_sha256;
19
20use crate::event::{
21    BlackholeInfo, Event, EventSender, QueryRequest, QueryResponse,
22    InterfaceStatsResponse, SingleInterfaceStat,
23    PathTableEntry, RateTableEntry,
24};
25use crate::md5::hmac_md5;
26use crate::pickle::{self, PickleValue};
27
28const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
29const WELCOME: &[u8] = b"#WELCOME#";
30const FAILURE: &[u8] = b"#FAILURE#";
31const CHALLENGE_LEN: usize = 40;
32
33/// RPC address types.
34#[derive(Debug, Clone)]
35pub enum RpcAddr {
36    Tcp(String, u16),
37}
38
39/// RPC server that listens for incoming connections and handles queries.
40pub struct RpcServer {
41    shutdown: Arc<AtomicBool>,
42    thread: Option<thread::JoinHandle<()>>,
43}
44
45impl RpcServer {
46    /// Start the RPC server on the given address.
47    pub fn start(
48        addr: &RpcAddr,
49        auth_key: [u8; 32],
50        event_tx: EventSender,
51    ) -> io::Result<Self> {
52        let shutdown = Arc::new(AtomicBool::new(false));
53        let shutdown2 = shutdown.clone();
54
55        let listener = match addr {
56            RpcAddr::Tcp(host, port) => {
57                let l = TcpListener::bind((host.as_str(), *port))?;
58                // Non-blocking so we can check shutdown flag
59                l.set_nonblocking(true)?;
60                l
61            }
62        };
63
64        let thread = thread::Builder::new()
65            .name("rpc-server".into())
66            .spawn(move || {
67                rpc_server_loop(listener, auth_key, event_tx, shutdown2);
68            })
69            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
70
71        Ok(RpcServer {
72            shutdown,
73            thread: Some(thread),
74        })
75    }
76
77    /// Stop the RPC server.
78    pub fn stop(&mut self) {
79        self.shutdown.store(true, Ordering::Relaxed);
80        if let Some(handle) = self.thread.take() {
81            let _ = handle.join();
82        }
83    }
84}
85
86impl Drop for RpcServer {
87    fn drop(&mut self) {
88        self.stop();
89    }
90}
91
92fn rpc_server_loop(
93    listener: TcpListener,
94    auth_key: [u8; 32],
95    event_tx: EventSender,
96    shutdown: Arc<AtomicBool>,
97) {
98    loop {
99        if shutdown.load(Ordering::Relaxed) {
100            break;
101        }
102
103        match listener.accept() {
104            Ok((stream, _addr)) => {
105                // Set blocking for this connection
106                let _ = stream.set_nonblocking(false);
107                let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
108                let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
109
110                if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
111                    log::debug!("RPC connection error: {}", e);
112                }
113            }
114            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
115                // No pending connection, sleep briefly and retry
116                thread::sleep(std::time::Duration::from_millis(100));
117            }
118            Err(e) => {
119                log::error!("RPC accept error: {}", e);
120                thread::sleep(std::time::Duration::from_millis(100));
121            }
122        }
123    }
124}
125
126fn handle_connection(
127    mut stream: TcpStream,
128    auth_key: &[u8; 32],
129    event_tx: &EventSender,
130) -> io::Result<()> {
131    // Authentication: send challenge, verify response
132    server_auth(&mut stream, auth_key)?;
133
134    // Read request (pickle dict)
135    let request_bytes = recv_bytes(&mut stream)?;
136    let request = pickle::decode(&request_bytes)
137        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
138
139    // Translate pickle dict to query, send to driver, get response
140    let response = handle_rpc_request(&request, event_tx)?;
141
142    // Encode response and send
143    let response_bytes = pickle::encode(&response);
144    send_bytes(&mut stream, &response_bytes)?;
145
146    Ok(())
147}
148
149/// Server-side authentication: challenge-response.
150fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
151    // Generate challenge: #CHALLENGE#{sha256}<40 random bytes>
152    let mut random_bytes = [0u8; CHALLENGE_LEN];
153    // Use /dev/urandom for randomness
154    {
155        let mut f = std::fs::File::open("/dev/urandom")?;
156        f.read_exact(&mut random_bytes)?;
157    }
158
159    let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
160    challenge_message.extend_from_slice(CHALLENGE_PREFIX);
161    challenge_message.extend_from_slice(b"{sha256}");
162    challenge_message.extend_from_slice(&random_bytes);
163
164    send_bytes(stream, &challenge_message)?;
165
166    // Read response (max 256 bytes)
167    let response = recv_bytes(stream)?;
168
169    // Verify response
170    // The message to HMAC is everything after #CHALLENGE# (i.e. {sha256}<random>)
171    let message = &challenge_message[CHALLENGE_PREFIX.len()..];
172
173    if verify_response(auth_key, message, &response) {
174        send_bytes(stream, WELCOME)?;
175        Ok(())
176    } else {
177        send_bytes(stream, FAILURE)?;
178        Err(io::Error::new(io::ErrorKind::PermissionDenied, "auth failed"))
179    }
180}
181
182/// Verify a client's HMAC response.
183fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
184    // Modern protocol: response = {sha256}<hmac-sha256 digest>
185    if response.starts_with(b"{sha256}") {
186        let digest = &response[8..];
187        let expected = hmac_sha256(auth_key, message);
188        constant_time_eq(digest, &expected)
189    }
190    // Legacy protocol: response = raw 16-byte HMAC-MD5 digest
191    else if response.len() == 16 {
192        let expected = hmac_md5(auth_key, message);
193        constant_time_eq(response, &expected)
194    }
195    // Legacy with {md5} prefix
196    else if response.starts_with(b"{md5}") {
197        let digest = &response[5..];
198        let expected = hmac_md5(auth_key, message);
199        constant_time_eq(digest, &expected)
200    } else {
201        false
202    }
203}
204
205/// Constant-time byte comparison.
206fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
207    if a.len() != b.len() {
208        return false;
209    }
210    let mut diff = 0u8;
211    for (x, y) in a.iter().zip(b.iter()) {
212        diff |= x ^ y;
213    }
214    diff == 0
215}
216
217/// Send bytes with 4-byte big-endian length prefix.
218fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
219    let len = data.len() as i32;
220    stream.write_all(&len.to_be_bytes())?;
221    stream.write_all(data)?;
222    stream.flush()
223}
224
225/// Receive bytes with 4-byte big-endian length prefix.
226fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
227    let mut len_buf = [0u8; 4];
228    stream.read_exact(&mut len_buf)?;
229    let len = i32::from_be_bytes(len_buf);
230
231    if len < 0 {
232        // Extended format: 8-byte length
233        let mut len8_buf = [0u8; 8];
234        stream.read_exact(&mut len8_buf)?;
235        let len = u64::from_be_bytes(len8_buf) as usize;
236        if len > 64 * 1024 * 1024 {
237            return Err(io::Error::new(io::ErrorKind::InvalidData, "message too large"));
238        }
239        let mut buf = vec![0u8; len];
240        stream.read_exact(&mut buf)?;
241        Ok(buf)
242    } else {
243        let len = len as usize;
244        if len > 64 * 1024 * 1024 {
245            return Err(io::Error::new(io::ErrorKind::InvalidData, "message too large"));
246        }
247        let mut buf = vec![0u8; len];
248        stream.read_exact(&mut buf)?;
249        Ok(buf)
250    }
251}
252
253/// Translate a pickle request dict to a query event and get response.
254fn handle_rpc_request(
255    request: &PickleValue,
256    event_tx: &EventSender,
257) -> io::Result<PickleValue> {
258    // Handle "get" requests
259    if let Some(get_val) = request.get("get") {
260        if let Some(path) = get_val.as_str() {
261            return match path {
262                "interface_stats" => {
263                    let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
264                    if let QueryResponse::InterfaceStats(stats) = resp {
265                        Ok(interface_stats_to_pickle(&stats))
266                    } else {
267                        Ok(PickleValue::None)
268                    }
269                }
270                "path_table" => {
271                    let max_hops = request.get("max_hops").and_then(|v| {
272                        v.as_int().map(|n| n as u8)
273                    });
274                    let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
275                    if let QueryResponse::PathTable(entries) = resp {
276                        Ok(path_table_to_pickle(&entries))
277                    } else {
278                        Ok(PickleValue::None)
279                    }
280                }
281                "rate_table" => {
282                    let resp = send_query(event_tx, QueryRequest::RateTable)?;
283                    if let QueryResponse::RateTable(entries) = resp {
284                        Ok(rate_table_to_pickle(&entries))
285                    } else {
286                        Ok(PickleValue::None)
287                    }
288                }
289                "next_hop" => {
290                    let hash = extract_dest_hash(request, "destination_hash")?;
291                    let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
292                    if let QueryResponse::NextHop(Some(nh)) = resp {
293                        Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
294                    } else {
295                        Ok(PickleValue::None)
296                    }
297                }
298                "next_hop_if_name" => {
299                    let hash = extract_dest_hash(request, "destination_hash")?;
300                    let resp = send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
301                    if let QueryResponse::NextHopIfName(Some(name)) = resp {
302                        Ok(PickleValue::String(name))
303                    } else {
304                        Ok(PickleValue::None)
305                    }
306                }
307                "link_count" => {
308                    let resp = send_query(event_tx, QueryRequest::LinkCount)?;
309                    if let QueryResponse::LinkCount(n) = resp {
310                        Ok(PickleValue::Int(n as i64))
311                    } else {
312                        Ok(PickleValue::None)
313                    }
314                }
315                "transport_identity" => {
316                    let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
317                    if let QueryResponse::TransportIdentity(Some(hash)) = resp {
318                        Ok(PickleValue::Bytes(hash.to_vec()))
319                    } else {
320                        Ok(PickleValue::None)
321                    }
322                }
323                "blackholed" => {
324                    let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
325                    if let QueryResponse::Blackholed(entries) = resp {
326                        Ok(blackholed_to_pickle(&entries))
327                    } else {
328                        Ok(PickleValue::None)
329                    }
330                }
331                "discovered_interfaces" => {
332                    let only_available = request.get("only_available")
333                        .and_then(|v| v.as_bool()).unwrap_or(false);
334                    let only_transport = request.get("only_transport")
335                        .and_then(|v| v.as_bool()).unwrap_or(false);
336                    let resp = send_query(event_tx, QueryRequest::DiscoveredInterfaces {
337                        only_available,
338                        only_transport,
339                    })?;
340                    if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
341                        Ok(discovered_interfaces_to_pickle(&interfaces))
342                    } else {
343                        Ok(PickleValue::None)
344                    }
345                }
346                _ => Ok(PickleValue::None),
347            };
348        }
349    }
350
351    // Handle "request_path" -- trigger a path request to the network
352    if let Some(hash_val) = request.get("request_path") {
353        if let Some(hash_bytes) = hash_val.as_bytes() {
354            if hash_bytes.len() >= 16 {
355                let mut dest_hash = [0u8; 16];
356                dest_hash.copy_from_slice(&hash_bytes[..16]);
357                let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
358                return Ok(PickleValue::Bool(true));
359            }
360        }
361    }
362
363    // Handle "send_probe" requests
364    if let Some(hash_val) = request.get("send_probe") {
365        if let Some(hash_bytes) = hash_val.as_bytes() {
366            if hash_bytes.len() >= 16 {
367                let mut dest_hash = [0u8; 16];
368                dest_hash.copy_from_slice(&hash_bytes[..16]);
369                let payload_size = request.get("size")
370                    .and_then(|v| v.as_int())
371                    .and_then(|n| if n > 0 && n <= 400 { Some(n as usize) } else { None })
372                    .unwrap_or(16);
373                let resp = send_query(event_tx, QueryRequest::SendProbe {
374                    dest_hash,
375                    payload_size,
376                })?;
377                if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
378                    return Ok(PickleValue::Dict(vec![
379                        (PickleValue::String("packet_hash".into()), PickleValue::Bytes(packet_hash.to_vec())),
380                        (PickleValue::String("hops".into()), PickleValue::Int(hops as i64)),
381                    ]));
382                } else {
383                    return Ok(PickleValue::None);
384                }
385            }
386        }
387    }
388
389    // Handle "check_proof" requests
390    if let Some(hash_val) = request.get("check_proof") {
391        if let Some(hash_bytes) = hash_val.as_bytes() {
392            if hash_bytes.len() >= 32 {
393                let mut packet_hash = [0u8; 32];
394                packet_hash.copy_from_slice(&hash_bytes[..32]);
395                let resp = send_query(event_tx, QueryRequest::CheckProof {
396                    packet_hash,
397                })?;
398                if let QueryResponse::CheckProof(Some(rtt)) = resp {
399                    return Ok(PickleValue::Float(rtt));
400                } else {
401                    return Ok(PickleValue::None);
402                }
403            }
404        }
405    }
406
407    // Handle "blackhole" requests
408    if let Some(hash_val) = request.get("blackhole") {
409        if let Some(hash_bytes) = hash_val.as_bytes() {
410            if hash_bytes.len() >= 16 {
411                let mut identity_hash = [0u8; 16];
412                identity_hash.copy_from_slice(&hash_bytes[..16]);
413                let duration_hours = request.get("duration").and_then(|v| v.as_float());
414                let reason = request.get("reason").and_then(|v| v.as_str()).map(|s| s.to_string());
415                let resp = send_query(event_tx, QueryRequest::BlackholeIdentity {
416                    identity_hash,
417                    duration_hours,
418                    reason,
419                })?;
420                return Ok(PickleValue::Bool(matches!(resp, QueryResponse::BlackholeResult(true))));
421            }
422        }
423    }
424
425    // Handle "unblackhole" requests
426    if let Some(hash_val) = request.get("unblackhole") {
427        if let Some(hash_bytes) = hash_val.as_bytes() {
428            if hash_bytes.len() >= 16 {
429                let mut identity_hash = [0u8; 16];
430                identity_hash.copy_from_slice(&hash_bytes[..16]);
431                let resp = send_query(event_tx, QueryRequest::UnblackholeIdentity {
432                    identity_hash,
433                })?;
434                return Ok(PickleValue::Bool(matches!(resp, QueryResponse::UnblackholeResult(true))));
435            }
436        }
437    }
438
439    // Handle "drop" requests
440    if let Some(drop_val) = request.get("drop") {
441        if let Some(path) = drop_val.as_str() {
442            return match path {
443                "path" => {
444                    let hash = extract_dest_hash(request, "destination_hash")?;
445                    let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
446                    if let QueryResponse::DropPath(ok) = resp {
447                        Ok(PickleValue::Bool(ok))
448                    } else {
449                        Ok(PickleValue::None)
450                    }
451                }
452                "all_via" => {
453                    let hash = extract_dest_hash(request, "destination_hash")?;
454                    let resp = send_query(event_tx, QueryRequest::DropAllVia { transport_hash: hash })?;
455                    if let QueryResponse::DropAllVia(n) = resp {
456                        Ok(PickleValue::Int(n as i64))
457                    } else {
458                        Ok(PickleValue::None)
459                    }
460                }
461                "announce_queues" => {
462                    let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
463                    if let QueryResponse::DropAnnounceQueues = resp {
464                        Ok(PickleValue::Bool(true))
465                    } else {
466                        Ok(PickleValue::None)
467                    }
468                }
469                _ => Ok(PickleValue::None),
470            };
471        }
472    }
473
474    Ok(PickleValue::None)
475}
476
477/// Send a query to the driver and wait for the response.
478fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
479    let (resp_tx, resp_rx) = mpsc::channel();
480    event_tx
481        .send(Event::Query(request, resp_tx))
482        .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
483    resp_rx
484        .recv_timeout(std::time::Duration::from_secs(5))
485        .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
486}
487
488/// Extract a 16-byte destination hash from a pickle dict field.
489fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
490    let bytes = request
491        .get(key)
492        .and_then(|v| v.as_bytes())
493        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
494    if bytes.len() < 16 {
495        return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
496    }
497    let mut hash = [0u8; 16];
498    hash.copy_from_slice(&bytes[..16]);
499    Ok(hash)
500}
501
502// --- Pickle response builders ---
503
504fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
505    let mut ifaces = Vec::new();
506    for iface in &stats.interfaces {
507        ifaces.push(single_iface_to_pickle(iface));
508    }
509
510    let mut dict = vec![
511        (
512            PickleValue::String("interfaces".into()),
513            PickleValue::List(ifaces),
514        ),
515        (
516            PickleValue::String("transport_enabled".into()),
517            PickleValue::Bool(stats.transport_enabled),
518        ),
519        (
520            PickleValue::String("transport_uptime".into()),
521            PickleValue::Float(stats.transport_uptime),
522        ),
523        (
524            PickleValue::String("rxb".into()),
525            PickleValue::Int(stats.total_rxb as i64),
526        ),
527        (
528            PickleValue::String("txb".into()),
529            PickleValue::Int(stats.total_txb as i64),
530        ),
531    ];
532
533    if let Some(tid) = stats.transport_id {
534        dict.push((
535            PickleValue::String("transport_id".into()),
536            PickleValue::Bytes(tid.to_vec()),
537        ));
538    } else {
539        dict.push((
540            PickleValue::String("transport_id".into()),
541            PickleValue::None,
542        ));
543    }
544
545    if let Some(pr) = stats.probe_responder {
546        dict.push((
547            PickleValue::String("probe_responder".into()),
548            PickleValue::Bytes(pr.to_vec()),
549        ));
550    } else {
551        dict.push((
552            PickleValue::String("probe_responder".into()),
553            PickleValue::None,
554        ));
555    }
556
557    PickleValue::Dict(dict)
558}
559
560fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
561    let mut dict = vec![
562        (PickleValue::String("name".into()), PickleValue::String(s.name.clone())),
563        (PickleValue::String("status".into()), PickleValue::Bool(s.status)),
564        (PickleValue::String("mode".into()), PickleValue::Int(s.mode as i64)),
565        (PickleValue::String("rxb".into()), PickleValue::Int(s.rxb as i64)),
566        (PickleValue::String("txb".into()), PickleValue::Int(s.txb as i64)),
567        (PickleValue::String("rx_packets".into()), PickleValue::Int(s.rx_packets as i64)),
568        (PickleValue::String("tx_packets".into()), PickleValue::Int(s.tx_packets as i64)),
569        (PickleValue::String("started".into()), PickleValue::Float(s.started)),
570        (PickleValue::String("ia_freq".into()), PickleValue::Float(s.ia_freq)),
571        (PickleValue::String("oa_freq".into()), PickleValue::Float(s.oa_freq)),
572    ];
573
574    match s.bitrate {
575        Some(br) => dict.push((
576            PickleValue::String("bitrate".into()),
577            PickleValue::Int(br as i64),
578        )),
579        None => dict.push((
580            PickleValue::String("bitrate".into()),
581            PickleValue::None,
582        )),
583    }
584
585    match s.ifac_size {
586        Some(sz) => dict.push((
587            PickleValue::String("ifac_size".into()),
588            PickleValue::Int(sz as i64),
589        )),
590        None => dict.push((
591            PickleValue::String("ifac_size".into()),
592            PickleValue::None,
593        )),
594    }
595
596    PickleValue::Dict(dict)
597}
598
599fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
600    let list: Vec<PickleValue> = entries.iter().map(|e| {
601        PickleValue::Dict(vec![
602            (PickleValue::String("hash".into()), PickleValue::Bytes(e.hash.to_vec())),
603            (PickleValue::String("timestamp".into()), PickleValue::Float(e.timestamp)),
604            (PickleValue::String("via".into()), PickleValue::Bytes(e.via.to_vec())),
605            (PickleValue::String("hops".into()), PickleValue::Int(e.hops as i64)),
606            (PickleValue::String("expires".into()), PickleValue::Float(e.expires)),
607            (PickleValue::String("interface".into()), PickleValue::String(e.interface_name.clone())),
608        ])
609    }).collect();
610    PickleValue::List(list)
611}
612
613fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
614    let list: Vec<PickleValue> = entries.iter().map(|e| {
615        PickleValue::Dict(vec![
616            (PickleValue::String("hash".into()), PickleValue::Bytes(e.hash.to_vec())),
617            (PickleValue::String("last".into()), PickleValue::Float(e.last)),
618            (PickleValue::String("rate_violations".into()), PickleValue::Int(e.rate_violations as i64)),
619            (PickleValue::String("blocked_until".into()), PickleValue::Float(e.blocked_until)),
620            (PickleValue::String("timestamps".into()), PickleValue::List(
621                e.timestamps.iter().map(|&t| PickleValue::Float(t)).collect()
622            )),
623        ])
624    }).collect();
625    PickleValue::List(list)
626}
627
628fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
629    let list: Vec<PickleValue> = entries.iter().map(|e| {
630        let mut dict = vec![
631            (PickleValue::String("identity_hash".into()), PickleValue::Bytes(e.identity_hash.to_vec())),
632            (PickleValue::String("created".into()), PickleValue::Float(e.created)),
633            (PickleValue::String("expires".into()), PickleValue::Float(e.expires)),
634        ];
635        if let Some(ref reason) = e.reason {
636            dict.push((PickleValue::String("reason".into()), PickleValue::String(reason.clone())));
637        } else {
638            dict.push((PickleValue::String("reason".into()), PickleValue::None));
639        }
640        PickleValue::Dict(dict)
641    }).collect();
642    PickleValue::List(list)
643}
644
645fn discovered_interfaces_to_pickle(interfaces: &[crate::discovery::DiscoveredInterface]) -> PickleValue {
646    let list: Vec<PickleValue> = interfaces.iter().map(|iface| {
647        let mut dict = vec![
648            (PickleValue::String("type".into()), PickleValue::String(iface.interface_type.clone())),
649            (PickleValue::String("transport".into()), PickleValue::Bool(iface.transport)),
650            (PickleValue::String("name".into()), PickleValue::String(iface.name.clone())),
651            (PickleValue::String("discovered".into()), PickleValue::Float(iface.discovered)),
652            (PickleValue::String("last_heard".into()), PickleValue::Float(iface.last_heard)),
653            (PickleValue::String("heard_count".into()), PickleValue::Int(iface.heard_count as i64)),
654            (PickleValue::String("status".into()), PickleValue::String(iface.status.as_str().into())),
655            (PickleValue::String("stamp".into()), PickleValue::Bytes(iface.stamp.clone())),
656            (PickleValue::String("value".into()), PickleValue::Int(iface.stamp_value as i64)),
657            (PickleValue::String("transport_id".into()), PickleValue::Bytes(iface.transport_id.to_vec())),
658            (PickleValue::String("network_id".into()), PickleValue::Bytes(iface.network_id.to_vec())),
659            (PickleValue::String("hops".into()), PickleValue::Int(iface.hops as i64)),
660        ];
661
662        // Optional location fields
663        if let Some(v) = iface.latitude {
664            dict.push((PickleValue::String("latitude".into()), PickleValue::Float(v)));
665        } else {
666            dict.push((PickleValue::String("latitude".into()), PickleValue::None));
667        }
668        if let Some(v) = iface.longitude {
669            dict.push((PickleValue::String("longitude".into()), PickleValue::Float(v)));
670        } else {
671            dict.push((PickleValue::String("longitude".into()), PickleValue::None));
672        }
673        if let Some(v) = iface.height {
674            dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
675        } else {
676            dict.push((PickleValue::String("height".into()), PickleValue::None));
677        }
678
679        // Connection info
680        if let Some(ref v) = iface.reachable_on {
681            dict.push((PickleValue::String("reachable_on".into()), PickleValue::String(v.clone())));
682        } else {
683            dict.push((PickleValue::String("reachable_on".into()), PickleValue::None));
684        }
685        if let Some(v) = iface.port {
686            dict.push((PickleValue::String("port".into()), PickleValue::Int(v as i64)));
687        } else {
688            dict.push((PickleValue::String("port".into()), PickleValue::None));
689        }
690
691        // RNode/RF specific
692        if let Some(v) = iface.frequency {
693            dict.push((PickleValue::String("frequency".into()), PickleValue::Int(v as i64)));
694        } else {
695            dict.push((PickleValue::String("frequency".into()), PickleValue::None));
696        }
697        if let Some(v) = iface.bandwidth {
698            dict.push((PickleValue::String("bandwidth".into()), PickleValue::Int(v as i64)));
699        } else {
700            dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
701        }
702        if let Some(v) = iface.spreading_factor {
703            dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
704        } else {
705            dict.push((PickleValue::String("sf".into()), PickleValue::None));
706        }
707        if let Some(v) = iface.coding_rate {
708            dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
709        } else {
710            dict.push((PickleValue::String("cr".into()), PickleValue::None));
711        }
712        if let Some(ref v) = iface.modulation {
713            dict.push((PickleValue::String("modulation".into()), PickleValue::String(v.clone())));
714        } else {
715            dict.push((PickleValue::String("modulation".into()), PickleValue::None));
716        }
717        if let Some(v) = iface.channel {
718            dict.push((PickleValue::String("channel".into()), PickleValue::Int(v as i64)));
719        } else {
720            dict.push((PickleValue::String("channel".into()), PickleValue::None));
721        }
722
723        // IFAC info
724        if let Some(ref v) = iface.ifac_netname {
725            dict.push((PickleValue::String("ifac_netname".into()), PickleValue::String(v.clone())));
726        } else {
727            dict.push((PickleValue::String("ifac_netname".into()), PickleValue::None));
728        }
729        if let Some(ref v) = iface.ifac_netkey {
730            dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::String(v.clone())));
731        } else {
732            dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
733        }
734
735        // Config entry
736        if let Some(ref v) = iface.config_entry {
737            dict.push((PickleValue::String("config_entry".into()), PickleValue::String(v.clone())));
738        } else {
739            dict.push((PickleValue::String("config_entry".into()), PickleValue::None));
740        }
741
742        dict.push((PickleValue::String("discovery_hash".into()), PickleValue::Bytes(iface.discovery_hash.to_vec())));
743
744        PickleValue::Dict(dict)
745    }).collect();
746    PickleValue::List(list)
747}
748
749// --- RPC Client ---
750
751/// RPC client for connecting to a running daemon.
752pub struct RpcClient {
753    stream: TcpStream,
754}
755
756impl RpcClient {
757    /// Connect to an RPC server and authenticate.
758    pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
759        let mut stream = match addr {
760            RpcAddr::Tcp(host, port) => {
761                TcpStream::connect((host.as_str(), *port))?
762            }
763        };
764
765        stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
766        stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
767
768        // Client-side authentication
769        client_auth(&mut stream, auth_key)?;
770
771        Ok(RpcClient { stream })
772    }
773
774    /// Send a pickle request and receive a pickle response.
775    pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
776        let request_bytes = pickle::encode(request);
777        send_bytes(&mut self.stream, &request_bytes)?;
778
779        let response_bytes = recv_bytes(&mut self.stream)?;
780        pickle::decode(&response_bytes)
781            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
782    }
783}
784
785/// Client-side authentication: answer the server's challenge.
786fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
787    // Read challenge
788    let challenge = recv_bytes(stream)?;
789
790    if !challenge.starts_with(CHALLENGE_PREFIX) {
791        return Err(io::Error::new(
792            io::ErrorKind::InvalidData,
793            "expected challenge",
794        ));
795    }
796
797    let message = &challenge[CHALLENGE_PREFIX.len()..];
798
799    // Create HMAC response
800    let response = create_response(auth_key, message);
801    send_bytes(stream, &response)?;
802
803    // Read welcome/failure
804    let result = recv_bytes(stream)?;
805    if result == WELCOME {
806        Ok(())
807    } else {
808        Err(io::Error::new(
809            io::ErrorKind::PermissionDenied,
810            "authentication failed",
811        ))
812    }
813}
814
815/// Create an HMAC response to a challenge message.
816fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
817    // Check if message has {sha256} prefix (modern protocol)
818    if message.starts_with(b"{sha256}") || message.len() > 20 {
819        // Modern protocol: use HMAC-SHA256 with {sha256} prefix
820        let digest = hmac_sha256(auth_key, message);
821        let mut response = Vec::with_capacity(8 + 32);
822        response.extend_from_slice(b"{sha256}");
823        response.extend_from_slice(&digest);
824        response
825    } else {
826        // Legacy protocol: raw HMAC-MD5
827        let digest = hmac_md5(auth_key, message);
828        digest.to_vec()
829    }
830}
831
832/// Derive the RPC auth key from transport identity private key.
833pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
834    sha256(private_key)
835}
836
837#[cfg(test)]
838mod tests {
839    use super::*;
840
841    #[test]
842    fn send_recv_bytes_roundtrip() {
843        let (mut c1, mut c2) = tcp_pair();
844        let data = b"hello world";
845        send_bytes(&mut c1, data).unwrap();
846        let received = recv_bytes(&mut c2).unwrap();
847        assert_eq!(&received, data);
848    }
849
850    #[test]
851    fn send_recv_empty() {
852        let (mut c1, mut c2) = tcp_pair();
853        send_bytes(&mut c1, b"").unwrap();
854        let received = recv_bytes(&mut c2).unwrap();
855        assert!(received.is_empty());
856    }
857
858    #[test]
859    fn auth_success() {
860        let key = derive_auth_key(b"test-private-key");
861        let (mut server, mut client) = tcp_pair();
862
863        let key2 = key;
864        let t = thread::spawn(move || {
865            client_auth(&mut client, &key2).unwrap();
866        });
867
868        server_auth(&mut server, &key).unwrap();
869        t.join().unwrap();
870    }
871
872    #[test]
873    fn auth_failure_wrong_key() {
874        let server_key = derive_auth_key(b"server-key");
875        let client_key = derive_auth_key(b"wrong-key");
876        let (mut server, mut client) = tcp_pair();
877
878        let t = thread::spawn(move || {
879            let result = client_auth(&mut client, &client_key);
880            assert!(result.is_err());
881        });
882
883        let result = server_auth(&mut server, &server_key);
884        assert!(result.is_err());
885        t.join().unwrap();
886    }
887
888    #[test]
889    fn verify_sha256_response() {
890        let key = derive_auth_key(b"mykey");
891        let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
892        let response = create_response(&key, message);
893        assert!(response.starts_with(b"{sha256}"));
894        assert!(verify_response(&key, message, &response));
895    }
896
897    #[test]
898    fn verify_legacy_md5_response() {
899        let key = derive_auth_key(b"mykey");
900        // Legacy message: 20 bytes, no prefix
901        let message = b"01234567890123456789";
902        // Create legacy response (raw HMAC-MD5)
903        let digest = hmac_md5(&key, message);
904        assert!(verify_response(&key, message, &digest));
905    }
906
907    #[test]
908    fn constant_time_eq_works() {
909        assert!(constant_time_eq(b"hello", b"hello"));
910        assert!(!constant_time_eq(b"hello", b"world"));
911        assert!(!constant_time_eq(b"hello", b"hell"));
912    }
913
914    #[test]
915    fn rpc_roundtrip() {
916        let key = derive_auth_key(b"test-key");
917        let (event_tx, event_rx) = crate::event::channel();
918
919        // Start server
920        let addr = RpcAddr::Tcp("127.0.0.1".into(), 0);
921        // Bind manually to get the actual port
922        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
923        let port = listener.local_addr().unwrap().port();
924        listener.set_nonblocking(true).unwrap();
925
926        let shutdown = Arc::new(AtomicBool::new(false));
927        let shutdown2 = shutdown.clone();
928
929        // Driver thread that handles queries
930        let driver_thread = thread::spawn(move || {
931            loop {
932                match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
933                    Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
934                        let _ = resp_tx.send(QueryResponse::LinkCount(42));
935                    }
936                    Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
937                        let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
938                            interfaces: vec![SingleInterfaceStat {
939                                name: "TestInterface".into(),
940                                status: true,
941                                mode: 1,
942                                rxb: 1000,
943                                txb: 2000,
944                                rx_packets: 10,
945                                tx_packets: 20,
946                                bitrate: Some(10_000_000),
947                                ifac_size: None,
948                                started: 1000.0,
949                                ia_freq: 0.0,
950                                oa_freq: 0.0,
951                                interface_type: "TestInterface".into(),
952                            }],
953                            transport_id: None,
954                            transport_enabled: true,
955                            transport_uptime: 3600.0,
956                            total_rxb: 1000,
957                            total_txb: 2000,
958                            probe_responder: None,
959                        }));
960                    }
961                    _ => break,
962                }
963            }
964        });
965
966        let key2 = key;
967        let shutdown3 = shutdown2.clone();
968        let server_thread = thread::spawn(move || {
969            rpc_server_loop(listener, key2, event_tx, shutdown3);
970        });
971
972        // Give server time to start
973        thread::sleep(std::time::Duration::from_millis(50));
974
975        // Client: connect and query link count
976        let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
977        let mut client = RpcClient::connect(&server_addr, &key).unwrap();
978        let response = client.call(&PickleValue::Dict(vec![
979            (PickleValue::String("get".into()), PickleValue::String("link_count".into())),
980        ])).unwrap();
981        assert_eq!(response.as_int().unwrap(), 42);
982        drop(client);
983
984        // Client: query interface stats
985        let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
986        let response2 = client2.call(&PickleValue::Dict(vec![
987            (PickleValue::String("get".into()), PickleValue::String("interface_stats".into())),
988        ])).unwrap();
989        let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
990        assert_eq!(ifaces.len(), 1);
991        let iface = &ifaces[0];
992        assert_eq!(iface.get("name").unwrap().as_str().unwrap(), "TestInterface");
993        assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
994        drop(client2);
995
996        // Shutdown
997        shutdown2.store(true, Ordering::Relaxed);
998        server_thread.join().unwrap();
999        driver_thread.join().unwrap();
1000    }
1001
1002    #[test]
1003    fn derive_auth_key_deterministic() {
1004        let key1 = derive_auth_key(b"test");
1005        let key2 = derive_auth_key(b"test");
1006        assert_eq!(key1, key2);
1007        // Different input → different key
1008        let key3 = derive_auth_key(b"other");
1009        assert_ne!(key1, key3);
1010    }
1011
1012    #[test]
1013    fn pickle_request_handling() {
1014        // Test the request → query translation without networking
1015        let (event_tx, event_rx) = crate::event::channel();
1016
1017        let driver = thread::spawn(move || {
1018            if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv() {
1019                assert_eq!(dest_hash, [1u8; 16]);
1020                let _ = resp_tx.send(QueryResponse::DropPath(true));
1021            }
1022        });
1023
1024        let request = PickleValue::Dict(vec![
1025            (PickleValue::String("drop".into()), PickleValue::String("path".into())),
1026            (PickleValue::String("destination_hash".into()), PickleValue::Bytes(vec![1u8; 16])),
1027        ]);
1028
1029        let response = handle_rpc_request(&request, &event_tx).unwrap();
1030        assert_eq!(response, PickleValue::Bool(true));
1031        driver.join().unwrap();
1032    }
1033
1034    #[test]
1035    fn interface_stats_pickle_format() {
1036        let stats = InterfaceStatsResponse {
1037            interfaces: vec![SingleInterfaceStat {
1038                name: "TCP".into(),
1039                status: true,
1040                mode: 1,
1041                rxb: 100,
1042                txb: 200,
1043                rx_packets: 5,
1044                tx_packets: 10,
1045                bitrate: Some(1000000),
1046                ifac_size: Some(16),
1047                started: 1000.0,
1048                ia_freq: 0.0,
1049                oa_freq: 0.0,
1050                interface_type: "TCPClientInterface".into(),
1051            }],
1052            transport_id: Some([0xAB; 16]),
1053            transport_enabled: true,
1054            transport_uptime: 3600.0,
1055            total_rxb: 100,
1056            total_txb: 200,
1057            probe_responder: None,
1058        };
1059
1060        let pickle = interface_stats_to_pickle(&stats);
1061
1062        // Verify it round-trips through encode/decode
1063        let encoded = pickle::encode(&pickle);
1064        let decoded = pickle::decode(&encoded).unwrap();
1065        assert_eq!(decoded.get("transport_enabled").unwrap().as_bool().unwrap(), true);
1066        let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
1067        assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
1068    }
1069
1070    #[test]
1071    fn send_probe_rpc_unknown_dest() {
1072        let (event_tx, event_rx) = crate::event::channel();
1073
1074        let driver = thread::spawn(move || {
1075            if let Ok(Event::Query(QueryRequest::SendProbe { dest_hash, payload_size }, resp_tx)) = event_rx.recv() {
1076                assert_eq!(dest_hash, [0xAA; 16]);
1077                assert_eq!(payload_size, 16); // default
1078                let _ = resp_tx.send(QueryResponse::SendProbe(None));
1079            }
1080        });
1081
1082        let request = PickleValue::Dict(vec![
1083            (PickleValue::String("send_probe".into()), PickleValue::Bytes(vec![0xAA; 16])),
1084        ]);
1085
1086        let response = handle_rpc_request(&request, &event_tx).unwrap();
1087        assert_eq!(response, PickleValue::None);
1088        driver.join().unwrap();
1089    }
1090
1091    #[test]
1092    fn send_probe_rpc_with_result() {
1093        let (event_tx, event_rx) = crate::event::channel();
1094
1095        let packet_hash = [0xBB; 32];
1096        let driver = thread::spawn(move || {
1097            if let Ok(Event::Query(QueryRequest::SendProbe { dest_hash, payload_size }, resp_tx)) = event_rx.recv() {
1098                assert_eq!(dest_hash, [0xCC; 16]);
1099                assert_eq!(payload_size, 32);
1100                let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
1101            }
1102        });
1103
1104        let request = PickleValue::Dict(vec![
1105            (PickleValue::String("send_probe".into()), PickleValue::Bytes(vec![0xCC; 16])),
1106            (PickleValue::String("size".into()), PickleValue::Int(32)),
1107        ]);
1108
1109        let response = handle_rpc_request(&request, &event_tx).unwrap();
1110        let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
1111        assert_eq!(ph, &[0xBB; 32]);
1112        assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
1113        driver.join().unwrap();
1114    }
1115
1116    #[test]
1117    fn send_probe_rpc_size_validation() {
1118        let (event_tx, event_rx) = crate::event::channel();
1119
1120        // Negative size should be clamped to default (16)
1121        let driver = thread::spawn(move || {
1122            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) = event_rx.recv() {
1123                assert_eq!(payload_size, 16); // default, not negative
1124                let _ = resp_tx.send(QueryResponse::SendProbe(None));
1125            }
1126        });
1127
1128        let request = PickleValue::Dict(vec![
1129            (PickleValue::String("send_probe".into()), PickleValue::Bytes(vec![0xDD; 16])),
1130            (PickleValue::String("size".into()), PickleValue::Int(-1)),
1131        ]);
1132
1133        let response = handle_rpc_request(&request, &event_tx).unwrap();
1134        assert_eq!(response, PickleValue::None);
1135        driver.join().unwrap();
1136    }
1137
1138    #[test]
1139    fn send_probe_rpc_size_too_large() {
1140        let (event_tx, event_rx) = crate::event::channel();
1141
1142        // Size > 400 should be clamped to default (16)
1143        let driver = thread::spawn(move || {
1144            if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) = event_rx.recv() {
1145                assert_eq!(payload_size, 16); // default, not 999
1146                let _ = resp_tx.send(QueryResponse::SendProbe(None));
1147            }
1148        });
1149
1150        let request = PickleValue::Dict(vec![
1151            (PickleValue::String("send_probe".into()), PickleValue::Bytes(vec![0xDD; 16])),
1152            (PickleValue::String("size".into()), PickleValue::Int(999)),
1153        ]);
1154
1155        let response = handle_rpc_request(&request, &event_tx).unwrap();
1156        assert_eq!(response, PickleValue::None);
1157        driver.join().unwrap();
1158    }
1159
1160    #[test]
1161    fn check_proof_rpc_not_found() {
1162        let (event_tx, event_rx) = crate::event::channel();
1163
1164        let driver = thread::spawn(move || {
1165            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) = event_rx.recv() {
1166                assert_eq!(packet_hash, [0xEE; 32]);
1167                let _ = resp_tx.send(QueryResponse::CheckProof(None));
1168            }
1169        });
1170
1171        let request = PickleValue::Dict(vec![
1172            (PickleValue::String("check_proof".into()), PickleValue::Bytes(vec![0xEE; 32])),
1173        ]);
1174
1175        let response = handle_rpc_request(&request, &event_tx).unwrap();
1176        assert_eq!(response, PickleValue::None);
1177        driver.join().unwrap();
1178    }
1179
1180    #[test]
1181    fn check_proof_rpc_found() {
1182        let (event_tx, event_rx) = crate::event::channel();
1183
1184        let driver = thread::spawn(move || {
1185            if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) = event_rx.recv() {
1186                assert_eq!(packet_hash, [0xFF; 32]);
1187                let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
1188            }
1189        });
1190
1191        let request = PickleValue::Dict(vec![
1192            (PickleValue::String("check_proof".into()), PickleValue::Bytes(vec![0xFF; 32])),
1193        ]);
1194
1195        let response = handle_rpc_request(&request, &event_tx).unwrap();
1196        if let PickleValue::Float(rtt) = response {
1197            assert!((rtt - 0.352).abs() < 0.001);
1198        } else {
1199            panic!("Expected Float, got {:?}", response);
1200        }
1201        driver.join().unwrap();
1202    }
1203
1204    #[test]
1205    fn request_path_rpc() {
1206        let (event_tx, event_rx) = crate::event::channel();
1207
1208        let driver = thread::spawn(move || {
1209            match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
1210                Ok(Event::RequestPath { dest_hash }) => {
1211                    assert_eq!(dest_hash, [0x11; 16]);
1212                }
1213                other => panic!("Expected RequestPath event, got {:?}", other),
1214            }
1215        });
1216
1217        let request = PickleValue::Dict(vec![
1218            (PickleValue::String("request_path".into()), PickleValue::Bytes(vec![0x11; 16])),
1219        ]);
1220
1221        let response = handle_rpc_request(&request, &event_tx).unwrap();
1222        assert_eq!(response, PickleValue::Bool(true));
1223        driver.join().unwrap();
1224    }
1225
1226    #[test]
1227    fn interface_stats_with_probe_responder() {
1228        let probe_hash = [0x42; 16];
1229        let stats = InterfaceStatsResponse {
1230            interfaces: vec![],
1231            transport_id: None,
1232            transport_enabled: true,
1233            transport_uptime: 100.0,
1234            total_rxb: 0,
1235            total_txb: 0,
1236            probe_responder: Some(probe_hash),
1237        };
1238
1239        let pickle = interface_stats_to_pickle(&stats);
1240        let encoded = pickle::encode(&pickle);
1241        let decoded = pickle::decode(&encoded).unwrap();
1242
1243        let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
1244        assert_eq!(pr, &probe_hash);
1245    }
1246
1247    // Helper: create a connected TCP pair
1248    fn tcp_pair() -> (TcpStream, TcpStream) {
1249        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1250        let port = listener.local_addr().unwrap().port();
1251        let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
1252        let (server, _) = listener.accept().unwrap();
1253        client.set_read_timeout(Some(std::time::Duration::from_secs(5))).unwrap();
1254        server.set_read_timeout(Some(std::time::Duration::from_secs(5))).unwrap();
1255        (server, client)
1256    }
1257}