1use std::io::{self, Read, Write};
12use std::net::{TcpListener, TcpStream};
13use std::sync::mpsc;
14use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
15use std::thread;
16
17use rns_crypto::sha256::sha256;
18use rns_crypto::hmac::hmac_sha256;
19
20use crate::event::{
21 BlackholeInfo, Event, EventSender, QueryRequest, QueryResponse,
22 InterfaceStatsResponse, SingleInterfaceStat,
23 PathTableEntry, RateTableEntry,
24};
25use crate::md5::hmac_md5;
26use crate::pickle::{self, PickleValue};
27
28const CHALLENGE_PREFIX: &[u8] = b"#CHALLENGE#";
29const WELCOME: &[u8] = b"#WELCOME#";
30const FAILURE: &[u8] = b"#FAILURE#";
31const CHALLENGE_LEN: usize = 40;
32
33#[derive(Debug, Clone)]
35pub enum RpcAddr {
36 Tcp(String, u16),
37}
38
39pub struct RpcServer {
41 shutdown: Arc<AtomicBool>,
42 thread: Option<thread::JoinHandle<()>>,
43}
44
45impl RpcServer {
46 pub fn start(
48 addr: &RpcAddr,
49 auth_key: [u8; 32],
50 event_tx: EventSender,
51 ) -> io::Result<Self> {
52 let shutdown = Arc::new(AtomicBool::new(false));
53 let shutdown2 = shutdown.clone();
54
55 let listener = match addr {
56 RpcAddr::Tcp(host, port) => {
57 let l = TcpListener::bind((host.as_str(), *port))?;
58 l.set_nonblocking(true)?;
60 l
61 }
62 };
63
64 let thread = thread::Builder::new()
65 .name("rpc-server".into())
66 .spawn(move || {
67 rpc_server_loop(listener, auth_key, event_tx, shutdown2);
68 })
69 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
70
71 Ok(RpcServer {
72 shutdown,
73 thread: Some(thread),
74 })
75 }
76
77 pub fn stop(&mut self) {
79 self.shutdown.store(true, Ordering::Relaxed);
80 if let Some(handle) = self.thread.take() {
81 let _ = handle.join();
82 }
83 }
84}
85
86impl Drop for RpcServer {
87 fn drop(&mut self) {
88 self.stop();
89 }
90}
91
92fn rpc_server_loop(
93 listener: TcpListener,
94 auth_key: [u8; 32],
95 event_tx: EventSender,
96 shutdown: Arc<AtomicBool>,
97) {
98 loop {
99 if shutdown.load(Ordering::Relaxed) {
100 break;
101 }
102
103 match listener.accept() {
104 Ok((stream, _addr)) => {
105 let _ = stream.set_nonblocking(false);
107 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(10)));
108 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(10)));
109
110 if let Err(e) = handle_connection(stream, &auth_key, &event_tx) {
111 log::debug!("RPC connection error: {}", e);
112 }
113 }
114 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
115 thread::sleep(std::time::Duration::from_millis(100));
117 }
118 Err(e) => {
119 log::error!("RPC accept error: {}", e);
120 thread::sleep(std::time::Duration::from_millis(100));
121 }
122 }
123 }
124}
125
126fn handle_connection(
127 mut stream: TcpStream,
128 auth_key: &[u8; 32],
129 event_tx: &EventSender,
130) -> io::Result<()> {
131 server_auth(&mut stream, auth_key)?;
133
134 let request_bytes = recv_bytes(&mut stream)?;
136 let request = pickle::decode(&request_bytes)
137 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
138
139 let response = handle_rpc_request(&request, event_tx)?;
141
142 let response_bytes = pickle::encode(&response);
144 send_bytes(&mut stream, &response_bytes)?;
145
146 Ok(())
147}
148
149fn server_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
151 let mut random_bytes = [0u8; CHALLENGE_LEN];
153 {
155 let mut f = std::fs::File::open("/dev/urandom")?;
156 f.read_exact(&mut random_bytes)?;
157 }
158
159 let mut challenge_message = Vec::with_capacity(CHALLENGE_PREFIX.len() + 8 + CHALLENGE_LEN);
160 challenge_message.extend_from_slice(CHALLENGE_PREFIX);
161 challenge_message.extend_from_slice(b"{sha256}");
162 challenge_message.extend_from_slice(&random_bytes);
163
164 send_bytes(stream, &challenge_message)?;
165
166 let response = recv_bytes(stream)?;
168
169 let message = &challenge_message[CHALLENGE_PREFIX.len()..];
172
173 if verify_response(auth_key, message, &response) {
174 send_bytes(stream, WELCOME)?;
175 Ok(())
176 } else {
177 send_bytes(stream, FAILURE)?;
178 Err(io::Error::new(io::ErrorKind::PermissionDenied, "auth failed"))
179 }
180}
181
182fn verify_response(auth_key: &[u8; 32], message: &[u8], response: &[u8]) -> bool {
184 if response.starts_with(b"{sha256}") {
186 let digest = &response[8..];
187 let expected = hmac_sha256(auth_key, message);
188 constant_time_eq(digest, &expected)
189 }
190 else if response.len() == 16 {
192 let expected = hmac_md5(auth_key, message);
193 constant_time_eq(response, &expected)
194 }
195 else if response.starts_with(b"{md5}") {
197 let digest = &response[5..];
198 let expected = hmac_md5(auth_key, message);
199 constant_time_eq(digest, &expected)
200 } else {
201 false
202 }
203}
204
205fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
207 if a.len() != b.len() {
208 return false;
209 }
210 let mut diff = 0u8;
211 for (x, y) in a.iter().zip(b.iter()) {
212 diff |= x ^ y;
213 }
214 diff == 0
215}
216
217fn send_bytes(stream: &mut TcpStream, data: &[u8]) -> io::Result<()> {
219 let len = data.len() as i32;
220 stream.write_all(&len.to_be_bytes())?;
221 stream.write_all(data)?;
222 stream.flush()
223}
224
225fn recv_bytes(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
227 let mut len_buf = [0u8; 4];
228 stream.read_exact(&mut len_buf)?;
229 let len = i32::from_be_bytes(len_buf);
230
231 if len < 0 {
232 let mut len8_buf = [0u8; 8];
234 stream.read_exact(&mut len8_buf)?;
235 let len = u64::from_be_bytes(len8_buf) as usize;
236 if len > 64 * 1024 * 1024 {
237 return Err(io::Error::new(io::ErrorKind::InvalidData, "message too large"));
238 }
239 let mut buf = vec![0u8; len];
240 stream.read_exact(&mut buf)?;
241 Ok(buf)
242 } else {
243 let len = len as usize;
244 if len > 64 * 1024 * 1024 {
245 return Err(io::Error::new(io::ErrorKind::InvalidData, "message too large"));
246 }
247 let mut buf = vec![0u8; len];
248 stream.read_exact(&mut buf)?;
249 Ok(buf)
250 }
251}
252
253fn handle_rpc_request(
255 request: &PickleValue,
256 event_tx: &EventSender,
257) -> io::Result<PickleValue> {
258 if let Some(get_val) = request.get("get") {
260 if let Some(path) = get_val.as_str() {
261 return match path {
262 "interface_stats" => {
263 let resp = send_query(event_tx, QueryRequest::InterfaceStats)?;
264 if let QueryResponse::InterfaceStats(stats) = resp {
265 Ok(interface_stats_to_pickle(&stats))
266 } else {
267 Ok(PickleValue::None)
268 }
269 }
270 "path_table" => {
271 let max_hops = request.get("max_hops").and_then(|v| {
272 v.as_int().map(|n| n as u8)
273 });
274 let resp = send_query(event_tx, QueryRequest::PathTable { max_hops })?;
275 if let QueryResponse::PathTable(entries) = resp {
276 Ok(path_table_to_pickle(&entries))
277 } else {
278 Ok(PickleValue::None)
279 }
280 }
281 "rate_table" => {
282 let resp = send_query(event_tx, QueryRequest::RateTable)?;
283 if let QueryResponse::RateTable(entries) = resp {
284 Ok(rate_table_to_pickle(&entries))
285 } else {
286 Ok(PickleValue::None)
287 }
288 }
289 "next_hop" => {
290 let hash = extract_dest_hash(request, "destination_hash")?;
291 let resp = send_query(event_tx, QueryRequest::NextHop { dest_hash: hash })?;
292 if let QueryResponse::NextHop(Some(nh)) = resp {
293 Ok(PickleValue::Bytes(nh.next_hop.to_vec()))
294 } else {
295 Ok(PickleValue::None)
296 }
297 }
298 "next_hop_if_name" => {
299 let hash = extract_dest_hash(request, "destination_hash")?;
300 let resp = send_query(event_tx, QueryRequest::NextHopIfName { dest_hash: hash })?;
301 if let QueryResponse::NextHopIfName(Some(name)) = resp {
302 Ok(PickleValue::String(name))
303 } else {
304 Ok(PickleValue::None)
305 }
306 }
307 "link_count" => {
308 let resp = send_query(event_tx, QueryRequest::LinkCount)?;
309 if let QueryResponse::LinkCount(n) = resp {
310 Ok(PickleValue::Int(n as i64))
311 } else {
312 Ok(PickleValue::None)
313 }
314 }
315 "transport_identity" => {
316 let resp = send_query(event_tx, QueryRequest::TransportIdentity)?;
317 if let QueryResponse::TransportIdentity(Some(hash)) = resp {
318 Ok(PickleValue::Bytes(hash.to_vec()))
319 } else {
320 Ok(PickleValue::None)
321 }
322 }
323 "blackholed" => {
324 let resp = send_query(event_tx, QueryRequest::GetBlackholed)?;
325 if let QueryResponse::Blackholed(entries) = resp {
326 Ok(blackholed_to_pickle(&entries))
327 } else {
328 Ok(PickleValue::None)
329 }
330 }
331 _ => Ok(PickleValue::None),
332 };
333 }
334 }
335
336 if let Some(hash_val) = request.get("blackhole") {
338 if let Some(hash_bytes) = hash_val.as_bytes() {
339 if hash_bytes.len() >= 16 {
340 let mut identity_hash = [0u8; 16];
341 identity_hash.copy_from_slice(&hash_bytes[..16]);
342 let duration_hours = request.get("duration").and_then(|v| v.as_float());
343 let reason = request.get("reason").and_then(|v| v.as_str()).map(|s| s.to_string());
344 let resp = send_query(event_tx, QueryRequest::BlackholeIdentity {
345 identity_hash,
346 duration_hours,
347 reason,
348 })?;
349 return Ok(PickleValue::Bool(matches!(resp, QueryResponse::BlackholeResult(true))));
350 }
351 }
352 }
353
354 if let Some(hash_val) = request.get("unblackhole") {
356 if let Some(hash_bytes) = hash_val.as_bytes() {
357 if hash_bytes.len() >= 16 {
358 let mut identity_hash = [0u8; 16];
359 identity_hash.copy_from_slice(&hash_bytes[..16]);
360 let resp = send_query(event_tx, QueryRequest::UnblackholeIdentity {
361 identity_hash,
362 })?;
363 return Ok(PickleValue::Bool(matches!(resp, QueryResponse::UnblackholeResult(true))));
364 }
365 }
366 }
367
368 if let Some(drop_val) = request.get("drop") {
370 if let Some(path) = drop_val.as_str() {
371 return match path {
372 "path" => {
373 let hash = extract_dest_hash(request, "destination_hash")?;
374 let resp = send_query(event_tx, QueryRequest::DropPath { dest_hash: hash })?;
375 if let QueryResponse::DropPath(ok) = resp {
376 Ok(PickleValue::Bool(ok))
377 } else {
378 Ok(PickleValue::None)
379 }
380 }
381 "all_via" => {
382 let hash = extract_dest_hash(request, "destination_hash")?;
383 let resp = send_query(event_tx, QueryRequest::DropAllVia { transport_hash: hash })?;
384 if let QueryResponse::DropAllVia(n) = resp {
385 Ok(PickleValue::Int(n as i64))
386 } else {
387 Ok(PickleValue::None)
388 }
389 }
390 "announce_queues" => {
391 let resp = send_query(event_tx, QueryRequest::DropAnnounceQueues)?;
392 if let QueryResponse::DropAnnounceQueues = resp {
393 Ok(PickleValue::Bool(true))
394 } else {
395 Ok(PickleValue::None)
396 }
397 }
398 _ => Ok(PickleValue::None),
399 };
400 }
401 }
402
403 Ok(PickleValue::None)
404}
405
406fn send_query(event_tx: &EventSender, request: QueryRequest) -> io::Result<QueryResponse> {
408 let (resp_tx, resp_rx) = mpsc::channel();
409 event_tx
410 .send(Event::Query(request, resp_tx))
411 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "driver shut down"))?;
412 resp_rx
413 .recv_timeout(std::time::Duration::from_secs(5))
414 .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "query timed out"))
415}
416
417fn extract_dest_hash(request: &PickleValue, key: &str) -> io::Result<[u8; 16]> {
419 let bytes = request
420 .get(key)
421 .and_then(|v| v.as_bytes())
422 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing destination_hash"))?;
423 if bytes.len() < 16 {
424 return Err(io::Error::new(io::ErrorKind::InvalidData, "hash too short"));
425 }
426 let mut hash = [0u8; 16];
427 hash.copy_from_slice(&bytes[..16]);
428 Ok(hash)
429}
430
431fn interface_stats_to_pickle(stats: &InterfaceStatsResponse) -> PickleValue {
434 let mut ifaces = Vec::new();
435 for iface in &stats.interfaces {
436 ifaces.push(single_iface_to_pickle(iface));
437 }
438
439 let mut dict = vec![
440 (
441 PickleValue::String("interfaces".into()),
442 PickleValue::List(ifaces),
443 ),
444 (
445 PickleValue::String("transport_enabled".into()),
446 PickleValue::Bool(stats.transport_enabled),
447 ),
448 (
449 PickleValue::String("transport_uptime".into()),
450 PickleValue::Float(stats.transport_uptime),
451 ),
452 (
453 PickleValue::String("rxb".into()),
454 PickleValue::Int(stats.total_rxb as i64),
455 ),
456 (
457 PickleValue::String("txb".into()),
458 PickleValue::Int(stats.total_txb as i64),
459 ),
460 ];
461
462 if let Some(tid) = stats.transport_id {
463 dict.push((
464 PickleValue::String("transport_id".into()),
465 PickleValue::Bytes(tid.to_vec()),
466 ));
467 } else {
468 dict.push((
469 PickleValue::String("transport_id".into()),
470 PickleValue::None,
471 ));
472 }
473
474 PickleValue::Dict(dict)
475}
476
477fn single_iface_to_pickle(s: &SingleInterfaceStat) -> PickleValue {
478 let mut dict = vec![
479 (PickleValue::String("name".into()), PickleValue::String(s.name.clone())),
480 (PickleValue::String("status".into()), PickleValue::Bool(s.status)),
481 (PickleValue::String("mode".into()), PickleValue::Int(s.mode as i64)),
482 (PickleValue::String("rxb".into()), PickleValue::Int(s.rxb as i64)),
483 (PickleValue::String("txb".into()), PickleValue::Int(s.txb as i64)),
484 (PickleValue::String("rx_packets".into()), PickleValue::Int(s.rx_packets as i64)),
485 (PickleValue::String("tx_packets".into()), PickleValue::Int(s.tx_packets as i64)),
486 (PickleValue::String("started".into()), PickleValue::Float(s.started)),
487 (PickleValue::String("ia_freq".into()), PickleValue::Float(s.ia_freq)),
488 (PickleValue::String("oa_freq".into()), PickleValue::Float(s.oa_freq)),
489 ];
490
491 match s.bitrate {
492 Some(br) => dict.push((
493 PickleValue::String("bitrate".into()),
494 PickleValue::Int(br as i64),
495 )),
496 None => dict.push((
497 PickleValue::String("bitrate".into()),
498 PickleValue::None,
499 )),
500 }
501
502 match s.ifac_size {
503 Some(sz) => dict.push((
504 PickleValue::String("ifac_size".into()),
505 PickleValue::Int(sz as i64),
506 )),
507 None => dict.push((
508 PickleValue::String("ifac_size".into()),
509 PickleValue::None,
510 )),
511 }
512
513 PickleValue::Dict(dict)
514}
515
516fn path_table_to_pickle(entries: &[PathTableEntry]) -> PickleValue {
517 let list: Vec<PickleValue> = entries.iter().map(|e| {
518 PickleValue::Dict(vec![
519 (PickleValue::String("hash".into()), PickleValue::Bytes(e.hash.to_vec())),
520 (PickleValue::String("timestamp".into()), PickleValue::Float(e.timestamp)),
521 (PickleValue::String("via".into()), PickleValue::Bytes(e.via.to_vec())),
522 (PickleValue::String("hops".into()), PickleValue::Int(e.hops as i64)),
523 (PickleValue::String("expires".into()), PickleValue::Float(e.expires)),
524 (PickleValue::String("interface".into()), PickleValue::String(e.interface_name.clone())),
525 ])
526 }).collect();
527 PickleValue::List(list)
528}
529
530fn rate_table_to_pickle(entries: &[RateTableEntry]) -> PickleValue {
531 let list: Vec<PickleValue> = entries.iter().map(|e| {
532 PickleValue::Dict(vec![
533 (PickleValue::String("hash".into()), PickleValue::Bytes(e.hash.to_vec())),
534 (PickleValue::String("last".into()), PickleValue::Float(e.last)),
535 (PickleValue::String("rate_violations".into()), PickleValue::Int(e.rate_violations as i64)),
536 (PickleValue::String("blocked_until".into()), PickleValue::Float(e.blocked_until)),
537 (PickleValue::String("timestamps".into()), PickleValue::List(
538 e.timestamps.iter().map(|&t| PickleValue::Float(t)).collect()
539 )),
540 ])
541 }).collect();
542 PickleValue::List(list)
543}
544
545fn blackholed_to_pickle(entries: &[BlackholeInfo]) -> PickleValue {
546 let list: Vec<PickleValue> = entries.iter().map(|e| {
547 let mut dict = vec![
548 (PickleValue::String("identity_hash".into()), PickleValue::Bytes(e.identity_hash.to_vec())),
549 (PickleValue::String("created".into()), PickleValue::Float(e.created)),
550 (PickleValue::String("expires".into()), PickleValue::Float(e.expires)),
551 ];
552 if let Some(ref reason) = e.reason {
553 dict.push((PickleValue::String("reason".into()), PickleValue::String(reason.clone())));
554 } else {
555 dict.push((PickleValue::String("reason".into()), PickleValue::None));
556 }
557 PickleValue::Dict(dict)
558 }).collect();
559 PickleValue::List(list)
560}
561
562pub struct RpcClient {
566 stream: TcpStream,
567}
568
569impl RpcClient {
570 pub fn connect(addr: &RpcAddr, auth_key: &[u8; 32]) -> io::Result<Self> {
572 let mut stream = match addr {
573 RpcAddr::Tcp(host, port) => {
574 TcpStream::connect((host.as_str(), *port))?
575 }
576 };
577
578 stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;
579 stream.set_write_timeout(Some(std::time::Duration::from_secs(10)))?;
580
581 client_auth(&mut stream, auth_key)?;
583
584 Ok(RpcClient { stream })
585 }
586
587 pub fn call(&mut self, request: &PickleValue) -> io::Result<PickleValue> {
589 let request_bytes = pickle::encode(request);
590 send_bytes(&mut self.stream, &request_bytes)?;
591
592 let response_bytes = recv_bytes(&mut self.stream)?;
593 pickle::decode(&response_bytes)
594 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
595 }
596}
597
598fn client_auth(stream: &mut TcpStream, auth_key: &[u8; 32]) -> io::Result<()> {
600 let challenge = recv_bytes(stream)?;
602
603 if !challenge.starts_with(CHALLENGE_PREFIX) {
604 return Err(io::Error::new(
605 io::ErrorKind::InvalidData,
606 "expected challenge",
607 ));
608 }
609
610 let message = &challenge[CHALLENGE_PREFIX.len()..];
611
612 let response = create_response(auth_key, message);
614 send_bytes(stream, &response)?;
615
616 let result = recv_bytes(stream)?;
618 if result == WELCOME {
619 Ok(())
620 } else {
621 Err(io::Error::new(
622 io::ErrorKind::PermissionDenied,
623 "authentication failed",
624 ))
625 }
626}
627
628fn create_response(auth_key: &[u8; 32], message: &[u8]) -> Vec<u8> {
630 if message.starts_with(b"{sha256}") || message.len() > 20 {
632 let digest = hmac_sha256(auth_key, message);
634 let mut response = Vec::with_capacity(8 + 32);
635 response.extend_from_slice(b"{sha256}");
636 response.extend_from_slice(&digest);
637 response
638 } else {
639 let digest = hmac_md5(auth_key, message);
641 digest.to_vec()
642 }
643}
644
645pub fn derive_auth_key(private_key: &[u8]) -> [u8; 32] {
647 sha256(private_key)
648}
649
650#[cfg(test)]
651mod tests {
652 use super::*;
653
654 #[test]
655 fn send_recv_bytes_roundtrip() {
656 let (mut c1, mut c2) = tcp_pair();
657 let data = b"hello world";
658 send_bytes(&mut c1, data).unwrap();
659 let received = recv_bytes(&mut c2).unwrap();
660 assert_eq!(&received, data);
661 }
662
663 #[test]
664 fn send_recv_empty() {
665 let (mut c1, mut c2) = tcp_pair();
666 send_bytes(&mut c1, b"").unwrap();
667 let received = recv_bytes(&mut c2).unwrap();
668 assert!(received.is_empty());
669 }
670
671 #[test]
672 fn auth_success() {
673 let key = derive_auth_key(b"test-private-key");
674 let (mut server, mut client) = tcp_pair();
675
676 let key2 = key;
677 let t = thread::spawn(move || {
678 client_auth(&mut client, &key2).unwrap();
679 });
680
681 server_auth(&mut server, &key).unwrap();
682 t.join().unwrap();
683 }
684
685 #[test]
686 fn auth_failure_wrong_key() {
687 let server_key = derive_auth_key(b"server-key");
688 let client_key = derive_auth_key(b"wrong-key");
689 let (mut server, mut client) = tcp_pair();
690
691 let t = thread::spawn(move || {
692 let result = client_auth(&mut client, &client_key);
693 assert!(result.is_err());
694 });
695
696 let result = server_auth(&mut server, &server_key);
697 assert!(result.is_err());
698 t.join().unwrap();
699 }
700
701 #[test]
702 fn verify_sha256_response() {
703 let key = derive_auth_key(b"mykey");
704 let message = b"{sha256}abcdefghijklmnopqrstuvwxyz0123456789ABCD";
705 let response = create_response(&key, message);
706 assert!(response.starts_with(b"{sha256}"));
707 assert!(verify_response(&key, message, &response));
708 }
709
710 #[test]
711 fn verify_legacy_md5_response() {
712 let key = derive_auth_key(b"mykey");
713 let message = b"01234567890123456789";
715 let digest = hmac_md5(&key, message);
717 assert!(verify_response(&key, message, &digest));
718 }
719
720 #[test]
721 fn constant_time_eq_works() {
722 assert!(constant_time_eq(b"hello", b"hello"));
723 assert!(!constant_time_eq(b"hello", b"world"));
724 assert!(!constant_time_eq(b"hello", b"hell"));
725 }
726
727 #[test]
728 fn rpc_roundtrip() {
729 let key = derive_auth_key(b"test-key");
730 let (event_tx, event_rx) = crate::event::channel();
731
732 let addr = RpcAddr::Tcp("127.0.0.1".into(), 0);
734 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
736 let port = listener.local_addr().unwrap().port();
737 listener.set_nonblocking(true).unwrap();
738
739 let shutdown = Arc::new(AtomicBool::new(false));
740 let shutdown2 = shutdown.clone();
741
742 let driver_thread = thread::spawn(move || {
744 loop {
745 match event_rx.recv_timeout(std::time::Duration::from_secs(5)) {
746 Ok(Event::Query(QueryRequest::LinkCount, resp_tx)) => {
747 let _ = resp_tx.send(QueryResponse::LinkCount(42));
748 }
749 Ok(Event::Query(QueryRequest::InterfaceStats, resp_tx)) => {
750 let _ = resp_tx.send(QueryResponse::InterfaceStats(InterfaceStatsResponse {
751 interfaces: vec![SingleInterfaceStat {
752 name: "TestInterface".into(),
753 status: true,
754 mode: 1,
755 rxb: 1000,
756 txb: 2000,
757 rx_packets: 10,
758 tx_packets: 20,
759 bitrate: Some(10_000_000),
760 ifac_size: None,
761 started: 1000.0,
762 ia_freq: 0.0,
763 oa_freq: 0.0,
764 interface_type: "TestInterface".into(),
765 }],
766 transport_id: None,
767 transport_enabled: true,
768 transport_uptime: 3600.0,
769 total_rxb: 1000,
770 total_txb: 2000,
771 }));
772 }
773 _ => break,
774 }
775 }
776 });
777
778 let key2 = key;
779 let shutdown3 = shutdown2.clone();
780 let server_thread = thread::spawn(move || {
781 rpc_server_loop(listener, key2, event_tx, shutdown3);
782 });
783
784 thread::sleep(std::time::Duration::from_millis(50));
786
787 let server_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
789 let mut client = RpcClient::connect(&server_addr, &key).unwrap();
790 let response = client.call(&PickleValue::Dict(vec![
791 (PickleValue::String("get".into()), PickleValue::String("link_count".into())),
792 ])).unwrap();
793 assert_eq!(response.as_int().unwrap(), 42);
794 drop(client);
795
796 let mut client2 = RpcClient::connect(&server_addr, &key).unwrap();
798 let response2 = client2.call(&PickleValue::Dict(vec![
799 (PickleValue::String("get".into()), PickleValue::String("interface_stats".into())),
800 ])).unwrap();
801 let ifaces = response2.get("interfaces").unwrap().as_list().unwrap();
802 assert_eq!(ifaces.len(), 1);
803 let iface = &ifaces[0];
804 assert_eq!(iface.get("name").unwrap().as_str().unwrap(), "TestInterface");
805 assert_eq!(iface.get("rxb").unwrap().as_int().unwrap(), 1000);
806 drop(client2);
807
808 shutdown2.store(true, Ordering::Relaxed);
810 server_thread.join().unwrap();
811 driver_thread.join().unwrap();
812 }
813
814 #[test]
815 fn derive_auth_key_deterministic() {
816 let key1 = derive_auth_key(b"test");
817 let key2 = derive_auth_key(b"test");
818 assert_eq!(key1, key2);
819 let key3 = derive_auth_key(b"other");
821 assert_ne!(key1, key3);
822 }
823
824 #[test]
825 fn pickle_request_handling() {
826 let (event_tx, event_rx) = crate::event::channel();
828
829 let driver = thread::spawn(move || {
830 if let Ok(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)) = event_rx.recv() {
831 assert_eq!(dest_hash, [1u8; 16]);
832 let _ = resp_tx.send(QueryResponse::DropPath(true));
833 }
834 });
835
836 let request = PickleValue::Dict(vec![
837 (PickleValue::String("drop".into()), PickleValue::String("path".into())),
838 (PickleValue::String("destination_hash".into()), PickleValue::Bytes(vec![1u8; 16])),
839 ]);
840
841 let response = handle_rpc_request(&request, &event_tx).unwrap();
842 assert_eq!(response, PickleValue::Bool(true));
843 driver.join().unwrap();
844 }
845
846 #[test]
847 fn interface_stats_pickle_format() {
848 let stats = InterfaceStatsResponse {
849 interfaces: vec![SingleInterfaceStat {
850 name: "TCP".into(),
851 status: true,
852 mode: 1,
853 rxb: 100,
854 txb: 200,
855 rx_packets: 5,
856 tx_packets: 10,
857 bitrate: Some(1000000),
858 ifac_size: Some(16),
859 started: 1000.0,
860 ia_freq: 0.0,
861 oa_freq: 0.0,
862 interface_type: "TCPClientInterface".into(),
863 }],
864 transport_id: Some([0xAB; 16]),
865 transport_enabled: true,
866 transport_uptime: 3600.0,
867 total_rxb: 100,
868 total_txb: 200,
869 };
870
871 let pickle = interface_stats_to_pickle(&stats);
872
873 let encoded = pickle::encode(&pickle);
875 let decoded = pickle::decode(&encoded).unwrap();
876 assert_eq!(decoded.get("transport_enabled").unwrap().as_bool().unwrap(), true);
877 let ifaces = decoded.get("interfaces").unwrap().as_list().unwrap();
878 assert_eq!(ifaces[0].get("name").unwrap().as_str().unwrap(), "TCP");
879 }
880
881 fn tcp_pair() -> (TcpStream, TcpStream) {
883 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
884 let port = listener.local_addr().unwrap().port();
885 let client = TcpStream::connect(("127.0.0.1", port)).unwrap();
886 let (server, _) = listener.accept().unwrap();
887 client.set_read_timeout(Some(std::time::Duration::from_secs(5))).unwrap();
888 server.set_read_timeout(Some(std::time::Duration::from_secs(5))).unwrap();
889 (server, client)
890 }
891}