1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
15use std::thread;
16
17use rns_crypto::sha256::sha256;
18use rns_crypto::hmac::hmac_sha256;
19
20use crate::event::{
21 BlackholeInfo, Event, EventSender, QueryRequest, QueryResponse,
22 InterfaceStatsResponse, SingleInterfaceStat,
23 PathTableEntry, RateTableEntry,
24};
25use crate::md5::hmac_md5;
26use crate::pickle::{self, PickleValue};
27
28const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
29const WELCOME: &[u8] = b"#WELCOME#";
30const FAILURE: &[u8] = b"#FAILURE#";
31const CHALLENGE_LEN: usize = 40;
32
33#[derive(Debug, Clone)]
35pub enum RpcAddr {
36 Tcp(String, u16),
37}
38
39pub struct RpcServer {
41 shutdown: Arc<AtomicBool>,
42 thread: Option<thread::JoinHandle<()>>,
43}
44
45impl RpcServer {
46 pub fn start(
48 addr: &RpcAddr,
49 auth_key: [u8; 32],
50 event_tx: EventSender,
51 ) -> io::Result<Self> {
52 let shutdown = Arc::new(AtomicBool::new(false));
53 let shutdown2 = shutdown.clone();
54
55 let listener = match addr {
56 RpcAddr::Tcp(host, port) => {
57 let l = TcpListener::bind((host.as_str(), *port))?;
58 l.set_nonblocking(true)?;
60 l
61 }
62 };
63
64 let thread = thread::Builder::new()
65 .name("rpc-server".into())
66 .spawn(move || {
67 rpc_server_loop(listener, auth_key, event_tx, shutdown2);
68 })
69 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
70
71 Ok(RpcServer {
72 shutdown,
73 thread: Some(thread),
74 })
75 }
76
77 pub fn stop(&mut self) {
79 self.shutdown.store(true, Ordering::Relaxed);
80 if let Some(handle) = self.thread.take() {
81 let _ = handle.join();
82 }
83 }
84}
85
86impl Drop for RpcServer {
87 fn drop(&mut self) {
88 self.stop();
89 }
90}
91
92fn rpc_server_loop(
93 listener: TcpListener,
94 auth_key: [u8; 32],
95 event_tx: EventSender,
96 shutdown: Arc<AtomicBool>,
97) {
98 loop {
99 if shutdown.load(Ordering::Relaxed) {
100 break;
101 }
102
103 match listener.accept() {
104 Ok((stream, _addr)) => {
105 let _ = stream.set_nonblocking(false);
107 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
108 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
109
110 if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
111 log::debug!("RPC connection error: {}", e);
112 }
113 }
114 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
115 thread::sleep(std::time::Duration::from_millis(100));
117 }
118 Err(e) => {
119 log::error!("RPC accept error: {}", e);
120 thread::sleep(std::time::Duration::from_millis(100));
121 }
122 }
123 }
124}
125
126fn handle_connection(
127 mut stream: TcpStream,
128 auth_key: &[u8; 32],
129 event_tx: &EventSender,
130) -> io::Result<()> {
131 server_auth(&mut stream, auth_key)?;
133
134 let request_bytes = recv_bytes(&mut stream)?;
136 let request = pickle::decode(&request_bytes)
137 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
138
139 let response = handle_rpc_request(&request, event_tx)?;
141
142 let response_bytes = pickle::encode(&response);
144 send_bytes(&mut stream, &response_bytes)?;
145
146 Ok(())
147}
148
149fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
151 let mut random_bytes = [0u8; CHALLENGE_LEN];
153 {
155 let mut f = std::fs::File::open("/dev/urandom")?;
156 f.read_exact(&mut random_bytes)?;
157 }
158
159 let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
160 challenge_message.extend_from_slice(CHALLENGE_PREFIX);
161 challenge_message.extend_from_slice(b"{sha256}");
162 challenge_message.extend_from_slice(&random_bytes);
163
164 send_bytes(stream, &challenge_message)?;
165
166 let response = recv_bytes(stream)?;
168
169 let message = &challenge_message[CHALLENGE_PREFIX.len()..];
172
173 if verify_response(auth_key, message, &response) {
174 send_bytes(stream, WELCOME)?;
175 Ok(())
176 } else {
177 send_bytes(stream, FAILURE)?;
178 Err(io::Error::new(io::ErrorKind::PermissionDenied, "auth failed"))
179 }
180}
181
182fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
184 if response.starts_with(b"{sha256}") {
186 let digest = &response[8..];
187 let expected = hmac_sha256(auth_key, message);
188 constant_time_eq(digest, &expected)
189 }
190 else if response.len() == 16 {
192 let expected = hmac_md5(auth_key, message);
193 constant_time_eq(response, &expected)
194 }
195 else if response.starts_with(b"{md5}") {
197 let digest = &response[5..];
198 let expected = hmac_md5(auth_key, message);
199 constant_time_eq(digest, &expected)
200 } else {
201 false
202 }
203}
204
205fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
207 if a.len() != b.len() {
208 return false;
209 }
210 let mut diff = 0u8;
211 for (x, y) in a.iter().zip(b.iter()) {
212 diff |= x ^ y;
213 }
214 diff == 0
215}
216
217fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
219 let len = data.len() as i32;
220 stream.write_all(&len.to_be_bytes())?;
221 stream.write_all(data)?;
222 stream.flush()
223}
224
225fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
227 let mut len_buf = [0u8; 4];
228 stream.read_exact(&mut len_buf)?;
229 let len = i32::from_be_bytes(len_buf);
230
231 if len < 0 {
232 let mut len8_buf = [0u8; 8];
234 stream.read_exact(&mut len8_buf)?;
235 let len = u64::from_be_bytes(len8_buf) as usize;
236 if len > 64 * 1024 * 1024 {
237 return Err(io::Error::new(io::ErrorKind::InvalidData, "message too large"));
238 }
239 let mut buf = vec![0u8; len];
240 stream.read_exact(&mut buf)?;
241 Ok(buf)
242 } else {
243 let len = len as usize;
244 if len > 64 * 1024 * 1024 {
245 return Err(io::Error::new(io::ErrorKind::InvalidData, "message too large"));
246 }
247 let mut buf = vec![0u8; len];
248 stream.read_exact(&mut buf)?;
249 Ok(buf)
250 }
251}
252
253fn handle_rpc_request(
255 request: &PickleValue,
256 event_tx: &EventSender,
257) -> io::Result<PickleValue> {
258 if let Some(get_val) = request.get("get") {
260 if let Some(path) = get_val.as_str() {
261 return match path {
262 "interface_stats" => {
263 let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
264 if let QueryResponse::InterfaceStats(stats) = resp {
265 Ok(interface_stats_to_pickle(&stats))
266 } else {
267 Ok(PickleValue::None)
268 }
269 }
270 "path_table" => {
271 let max_hops = request.get("max_hops").and_then(|v| {
272 v.as_int().map(|n| n as u8)
273 });
274 let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
275 if let QueryResponse::PathTable(entries) = resp {
276 Ok(path_table_to_pickle(&entries))
277 } else {
278 Ok(PickleValue::None)
279 }
280 }
281 "rate_table" => {
282 let resp = send_query(event_tx, QueryRequest::RateTable)?;
283 if let QueryResponse::RateTable(entries) = resp {
284 Ok(rate_table_to_pickle(&entries))
285 } else {
286 Ok(PickleValue::None)
287 }
288 }
289 "next_hop" => {
290 let hash = extract_dest_hash(request, "destination_hash")?;
291 let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
292 if let QueryResponse::NextHop(Some(nh)) = resp {
293 Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
294 } else {
295 Ok(PickleValue::None)
296 }
297 }
298 "next_hop_if_name" => {
299 let hash = extract_dest_hash(request, "destination_hash")?;
300 let resp = send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
301 if let QueryResponse::NextHopIfName(Some(name)) = resp {
302 Ok(PickleValue::String(name))
303 } else {
304 Ok(PickleValue::None)
305 }
306 }
307 "link_count" => {
308 let resp = send_query(event_tx, QueryRequest::LinkCount)?;
309 if let QueryResponse::LinkCount(n) = resp {
310 Ok(PickleValue::Int(n as i64))
311 } else {
312 Ok(PickleValue::None)
313 }
314 }
315 "transport_identity" => {
316 let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
317 if let QueryResponse::TransportIdentity(Some(hash)) = resp {
318 Ok(PickleValue::Bytes(hash.to_vec()))
319 } else {
320 Ok(PickleValue::None)
321 }
322 }
323 "blackholed" => {
324 let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
325 if let QueryResponse::Blackholed(entries) = resp {
326 Ok(blackholed_to_pickle(&entries))
327 } else {
328 Ok(PickleValue::None)
329 }
330 }
331 "discovered_interfaces" => {
332 let only_available = request.get("only_available")
333 .and_then(|v| v.as_bool()).unwrap_or(false);
334 let only_transport = request.get("only_transport")
335 .and_then(|v| v.as_bool()).unwrap_or(false);
336 let resp = send_query(event_tx, QueryRequest::DiscoveredInterfaces {
337 only_available,
338 only_transport,
339 })?;
340 if let QueryResponse::DiscoveredInterfaces(interfaces) = resp {
341 Ok(discovered_interfaces_to_pickle(&interfaces))
342 } else {
343 Ok(PickleValue::None)
344 }
345 }
346 _ => Ok(PickleValue::None),
347 };
348 }
349 }
350
351 if let Some(hash_val) = request.get("request_path") {
353 if let Some(hash_bytes) = hash_val.as_bytes() {
354 if hash_bytes.len() >= 16 {
355 let mut dest_hash = [0u8; 16];
356 dest_hash.copy_from_slice(&hash_bytes[..16]);
357 let _ = event_tx.send(crate::event::Event::RequestPath { dest_hash });
358 return Ok(PickleValue::Bool(true));
359 }
360 }
361 }
362
363 if let Some(hash_val) = request.get("send_probe") {
365 if let Some(hash_bytes) = hash_val.as_bytes() {
366 if hash_bytes.len() >= 16 {
367 let mut dest_hash = [0u8; 16];
368 dest_hash.copy_from_slice(&hash_bytes[..16]);
369 let payload_size = request.get("size")
370 .and_then(|v| v.as_int())
371 .and_then(|n| if n > 0 && n <= 400 { Some(n as usize) } else { None })
372 .unwrap_or(16);
373 let resp = send_query(event_tx, QueryRequest::SendProbe {
374 dest_hash,
375 payload_size,
376 })?;
377 if let QueryResponse::SendProbe(Some((packet_hash, hops))) = resp {
378 return Ok(PickleValue::Dict(vec![
379 (PickleValue::String("packet_hash".into()), PickleValue::Bytes(packet_hash.to_vec())),
380 (PickleValue::String("hops".into()), PickleValue::Int(hops as i64)),
381 ]));
382 } else {
383 return Ok(PickleValue::None);
384 }
385 }
386 }
387 }
388
389 if let Some(hash_val) = request.get("check_proof") {
391 if let Some(hash_bytes) = hash_val.as_bytes() {
392 if hash_bytes.len() >= 32 {
393 let mut packet_hash = [0u8; 32];
394 packet_hash.copy_from_slice(&hash_bytes[..32]);
395 let resp = send_query(event_tx, QueryRequest::CheckProof {
396 packet_hash,
397 })?;
398 if let QueryResponse::CheckProof(Some(rtt)) = resp {
399 return Ok(PickleValue::Float(rtt));
400 } else {
401 return Ok(PickleValue::None);
402 }
403 }
404 }
405 }
406
407 if let Some(hash_val) = request.get("blackhole") {
409 if let Some(hash_bytes) = hash_val.as_bytes() {
410 if hash_bytes.len() >= 16 {
411 let mut identity_hash = [0u8; 16];
412 identity_hash.copy_from_slice(&hash_bytes[..16]);
413 let duration_hours = request.get("duration").and_then(|v| v.as_float());
414 let reason = request.get("reason").and_then(|v| v.as_str()).map(|s| s.to_string());
415 let resp = send_query(event_tx, QueryRequest::BlackholeIdentity {
416 identity_hash,
417 duration_hours,
418 reason,
419 })?;
420 return Ok(PickleValue::Bool(matches!(resp, QueryResponse::BlackholeResult(true))));
421 }
422 }
423 }
424
425 if let Some(hash_val) = request.get("unblackhole") {
427 if let Some(hash_bytes) = hash_val.as_bytes() {
428 if hash_bytes.len() >= 16 {
429 let mut identity_hash = [0u8; 16];
430 identity_hash.copy_from_slice(&hash_bytes[..16]);
431 let resp = send_query(event_tx, QueryRequest::UnblackholeIdentity {
432 identity_hash,
433 })?;
434 return Ok(PickleValue::Bool(matches!(resp, QueryResponse::UnblackholeResult(true))));
435 }
436 }
437 }
438
439 if let Some(drop_val) = request.get("drop") {
441 if let Some(path) = drop_val.as_str() {
442 return match path {
443 "path" => {
444 let hash = extract_dest_hash(request, "destination_hash")?;
445 let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
446 if let QueryResponse::DropPath(ok) = resp {
447 Ok(PickleValue::Bool(ok))
448 } else {
449 Ok(PickleValue::None)
450 }
451 }
452 "all_via" => {
453 let hash = extract_dest_hash(request, "destination_hash")?;
454 let resp = send_query(event_tx, QueryRequest::DropAllVia { transport_hash: hash })?;
455 if let QueryResponse::DropAllVia(n) = resp {
456 Ok(PickleValue::Int(n as i64))
457 } else {
458 Ok(PickleValue::None)
459 }
460 }
461 "announce_queues" => {
462 let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
463 if let QueryResponse::DropAnnounceQueues = resp {
464 Ok(PickleValue::Bool(true))
465 } else {
466 Ok(PickleValue::None)
467 }
468 }
469 _ => Ok(PickleValue::None),
470 };
471 }
472 }
473
474 Ok(PickleValue::None)
475}
476
477fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
479 let (resp_tx, resp_rx) = mpsc::channel();
480 event_tx
481 .send(Event::Query(request, resp_tx))
482 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
483 resp_rx
484 .recv_timeout(std::time::Duration::from_secs(5))
485 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
486}
487
488fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
490 let bytes = request
491 .get(key)
492 .and_then(|v| v.as_bytes())
493 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
494 if bytes.len() < 16 {
495 return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
496 }
497 let mut hash = [0u8; 16];
498 hash.copy_from_slice(&bytes[..16]);
499 Ok(hash)
500}
501
502fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
505 let mut ifaces = Vec::new();
506 for iface in &stats.interfaces {
507 ifaces.push(single_iface_to_pickle(iface));
508 }
509
510 let mut dict = vec![
511 (
512 PickleValue::String("interfaces".into()),
513 PickleValue::List(ifaces),
514 ),
515 (
516 PickleValue::String("transport_enabled".into()),
517 PickleValue::Bool(stats.transport_enabled),
518 ),
519 (
520 PickleValue::String("transport_uptime".into()),
521 PickleValue::Float(stats.transport_uptime),
522 ),
523 (
524 PickleValue::String("rxb".into()),
525 PickleValue::Int(stats.total_rxb as i64),
526 ),
527 (
528 PickleValue::String("txb".into()),
529 PickleValue::Int(stats.total_txb as i64),
530 ),
531 ];
532
533 if let Some(tid) = stats.transport_id {
534 dict.push((
535 PickleValue::String("transport_id".into()),
536 PickleValue::Bytes(tid.to_vec()),
537 ));
538 } else {
539 dict.push((
540 PickleValue::String("transport_id".into()),
541 PickleValue::None,
542 ));
543 }
544
545 if let Some(pr) = stats.probe_responder {
546 dict.push((
547 PickleValue::String("probe_responder".into()),
548 PickleValue::Bytes(pr.to_vec()),
549 ));
550 } else {
551 dict.push((
552 PickleValue::String("probe_responder".into()),
553 PickleValue::None,
554 ));
555 }
556
557 PickleValue::Dict(dict)
558}
559
560fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
561 let mut dict = vec![
562 (PickleValue::String("name".into()), PickleValue::String(s.name.clone())),
563 (PickleValue::String("status".into()), PickleValue::Bool(s.status)),
564 (PickleValue::String("mode".into()), PickleValue::Int(s.mode as i64)),
565 (PickleValue::String("rxb".into()), PickleValue::Int(s.rxb as i64)),
566 (PickleValue::String("txb".into()), PickleValue::Int(s.txb as i64)),
567 (PickleValue::String("rx_packets".into()), PickleValue::Int(s.rx_packets as i64)),
568 (PickleValue::String("tx_packets".into()), PickleValue::Int(s.tx_packets as i64)),
569 (PickleValue::String("started".into()), PickleValue::Float(s.started)),
570 (PickleValue::String("ia_freq".into()), PickleValue::Float(s.ia_freq)),
571 (PickleValue::String("oa_freq".into()), PickleValue::Float(s.oa_freq)),
572 ];
573
574 match s.bitrate {
575 Some(br) => dict.push((
576 PickleValue::String("bitrate".into()),
577 PickleValue::Int(br as i64),
578 )),
579 None => dict.push((
580 PickleValue::String("bitrate".into()),
581 PickleValue::None,
582 )),
583 }
584
585 match s.ifac_size {
586 Some(sz) => dict.push((
587 PickleValue::String("ifac_size".into()),
588 PickleValue::Int(sz as i64),
589 )),
590 None => dict.push((
591 PickleValue::String("ifac_size".into()),
592 PickleValue::None,
593 )),
594 }
595
596 PickleValue::Dict(dict)
597}
598
599fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
600 let list: Vec<PickleValue> = entries.iter().map(|e| {
601 PickleValue::Dict(vec![
602 (PickleValue::String("hash".into()), PickleValue::Bytes(e.hash.to_vec())),
603 (PickleValue::String("timestamp".into()), PickleValue::Float(e.timestamp)),
604 (PickleValue::String("via".into()), PickleValue::Bytes(e.via.to_vec())),
605 (PickleValue::String("hops".into()), PickleValue::Int(e.hops as i64)),
606 (PickleValue::String("expires".into()), PickleValue::Float(e.expires)),
607 (PickleValue::String("interface".into()), PickleValue::String(e.interface_name.clone())),
608 ])
609 }).collect();
610 PickleValue::List(list)
611}
612
613fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
614 let list: Vec<PickleValue> = entries.iter().map(|e| {
615 PickleValue::Dict(vec![
616 (PickleValue::String("hash".into()), PickleValue::Bytes(e.hash.to_vec())),
617 (PickleValue::String("last".into()), PickleValue::Float(e.last)),
618 (PickleValue::String("rate_violations".into()), PickleValue::Int(e.rate_violations as i64)),
619 (PickleValue::String("blocked_until".into()), PickleValue::Float(e.blocked_until)),
620 (PickleValue::String("timestamps".into()), PickleValue::List(
621 e.timestamps.iter().map(|&t| PickleValue::Float(t)).collect()
622 )),
623 ])
624 }).collect();
625 PickleValue::List(list)
626}
627
628fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
629 let list: Vec<PickleValue> = entries.iter().map(|e| {
630 let mut dict = vec![
631 (PickleValue::String("identity_hash".into()), PickleValue::Bytes(e.identity_hash.to_vec())),
632 (PickleValue::String("created".into()), PickleValue::Float(e.created)),
633 (PickleValue::String("expires".into()), PickleValue::Float(e.expires)),
634 ];
635 if let Some(ref reason) = e.reason {
636 dict.push((PickleValue::String("reason".into()), PickleValue::String(reason.clone())));
637 } else {
638 dict.push((PickleValue::String("reason".into()), PickleValue::None));
639 }
640 PickleValue::Dict(dict)
641 }).collect();
642 PickleValue::List(list)
643}
644
645fn discovered_interfaces_to_pickle(interfaces: &[crate::discovery::DiscoveredInterface]) -> PickleValue {
646 let list: Vec<PickleValue> = interfaces.iter().map(|iface| {
647 let mut dict = vec![
648 (PickleValue::String("type".into()), PickleValue::String(iface.interface_type.clone())),
649 (PickleValue::String("transport".into()), PickleValue::Bool(iface.transport)),
650 (PickleValue::String("name".into()), PickleValue::String(iface.name.clone())),
651 (PickleValue::String("discovered".into()), PickleValue::Float(iface.discovered)),
652 (PickleValue::String("last_heard".into()), PickleValue::Float(iface.last_heard)),
653 (PickleValue::String("heard_count".into()), PickleValue::Int(iface.heard_count as i64)),
654 (PickleValue::String("status".into()), PickleValue::String(iface.status.as_str().into())),
655 (PickleValue::String("stamp".into()), PickleValue::Bytes(iface.stamp.clone())),
656 (PickleValue::String("value".into()), PickleValue::Int(iface.stamp_value as i64)),
657 (PickleValue::String("transport_id".into()), PickleValue::Bytes(iface.transport_id.to_vec())),
658 (PickleValue::String("network_id".into()), PickleValue::Bytes(iface.network_id.to_vec())),
659 (PickleValue::String("hops".into()), PickleValue::Int(iface.hops as i64)),
660 ];
661
662 if let Some(v) = iface.latitude {
664 dict.push((PickleValue::String("latitude".into()), PickleValue::Float(v)));
665 } else {
666 dict.push((PickleValue::String("latitude".into()), PickleValue::None));
667 }
668 if let Some(v) = iface.longitude {
669 dict.push((PickleValue::String("longitude".into()), PickleValue::Float(v)));
670 } else {
671 dict.push((PickleValue::String("longitude".into()), PickleValue::None));
672 }
673 if let Some(v) = iface.height {
674 dict.push((PickleValue::String("height".into()), PickleValue::Float(v)));
675 } else {
676 dict.push((PickleValue::String("height".into()), PickleValue::None));
677 }
678
679 if let Some(ref v) = iface.reachable_on {
681 dict.push((PickleValue::String("reachable_on".into()), PickleValue::String(v.clone())));
682 } else {
683 dict.push((PickleValue::String("reachable_on".into()), PickleValue::None));
684 }
685 if let Some(v) = iface.port {
686 dict.push((PickleValue::String("port".into()), PickleValue::Int(v as i64)));
687 } else {
688 dict.push((PickleValue::String("port".into()), PickleValue::None));
689 }
690
691 if let Some(v) = iface.frequency {
693 dict.push((PickleValue::String("frequency".into()), PickleValue::Int(v as i64)));
694 } else {
695 dict.push((PickleValue::String("frequency".into()), PickleValue::None));
696 }
697 if let Some(v) = iface.bandwidth {
698 dict.push((PickleValue::String("bandwidth".into()), PickleValue::Int(v as i64)));
699 } else {
700 dict.push((PickleValue::String("bandwidth".into()), PickleValue::None));
701 }
702 if let Some(v) = iface.spreading_factor {
703 dict.push((PickleValue::String("sf".into()), PickleValue::Int(v as i64)));
704 } else {
705 dict.push((PickleValue::String("sf".into()), PickleValue::None));
706 }
707 if let Some(v) = iface.coding_rate {
708 dict.push((PickleValue::String("cr".into()), PickleValue::Int(v as i64)));
709 } else {
710 dict.push((PickleValue::String("cr".into()), PickleValue::None));
711 }
712 if let Some(ref v) = iface.modulation {
713 dict.push((PickleValue::String("modulation".into()), PickleValue::String(v.clone())));
714 } else {
715 dict.push((PickleValue::String("modulation".into()), PickleValue::None));
716 }
717 if let Some(v) = iface.channel {
718 dict.push((PickleValue::String("channel".into()), PickleValue::Int(v as i64)));
719 } else {
720 dict.push((PickleValue::String("channel".into()), PickleValue::None));
721 }
722
723 if let Some(ref v) = iface.ifac_netname {
725 dict.push((PickleValue::String("ifac_netname".into()), PickleValue::String(v.clone())));
726 } else {
727 dict.push((PickleValue::String("ifac_netname".into()), PickleValue::None));
728 }
729 if let Some(ref v) = iface.ifac_netkey {
730 dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::String(v.clone())));
731 } else {
732 dict.push((PickleValue::String("ifac_netkey".into()), PickleValue::None));
733 }
734
735 if let Some(ref v) = iface.config_entry {
737 dict.push((PickleValue::String("config_entry".into()), PickleValue::String(v.clone())));
738 } else {
739 dict.push((PickleValue::String("config_entry".into()), PickleValue::None));
740 }
741
742 dict.push((PickleValue::String("discovery_hash".into()), PickleValue::Bytes(iface.discovery_hash.to_vec())));
743
744 PickleValue::Dict(dict)
745 }).collect();
746 PickleValue::List(list)
747}
748
749pub struct RpcClient {
753 stream: TcpStream,
754}
755
756impl RpcClient {
757 pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
759 let mut stream = match addr {
760 RpcAddr::Tcp(host, port) => {
761 TcpStream::connect((host.as_str(), *port))?
762 }
763 };
764
765 stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
766 stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
767
768 client_auth(&mut stream, auth_key)?;
770
771 Ok(RpcClient { stream })
772 }
773
774 pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
776 let request_bytes = pickle::encode(request);
777 send_bytes(&mut self.stream, &request_bytes)?;
778
779 let response_bytes = recv_bytes(&mut self.stream)?;
780 pickle::decode(&response_bytes)
781 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
782 }
783}
784
785fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
787 let challenge = recv_bytes(stream)?;
789
790 if !challenge.starts_with(CHALLENGE_PREFIX) {
791 return Err(io::Error::new(
792 io::ErrorKind::InvalidData,
793 "expected challenge",
794 ));
795 }
796
797 let message = &challenge[CHALLENGE_PREFIX.len()..];
798
799 let response = create_response(auth_key, message);
801 send_bytes(stream, &response)?;
802
803 let result = recv_bytes(stream)?;
805 if result == WELCOME {
806 Ok(())
807 } else {
808 Err(io::Error::new(
809 io::ErrorKind::PermissionDenied,
810 "authentication failed",
811 ))
812 }
813}
814
815fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
817 if message.starts_with(b"{sha256}") || message.len() > 20 {
819 let digest = hmac_sha256(auth_key, message);
821 let mut response = Vec::with_capacity(8 + 32);
822 response.extend_from_slice(b"{sha256}");
823 response.extend_from_slice(&digest);
824 response
825 } else {
826 let digest = hmac_md5(auth_key, message);
828 digest.to_vec()
829 }
830}
831
832pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
834 sha256(private_key)
835}
836
837#[cfg(test)]
838mod tests {
839 use super::*;
840
841 #[test]
842 fn send_recv_bytes_roundtrip() {
843 let (mut c1, mut c2) = tcp_pair();
844 let data = b"hello world";
845 send_bytes(&mut c1, data).unwrap();
846 let received = recv_bytes(&mut c2).unwrap();
847 assert_eq!(&received, data);
848 }
849
850 #[test]
851 fn send_recv_empty() {
852 let (mut c1, mut c2) = tcp_pair();
853 send_bytes(&mut c1, b"").unwrap();
854 let received = recv_bytes(&mut c2).unwrap();
855 assert!(received.is_empty());
856 }
857
858 #[test]
859 fn auth_success() {
860 let key = derive_auth_key(b"test-private-key");
861 let (mut server, mut client) = tcp_pair();
862
863 let key2 = key;
864 let t = thread::spawn(move || {
865 client_auth(&mut client, &key2).unwrap();
866 });
867
868 server_auth(&mut server, &key).unwrap();
869 t.join().unwrap();
870 }
871
872 #[test]
873 fn auth_failure_wrong_key() {
874 let server_key = derive_auth_key(b"server-key");
875 let client_key = derive_auth_key(b"wrong-key");
876 let (mut server, mut client) = tcp_pair();
877
878 let t = thread::spawn(move || {
879 let result = client_auth(&mut client, &client_key);
880 assert!(result.is_err());
881 });
882
883 let result = server_auth(&mut server, &server_key);
884 assert!(result.is_err());
885 t.join().unwrap();
886 }
887
888 #[test]
889 fn verify_sha256_response() {
890 let key = derive_auth_key(b"mykey");
891 let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
892 let response = create_response(&key, message);
893 assert!(response.starts_with(b"{sha256}"));
894 assert!(verify_response(&key, message, &response));
895 }
896
897 #[test]
898 fn verify_legacy_md5_response() {
899 let key = derive_auth_key(b"mykey");
900 let message = b"01234567890123456789";
902 let digest = hmac_md5(&key, message);
904 assert!(verify_response(&key, message, &digest));
905 }
906
907 #[test]
908 fn constant_time_eq_works() {
909 assert!(constant_time_eq(b"hello", b"hello"));
910 assert!(!constant_time_eq(b"hello", b"world"));
911 assert!(!constant_time_eq(b"hello", b"hell"));
912 }
913
914 #[test]
915 fn rpc_roundtrip() {
916 let key = derive_auth_key(b"test-key");
917 let (event_tx, event_rx) = crate::event::channel();
918
919 let addr = RpcAddr::Tcp("127.0.0.1".into(), 0);
921 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
923 let port = listener.local_addr().unwrap().port();
924 listener.set_nonblocking(true).unwrap();
925
926 let shutdown = Arc::new(AtomicBool::new(false));
927 let shutdown2 = shutdown.clone();
928
929 let driver_thread = thread::spawn(move || {
931 loop {
932 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
933 Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
934 let _ = resp_tx.send(QueryResponse::LinkCount(42));
935 }
936 Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
937 let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
938 interfaces: vec![SingleInterfaceStat {
939 name: "TestInterface".into(),
940 status: true,
941 mode: 1,
942 rxb: 1000,
943 txb: 2000,
944 rx_packets: 10,
945 tx_packets: 20,
946 bitrate: Some(10_000_000),
947 ifac_size: None,
948 started: 1000.0,
949 ia_freq: 0.0,
950 oa_freq: 0.0,
951 interface_type: "TestInterface".into(),
952 }],
953 transport_id: None,
954 transport_enabled: true,
955 transport_uptime: 3600.0,
956 total_rxb: 1000,
957 total_txb: 2000,
958 probe_responder: None,
959 }));
960 }
961 _ => break,
962 }
963 }
964 });
965
966 let key2 = key;
967 let shutdown3 = shutdown2.clone();
968 let server_thread = thread::spawn(move || {
969 rpc_server_loop(listener, key2, event_tx, shutdown3);
970 });
971
972 thread::sleep(std::time::Duration::from_millis(50));
974
975 let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
977 let mut client = RpcClient::connect(&server_addr, &key).unwrap();
978 let response = client.call(&PickleValue::Dict(vec![
979 (PickleValue::String("get".into()), PickleValue::String("link_count".into())),
980 ])).unwrap();
981 assert_eq!(response.as_int().unwrap(), 42);
982 drop(client);
983
984 let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
986 let response2 = client2.call(&PickleValue::Dict(vec![
987 (PickleValue::String("get".into()), PickleValue::String("interface_stats".into())),
988 ])).unwrap();
989 let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
990 assert_eq!(ifaces.len(), 1);
991 let iface = &ifaces[0];
992 assert_eq!(iface.get("name").unwrap().as_str().unwrap(), "TestInterface");
993 assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
994 drop(client2);
995
996 shutdown2.store(true, Ordering::Relaxed);
998 server_thread.join().unwrap();
999 driver_thread.join().unwrap();
1000 }
1001
1002 #[test]
1003 fn derive_auth_key_deterministic() {
1004 let key1 = derive_auth_key(b"test");
1005 let key2 = derive_auth_key(b"test");
1006 assert_eq!(key1, key2);
1007 let key3 = derive_auth_key(b"other");
1009 assert_ne!(key1, key3);
1010 }
1011
1012 #[test]
1013 fn pickle_request_handling() {
1014 let (event_tx, event_rx) = crate::event::channel();
1016
1017 let driver = thread::spawn(move || {
1018 if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv() {
1019 assert_eq!(dest_hash, [1u8; 16]);
1020 let _ = resp_tx.send(QueryResponse::DropPath(true));
1021 }
1022 });
1023
1024 let request = PickleValue::Dict(vec![
1025 (PickleValue::String("drop".into()), PickleValue::String("path".into())),
1026 (PickleValue::String("destination_hash".into()), PickleValue::Bytes(vec![1u8; 16])),
1027 ]);
1028
1029 let response = handle_rpc_request(&request, &event_tx).unwrap();
1030 assert_eq!(response, PickleValue::Bool(true));
1031 driver.join().unwrap();
1032 }
1033
1034 #[test]
1035 fn interface_stats_pickle_format() {
1036 let stats = InterfaceStatsResponse {
1037 interfaces: vec![SingleInterfaceStat {
1038 name: "TCP".into(),
1039 status: true,
1040 mode: 1,
1041 rxb: 100,
1042 txb: 200,
1043 rx_packets: 5,
1044 tx_packets: 10,
1045 bitrate: Some(1000000),
1046 ifac_size: Some(16),
1047 started: 1000.0,
1048 ia_freq: 0.0,
1049 oa_freq: 0.0,
1050 interface_type: "TCPClientInterface".into(),
1051 }],
1052 transport_id: Some([0xAB; 16]),
1053 transport_enabled: true,
1054 transport_uptime: 3600.0,
1055 total_rxb: 100,
1056 total_txb: 200,
1057 probe_responder: None,
1058 };
1059
1060 let pickle = interface_stats_to_pickle(&stats);
1061
1062 let encoded = pickle::encode(&pickle);
1064 let decoded = pickle::decode(&encoded).unwrap();
1065 assert_eq!(decoded.get("transport_enabled").unwrap().as_bool().unwrap(), true);
1066 let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
1067 assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
1068 }
1069
1070 #[test]
1071 fn send_probe_rpc_unknown_dest() {
1072 let (event_tx, event_rx) = crate::event::channel();
1073
1074 let driver = thread::spawn(move || {
1075 if let Ok(Event::Query(QueryRequest::SendProbe { dest_hash, payload_size }, resp_tx)) = event_rx.recv() {
1076 assert_eq!(dest_hash, [0xAA; 16]);
1077 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
1079 }
1080 });
1081
1082 let request = PickleValue::Dict(vec![
1083 (PickleValue::String("send_probe".into()), PickleValue::Bytes(vec![0xAA; 16])),
1084 ]);
1085
1086 let response = handle_rpc_request(&request, &event_tx).unwrap();
1087 assert_eq!(response, PickleValue::None);
1088 driver.join().unwrap();
1089 }
1090
1091 #[test]
1092 fn send_probe_rpc_with_result() {
1093 let (event_tx, event_rx) = crate::event::channel();
1094
1095 let packet_hash = [0xBB; 32];
1096 let driver = thread::spawn(move || {
1097 if let Ok(Event::Query(QueryRequest::SendProbe { dest_hash, payload_size }, resp_tx)) = event_rx.recv() {
1098 assert_eq!(dest_hash, [0xCC; 16]);
1099 assert_eq!(payload_size, 32);
1100 let _ = resp_tx.send(QueryResponse::SendProbe(Some((packet_hash, 3))));
1101 }
1102 });
1103
1104 let request = PickleValue::Dict(vec![
1105 (PickleValue::String("send_probe".into()), PickleValue::Bytes(vec![0xCC; 16])),
1106 (PickleValue::String("size".into()), PickleValue::Int(32)),
1107 ]);
1108
1109 let response = handle_rpc_request(&request, &event_tx).unwrap();
1110 let ph = response.get("packet_hash").unwrap().as_bytes().unwrap();
1111 assert_eq!(ph, &[0xBB; 32]);
1112 assert_eq!(response.get("hops").unwrap().as_int().unwrap(), 3);
1113 driver.join().unwrap();
1114 }
1115
1116 #[test]
1117 fn send_probe_rpc_size_validation() {
1118 let (event_tx, event_rx) = crate::event::channel();
1119
1120 let driver = thread::spawn(move || {
1122 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) = event_rx.recv() {
1123 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
1125 }
1126 });
1127
1128 let request = PickleValue::Dict(vec![
1129 (PickleValue::String("send_probe".into()), PickleValue::Bytes(vec![0xDD; 16])),
1130 (PickleValue::String("size".into()), PickleValue::Int(-1)),
1131 ]);
1132
1133 let response = handle_rpc_request(&request, &event_tx).unwrap();
1134 assert_eq!(response, PickleValue::None);
1135 driver.join().unwrap();
1136 }
1137
1138 #[test]
1139 fn send_probe_rpc_size_too_large() {
1140 let (event_tx, event_rx) = crate::event::channel();
1141
1142 let driver = thread::spawn(move || {
1144 if let Ok(Event::Query(QueryRequest::SendProbe { payload_size, .. }, resp_tx)) = event_rx.recv() {
1145 assert_eq!(payload_size, 16); let _ = resp_tx.send(QueryResponse::SendProbe(None));
1147 }
1148 });
1149
1150 let request = PickleValue::Dict(vec![
1151 (PickleValue::String("send_probe".into()), PickleValue::Bytes(vec![0xDD; 16])),
1152 (PickleValue::String("size".into()), PickleValue::Int(999)),
1153 ]);
1154
1155 let response = handle_rpc_request(&request, &event_tx).unwrap();
1156 assert_eq!(response, PickleValue::None);
1157 driver.join().unwrap();
1158 }
1159
1160 #[test]
1161 fn check_proof_rpc_not_found() {
1162 let (event_tx, event_rx) = crate::event::channel();
1163
1164 let driver = thread::spawn(move || {
1165 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) = event_rx.recv() {
1166 assert_eq!(packet_hash, [0xEE; 32]);
1167 let _ = resp_tx.send(QueryResponse::CheckProof(None));
1168 }
1169 });
1170
1171 let request = PickleValue::Dict(vec![
1172 (PickleValue::String("check_proof".into()), PickleValue::Bytes(vec![0xEE; 32])),
1173 ]);
1174
1175 let response = handle_rpc_request(&request, &event_tx).unwrap();
1176 assert_eq!(response, PickleValue::None);
1177 driver.join().unwrap();
1178 }
1179
1180 #[test]
1181 fn check_proof_rpc_found() {
1182 let (event_tx, event_rx) = crate::event::channel();
1183
1184 let driver = thread::spawn(move || {
1185 if let Ok(Event::Query(QueryRequest::CheckProof { packet_hash }, resp_tx)) = event_rx.recv() {
1186 assert_eq!(packet_hash, [0xFF; 32]);
1187 let _ = resp_tx.send(QueryResponse::CheckProof(Some(0.352)));
1188 }
1189 });
1190
1191 let request = PickleValue::Dict(vec![
1192 (PickleValue::String("check_proof".into()), PickleValue::Bytes(vec![0xFF; 32])),
1193 ]);
1194
1195 let response = handle_rpc_request(&request, &event_tx).unwrap();
1196 if let PickleValue::Float(rtt) = response {
1197 assert!((rtt - 0.352).abs() < 0.001);
1198 } else {
1199 panic!("Expected Float, got {:?}", response);
1200 }
1201 driver.join().unwrap();
1202 }
1203
1204 #[test]
1205 fn request_path_rpc() {
1206 let (event_tx, event_rx) = crate::event::channel();
1207
1208 let driver = thread::spawn(move || {
1209 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
1210 Ok(Event::RequestPath { dest_hash }) => {
1211 assert_eq!(dest_hash, [0x11; 16]);
1212 }
1213 other => panic!("Expected RequestPath event, got {:?}", other),
1214 }
1215 });
1216
1217 let request = PickleValue::Dict(vec![
1218 (PickleValue::String("request_path".into()), PickleValue::Bytes(vec![0x11; 16])),
1219 ]);
1220
1221 let response = handle_rpc_request(&request, &event_tx).unwrap();
1222 assert_eq!(response, PickleValue::Bool(true));
1223 driver.join().unwrap();
1224 }
1225
1226 #[test]
1227 fn interface_stats_with_probe_responder() {
1228 let probe_hash = [0x42; 16];
1229 let stats = InterfaceStatsResponse {
1230 interfaces: vec![],
1231 transport_id: None,
1232 transport_enabled: true,
1233 transport_uptime: 100.0,
1234 total_rxb: 0,
1235 total_txb: 0,
1236 probe_responder: Some(probe_hash),
1237 };
1238
1239 let pickle = interface_stats_to_pickle(&stats);
1240 let encoded = pickle::encode(&pickle);
1241 let decoded = pickle::decode(&encoded).unwrap();
1242
1243 let pr = decoded.get("probe_responder").unwrap().as_bytes().unwrap();
1244 assert_eq!(pr, &probe_hash);
1245 }
1246
1247 fn tcp_pair() -> (TcpStream, TcpStream) {
1249 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1250 let port = listener.local_addr().unwrap().port();
1251 let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
1252 let (server, _) = listener.accept().unwrap();
1253 client.set_read_timeout(Some(std::time::Duration::from_secs(5))).unwrap();
1254 server.set_read_timeout(Some(std::time::Duration::from_secs(5))).unwrap();
1255 (server, client)
1256 }
1257}