1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17};
18use std::thread;
19
20use rns_crypto::hmac::hmac_sha256;
21use rns_crypto::sha256::sha256;
22
23use crate::event::{
24 BlackholeInfo, Event, EventSender, InterfaceStatsResponse, PathTableEntry, QueryRequest,
25 QueryResponse, RateTableEntry, SingleInterfaceStat,
26};
27use crate::md5::hmac_md5;
28use crate::pickle::{self, PickleValue};
29
30const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
31const WELCOME: &[u8] = b"#WELCOME#";
32const FAILURE: &[u8] = b"#FAILURE#";
33const CHALLENGE_LEN: usize = 40;
34
35#[derive(Debug, Clone)]
37pub enum RpcAddr {
38 Tcp(String, u16),
39}
40
41pub struct RpcServer {
43 shutdown: Arc<AtomicBool>,
44 thread: Option<thread::JoinHandle<()>>,
45}
46
47impl RpcServer {
48 pub fn start(addr: &RpcAddr, auth_key: [u8; 32], event_tx: EventSender) -> io::Result<Self> {
50 let shutdown = Arc::new(AtomicBool::new(false));
51 let shutdown2 = shutdown.clone();
52
53 let listener = match addr {
54 RpcAddr::Tcp(host, port) => {
55 let l = TcpListener::bind((host.as_str(), *port))?;
56 l.set_nonblocking(true)?;
58 l
59 }
60 };
61
62 let thread = thread::Builder::new()
63 .name("rpc-server".into())
64 .spawn(move || {
65 rpc_server_loop(listener, auth_key, event_tx, shutdown2);
66 })
67 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
68
69 Ok(RpcServer {
70 shutdown,
71 thread: Some(thread),
72 })
73 }
74
75 pub fn stop(&mut self) {
77 self.shutdown.store(true, Ordering::Relaxed);
78 if let Some(handle) = self.thread.take() {
79 let _ = handle.join();
80 }
81 }
82}
83
84impl Drop for RpcServer {
85 fn drop(&mut self) {
86 self.stop();
87 }
88}
89
90fn rpc_server_loop(
91 listener: TcpListener,
92 auth_key: [u8; 32],
93 event_tx: EventSender,
94 shutdown: Arc<AtomicBool>,
95) {
96 loop {
97 if shutdown.load(Ordering::Relaxed) {
98 break;
99 }
100
101 match listener.accept() {
102 Ok((stream, _addr)) => {
103 let _ = stream.set_nonblocking(false);
105 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
106 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
107
108 if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
109 log::debug!("RPC connection error: {}", e);
110 }
111 }
112 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
113 thread::sleep(std::time::Duration::from_millis(100));
115 }
116 Err(e) => {
117 log::error!("RPC accept error: {}", e);
118 thread::sleep(std::time::Duration::from_millis(100));
119 }
120 }
121 }
122}
123
124fn handle_connection(
125 mut stream: TcpStream,
126 auth_key: &[u8; 32],
127 event_tx: &EventSender,
128) -> io::Result<()> {
129 server_auth(&mut stream, auth_key)?;
131
132 let request_bytes = recv_bytes(&mut stream)?;
134 let request = pickle::decode(&request_bytes)
135 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
136
137 let response = handle_rpc_request(&request, event_tx)?;
139
140 let response_bytes = pickle::encode(&response);
142 send_bytes(&mut stream, &response_bytes)?;
143
144 Ok(())
145}
146
147fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
149 let mut random_bytes = [0u8; CHALLENGE_LEN];
151 {
153 let mut f = std::fs::File::open("/dev/urandom")?;
154 f.read_exact(&mut random_bytes)?;
155 }
156
157 let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
158 challenge_message.extend_from_slice(CHALLENGE_PREFIX);
159 challenge_message.extend_from_slice(b"{sha256}");
160 challenge_message.extend_from_slice(&random_bytes);
161
162 send_bytes(stream, &challenge_message)?;
163
164 let response = recv_bytes(stream)?;
166
167 let message = &challenge_message[CHALLENGE_PREFIX.len()..];
170
171 if verify_response(auth_key, message, &response) {
172 send_bytes(stream, WELCOME)?;
173 Ok(())
174 } else {
175 send_bytes(stream, FAILURE)?;
176 Err(io::Error::new(
177 io::ErrorKind::PermissionDenied,
178 "auth failed",
179 ))
180 }
181}
182
183fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
185 if response.starts_with(b"{sha256}") {
187 let digest = &response[8..];
188 let expected = hmac_sha256(auth_key, message);
189 constant_time_eq(digest, &expected)
190 }
191 else if response.len() == 16 {
193 let expected = hmac_md5(auth_key, message);
194 constant_time_eq(response, &expected)
195 }
196 else if response.starts_with(b"{md5}") {
198 let digest = &response[5..];
199 let expected = hmac_md5(auth_key, message);
200 constant_time_eq(digest, &expected)
201 } else {
202 false
203 }
204}
205
206fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
208 if a.len() != b.len() {
209 return false;
210 }
211 let mut diff = 0u8;
212 for (x, y) in a.iter().zip(b.iter()) {
213 diff |= x ^ y;
214 }
215 diff == 0
216}
217
218fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
220 let len = data.len() as i32;
221 stream.write_all(&len.to_be_bytes())?;
222 stream.write_all(data)?;
223 stream.flush()
224}
225
226fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
228 let mut len_buf = [0u8; 4];
229 stream.read_exact(&mut len_buf)?;
230 let len = i32::from_be_bytes(len_buf);
231
232 if len < 0 {
233 let mut len8_buf = [0u8; 8];
235 stream.read_exact(&mut len8_buf)?;
236 let len = u64::from_be_bytes(len8_buf) as usize;
237 if len > 64 * 1024 * 1024 {
238 return Err(io::Error::new(
239 io::ErrorKind::InvalidData,
240 "message too large",
241 ));
242 }
243 let mut buf = vec![0u8; len];
244 stream.read_exact(&mut buf)?;
245 Ok(buf)
246 } else {
247 let len = len as usize;
248 if len > 64 * 1024 * 1024 {
249 return Err(io::Error::new(
250 io::ErrorKind::InvalidData,
251 "message too large",
252 ));
253 }
254 let mut buf = vec![0u8; len];
255 stream.read_exact(&mut buf)?;
256 Ok(buf)
257 }
258}
259
260fn handle_rpc_request(request: &PickleValue, event_tx: &EventSender) -> io::Result<PickleValue> {
262 if let Some(get_val) = request.get("get") {
264 if let Some(path) = get_val.as_str() {
265 return match path {
266 "interface_stats" => {
267 let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
268 if let QueryResponse::InterfaceStats(stats) = resp {
269 Ok(interface_stats_to_pickle(&stats))
270 } else {
271 Ok(PickleValue::None)
272 }
273 }
274 "path_table" => {
275 let max_hops = request
276 .get("max_hops")
277 .and_then(|v| v.as_int().map(|n| n as u8));
278 let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
279 if let QueryResponse::PathTable(entries) = resp {
280 Ok(path_table_to_pickle(&entries))
281 } else {
282 Ok(PickleValue::None)
283 }
284 }
285 "rate_table" => {
286 let resp = send_query(event_tx, QueryRequest::RateTable)?;
287 if let QueryResponse::RateTable(entries) = resp {
288 Ok(rate_table_to_pickle(&entries))
289 } else {
290 Ok(PickleValue::None)
291 }
292 }
293 "next_hop" => {
294 let hash = extract_dest_hash(request, "destination_hash")?;
295 let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
296 if let QueryResponse::NextHop(Some(nh)) = resp {
297 Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
298 } else {
299 Ok(PickleValue::None)
300 }
301 }
302 "next_hop_if_name" => {
303 let hash = extract_dest_hash(request, "destination_hash")?;
304 let resp =
305 send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
306 if let QueryResponse::NextHopIfName(Some(name)) = resp {
307 Ok(PickleValue::String(name))
308 } else {
309 Ok(PickleValue::None)
310 }
311 }
312 "link_count" => {
313 let resp = send_query(event_tx, QueryRequest::LinkCount)?;
314 if let QueryResponse::LinkCount(n) = resp {
315 Ok(PickleValue::Int(n as i64))
316 } else {
317 Ok(PickleValue::None)
318 }
319 }
320 "transport_identity" => {
321 let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
322 if let QueryResponse::TransportIdentity(Some(hash)) = resp {
323 Ok(PickleValue::Bytes(hash.to_vec()))
324 } else {
325 Ok(PickleValue::None)
326 }
327 }
328 "blackholed" => {
329 let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
330 if let QueryResponse::Blackholed(entries) = resp {
331 Ok(blackholed_to_pickle(&entries))
332 } else {
333 Ok(PickleValue::None)
334 }
335 }
336 "discovered_interfaces" => {
337 let only_available = request
338 .get("only_available")
339 .and_then(|v| v.as_bool())
340 .unwrap_or(false);
341 let only_transport = request
342 .get("only_transport")
343 .and_then(|v| v.as_bool())
344 .unwrap_or(false);
345 let resp = send_query(
346 event_tx,
347 QueryRequest::DiscoveredInterfaces {
348 only_available,
349 only_transport,
350 },
351 )?;
352 if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
353 Ok(discovered_interfaces_to_pickle(&interfaces))
354 } else {
355 Ok(PickleValue::None)
356 }
357 }
358 _ => Ok(PickleValue::None),
359 };
360 }
361 }
362
363 if let Some(hash_val) = request.get("request_path") {
365 if let Some(hash_bytes) = hash_val.as_bytes() {
366 if hash_bytes.len() >= 16 {
367 let mut dest_hash = [0u8; 16];
368 dest_hash.copy_from_slice(&hash_bytes[..16]);
369 let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
370 return Ok(PickleValue::Bool(true));
371 }
372 }
373 }
374
375 if let Some(hash_val) = request.get("send_probe") {
377 if let Some(hash_bytes) = hash_val.as_bytes() {
378 if hash_bytes.len() >= 16 {
379 let mut dest_hash = [0u8; 16];
380 dest_hash.copy_from_slice(&hash_bytes[..16]);
381 let payload_size = request
382 .get("size")
383 .and_then(|v| v.as_int())
384 .and_then(|n| {
385 if n > 0 && n <= 400 {
386 Some(n as usize)
387 } else {
388 None
389 }
390 })
391 .unwrap_or(16);
392 let resp = send_query(
393 event_tx,
394 QueryRequest::SendProbe {
395 dest_hash,
396 payload_size,
397 },
398 )?;
399 if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
400 return Ok(PickleValue::Dict(vec![
401 (
402 PickleValue::String("packet_hash".into()),
403 PickleValue::Bytes(packet_hash.to_vec()),
404 ),
405 (
406 PickleValue::String("hops".into()),
407 PickleValue::Int(hops as i64),
408 ),
409 ]));
410 } else {
411 return Ok(PickleValue::None);
412 }
413 }
414 }
415 }
416
417 if let Some(hash_val) = request.get("check_proof") {
419 if let Some(hash_bytes) = hash_val.as_bytes() {
420 if hash_bytes.len() >= 32 {
421 let mut packet_hash = [0u8; 32];
422 packet_hash.copy_from_slice(&hash_bytes[..32]);
423 let resp = send_query(event_tx, QueryRequest::CheckProof { packet_hash })?;
424 if let QueryResponse::CheckProof(Some(rtt)) = resp {
425 return Ok(PickleValue::Float(rtt));
426 } else {
427 return Ok(PickleValue::None);
428 }
429 }
430 }
431 }
432
433 if let Some(hash_val) = request.get("blackhole") {
435 if let Some(hash_bytes) = hash_val.as_bytes() {
436 if hash_bytes.len() >= 16 {
437 let mut identity_hash = [0u8; 16];
438 identity_hash.copy_from_slice(&hash_bytes[..16]);
439 let duration_hours = request.get("duration").and_then(|v| v.as_float());
440 let reason = request
441 .get("reason")
442 .and_then(|v| v.as_str())
443 .map(|s| s.to_string());
444 let resp = send_query(
445 event_tx,
446 QueryRequest::BlackholeIdentity {
447 identity_hash,
448 duration_hours,
449 reason,
450 },
451 )?;
452 return Ok(PickleValue::Bool(matches!(
453 resp,
454 QueryResponse::BlackholeResult(true)
455 )));
456 }
457 }
458 }
459
460 if let Some(hash_val) = request.get("unblackhole") {
462 if let Some(hash_bytes) = hash_val.as_bytes() {
463 if hash_bytes.len() >= 16 {
464 let mut identity_hash = [0u8; 16];
465 identity_hash.copy_from_slice(&hash_bytes[..16]);
466 let resp = send_query(
467 event_tx,
468 QueryRequest::UnblackholeIdentity { identity_hash },
469 )?;
470 return Ok(PickleValue::Bool(matches!(
471 resp,
472 QueryResponse::UnblackholeResult(true)
473 )));
474 }
475 }
476 }
477
478 if let Some(drop_val) = request.get("drop") {
480 if let Some(path) = drop_val.as_str() {
481 return match path {
482 "path" => {
483 let hash = extract_dest_hash(request, "destination_hash")?;
484 let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
485 if let QueryResponse::DropPath(ok) = resp {
486 Ok(PickleValue::Bool(ok))
487 } else {
488 Ok(PickleValue::None)
489 }
490 }
491 "all_via" => {
492 let hash = extract_dest_hash(request, "destination_hash")?;
493 let resp = send_query(
494 event_tx,
495 QueryRequest::DropAllVia {
496 transport_hash: hash,
497 },
498 )?;
499 if let QueryResponse::DropAllVia(n) = resp {
500 Ok(PickleValue::Int(n as i64))
501 } else {
502 Ok(PickleValue::None)
503 }
504 }
505 "announce_queues" => {
506 let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
507 if let QueryResponse::DropAnnounceQueues = resp {
508 Ok(PickleValue::Bool(true))
509 } else {
510 Ok(PickleValue::None)
511 }
512 }
513 _ => Ok(PickleValue::None),
514 };
515 }
516 }
517
518 Ok(PickleValue::None)
519}
520
521fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
523 let (resp_tx, resp_rx) = mpsc::channel();
524 event_tx
525 .send(Event::Query(request, resp_tx))
526 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
527 resp_rx
528 .recv_timeout(std::time::Duration::from_secs(5))
529 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
530}
531
532fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
534 let bytes = request
535 .get(key)
536 .and_then(|v| v.as_bytes())
537 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
538 if bytes.len() < 16 {
539 return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
540 }
541 let mut hash = [0u8; 16];
542 hash.copy_from_slice(&bytes[..16]);
543 Ok(hash)
544}
545
546fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
549 let mut ifaces = Vec::new();
550 for iface in &stats.interfaces {
551 ifaces.push(single_iface_to_pickle(iface));
552 }
553
554 let mut dict = vec![
555 (
556 PickleValue::String("interfaces".into()),
557 PickleValue::List(ifaces),
558 ),
559 (
560 PickleValue::String("transport_enabled".into()),
561 PickleValue::Bool(stats.transport_enabled),
562 ),
563 (
564 PickleValue::String("transport_uptime".into()),
565 PickleValue::Float(stats.transport_uptime),
566 ),
567 (
568 PickleValue::String("rxb".into()),
569 PickleValue::Int(stats.total_rxb as i64),
570 ),
571 (
572 PickleValue::String("txb".into()),
573 PickleValue::Int(stats.total_txb as i64),
574 ),
575 ];
576
577 if let Some(tid) = stats.transport_id {
578 dict.push((
579 PickleValue::String("transport_id".into()),
580 PickleValue::Bytes(tid.to_vec()),
581 ));
582 } else {
583 dict.push((
584 PickleValue::String("transport_id".into()),
585 PickleValue::None,
586 ));
587 }
588
589 if let Some(pr) = stats.probe_responder {
590 dict.push((
591 PickleValue::String("probe_responder".into()),
592 PickleValue::Bytes(pr.to_vec()),
593 ));
594 } else {
595 dict.push((
596 PickleValue::String("probe_responder".into()),
597 PickleValue::None,
598 ));
599 }
600
601 PickleValue::Dict(dict)
602}
603
604fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
605 let mut dict = vec![
606 (
607 PickleValue::String("name".into()),
608 PickleValue::String(s.name.clone()),
609 ),
610 (
611 PickleValue::String("status".into()),
612 PickleValue::Bool(s.status),
613 ),
614 (
615 PickleValue::String("mode".into()),
616 PickleValue::Int(s.mode as i64),
617 ),
618 (
619 PickleValue::String("rxb".into()),
620 PickleValue::Int(s.rxb as i64),
621 ),
622 (
623 PickleValue::String("txb".into()),
624 PickleValue::Int(s.txb as i64),
625 ),
626 (
627 PickleValue::String("rx_packets".into()),
628 PickleValue::Int(s.rx_packets as i64),
629 ),
630 (
631 PickleValue::String("tx_packets".into()),
632 PickleValue::Int(s.tx_packets as i64),
633 ),
634 (
635 PickleValue::String("started".into()),
636 PickleValue::Float(s.started),
637 ),
638 (
639 PickleValue::String("ia_freq".into()),
640 PickleValue::Float(s.ia_freq),
641 ),
642 (
643 PickleValue::String("oa_freq".into()),
644 PickleValue::Float(s.oa_freq),
645 ),
646 ];
647
648 match s.bitrate {
649 Some(br) => dict.push((
650 PickleValue::String("bitrate".into()),
651 PickleValue::Int(br as i64),
652 )),
653 None => dict.push((PickleValue::String("bitrate".into()), PickleValue::None)),
654 }
655
656 match s.ifac_size {
657 Some(sz) => dict.push((
658 PickleValue::String("ifac_size".into()),
659 PickleValue::Int(sz as i64),
660 )),
661 None => dict.push((PickleValue::String("ifac_size".into()), PickleValue::None)),
662 }
663
664 PickleValue::Dict(dict)
665}
666
667fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
668 let list: Vec<PickleValue> = entries
669 .iter()
670 .map(|e| {
671 PickleValue::Dict(vec![
672 (
673 PickleValue::String("hash".into()),
674 PickleValue::Bytes(e.hash.to_vec()),
675 ),
676 (
677 PickleValue::String("timestamp".into()),
678 PickleValue::Float(e.timestamp),
679 ),
680 (
681 PickleValue::String("via".into()),
682 PickleValue::Bytes(e.via.to_vec()),
683 ),
684 (
685 PickleValue::String("hops".into()),
686 PickleValue::Int(e.hops as i64),
687 ),
688 (
689 PickleValue::String("expires".into()),
690 PickleValue::Float(e.expires),
691 ),
692 (
693 PickleValue::String("interface".into()),
694 PickleValue::String(e.interface_name.clone()),
695 ),
696 ])
697 })
698 .collect();
699 PickleValue::List(list)
700}
701
702fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
703 let list: Vec<PickleValue> = entries
704 .iter()
705 .map(|e| {
706 PickleValue::Dict(vec![
707 (
708 PickleValue::String("hash".into()),
709 PickleValue::Bytes(e.hash.to_vec()),
710 ),
711 (
712 PickleValue::String("last".into()),
713 PickleValue::Float(e.last),
714 ),
715 (
716 PickleValue::String("rate_violations".into()),
717 PickleValue::Int(e.rate_violations as i64),
718 ),
719 (
720 PickleValue::String("blocked_until".into()),
721 PickleValue::Float(e.blocked_until),
722 ),
723 (
724 PickleValue::String("timestamps".into()),
725 PickleValue::List(
726 e.timestamps
727 .iter()
728 .map(|&t| PickleValue::Float(t))
729 .collect(),
730 ),
731 ),
732 ])
733 })
734 .collect();
735 PickleValue::List(list)
736}
737
738fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
739 let list: Vec<PickleValue> = entries
740 .iter()
741 .map(|e| {
742 let mut dict = vec![
743 (
744 PickleValue::String("identity_hash".into()),
745 PickleValue::Bytes(e.identity_hash.to_vec()),
746 ),
747 (
748 PickleValue::String("created".into()),
749 PickleValue::Float(e.created),
750 ),
751 (
752 PickleValue::String("expires".into()),
753 PickleValue::Float(e.expires),
754 ),
755 ];
756 if let Some(ref reason) = e.reason {
757 dict.push((
758 PickleValue::String("reason".into()),
759 PickleValue::String(reason.clone()),
760 ));
761 } else {
762 dict.push((PickleValue::String("reason".into()), PickleValue::None));
763 }
764 PickleValue::Dict(dict)
765 })
766 .collect();
767 PickleValue::List(list)
768}
769
770fn discovered_interfaces_to_pickle(
771 interfaces: &[crate::discovery::DiscoveredInterface],
772) -> PickleValue {
773 let list: Vec<PickleValue> = interfaces
774 .iter()
775 .map(|iface| {
776 let mut dict = vec![
777 (
778 PickleValue::String("type".into()),
779 PickleValue::String(iface.interface_type.clone()),
780 ),
781 (
782 PickleValue::String("transport".into()),
783 PickleValue::Bool(iface.transport),
784 ),
785 (
786 PickleValue::String("name".into()),
787 PickleValue::String(iface.name.clone()),
788 ),
789 (
790 PickleValue::String("discovered".into()),
791 PickleValue::Float(iface.discovered),
792 ),
793 (
794 PickleValue::String("last_heard".into()),
795 PickleValue::Float(iface.last_heard),
796 ),
797 (
798 PickleValue::String("heard_count".into()),
799 PickleValue::Int(iface.heard_count as i64),
800 ),
801 (
802 PickleValue::String("status".into()),
803 PickleValue::String(iface.status.as_str().into()),
804 ),
805 (
806 PickleValue::String("stamp".into()),
807 PickleValue::Bytes(iface.stamp.clone()),
808 ),
809 (
810 PickleValue::String("value".into()),
811 PickleValue::Int(iface.stamp_value as i64),
812 ),
813 (
814 PickleValue::String("transport_id".into()),
815 PickleValue::Bytes(iface.transport_id.to_vec()),
816 ),
817 (
818 PickleValue::String("network_id".into()),
819 PickleValue::Bytes(iface.network_id.to_vec()),
820 ),
821 (
822 PickleValue::String("hops".into()),
823 PickleValue::Int(iface.hops as i64),
824 ),
825 ];
826
827 if let Some(v) = iface.latitude {
829 dict.push((
830 PickleValue::String("latitude".into()),
831 PickleValue::Float(v),
832 ));
833 } else {
834 dict.push((PickleValue::String("latitude".into()), PickleValue::None));
835 }
836 if let Some(v) = iface.longitude {
837 dict.push((
838 PickleValue::String("longitude".into()),
839 PickleValue::Float(v),
840 ));
841 } else {
842 dict.push((PickleValue::String("longitude".into()), PickleValue::None));
843 }
844 if let Some(v) = iface.height {
845 dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
846 } else {
847 dict.push((PickleValue::String("height".into()), PickleValue::None));
848 }
849
850 if let Some(ref v) = iface.reachable_on {
852 dict.push((
853 PickleValue::String("reachable_on".into()),
854 PickleValue::String(v.clone()),
855 ));
856 } else {
857 dict.push((
858 PickleValue::String("reachable_on".into()),
859 PickleValue::None,
860 ));
861 }
862 if let Some(v) = iface.port {
863 dict.push((
864 PickleValue::String("port".into()),
865 PickleValue::Int(v as i64),
866 ));
867 } else {
868 dict.push((PickleValue::String("port".into()), PickleValue::None));
869 }
870
871 if let Some(v) = iface.frequency {
873 dict.push((
874 PickleValue::String("frequency".into()),
875 PickleValue::Int(v as i64),
876 ));
877 } else {
878 dict.push((PickleValue::String("frequency".into()), PickleValue::None));
879 }
880 if let Some(v) = iface.bandwidth {
881 dict.push((
882 PickleValue::String("bandwidth".into()),
883 PickleValue::Int(v as i64),
884 ));
885 } else {
886 dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
887 }
888 if let Some(v) = iface.spreading_factor {
889 dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
890 } else {
891 dict.push((PickleValue::String("sf".into()), PickleValue::None));
892 }
893 if let Some(v) = iface.coding_rate {
894 dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
895 } else {
896 dict.push((PickleValue::String("cr".into()), PickleValue::None));
897 }
898 if let Some(ref v) = iface.modulation {
899 dict.push((
900 PickleValue::String("modulation".into()),
901 PickleValue::String(v.clone()),
902 ));
903 } else {
904 dict.push((PickleValue::String("modulation".into()), PickleValue::None));
905 }
906 if let Some(v) = iface.channel {
907 dict.push((
908 PickleValue::String("channel".into()),
909 PickleValue::Int(v as i64),
910 ));
911 } else {
912 dict.push((PickleValue::String("channel".into()), PickleValue::None));
913 }
914
915 if let Some(ref v) = iface.ifac_netname {
917 dict.push((
918 PickleValue::String("ifac_netname".into()),
919 PickleValue::String(v.clone()),
920 ));
921 } else {
922 dict.push((
923 PickleValue::String("ifac_netname".into()),
924 PickleValue::None,
925 ));
926 }
927 if let Some(ref v) = iface.ifac_netkey {
928 dict.push((
929 PickleValue::String("ifac_netkey".into()),
930 PickleValue::String(v.clone()),
931 ));
932 } else {
933 dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
934 }
935
936 if let Some(ref v) = iface.config_entry {
938 dict.push((
939 PickleValue::String("config_entry".into()),
940 PickleValue::String(v.clone()),
941 ));
942 } else {
943 dict.push((
944 PickleValue::String("config_entry".into()),
945 PickleValue::None,
946 ));
947 }
948
949 dict.push((
950 PickleValue::String("discovery_hash".into()),
951 PickleValue::Bytes(iface.discovery_hash.to_vec()),
952 ));
953
954 PickleValue::Dict(dict)
955 })
956 .collect();
957 PickleValue::List(list)
958}
959
960pub struct RpcClient {
964 stream: TcpStream,
965}
966
967impl RpcClient {
968 pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
970 let mut stream = match addr {
971 RpcAddr::Tcp(host, port) => TcpStream::connect((host.as_str(), *port))?,
972 };
973
974 stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
975 stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
976
977 client_auth(&mut stream, auth_key)?;
979
980 Ok(RpcClient { stream })
981 }
982
983 pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
985 let request_bytes = pickle::encode(request);
986 send_bytes(&mut self.stream, &request_bytes)?;
987
988 let response_bytes = recv_bytes(&mut self.stream)?;
989 pickle::decode(&response_bytes)
990 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
991 }
992}
993
994fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
996 let challenge = recv_bytes(stream)?;
998
999 if !challenge.starts_with(CHALLENGE_PREFIX) {
1000 return Err(io::Error::new(
1001 io::ErrorKind::InvalidData,
1002 "expected challenge",
1003 ));
1004 }
1005
1006 let message = &challenge[CHALLENGE_PREFIX.len()..];
1007
1008 let response = create_response(auth_key, message);
1010 send_bytes(stream, &response)?;
1011
1012 let result = recv_bytes(stream)?;
1014 if result == WELCOME {
1015 Ok(())
1016 } else {
1017 Err(io::Error::new(
1018 io::ErrorKind::PermissionDenied,
1019 "authentication failed",
1020 ))
1021 }
1022}
1023
1024fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
1026 if message.starts_with(b"{sha256}") || message.len() > 20 {
1028 let digest = hmac_sha256(auth_key, message);
1030 let mut response = Vec::with_capacity(8 + 32);
1031 response.extend_from_slice(b"{sha256}");
1032 response.extend_from_slice(&digest);
1033 response
1034 } else {
1035 let digest = hmac_md5(auth_key, message);
1037 digest.to_vec()
1038 }
1039}
1040
1041pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
1043 sha256(private_key)
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use super::*;
1049
1050 #[test]
1051 fn send_recv_bytes_roundtrip() {
1052 let (mut c1, mut c2) = tcp_pair();
1053 let data = b"hello world";
1054 send_bytes(&mut c1, data).unwrap();
1055 let received = recv_bytes(&mut c2).unwrap();
1056 assert_eq!(&received, data);
1057 }
1058
1059 #[test]
1060 fn send_recv_empty() {
1061 let (mut c1, mut c2) = tcp_pair();
1062 send_bytes(&mut c1, b"").unwrap();
1063 let received = recv_bytes(&mut c2).unwrap();
1064 assert!(received.is_empty());
1065 }
1066
1067 #[test]
1068 fn auth_success() {
1069 let key = derive_auth_key(b"test-private-key");
1070 let (mut server, mut client) = tcp_pair();
1071
1072 let key2 = key;
1073 let t = thread::spawn(move || {
1074 client_auth(&mut client, &key2).unwrap();
1075 });
1076
1077 server_auth(&mut server, &key).unwrap();
1078 t.join().unwrap();
1079 }
1080
1081 #[test]
1082 fn auth_failure_wrong_key() {
1083 let server_key = derive_auth_key(b"server-key");
1084 let client_key = derive_auth_key(b"wrong-key");
1085 let (mut server, mut client) = tcp_pair();
1086
1087 let t = thread::spawn(move || {
1088 let result = client_auth(&mut client, &client_key);
1089 assert!(result.is_err());
1090 });
1091
1092 let result = server_auth(&mut server, &server_key);
1093 assert!(result.is_err());
1094 t.join().unwrap();
1095 }
1096
1097 #[test]
1098 fn verify_sha256_response() {
1099 let key = derive_auth_key(b"mykey");
1100 let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
1101 let response = create_response(&key, message);
1102 assert!(response.starts_with(b"{sha256}"));
1103 assert!(verify_response(&key, message, &response));
1104 }
1105
1106 #[test]
1107 fn verify_legacy_md5_response() {
1108 let key = derive_auth_key(b"mykey");
1109 let message = b"01234567890123456789";
1111 let digest = hmac_md5(&key, message);
1113 assert!(verify_response(&key, message, &digest));
1114 }
1115
1116 #[test]
1117 fn constant_time_eq_works() {
1118 assert!(constant_time_eq(b"hello", b"hello"));
1119 assert!(!constant_time_eq(b"hello", b"world"));
1120 assert!(!constant_time_eq(b"hello", b"hell"));
1121 }
1122
1123 #[test]
1124 fn rpc_roundtrip() {
1125 let key = derive_auth_key(b"test-key");
1126 let (event_tx, event_rx) = crate::event::channel();
1127
1128 let addr = RpcAddr::Tcp("127.0.0.1".into(), 0);
1130 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1132 let port = listener.local_addr().unwrap().port();
1133 listener.set_nonblocking(true).unwrap();
1134
1135 let shutdown = Arc::new(AtomicBool::new(false));
1136 let shutdown2 = shutdown.clone();
1137
1138 let driver_thread = thread::spawn(move || loop {
1140 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
1141 Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
1142 let _ = resp_tx.send(QueryResponse::LinkCount(42));
1143 }
1144 Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
1145 let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
1146 interfaces: vec![SingleInterfaceStat {
1147 name: "TestInterface".into(),
1148 status: true,
1149 mode: 1,
1150 rxb: 1000,
1151 txb: 2000,
1152 rx_packets: 10,
1153 tx_packets: 20,
1154 bitrate: Some(10_000_000),
1155 ifac_size: None,
1156 started: 1000.0,
1157 ia_freq: 0.0,
1158 oa_freq: 0.0,
1159 interface_type: "TestInterface".into(),
1160 }],
1161 transport_id: None,
1162 transport_enabled: true,
1163 transport_uptime: 3600.0,
1164 total_rxb: 1000,
1165 total_txb: 2000,
1166 probe_responder: None,
1167 }));
1168 }
1169 _ => break,
1170 }
1171 });
1172
1173 let key2 = key;
1174 let shutdown3 = shutdown2.clone();
1175 let server_thread = thread::spawn(move || {
1176 rpc_server_loop(listener, key2, event_tx, shutdown3);
1177 });
1178
1179 thread::sleep(std::time::Duration::from_millis(50));
1181
1182 let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
1184 let mut client = RpcClient::connect(&server_addr, &key).unwrap();
1185 let response = client
1186 .call(&PickleValue::Dict(vec![(
1187 PickleValue::String("get".into()),
1188 PickleValue::String("link_count".into()),
1189 )]))
1190 .unwrap();
1191 assert_eq!(response.as_int().unwrap(), 42);
1192 drop(client);
1193
1194 let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
1196 let response2 = client2
1197 .call(&PickleValue::Dict(vec![(
1198 PickleValue::String("get".into()),
1199 PickleValue::String("interface_stats".into()),
1200 )]))
1201 .unwrap();
1202 let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
1203 assert_eq!(ifaces.len(), 1);
1204 let iface = &ifaces[0];
1205 assert_eq!(
1206 iface.get("name").unwrap().as_str().unwrap(),
1207 "TestInterface"
1208 );
1209 assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
1210 drop(client2);
1211
1212 shutdown2.store(true, Ordering::Relaxed);
1214 server_thread.join().unwrap();
1215 driver_thread.join().unwrap();
1216 }
1217
1218 #[test]
1219 fn derive_auth_key_deterministic() {
1220 let key1 = derive_auth_key(b"test");
1221 let key2 = derive_auth_key(b"test");
1222 assert_eq!(key1, key2);
1223 let key3 = derive_auth_key(b"other");
1225 assert_ne!(key1, key3);
1226 }
1227
1228 #[test]
1229 fn pickle_request_handling() {
1230 let (event_tx, event_rx) = crate::event::channel();
1232
1233 let driver = thread::spawn(move || {
1234 if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv()
1235 {
1236 assert_eq!(dest_hash, [1u8; 16]);
1237 let _ = resp_tx.send(QueryResponse::DropPath(true));
1238 }
1239 });
1240
1241 let request = PickleValue::Dict(vec![
1242 (
1243 PickleValue::String("drop".into()),
1244 PickleValue::String("path".into()),
1245 ),
1246 (
1247 PickleValue::String("destination_hash".into()),
1248 PickleValue::Bytes(vec![1u8; 16]),
1249 ),
1250 ]);
1251
1252 let response = handle_rpc_request(&request, &event_tx).unwrap();
1253 assert_eq!(response, PickleValue::Bool(true));
1254 driver.join().unwrap();
1255 }
1256
1257 #[test]
1258 fn interface_stats_pickle_format() {
1259 let stats = InterfaceStatsResponse {
1260 interfaces: vec![SingleInterfaceStat {
1261 name: "TCP".into(),
1262 status: true,
1263 mode: 1,
1264 rxb: 100,
1265 txb: 200,
1266 rx_packets: 5,
1267 tx_packets: 10,
1268 bitrate: Some(1000000),
1269 ifac_size: Some(16),
1270 started: 1000.0,
1271 ia_freq: 0.0,
1272 oa_freq: 0.0,
1273 interface_type: "TCPClientInterface".into(),
1274 }],
1275 transport_id: Some([0xAB; 16]),
1276 transport_enabled: true,
1277 transport_uptime: 3600.0,
1278 total_rxb: 100,
1279 total_txb: 200,
1280 probe_responder: None,
1281 };
1282
1283 let pickle = interface_stats_to_pickle(&stats);
1284
1285 let encoded = pickle::encode(&pickle);
1287 let decoded = pickle::decode(&encoded).unwrap();
1288 assert_eq!(
1289 decoded.get("transport_enabled").unwrap().as_bool().unwrap(),
1290 true
1291 );
1292 let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
1293 assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
1294 }
1295
1296 #[test]
1297 fn send_probe_rpc_unknown_dest() {
1298 let (event_tx, event_rx) = crate::event::channel();
1299
1300 let driver = thread::spawn(move || {
1301 if let Ok(Event::Query(
1302 QueryRequest::SendProbe {
1303 dest_hash,
1304 payload_size,
1305 },
1306 resp_tx,
1307 )) = event_rx.recv()
1308 {
1309 assert_eq!(dest_hash, [0xAA; 16]);
1310 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
1312 }
1313 });
1314
1315 let request = PickleValue::Dict(vec![(
1316 PickleValue::String("send_probe".into()),
1317 PickleValue::Bytes(vec![0xAA; 16]),
1318 )]);
1319
1320 let response = handle_rpc_request(&request, &event_tx).unwrap();
1321 assert_eq!(response, PickleValue::None);
1322 driver.join().unwrap();
1323 }
1324
1325 #[test]
1326 fn send_probe_rpc_with_result() {
1327 let (event_tx, event_rx) = crate::event::channel();
1328
1329 let packet_hash = [0xBB; 32];
1330 let driver = thread::spawn(move || {
1331 if let Ok(Event::Query(
1332 QueryRequest::SendProbe {
1333 dest_hash,
1334 payload_size,
1335 },
1336 resp_tx,
1337 )) = event_rx.recv()
1338 {
1339 assert_eq!(dest_hash, [0xCC; 16]);
1340 assert_eq!(payload_size, 32);
1341 let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
1342 }
1343 });
1344
1345 let request = PickleValue::Dict(vec![
1346 (
1347 PickleValue::String("send_probe".into()),
1348 PickleValue::Bytes(vec![0xCC; 16]),
1349 ),
1350 (PickleValue::String("size".into()), PickleValue::Int(32)),
1351 ]);
1352
1353 let response = handle_rpc_request(&request, &event_tx).unwrap();
1354 let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
1355 assert_eq!(ph, &[0xBB; 32]);
1356 assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
1357 driver.join().unwrap();
1358 }
1359
1360 #[test]
1361 fn send_probe_rpc_size_validation() {
1362 let (event_tx, event_rx) = crate::event::channel();
1363
1364 let driver = thread::spawn(move || {
1366 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
1367 event_rx.recv()
1368 {
1369 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
1371 }
1372 });
1373
1374 let request = PickleValue::Dict(vec![
1375 (
1376 PickleValue::String("send_probe".into()),
1377 PickleValue::Bytes(vec![0xDD; 16]),
1378 ),
1379 (PickleValue::String("size".into()), PickleValue::Int(-1)),
1380 ]);
1381
1382 let response = handle_rpc_request(&request, &event_tx).unwrap();
1383 assert_eq!(response, PickleValue::None);
1384 driver.join().unwrap();
1385 }
1386
1387 #[test]
1388 fn send_probe_rpc_size_too_large() {
1389 let (event_tx, event_rx) = crate::event::channel();
1390
1391 let driver = thread::spawn(move || {
1393 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) =
1394 event_rx.recv()
1395 {
1396 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
1398 }
1399 });
1400
1401 let request = PickleValue::Dict(vec![
1402 (
1403 PickleValue::String("send_probe".into()),
1404 PickleValue::Bytes(vec![0xDD; 16]),
1405 ),
1406 (PickleValue::String("size".into()), PickleValue::Int(999)),
1407 ]);
1408
1409 let response = handle_rpc_request(&request, &event_tx).unwrap();
1410 assert_eq!(response, PickleValue::None);
1411 driver.join().unwrap();
1412 }
1413
1414 #[test]
1415 fn check_proof_rpc_not_found() {
1416 let (event_tx, event_rx) = crate::event::channel();
1417
1418 let driver = thread::spawn(move || {
1419 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
1420 event_rx.recv()
1421 {
1422 assert_eq!(packet_hash, [0xEE; 32]);
1423 let _ = resp_tx.send(QueryResponse::CheckProof(None));
1424 }
1425 });
1426
1427 let request = PickleValue::Dict(vec![(
1428 PickleValue::String("check_proof".into()),
1429 PickleValue::Bytes(vec![0xEE; 32]),
1430 )]);
1431
1432 let response = handle_rpc_request(&request, &event_tx).unwrap();
1433 assert_eq!(response, PickleValue::None);
1434 driver.join().unwrap();
1435 }
1436
1437 #[test]
1438 fn check_proof_rpc_found() {
1439 let (event_tx, event_rx) = crate::event::channel();
1440
1441 let driver = thread::spawn(move || {
1442 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) =
1443 event_rx.recv()
1444 {
1445 assert_eq!(packet_hash, [0xFF; 32]);
1446 let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
1447 }
1448 });
1449
1450 let request = PickleValue::Dict(vec![(
1451 PickleValue::String("check_proof".into()),
1452 PickleValue::Bytes(vec![0xFF; 32]),
1453 )]);
1454
1455 let response = handle_rpc_request(&request, &event_tx).unwrap();
1456 if let PickleValue::Float(rtt) = response {
1457 assert!((rtt - 0.352).abs() < 0.001);
1458 } else {
1459 panic!("Expected Float, got {:?}", response);
1460 }
1461 driver.join().unwrap();
1462 }
1463
1464 #[test]
1465 fn request_path_rpc() {
1466 let (event_tx, event_rx) = crate::event::channel();
1467
1468 let driver =
1469 thread::spawn(
1470 move || match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
1471 Ok(Event::RequestPath { dest_hash }) => {
1472 assert_eq!(dest_hash, [0x11; 16]);
1473 }
1474 other => panic!("Expected RequestPath event, got {:?}", other),
1475 },
1476 );
1477
1478 let request = PickleValue::Dict(vec![(
1479 PickleValue::String("request_path".into()),
1480 PickleValue::Bytes(vec![0x11; 16]),
1481 )]);
1482
1483 let response = handle_rpc_request(&request, &event_tx).unwrap();
1484 assert_eq!(response, PickleValue::Bool(true));
1485 driver.join().unwrap();
1486 }
1487
1488 #[test]
1489 fn interface_stats_with_probe_responder() {
1490 let probe_hash = [0x42; 16];
1491 let stats = InterfaceStatsResponse {
1492 interfaces: vec![],
1493 transport_id: None,
1494 transport_enabled: true,
1495 transport_uptime: 100.0,
1496 total_rxb: 0,
1497 total_txb: 0,
1498 probe_responder: Some(probe_hash),
1499 };
1500
1501 let pickle = interface_stats_to_pickle(&stats);
1502 let encoded = pickle::encode(&pickle);
1503 let decoded = pickle::decode(&encoded).unwrap();
1504
1505 let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
1506 assert_eq!(pr, &probe_hash);
1507 }
1508
1509 fn tcp_pair() -> (TcpStream, TcpStream) {
1511 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1512 let port = listener.local_addr().unwrap().port();
1513 let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
1514 let (server, _) = listener.accept().unwrap();
1515 client
1516 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
1517 .unwrap();
1518 server
1519 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
1520 .unwrap();
1521 (server, client)
1522 }
1523}