1pub mod control;
22pub mod stats;
23
24#[cfg(test)]
25mod mock_control;
26#[cfg(test)]
27mod mock_socks5;
28
29use super::{
30 ConnectionState, DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr,
31 TransportError, TransportId, TransportState, TransportType,
32};
33use crate::config::TorConfig;
34use crate::transport::tcp::stream::read_fmp_packet;
35use control::{ControlAuth, TorControlClient, TorMonitoringInfo};
36use stats::TorStats;
37
38use futures::FutureExt;
39use socket2::TcpKeepalive;
40use std::collections::HashMap;
41use std::net::SocketAddr;
42use std::sync::Arc;
43use std::time::Duration;
44use tokio::io::AsyncWriteExt;
45use tokio::net::tcp::OwnedWriteHalf;
46use tokio::net::{TcpListener, TcpStream};
47use tokio::sync::Mutex;
48use tokio::task::JoinHandle;
49use tokio::time::Instant;
50use tokio_socks::tcp::Socks5Stream;
51use tracing::{debug, info, trace, warn};
52
53#[derive(Clone, Debug)]
59pub enum TorAddr {
60 Onion(String, u16),
62 Clearnet(SocketAddr),
64 ClearnetHostname(String, u16),
68}
69
70fn parse_tor_addr(addr: &TransportAddr) -> Result<TorAddr, TransportError> {
76 let s = addr.as_str().ok_or_else(|| {
77 TransportError::InvalidAddress("Tor address must be a valid UTF-8 string".into())
78 })?;
79
80 if s.contains(".onion:") {
81 let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
83 TransportError::InvalidAddress(format!("invalid onion address: {}", s))
84 })?;
85 let port: u16 = port_str.parse().map_err(|_| {
86 TransportError::InvalidAddress(format!("invalid port in onion address: {}", s))
87 })?;
88 Ok(TorAddr::Onion(host.to_string(), port))
89 } else if let Ok(socket_addr) = s.parse::<SocketAddr>() {
90 Ok(TorAddr::Clearnet(socket_addr))
92 } else {
93 let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
95 TransportError::InvalidAddress(format!("invalid address (expected host:port): {}", s))
96 })?;
97 let port: u16 = port_str
98 .parse()
99 .map_err(|_| TransportError::InvalidAddress(format!("invalid port: {}", s)))?;
100 if !host.contains('.') {
101 return Err(TransportError::InvalidAddress(format!(
102 "hostname must be fully qualified (contain a dot): {}",
103 host
104 )));
105 }
106 Ok(TorAddr::ClearnetHostname(host.to_string(), port))
107 }
108}
109
110#[derive(Clone, Copy, Debug, PartialEq, Eq)]
115enum Direction {
116 Inbound,
117 Outbound,
118}
119
120struct TorConnection {
122 writer: Arc<Mutex<OwnedWriteHalf>>,
124 recv_task: JoinHandle<()>,
126 #[allow(dead_code)]
128 mtu: u16,
129 #[allow(dead_code)]
131 established_at: Instant,
132 direction: Direction,
133}
134
135type ConnectionPool = Arc<Mutex<HashMap<TransportAddr, TorConnection>>>;
137
138struct ConnectingEntry {
143 task: JoinHandle<Result<(TcpStream, u16), TransportError>>,
145}
146
147type ConnectingPool = Arc<Mutex<HashMap<TransportAddr, ConnectingEntry>>>;
149
150pub struct TorTransport {
161 transport_id: TransportId,
163 name: Option<String>,
165 config: TorConfig,
167 state: TransportState,
169 pool: ConnectionPool,
171 connecting: ConnectingPool,
173 packet_tx: PacketTx,
175 stats: Arc<TorStats>,
177 accept_task: Option<JoinHandle<()>>,
179 onion_address: Option<String>,
182 control_client: Option<Arc<Mutex<TorControlClient>>>,
184 cached_monitoring: Arc<std::sync::RwLock<Option<TorMonitoringInfo>>>,
186 monitoring_task: Option<JoinHandle<()>>,
188}
189
190impl TorTransport {
191 pub fn new(
193 transport_id: TransportId,
194 name: Option<String>,
195 config: TorConfig,
196 packet_tx: PacketTx,
197 ) -> Self {
198 Self {
199 transport_id,
200 name,
201 config,
202 state: TransportState::Configured,
203 pool: Arc::new(Mutex::new(HashMap::new())),
204 connecting: Arc::new(Mutex::new(HashMap::new())),
205 packet_tx,
206 stats: Arc::new(TorStats::new()),
207 accept_task: None,
208 onion_address: None,
209 control_client: None,
210 cached_monitoring: Arc::new(std::sync::RwLock::new(None)),
211 monitoring_task: None,
212 }
213 }
214
215 pub fn name(&self) -> Option<&str> {
217 self.name.as_deref()
218 }
219
220 pub fn onion_address(&self) -> Option<&str> {
222 self.onion_address.as_deref()
223 }
224
225 pub fn stats(&self) -> &Arc<TorStats> {
227 &self.stats
228 }
229
230 pub fn cached_monitoring(&self) -> Option<TorMonitoringInfo> {
232 self.cached_monitoring.read().ok()?.clone()
233 }
234
235 pub fn mode(&self) -> &str {
237 self.config.mode()
238 }
239
240 pub async fn start_async(&mut self) -> Result<(), TransportError> {
248 if !self.state.can_start() {
249 return Err(TransportError::AlreadyStarted);
250 }
251
252 self.state = TransportState::Starting;
253
254 let socks5_addr = self.config.socks5_addr().to_string();
256 validate_host_port(&socks5_addr, "socks5_addr")?;
257
258 let mode = self.config.mode().to_string();
259 match mode.as_str() {
260 "socks5" => {
261 if self.config.directory_service.is_some() {
263 return Err(TransportError::StartFailed(
264 "directory_service config requires mode 'directory', not 'socks5'".into(),
265 ));
266 }
267 self.state = TransportState::Up;
268 }
269 "control_port" => {
270 self.start_control_port_mode().await?;
271 }
272 "directory" => {
273 self.start_directory_mode().await?;
274 }
275 other => {
276 return Err(TransportError::StartFailed(format!(
277 "unsupported Tor mode '{}' (expected 'socks5', 'control_port', or 'directory')",
278 other
279 )));
280 }
281 }
282
283 if let Some(ref name) = self.name {
284 info!(
285 name = %name,
286 mode = %mode,
287 socks5_addr = %socks5_addr,
288 onion_address = ?self.onion_address,
289 mtu = self.config.mtu(),
290 "Tor transport started"
291 );
292 } else {
293 info!(
294 mode = %mode,
295 socks5_addr = %socks5_addr,
296 onion_address = ?self.onion_address,
297 mtu = self.config.mtu(),
298 "Tor transport started"
299 );
300 }
301
302 Ok(())
303 }
304
305 async fn start_control_port_mode(&mut self) -> Result<(), TransportError> {
308 let control_addr = self.config.control_addr().to_string();
309 if !control_addr.starts_with('/') && !control_addr.starts_with("./") {
311 validate_host_port(&control_addr, "control_addr")?;
312 }
313
314 let mut client = TorControlClient::connect(&control_addr)
316 .await
317 .map_err(|e| {
318 self.stats.record_control_error();
319 TransportError::StartFailed(format!("Tor control port: {}", e))
320 })?;
321
322 let auth = ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path())
324 .map_err(|e| TransportError::StartFailed(format!("Tor auth config: {}", e)))?;
325
326 client.authenticate(&auth).await.map_err(|e| {
327 self.stats.record_control_error();
328 TransportError::StartFailed(format!("Tor authentication: {}", e))
329 })?;
330
331 self.control_client = Some(Arc::new(Mutex::new(client)));
333 self.state = TransportState::Up;
334 self.spawn_monitoring_task();
335
336 Ok(())
337 }
338
339 async fn start_directory_mode(&mut self) -> Result<(), TransportError> {
346 let dir_config = self.config.directory_service.clone().unwrap_or_default();
347
348 let hostname_file = dir_config.hostname_file();
350 let onion_addr = std::fs::read_to_string(hostname_file)
351 .map_err(|e| {
352 TransportError::StartFailed(format!(
353 "failed to read onion hostname from '{}': {} \
354 (ensure HiddenServiceDir is configured in torrc and Tor has started)",
355 hostname_file, e
356 ))
357 })?
358 .trim()
359 .to_string();
360
361 if onion_addr.is_empty() || !onion_addr.ends_with(".onion") {
362 return Err(TransportError::StartFailed(format!(
363 "invalid onion address in '{}': '{}'",
364 hostname_file, onion_addr
365 )));
366 }
367
368 self.onion_address = Some(onion_addr.clone());
369
370 let bind_addr = dir_config.bind_addr();
372 let listener = TcpListener::bind(bind_addr).await.map_err(|e| {
373 TransportError::StartFailed(format!(
374 "failed to bind directory-mode listener on {}: {}",
375 bind_addr, e
376 ))
377 })?;
378 let local_addr = listener
379 .local_addr()
380 .map_err(|e| TransportError::StartFailed(format!("failed to get local addr: {}", e)))?;
381
382 info!(
383 onion_address = %onion_addr,
384 local_addr = %local_addr,
385 hostname_file = %hostname_file,
386 "Directory-mode onion service active"
387 );
388
389 let transport_id = self.transport_id;
391 let packet_tx = self.packet_tx.clone();
392 let pool = self.pool.clone();
393 let mtu = self.config.mtu();
394 let max_inbound = self.config.max_inbound_connections();
395 let stats = self.stats.clone();
396
397 let accept_handle = tokio::spawn(async move {
398 tor_accept_loop(
399 listener,
400 transport_id,
401 packet_tx,
402 pool,
403 mtu,
404 max_inbound,
405 stats,
406 )
407 .await;
408 });
409
410 self.accept_task = Some(accept_handle);
411 self.state = TransportState::Up;
412
413 if self.config.control_addr.is_some() {
415 self.try_connect_control_port().await;
416 }
417
418 Ok(())
419 }
420
421 async fn try_connect_control_port(&mut self) {
424 let control_addr = self.config.control_addr().to_string();
425 if !control_addr.starts_with('/')
426 && !control_addr.starts_with("./")
427 && let Err(e) = validate_host_port(&control_addr, "control_addr")
428 {
429 warn!(
430 transport_id = %self.transport_id,
431 error = %e,
432 "Tor control port address invalid, monitoring disabled"
433 );
434 return;
435 }
436
437 let client = match TorControlClient::connect(&control_addr).await {
438 Ok(c) => c,
439 Err(e) => {
440 warn!(
441 transport_id = %self.transport_id,
442 addr = %control_addr,
443 error = %e,
444 "Tor control port connect failed, monitoring disabled"
445 );
446 return;
447 }
448 };
449
450 let auth =
451 match ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path()) {
452 Ok(a) => a,
453 Err(e) => {
454 warn!(
455 transport_id = %self.transport_id,
456 error = %e,
457 "Tor control auth config error, monitoring disabled"
458 );
459 return;
460 }
461 };
462
463 let mut client = client;
464 if let Err(e) = client.authenticate(&auth).await {
465 warn!(
466 transport_id = %self.transport_id,
467 error = %e,
468 "Tor control port auth failed, monitoring disabled"
469 );
470 return;
471 }
472
473 info!(
474 transport_id = %self.transport_id,
475 addr = %control_addr,
476 "Tor control port connected (monitoring enabled)"
477 );
478
479 self.control_client = Some(Arc::new(Mutex::new(client)));
480 self.spawn_monitoring_task();
481 }
482
483 pub async fn stop_async(&mut self) -> Result<(), TransportError> {
488 if !self.state.is_operational() {
489 return Err(TransportError::NotStarted);
490 }
491
492 if let Some(task) = self.accept_task.take() {
494 task.abort();
495 let _ = task.await;
496 debug!(
497 transport_id = %self.transport_id,
498 "Onion service accept loop stopped"
499 );
500 }
501
502 if let Some(task) = self.monitoring_task.take() {
504 task.abort();
505 let _ = task.await;
506 }
507 if let Ok(mut w) = self.cached_monitoring.write() {
508 *w = None;
509 }
510
511 self.control_client = None;
512 self.onion_address = None;
513
514 let mut connecting = self.connecting.lock().await;
516 for (addr, entry) in connecting.drain() {
517 entry.task.abort();
518 debug!(
519 transport_id = %self.transport_id,
520 remote_addr = %addr,
521 "Tor connect aborted (transport stopping)"
522 );
523 }
524 drop(connecting);
525
526 let mut pool = self.pool.lock().await;
528 for (addr, conn) in pool.drain() {
529 conn.recv_task.abort();
530 let _ = conn.recv_task.await;
531 match conn.direction {
532 Direction::Inbound => self.stats.record_pool_inbound_removed(),
533 Direction::Outbound => self.stats.record_pool_outbound_removed(),
534 }
535 debug!(
536 transport_id = %self.transport_id,
537 remote_addr = %addr,
538 direction = ?conn.direction,
539 "Tor connection closed (transport stopping)"
540 );
541 }
542 drop(pool);
543
544 self.state = TransportState::Down;
545
546 info!(
547 transport_id = %self.transport_id,
548 "Tor transport stopped"
549 );
550
551 Ok(())
552 }
553
554 fn spawn_monitoring_task(&mut self) {
557 let Some(client) = self.control_client.clone() else {
558 return;
559 };
560 let cache = self.cached_monitoring.clone();
561 let stats = self.stats.clone();
562 let transport_id = self.transport_id;
563
564 let handle = tokio::spawn(async move {
565 let mut interval = tokio::time::interval(Duration::from_secs(10));
566 let mut last_bootstrap: u8 = 0;
567 let mut last_liveness = String::new();
568 let mut was_dormant = false;
569 let mut stall_warned = false;
570 let started_at = Instant::now();
571
572 loop {
573 interval.tick().await;
574 let mut guard = client.lock().await;
575 match guard.monitoring_snapshot().await {
576 Ok(info) => {
577 for &milestone in &[25u8, 50, 75, 100] {
579 if info.bootstrap >= milestone && last_bootstrap < milestone {
580 info!(
581 transport_id = %transport_id,
582 bootstrap = info.bootstrap,
583 "Tor bootstrap {}%",
584 milestone
585 );
586 }
587 }
588
589 if info.bootstrap < 100
591 && started_at.elapsed() > Duration::from_secs(60)
592 && !stall_warned
593 {
594 warn!(
595 transport_id = %transport_id,
596 bootstrap = info.bootstrap,
597 "Tor bootstrap stalled — not at 100% after 60s"
598 );
599 stall_warned = true;
600 }
601 if info.bootstrap == 100 {
602 stall_warned = false;
603 }
604
605 last_bootstrap = info.bootstrap;
606
607 if !last_liveness.is_empty() && info.network_liveness != last_liveness {
609 warn!(
610 transport_id = %transport_id,
611 from = %last_liveness,
612 to = %info.network_liveness,
613 "Tor network liveness changed"
614 );
615 }
616 last_liveness = info.network_liveness.clone();
617
618 if info.dormant && !was_dormant {
620 warn!(
621 transport_id = %transport_id,
622 "Tor daemon entered dormant mode"
623 );
624 }
625 was_dormant = info.dormant;
626
627 if let Ok(mut w) = cache.write() {
628 *w = Some(info);
629 }
630 }
631 Err(e) => {
632 stats.record_control_error();
633 warn!(
634 transport_id = %transport_id,
635 error = %e,
636 "Tor monitoring query failed"
637 );
638 }
639 }
640 }
641 });
642
643 self.monitoring_task = Some(handle);
644 }
645
646 pub async fn send_async(
653 &self,
654 addr: &TransportAddr,
655 data: &[u8],
656 ) -> Result<usize, TransportError> {
657 if !self.state.is_operational() {
658 return Err(TransportError::NotStarted);
659 }
660
661 let mtu = self.config.mtu() as usize;
663 if data.len() > mtu {
664 self.stats.record_mtu_exceeded();
665 return Err(TransportError::MtuExceeded {
666 packet_size: data.len(),
667 mtu: self.config.mtu(),
668 });
669 }
670
671 let writer = {
673 let pool = self.pool.lock().await;
674 pool.get(addr).map(|c| c.writer.clone())
675 };
676
677 let writer = match writer {
678 Some(w) => w,
679 None => {
680 self.connect(addr).await?
682 }
683 };
684
685 let mut w = writer.lock().await;
687 match w.write_all(data).await {
688 Ok(()) => {
689 self.stats.record_send(data.len());
690 trace!(
691 transport_id = %self.transport_id,
692 remote_addr = %addr,
693 bytes = data.len(),
694 "Tor packet sent"
695 );
696 Ok(data.len())
697 }
698 Err(e) => {
699 self.stats.record_send_error();
700 drop(w);
701 let mut pool = self.pool.lock().await;
703 if let Some(conn) = pool.remove(addr) {
704 conn.recv_task.abort();
705 match conn.direction {
706 Direction::Inbound => self.stats.record_pool_inbound_removed(),
707 Direction::Outbound => self.stats.record_pool_outbound_removed(),
708 }
709 }
710 Err(TransportError::SendFailed(format!("{}", e)))
711 }
712 }
713 }
714
715 async fn connect(
721 &self,
722 addr: &TransportAddr,
723 ) -> Result<Arc<Mutex<OwnedWriteHalf>>, TransportError> {
724 let tor_addr = parse_tor_addr(addr)?;
725 let proxy_addr = self.config.socks5_addr();
726 let timeout_ms = self.config.connect_timeout_ms();
727
728 debug!(
729 transport_id = %self.transport_id,
730 remote_addr = %addr,
731 proxy = %proxy_addr,
732 timeout_secs = timeout_ms / 1000,
733 "Connecting via Tor SOCKS5"
734 );
735
736 let isolation_key = addr.to_string();
741 let connect_start = Instant::now();
742 let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
743 match &tor_addr {
744 TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
745 Socks5Stream::connect_with_password(
746 proxy_addr,
747 (host.as_str(), *port),
748 "fips",
749 &isolation_key,
750 )
751 .await
752 }
753 TorAddr::Clearnet(socket_addr) => {
754 Socks5Stream::connect_with_password(
755 proxy_addr,
756 *socket_addr,
757 "fips",
758 &isolation_key,
759 )
760 .await
761 }
762 }
763 })
764 .await;
765
766 let stream = match socks_result {
767 Ok(Ok(socks_stream)) => socks_stream.into_inner(),
768 Ok(Err(e)) => {
769 self.stats.record_socks5_error();
770 warn!(
771 transport_id = %self.transport_id,
772 remote_addr = %addr,
773 error = %e,
774 elapsed_secs = connect_start.elapsed().as_secs(),
775 "Tor SOCKS5 connection failed"
776 );
777 return Err(TransportError::ConnectionRefused);
778 }
779 Err(_) => {
780 self.stats.record_connect_timeout();
781 warn!(
782 transport_id = %self.transport_id,
783 remote_addr = %addr,
784 timeout_secs = timeout_ms / 1000,
785 "Tor SOCKS5 connection timed out"
786 );
787 return Err(TransportError::Timeout);
788 }
789 };
790
791 let std_stream = stream
793 .into_std()
794 .map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
795 configure_socket(&std_stream, &self.config)?;
796
797 let stream = TcpStream::from_std(std_stream)
799 .map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
800
801 let (read_half, write_half) = stream.into_split();
803 let writer = Arc::new(Mutex::new(write_half));
804
805 let transport_id = self.transport_id;
806 let packet_tx = self.packet_tx.clone();
807 let pool = self.pool.clone();
808 let recv_stats = self.stats.clone();
809 let remote_addr = addr.clone();
810 let mtu = self.config.mtu();
811
812 let recv_task = tokio::spawn(async move {
813 tor_receive_loop(
814 read_half,
815 transport_id,
816 remote_addr.clone(),
817 packet_tx,
818 pool,
819 mtu,
820 recv_stats,
821 Direction::Outbound,
822 )
823 .await;
824 });
825
826 let conn = TorConnection {
827 writer: writer.clone(),
828 recv_task,
829 mtu,
830 established_at: Instant::now(),
831 direction: Direction::Outbound,
832 };
833
834 let mut pool = self.pool.lock().await;
835 pool.insert(addr.clone(), conn);
836
837 self.stats.record_connection_established();
838 self.stats.record_pool_outbound_added();
839
840 info!(
841 transport_id = %self.transport_id,
842 remote_addr = %addr,
843 elapsed_secs = connect_start.elapsed().as_secs(),
844 "Tor circuit established via SOCKS5"
845 );
846
847 Ok(writer)
848 }
849
850 pub async fn connect_async(&self, addr: &TransportAddr) -> Result<(), TransportError> {
859 if !self.state.is_operational() {
860 return Err(TransportError::NotStarted);
861 }
862
863 {
865 let pool = self.pool.lock().await;
866 if pool.contains_key(addr) {
867 return Ok(());
868 }
869 }
870
871 {
873 let connecting = self.connecting.lock().await;
874 if connecting.contains_key(addr) {
875 return Ok(());
876 }
877 }
878
879 let tor_addr = parse_tor_addr(addr)?;
880 let proxy_addr = self.config.socks5_addr().to_string();
881 let timeout_ms = self.config.connect_timeout_ms();
882 let transport_id = self.transport_id;
883 let remote_addr = addr.clone();
884 let config = self.config.clone();
885
886 debug!(
887 transport_id = %transport_id,
888 remote_addr = %remote_addr,
889 timeout_ms,
890 "Initiating background Tor SOCKS5 connect"
891 );
892
893 let isolation_key = addr.to_string();
895
896 let task = tokio::spawn(async move {
897 let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
900 match &tor_addr {
901 TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
902 Socks5Stream::connect_with_password(
903 proxy_addr.as_str(),
904 (host.as_str(), *port),
905 "fips",
906 &isolation_key,
907 )
908 .await
909 }
910 TorAddr::Clearnet(socket_addr) => {
911 Socks5Stream::connect_with_password(
912 proxy_addr.as_str(),
913 *socket_addr,
914 "fips",
915 &isolation_key,
916 )
917 .await
918 }
919 }
920 })
921 .await;
922
923 let stream = match socks_result {
924 Ok(Ok(socks_stream)) => socks_stream.into_inner(),
925 Ok(Err(e)) => {
926 debug!(
927 transport_id = %transport_id,
928 remote_addr = %remote_addr,
929 error = %e,
930 "Background Tor SOCKS5 connect failed"
931 );
932 return Err(TransportError::ConnectionRefused);
933 }
934 Err(_) => {
935 debug!(
936 transport_id = %transport_id,
937 remote_addr = %remote_addr,
938 "Background Tor SOCKS5 connect timed out"
939 );
940 return Err(TransportError::Timeout);
941 }
942 };
943
944 let std_stream = stream
946 .into_std()
947 .map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
948 configure_socket(&std_stream, &config)?;
949
950 let mtu = config.mtu();
951
952 let stream = TcpStream::from_std(std_stream)
954 .map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
955
956 Ok((stream, mtu))
957 });
958
959 let mut connecting = self.connecting.lock().await;
960 connecting.insert(addr.clone(), ConnectingEntry { task });
961
962 Ok(())
963 }
964
965 pub fn connection_state_sync(&self, addr: &TransportAddr) -> ConnectionState {
974 if let Ok(pool) = self.pool.try_lock() {
976 if pool.contains_key(addr) {
977 return ConnectionState::Connected;
978 }
979 } else {
980 return ConnectionState::Connecting; }
982
983 let mut connecting = match self.connecting.try_lock() {
985 Ok(c) => c,
986 Err(_) => return ConnectionState::Connecting,
987 };
988
989 let entry = match connecting.get_mut(addr) {
990 Some(e) => e,
991 None => return ConnectionState::None,
992 };
993
994 if !entry.task.is_finished() {
996 return ConnectionState::Connecting;
997 }
998
999 let addr_clone = addr.clone();
1001 let task = connecting.remove(&addr_clone).unwrap().task;
1002
1003 match task.now_or_never() {
1005 Some(Ok(Ok((stream, mtu)))) => {
1006 self.promote_connection(addr, stream, mtu);
1008 ConnectionState::Connected
1009 }
1010 Some(Ok(Err(e))) => ConnectionState::Failed(format!("{}", e)),
1011 Some(Err(e)) => {
1012 ConnectionState::Failed(format!("task failed: {}", e))
1014 }
1015 None => {
1016 ConnectionState::Connecting
1018 }
1019 }
1020 }
1021
1022 fn promote_connection(&self, addr: &TransportAddr, stream: TcpStream, mtu: u16) {
1027 let (read_half, write_half) = stream.into_split();
1028 let writer = Arc::new(Mutex::new(write_half));
1029
1030 let transport_id = self.transport_id;
1031 let packet_tx = self.packet_tx.clone();
1032 let pool = self.pool.clone();
1033 let recv_stats = self.stats.clone();
1034 let remote_addr = addr.clone();
1035
1036 let recv_task = tokio::spawn(async move {
1037 tor_receive_loop(
1038 read_half,
1039 transport_id,
1040 remote_addr.clone(),
1041 packet_tx,
1042 pool,
1043 mtu,
1044 recv_stats,
1045 Direction::Outbound,
1046 )
1047 .await;
1048 });
1049
1050 let conn = TorConnection {
1051 writer,
1052 recv_task,
1053 mtu,
1054 established_at: Instant::now(),
1055 direction: Direction::Outbound,
1056 };
1057
1058 if let Ok(mut pool) = self.pool.try_lock() {
1061 pool.insert(addr.clone(), conn);
1062 self.stats.record_connection_established();
1063 self.stats.record_pool_outbound_added();
1064 debug!(
1065 transport_id = %self.transport_id,
1066 remote_addr = %addr,
1067 "Tor connection established (background connect)"
1068 );
1069 } else {
1070 conn.recv_task.abort();
1072 warn!(
1073 transport_id = %self.transport_id,
1074 remote_addr = %addr,
1075 "Failed to promote Tor connection (pool locked)"
1076 );
1077 }
1078 }
1079
1080 pub async fn close_connection_async(&self, addr: &TransportAddr) {
1082 let mut pool = self.pool.lock().await;
1083 if let Some(conn) = pool.remove(addr) {
1084 conn.recv_task.abort();
1085 match conn.direction {
1086 Direction::Inbound => self.stats.record_pool_inbound_removed(),
1087 Direction::Outbound => self.stats.record_pool_outbound_removed(),
1088 }
1089 debug!(
1090 transport_id = %self.transport_id,
1091 remote_addr = %addr,
1092 direction = ?conn.direction,
1093 "Tor connection closed"
1094 );
1095 }
1096 }
1097}
1098
1099impl Transport for TorTransport {
1100 fn transport_id(&self) -> TransportId {
1101 self.transport_id
1102 }
1103
1104 fn transport_type(&self) -> &TransportType {
1105 &TransportType::TOR
1106 }
1107
1108 fn state(&self) -> TransportState {
1109 self.state
1110 }
1111
1112 fn mtu(&self) -> u16 {
1113 self.config.mtu()
1114 }
1115
1116 fn link_mtu(&self, _addr: &TransportAddr) -> u16 {
1117 self.config.mtu()
1118 }
1119
1120 fn start(&mut self) -> Result<(), TransportError> {
1121 Err(TransportError::NotSupported(
1122 "use start_async() for Tor transport".into(),
1123 ))
1124 }
1125
1126 fn stop(&mut self) -> Result<(), TransportError> {
1127 Err(TransportError::NotSupported(
1128 "use stop_async() for Tor transport".into(),
1129 ))
1130 }
1131
1132 fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
1133 Err(TransportError::NotSupported(
1134 "use send_async() for Tor transport".into(),
1135 ))
1136 }
1137
1138 fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
1139 Ok(Vec::new())
1140 }
1141
1142 fn accept_connections(&self) -> bool {
1143 self.onion_address.is_some()
1144 }
1145}
1146
1147async fn tor_receive_loop(
1157 mut reader: tokio::net::tcp::OwnedReadHalf,
1158 transport_id: TransportId,
1159 remote_addr: TransportAddr,
1160 packet_tx: PacketTx,
1161 pool: ConnectionPool,
1162 mtu: u16,
1163 stats: Arc<TorStats>,
1164 direction: Direction,
1165) {
1166 debug!(
1167 transport_id = %transport_id,
1168 remote_addr = %remote_addr,
1169 "Tor receive loop starting"
1170 );
1171
1172 loop {
1173 match read_fmp_packet(&mut reader, mtu).await {
1174 Ok(data) => {
1175 stats.record_recv(data.len());
1176
1177 trace!(
1178 transport_id = %transport_id,
1179 remote_addr = %remote_addr,
1180 bytes = data.len(),
1181 "Tor packet received"
1182 );
1183
1184 let packet = ReceivedPacket::new(transport_id, remote_addr.clone(), data);
1185
1186 if packet_tx.send(packet).is_err() {
1187 debug!(
1188 transport_id = %transport_id,
1189 "Packet channel closed, stopping Tor receive loop"
1190 );
1191 break;
1192 }
1193 }
1194 Err(e) => {
1195 stats.record_recv_error();
1196 debug!(
1197 transport_id = %transport_id,
1198 remote_addr = %remote_addr,
1199 error = %e,
1200 "Tor receive error, removing connection"
1201 );
1202 break;
1203 }
1204 }
1205 }
1206
1207 let mut pool_guard = pool.lock().await;
1210 let removed = pool_guard.remove(&remote_addr).is_some();
1211 drop(pool_guard);
1212 if removed {
1213 match direction {
1214 Direction::Inbound => stats.record_pool_inbound_removed(),
1215 Direction::Outbound => stats.record_pool_outbound_removed(),
1216 }
1217 }
1218
1219 debug!(
1220 transport_id = %transport_id,
1221 remote_addr = %remote_addr,
1222 direction = ?direction,
1223 "Tor receive loop stopped"
1224 );
1225}
1226
1227fn configure_socket(
1235 stream: &std::net::TcpStream,
1236 _config: &TorConfig,
1237) -> Result<(), TransportError> {
1238 let socket = socket2::SockRef::from(stream);
1239
1240 socket
1242 .set_tcp_nodelay(true)
1243 .map_err(|e| TransportError::StartFailed(format!("set nodelay: {}", e)))?;
1244
1245 let keepalive_secs = 30u64;
1247 if keepalive_secs > 0 {
1248 let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(keepalive_secs));
1249 socket
1250 .set_tcp_keepalive(&keepalive)
1251 .map_err(|e| TransportError::StartFailed(format!("set keepalive: {}", e)))?;
1252 }
1253
1254 Ok(())
1255}
1256
1257async fn tor_accept_loop(
1268 listener: TcpListener,
1269 transport_id: TransportId,
1270 packet_tx: PacketTx,
1271 pool: ConnectionPool,
1272 mtu: u16,
1273 max_inbound: usize,
1274 stats: Arc<TorStats>,
1275) {
1276 debug!(
1277 transport_id = %transport_id,
1278 "Onion service accept loop starting"
1279 );
1280
1281 loop {
1282 let (stream, peer_addr) = match listener.accept().await {
1283 Ok(result) => result,
1284 Err(e) => {
1285 warn!(
1286 transport_id = %transport_id,
1287 error = %e,
1288 "Onion service accept error"
1289 );
1290 continue;
1291 }
1292 };
1293
1294 if stats.pool_inbound_count() >= max_inbound as u64 {
1297 stats.record_connection_rejected();
1298 debug!(
1299 transport_id = %transport_id,
1300 peer_addr = %peer_addr,
1301 max_inbound,
1302 "Rejecting inbound onion connection (limit reached)"
1303 );
1304 drop(stream);
1305 continue;
1306 }
1307
1308 let std_stream = match stream.into_std() {
1310 Ok(s) => s,
1311 Err(e) => {
1312 warn!(
1313 transport_id = %transport_id,
1314 error = %e,
1315 "Failed to convert accepted stream to std"
1316 );
1317 continue;
1318 }
1319 };
1320
1321 let socket = socket2::SockRef::from(&std_stream);
1322 let _ = socket.set_tcp_nodelay(true);
1323 let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(30));
1324 let _ = socket.set_tcp_keepalive(&keepalive);
1325
1326 let stream = match TcpStream::from_std(std_stream) {
1327 Ok(s) => s,
1328 Err(e) => {
1329 warn!(
1330 transport_id = %transport_id,
1331 error = %e,
1332 "Failed to convert accepted stream back to tokio"
1333 );
1334 continue;
1335 }
1336 };
1337
1338 let remote_addr = TransportAddr::from_string(&peer_addr.to_string());
1339
1340 let (read_half, write_half) = stream.into_split();
1342 let writer = Arc::new(Mutex::new(write_half));
1343
1344 let recv_pool = pool.clone();
1345 let recv_stats = stats.clone();
1346 let recv_addr = remote_addr.clone();
1347 let recv_tx = packet_tx.clone();
1348
1349 let recv_task = tokio::spawn(async move {
1350 tor_receive_loop(
1351 read_half,
1352 transport_id,
1353 recv_addr,
1354 recv_tx,
1355 recv_pool,
1356 mtu,
1357 recv_stats,
1358 Direction::Inbound,
1359 )
1360 .await;
1361 });
1362
1363 let conn = TorConnection {
1364 writer,
1365 recv_task,
1366 mtu,
1367 established_at: Instant::now(),
1368 direction: Direction::Inbound,
1369 };
1370
1371 {
1372 let mut pool_guard = pool.lock().await;
1373 pool_guard.insert(remote_addr.clone(), conn);
1374 }
1375
1376 stats.record_connection_accepted();
1377 stats.record_pool_inbound_added();
1378
1379 debug!(
1380 transport_id = %transport_id,
1381 peer_addr = %peer_addr,
1382 "Accepted inbound onion connection"
1383 );
1384 }
1385}
1386
1387fn validate_host_port(addr: &str, field_name: &str) -> Result<(), TransportError> {
1393 if addr.parse::<SocketAddr>().is_ok() {
1394 return Ok(());
1395 }
1396 let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
1398 if parts.len() != 2 || parts[0].parse::<u16>().is_err() || parts[1].is_empty() {
1399 return Err(TransportError::StartFailed(format!(
1400 "invalid {} '{}': expected host:port or IP:port",
1401 field_name, addr
1402 )));
1403 }
1404 Ok(())
1405}
1406
1407#[cfg(test)]
1412mod tests {
1413 use super::*;
1414 use crate::transport::packet_channel;
1415
1416 fn make_config() -> TorConfig {
1417 TorConfig {
1418 socks5_addr: Some("127.0.0.1:19050".to_string()),
1419 ..Default::default()
1420 }
1421 }
1422
1423 #[test]
1424 fn test_parse_tor_addr_onion() {
1425 let addr = TransportAddr::from_string("abcdef1234567890.onion:2121");
1426 let tor_addr = parse_tor_addr(&addr).unwrap();
1427 match tor_addr {
1428 TorAddr::Onion(host, port) => {
1429 assert_eq!(host, "abcdef1234567890.onion");
1430 assert_eq!(port, 2121);
1431 }
1432 _ => panic!("expected Onion variant"),
1433 }
1434 }
1435
1436 #[test]
1437 fn test_parse_tor_addr_clearnet() {
1438 let addr = TransportAddr::from_string("192.168.1.1:8080");
1439 let tor_addr = parse_tor_addr(&addr).unwrap();
1440 match tor_addr {
1441 TorAddr::Clearnet(socket_addr) => {
1442 assert_eq!(
1443 socket_addr,
1444 "192.168.1.1:8080".parse::<SocketAddr>().unwrap()
1445 );
1446 }
1447 _ => panic!("expected Clearnet variant"),
1448 }
1449 }
1450
1451 #[test]
1452 fn test_parse_tor_addr_clearnet_hostname() {
1453 let addr = TransportAddr::from_string("peer1.example.com:2121");
1454 let tor_addr = parse_tor_addr(&addr).unwrap();
1455 match tor_addr {
1456 TorAddr::ClearnetHostname(host, port) => {
1457 assert_eq!(host, "peer1.example.com");
1458 assert_eq!(port, 2121);
1459 }
1460 _ => panic!("expected ClearnetHostname variant"),
1461 }
1462 }
1463
1464 #[test]
1465 fn test_parse_tor_addr_invalid() {
1466 let addr = TransportAddr::from_string("localhost:2121");
1468 assert!(parse_tor_addr(&addr).is_err());
1469
1470 let addr = TransportAddr::from_string("not-a-valid-address");
1472 assert!(parse_tor_addr(&addr).is_err());
1473
1474 let addr = TransportAddr::from_string("example.com:notaport");
1476 assert!(parse_tor_addr(&addr).is_err());
1477 }
1478
1479 #[test]
1480 fn test_config_defaults() {
1481 let config = TorConfig::default();
1482 assert_eq!(config.mode(), "socks5");
1483 assert_eq!(config.socks5_addr(), "127.0.0.1:9050");
1484 assert_eq!(config.connect_timeout_ms(), 120000);
1485 assert_eq!(config.mtu(), 1400);
1486 assert_eq!(config.advertised_port(), 443);
1487 }
1488
1489 #[test]
1490 fn test_advertised_port_override() {
1491 let config = TorConfig {
1492 advertised_port: Some(9001),
1493 ..Default::default()
1494 };
1495 assert_eq!(config.advertised_port(), 9001);
1496 }
1497
1498 #[test]
1503 fn test_advert_address_round_trips_through_parser() {
1504 let onion = "mwvj6q3pnsiaky7i6wg5s42xlfurt5uqr3qzckrlw2graa2ugcgwhiqd.onion";
1505 let cfg = TorConfig::default();
1506 let advertised = format!("{}:{}", onion, cfg.advertised_port());
1507
1508 let parsed = parse_tor_addr(&TransportAddr::from_string(&advertised)).unwrap();
1509 match parsed {
1510 TorAddr::Onion(host, port) => {
1511 assert_eq!(host, onion);
1512 assert_eq!(port, 443);
1513 }
1514 other => panic!("expected Onion variant, got {:?}", other),
1515 }
1516
1517 assert!(parse_tor_addr(&TransportAddr::from_string(onion)).is_err());
1521 }
1522
1523 #[tokio::test]
1524 async fn test_start_stop() {
1525 let (tx, _rx) = packet_channel(32);
1526 let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1527
1528 transport.start_async().await.unwrap();
1529 assert_eq!(transport.state(), TransportState::Up);
1530
1531 transport.stop_async().await.unwrap();
1532 assert_eq!(transport.state(), TransportState::Down);
1533 }
1534
1535 #[tokio::test]
1536 async fn test_double_start_fails() {
1537 let (tx, _rx) = packet_channel(32);
1538 let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1539
1540 transport.start_async().await.unwrap();
1541 assert!(transport.start_async().await.is_err());
1542 }
1543
1544 #[tokio::test]
1545 async fn test_stop_not_started_fails() {
1546 let (tx, _rx) = packet_channel(32);
1547 let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1548
1549 assert!(transport.stop_async().await.is_err());
1550 }
1551
1552 #[tokio::test]
1553 async fn test_send_not_started() {
1554 let (tx, _rx) = packet_channel(32);
1555 let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1556
1557 let addr = TransportAddr::from_string("127.0.0.1:2121");
1558 let result = transport.send_async(&addr, &[0u8; 10]).await;
1559 assert!(result.is_err());
1560 }
1561
1562 #[test]
1563 fn test_transport_type() {
1564 let (tx, _rx) = packet_channel(32);
1565 let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1566
1567 let tt = transport.transport_type();
1568 assert_eq!(tt.name, "tor");
1569 assert!(tt.connection_oriented);
1570 assert!(tt.reliable);
1571 }
1572
1573 #[test]
1574 fn test_sync_methods_return_not_supported() {
1575 let (tx, _rx) = packet_channel(32);
1576 let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1577
1578 assert!(transport.start().is_err());
1579 assert!(transport.stop().is_err());
1580 let addr = TransportAddr::from_string("127.0.0.1:2121");
1581 assert!(transport.send(&addr, &[0u8; 10]).is_err());
1582 }
1583
1584 #[test]
1585 fn test_accept_connections_false() {
1586 let (tx, _rx) = packet_channel(32);
1587 let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1588
1589 assert!(!transport.accept_connections());
1590 }
1591
1592 #[test]
1593 fn test_discover_returns_empty() {
1594 let (tx, _rx) = packet_channel(32);
1595 let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1596
1597 assert!(transport.discover().unwrap().is_empty());
1598 }
1599
1600 #[tokio::test]
1601 async fn test_invalid_socks5_addr_start_fails() {
1602 let (tx, _rx) = packet_channel(32);
1603 let config = TorConfig {
1604 socks5_addr: Some("not-a-socket-addr".to_string()),
1605 ..Default::default()
1606 };
1607 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1608 assert!(transport.start_async().await.is_err());
1609 }
1610
1611 #[tokio::test]
1612 async fn test_unsupported_mode_start_fails() {
1613 let (tx, _rx) = packet_channel(32);
1614 let config = TorConfig {
1615 mode: Some("embedded".to_string()),
1616 socks5_addr: Some("127.0.0.1:9050".to_string()),
1617 ..Default::default()
1618 };
1619 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1620 assert!(transport.start_async().await.is_err());
1621 }
1622
1623 use crate::config::TcpConfig;
1628 use crate::transport::tcp::TcpTransport;
1629 use mock_socks5::MockSocks5Server;
1630
1631 const MSG1_WIRE_SIZE: usize = 114;
1633 const MSG1_PAYLOAD_LEN: u16 = (MSG1_WIRE_SIZE - 4) as u16;
1635
1636 fn build_msg1_frame() -> Vec<u8> {
1638 let mut frame = vec![0xAA; MSG1_WIRE_SIZE];
1639 frame[0] = 0x01; frame[1] = 0x00; frame[2..4].copy_from_slice(&MSG1_PAYLOAD_LEN.to_le_bytes());
1642 frame
1643 }
1644
1645 #[tokio::test]
1646 async fn test_send_recv_via_socks5() {
1647 let (dest_tx, mut dest_rx) = packet_channel(32);
1649 let dest_config = TcpConfig {
1650 bind_addr: Some("127.0.0.1:0".to_string()),
1651 ..Default::default()
1652 };
1653 let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
1654 dest.start_async().await.unwrap();
1655 let dest_addr = dest.local_addr().unwrap();
1656
1657 let mock = MockSocks5Server::new(dest_addr).await.unwrap();
1659 let proxy_addr = mock.addr();
1660 let _proxy_handle = mock.spawn();
1661
1662 let (tor_tx, _tor_rx) = packet_channel(32);
1664 let tor_config = TorConfig {
1665 socks5_addr: Some(proxy_addr.to_string()),
1666 ..Default::default()
1667 };
1668 let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
1669 tor.start_async().await.unwrap();
1670
1671 let frame = build_msg1_frame();
1673 let target = TransportAddr::from_string(&dest_addr.to_string());
1674 tor.send_async(&target, &frame).await.unwrap();
1675
1676 let received = tokio::time::timeout(Duration::from_secs(5), dest_rx.recv())
1678 .await
1679 .expect("timeout waiting for packet")
1680 .expect("channel closed");
1681
1682 assert_eq!(received.data, frame);
1683
1684 tor.stop_async().await.unwrap();
1686 dest.stop_async().await.unwrap();
1687 }
1688
1689 #[tokio::test]
1690 async fn test_socks5_proxy_down() {
1691 let (tx, _rx) = packet_channel(32);
1693 let config = TorConfig {
1694 socks5_addr: Some("127.0.0.1:19999".to_string()),
1695 connect_timeout_ms: Some(2000),
1696 ..Default::default()
1697 };
1698 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1699 transport.start_async().await.unwrap();
1700
1701 let addr = TransportAddr::from_string("192.168.1.1:2121");
1702 let result = transport.send_async(&addr, &build_msg1_frame()).await;
1703 assert!(result.is_err());
1704 }
1705
1706 #[tokio::test]
1707 async fn test_connect_timeout() {
1708 let (tx, _rx) = packet_channel(32);
1710 let config = TorConfig {
1711 socks5_addr: Some("192.0.2.1:9050".to_string()),
1713 connect_timeout_ms: Some(500),
1714 ..Default::default()
1715 };
1716 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1717 transport.start_async().await.unwrap();
1718
1719 let addr = TransportAddr::from_string("10.0.0.1:2121");
1720 let result = transport.send_async(&addr, &build_msg1_frame()).await;
1721 assert!(result.is_err());
1722 }
1723
1724 #[tokio::test]
1725 async fn test_close_connection() {
1726 let (dest_tx, _dest_rx) = packet_channel(32);
1728 let dest_config = TcpConfig {
1729 bind_addr: Some("127.0.0.1:0".to_string()),
1730 ..Default::default()
1731 };
1732 let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
1733 dest.start_async().await.unwrap();
1734 let dest_addr = dest.local_addr().unwrap();
1735
1736 let mock = MockSocks5Server::new(dest_addr).await.unwrap();
1737 let proxy_addr = mock.addr();
1738 let _proxy_handle = mock.spawn();
1739
1740 let (tor_tx, _tor_rx) = packet_channel(32);
1741 let tor_config = TorConfig {
1742 socks5_addr: Some(proxy_addr.to_string()),
1743 ..Default::default()
1744 };
1745 let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
1746 tor.start_async().await.unwrap();
1747
1748 let target = TransportAddr::from_string(&dest_addr.to_string());
1750 tor.send_async(&target, &build_msg1_frame()).await.unwrap();
1751
1752 {
1754 let pool = tor.pool.lock().await;
1755 assert_eq!(pool.len(), 1);
1756 }
1757 assert_eq!(tor.stats.snapshot().pool_outbound, 1);
1758 assert_eq!(tor.stats.snapshot().pool_inbound, 0);
1759
1760 tor.close_connection_async(&target).await;
1762
1763 {
1765 let pool = tor.pool.lock().await;
1766 assert_eq!(pool.len(), 0);
1767 }
1768 assert_eq!(tor.stats.snapshot().pool_outbound, 0);
1769
1770 tor.stop_async().await.unwrap();
1771 dest.stop_async().await.unwrap();
1772 }
1773
1774 use mock_control::MockTorControlServer;
1779
1780 #[tokio::test]
1781 async fn test_control_port_start_stop() {
1782 let mock = MockTorControlServer::start().await;
1783 let (tx, _rx) = packet_channel(32);
1784
1785 let config = TorConfig {
1786 mode: Some("control_port".to_string()),
1787 socks5_addr: Some("127.0.0.1:19050".to_string()),
1788 control_addr: Some(mock.addr().to_string()),
1789 control_auth: Some("password:testpass".to_string()),
1790 ..Default::default()
1791 };
1792 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1793
1794 transport.start_async().await.unwrap();
1795 assert_eq!(transport.state(), TransportState::Up);
1796 assert!(transport.onion_address().is_none());
1797 assert!(!transport.accept_connections());
1798
1799 transport.stop_async().await.unwrap();
1800 }
1801
1802 #[tokio::test]
1803 async fn test_config_defaults_phase2() {
1804 let config = TorConfig::default();
1805 assert_eq!(config.control_addr(), "/run/tor/control");
1806 assert_eq!(config.control_auth(), "cookie");
1807 assert_eq!(config.cookie_path(), "/var/run/tor/control.authcookie");
1808 assert_eq!(config.max_inbound_connections(), 64);
1809 }
1810
1811 use crate::config::DirectoryServiceConfig;
1816 use tempfile::TempDir;
1817
1818 #[test]
1819 fn test_directory_service_config_defaults() {
1820 let config = DirectoryServiceConfig::default();
1821 assert_eq!(
1822 config.hostname_file(),
1823 "/var/lib/tor/fips_onion_service/hostname"
1824 );
1825 assert_eq!(config.bind_addr(), "127.0.0.1:8443");
1826 }
1827
1828 #[tokio::test]
1829 async fn test_directory_mode_start_stop() {
1830 let dir = TempDir::new().unwrap();
1831 let hostname_path = dir.path().join("hostname");
1832 std::fs::write(
1833 &hostname_path,
1834 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
1835 )
1836 .unwrap();
1837
1838 let (tx, _rx) = packet_channel(32);
1839 let config = TorConfig {
1840 mode: Some("directory".to_string()),
1841 socks5_addr: Some("127.0.0.1:19050".to_string()),
1842 directory_service: Some(DirectoryServiceConfig {
1843 hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1844 bind_addr: Some("127.0.0.1:0".to_string()),
1845 }),
1846 ..Default::default()
1847 };
1848 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1849
1850 transport.start_async().await.unwrap();
1851 assert_eq!(transport.state(), TransportState::Up);
1852 assert_eq!(
1853 transport.onion_address(),
1854 Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion"),
1855 );
1856 assert!(transport.accept_connections());
1857
1858 transport.stop_async().await.unwrap();
1859 assert_eq!(transport.state(), TransportState::Down);
1860 }
1861
1862 #[tokio::test]
1863 async fn test_directory_mode_missing_hostname_file() {
1864 let (tx, _rx) = packet_channel(32);
1865 let config = TorConfig {
1866 mode: Some("directory".to_string()),
1867 socks5_addr: Some("127.0.0.1:19050".to_string()),
1868 directory_service: Some(DirectoryServiceConfig {
1869 hostname_file: Some("/nonexistent/hostname".to_string()),
1870 bind_addr: Some("127.0.0.1:0".to_string()),
1871 }),
1872 ..Default::default()
1873 };
1874 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1875
1876 let result = transport.start_async().await;
1877 assert!(result.is_err());
1878 let err = format!("{}", result.unwrap_err());
1879 assert!(err.contains("hostname"));
1880 }
1881
1882 #[tokio::test]
1883 async fn test_directory_mode_invalid_hostname() {
1884 let dir = TempDir::new().unwrap();
1885 let hostname_path = dir.path().join("hostname");
1886 std::fs::write(&hostname_path, "not-an-onion-address\n").unwrap();
1887
1888 let (tx, _rx) = packet_channel(32);
1889 let config = TorConfig {
1890 mode: Some("directory".to_string()),
1891 socks5_addr: Some("127.0.0.1:19050".to_string()),
1892 directory_service: Some(DirectoryServiceConfig {
1893 hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1894 bind_addr: Some("127.0.0.1:0".to_string()),
1895 }),
1896 ..Default::default()
1897 };
1898 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1899
1900 let result = transport.start_async().await;
1901 assert!(result.is_err());
1902 let err = format!("{}", result.unwrap_err());
1903 assert!(err.contains("invalid onion address"));
1904 }
1905
1906 #[tokio::test]
1907 async fn test_directory_mode_accept_inbound() {
1908 let dir = TempDir::new().unwrap();
1909 let hostname_path = dir.path().join("hostname");
1910 std::fs::write(
1911 &hostname_path,
1912 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
1913 )
1914 .unwrap();
1915
1916 let (tx, _rx) = packet_channel(32);
1917 let config = TorConfig {
1918 mode: Some("directory".to_string()),
1919 socks5_addr: Some("127.0.0.1:19050".to_string()),
1920 directory_service: Some(DirectoryServiceConfig {
1921 hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1922 bind_addr: Some("127.0.0.1:0".to_string()),
1923 }),
1924 ..Default::default()
1925 };
1926 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1927 transport.start_async().await.unwrap();
1928 assert!(transport.accept_connections());
1929
1930 transport.stop_async().await.unwrap();
1931 }
1932
1933 #[tokio::test]
1934 async fn test_socks5_mode_rejects_directory_service_config() {
1935 let (tx, _rx) = packet_channel(32);
1936 let config = TorConfig {
1937 mode: Some("socks5".to_string()),
1938 socks5_addr: Some("127.0.0.1:9050".to_string()),
1939 directory_service: Some(DirectoryServiceConfig::default()),
1940 ..Default::default()
1941 };
1942 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1943 let result = transport.start_async().await;
1944 assert!(result.is_err());
1945 let err = format!("{}", result.unwrap_err());
1946 assert!(err.contains("directory"));
1947 }
1948}