1pub mod control;
22pub mod stats;
23
24#[cfg(test)]
25mod mock_control;
26#[cfg(test)]
27mod mock_socks5;
28
29use super::{
30 ConnectionState, DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr,
31 TransportError, TransportId, TransportState, TransportType,
32};
33use crate::config::TorConfig;
34use crate::transport::tcp::stream::read_fmp_packet;
35use control::{ControlAuth, TorControlClient, TorMonitoringInfo};
36use stats::TorStats;
37
38use futures::FutureExt;
39use socket2::TcpKeepalive;
40use std::collections::HashMap;
41use std::net::SocketAddr;
42use std::sync::Arc;
43use std::time::Duration;
44use tokio::io::AsyncWriteExt;
45use tokio::net::tcp::OwnedWriteHalf;
46use tokio::net::{TcpListener, TcpStream};
47use tokio::sync::Mutex;
48use tokio::task::JoinHandle;
49use tokio::time::Instant;
50use tokio_socks::tcp::Socks5Stream;
51use tracing::{debug, info, trace, warn};
52
53#[derive(Clone, Debug)]
59pub enum TorAddr {
60 Onion(String, u16),
62 Clearnet(SocketAddr),
64 ClearnetHostname(String, u16),
68}
69
70fn parse_tor_addr(addr: &TransportAddr) -> Result<TorAddr, TransportError> {
76 let s = addr.as_str().ok_or_else(|| {
77 TransportError::InvalidAddress("Tor address must be a valid UTF-8 string".into())
78 })?;
79
80 if s.contains(".onion:") {
81 let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
83 TransportError::InvalidAddress(format!("invalid onion address: {}", s))
84 })?;
85 let port: u16 = port_str.parse().map_err(|_| {
86 TransportError::InvalidAddress(format!("invalid port in onion address: {}", s))
87 })?;
88 Ok(TorAddr::Onion(host.to_string(), port))
89 } else if let Ok(socket_addr) = s.parse::<SocketAddr>() {
90 Ok(TorAddr::Clearnet(socket_addr))
92 } else {
93 let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
95 TransportError::InvalidAddress(format!("invalid address (expected host:port): {}", s))
96 })?;
97 let port: u16 = port_str
98 .parse()
99 .map_err(|_| TransportError::InvalidAddress(format!("invalid port: {}", s)))?;
100 if !host.contains('.') {
101 return Err(TransportError::InvalidAddress(format!(
102 "hostname must be fully qualified (contain a dot): {}",
103 host
104 )));
105 }
106 Ok(TorAddr::ClearnetHostname(host.to_string(), port))
107 }
108}
109
110struct TorConnection {
116 writer: Arc<Mutex<OwnedWriteHalf>>,
118 recv_task: JoinHandle<()>,
120 #[allow(dead_code)]
122 mtu: u16,
123 #[allow(dead_code)]
125 established_at: Instant,
126}
127
128type ConnectionPool = Arc<Mutex<HashMap<TransportAddr, TorConnection>>>;
130
131struct ConnectingEntry {
136 task: JoinHandle<Result<(TcpStream, u16), TransportError>>,
138}
139
140type ConnectingPool = Arc<Mutex<HashMap<TransportAddr, ConnectingEntry>>>;
142
143pub struct TorTransport {
154 transport_id: TransportId,
156 name: Option<String>,
158 config: TorConfig,
160 state: TransportState,
162 pool: ConnectionPool,
164 connecting: ConnectingPool,
166 packet_tx: PacketTx,
168 stats: Arc<TorStats>,
170 accept_task: Option<JoinHandle<()>>,
172 onion_address: Option<String>,
175 control_client: Option<Arc<Mutex<TorControlClient>>>,
177 cached_monitoring: Arc<std::sync::RwLock<Option<TorMonitoringInfo>>>,
179 monitoring_task: Option<JoinHandle<()>>,
181}
182
183impl TorTransport {
184 pub fn new(
186 transport_id: TransportId,
187 name: Option<String>,
188 config: TorConfig,
189 packet_tx: PacketTx,
190 ) -> Self {
191 Self {
192 transport_id,
193 name,
194 config,
195 state: TransportState::Configured,
196 pool: Arc::new(Mutex::new(HashMap::new())),
197 connecting: Arc::new(Mutex::new(HashMap::new())),
198 packet_tx,
199 stats: Arc::new(TorStats::new()),
200 accept_task: None,
201 onion_address: None,
202 control_client: None,
203 cached_monitoring: Arc::new(std::sync::RwLock::new(None)),
204 monitoring_task: None,
205 }
206 }
207
208 pub fn name(&self) -> Option<&str> {
210 self.name.as_deref()
211 }
212
213 pub fn onion_address(&self) -> Option<&str> {
215 self.onion_address.as_deref()
216 }
217
218 pub fn stats(&self) -> &Arc<TorStats> {
220 &self.stats
221 }
222
223 pub fn cached_monitoring(&self) -> Option<TorMonitoringInfo> {
225 self.cached_monitoring.read().ok()?.clone()
226 }
227
228 pub fn mode(&self) -> &str {
230 self.config.mode()
231 }
232
233 pub async fn start_async(&mut self) -> Result<(), TransportError> {
241 if !self.state.can_start() {
242 return Err(TransportError::AlreadyStarted);
243 }
244
245 self.state = TransportState::Starting;
246
247 let socks5_addr = self.config.socks5_addr().to_string();
249 validate_host_port(&socks5_addr, "socks5_addr")?;
250
251 let mode = self.config.mode().to_string();
252 match mode.as_str() {
253 "socks5" => {
254 if self.config.directory_service.is_some() {
256 return Err(TransportError::StartFailed(
257 "directory_service config requires mode 'directory', not 'socks5'".into(),
258 ));
259 }
260 self.state = TransportState::Up;
261 }
262 "control_port" => {
263 self.start_control_port_mode().await?;
264 }
265 "directory" => {
266 self.start_directory_mode().await?;
267 }
268 other => {
269 return Err(TransportError::StartFailed(format!(
270 "unsupported Tor mode '{}' (expected 'socks5', 'control_port', or 'directory')",
271 other
272 )));
273 }
274 }
275
276 if let Some(ref name) = self.name {
277 info!(
278 name = %name,
279 mode = %mode,
280 socks5_addr = %socks5_addr,
281 onion_address = ?self.onion_address,
282 mtu = self.config.mtu(),
283 "Tor transport started"
284 );
285 } else {
286 info!(
287 mode = %mode,
288 socks5_addr = %socks5_addr,
289 onion_address = ?self.onion_address,
290 mtu = self.config.mtu(),
291 "Tor transport started"
292 );
293 }
294
295 Ok(())
296 }
297
298 async fn start_control_port_mode(&mut self) -> Result<(), TransportError> {
301 let control_addr = self.config.control_addr().to_string();
302 if !control_addr.starts_with('/') && !control_addr.starts_with("./") {
304 validate_host_port(&control_addr, "control_addr")?;
305 }
306
307 let mut client = TorControlClient::connect(&control_addr)
309 .await
310 .map_err(|e| {
311 self.stats.record_control_error();
312 TransportError::StartFailed(format!("Tor control port: {}", e))
313 })?;
314
315 let auth = ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path())
317 .map_err(|e| TransportError::StartFailed(format!("Tor auth config: {}", e)))?;
318
319 client.authenticate(&auth).await.map_err(|e| {
320 self.stats.record_control_error();
321 TransportError::StartFailed(format!("Tor authentication: {}", e))
322 })?;
323
324 self.control_client = Some(Arc::new(Mutex::new(client)));
326 self.state = TransportState::Up;
327 self.spawn_monitoring_task();
328
329 Ok(())
330 }
331
332 async fn start_directory_mode(&mut self) -> Result<(), TransportError> {
339 let dir_config = self.config.directory_service.clone().unwrap_or_default();
340
341 let hostname_file = dir_config.hostname_file();
343 let onion_addr = std::fs::read_to_string(hostname_file)
344 .map_err(|e| {
345 TransportError::StartFailed(format!(
346 "failed to read onion hostname from '{}': {} \
347 (ensure HiddenServiceDir is configured in torrc and Tor has started)",
348 hostname_file, e
349 ))
350 })?
351 .trim()
352 .to_string();
353
354 if onion_addr.is_empty() || !onion_addr.ends_with(".onion") {
355 return Err(TransportError::StartFailed(format!(
356 "invalid onion address in '{}': '{}'",
357 hostname_file, onion_addr
358 )));
359 }
360
361 self.onion_address = Some(onion_addr.clone());
362
363 let bind_addr = dir_config.bind_addr();
365 let listener = TcpListener::bind(bind_addr).await.map_err(|e| {
366 TransportError::StartFailed(format!(
367 "failed to bind directory-mode listener on {}: {}",
368 bind_addr, e
369 ))
370 })?;
371 let local_addr = listener
372 .local_addr()
373 .map_err(|e| TransportError::StartFailed(format!("failed to get local addr: {}", e)))?;
374
375 info!(
376 onion_address = %onion_addr,
377 local_addr = %local_addr,
378 hostname_file = %hostname_file,
379 "Directory-mode onion service active"
380 );
381
382 let transport_id = self.transport_id;
384 let packet_tx = self.packet_tx.clone();
385 let pool = self.pool.clone();
386 let mtu = self.config.mtu();
387 let max_inbound = self.config.max_inbound_connections();
388 let stats = self.stats.clone();
389
390 let accept_handle = tokio::spawn(async move {
391 tor_accept_loop(
392 listener,
393 transport_id,
394 packet_tx,
395 pool,
396 mtu,
397 max_inbound,
398 stats,
399 )
400 .await;
401 });
402
403 self.accept_task = Some(accept_handle);
404 self.state = TransportState::Up;
405
406 if self.config.control_addr.is_some() {
408 self.try_connect_control_port().await;
409 }
410
411 Ok(())
412 }
413
414 async fn try_connect_control_port(&mut self) {
417 let control_addr = self.config.control_addr().to_string();
418 if !control_addr.starts_with('/')
419 && !control_addr.starts_with("./")
420 && let Err(e) = validate_host_port(&control_addr, "control_addr")
421 {
422 warn!(
423 transport_id = %self.transport_id,
424 error = %e,
425 "Tor control port address invalid, monitoring disabled"
426 );
427 return;
428 }
429
430 let client = match TorControlClient::connect(&control_addr).await {
431 Ok(c) => c,
432 Err(e) => {
433 warn!(
434 transport_id = %self.transport_id,
435 addr = %control_addr,
436 error = %e,
437 "Tor control port connect failed, monitoring disabled"
438 );
439 return;
440 }
441 };
442
443 let auth =
444 match ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path()) {
445 Ok(a) => a,
446 Err(e) => {
447 warn!(
448 transport_id = %self.transport_id,
449 error = %e,
450 "Tor control auth config error, monitoring disabled"
451 );
452 return;
453 }
454 };
455
456 let mut client = client;
457 if let Err(e) = client.authenticate(&auth).await {
458 warn!(
459 transport_id = %self.transport_id,
460 error = %e,
461 "Tor control port auth failed, monitoring disabled"
462 );
463 return;
464 }
465
466 info!(
467 transport_id = %self.transport_id,
468 addr = %control_addr,
469 "Tor control port connected (monitoring enabled)"
470 );
471
472 self.control_client = Some(Arc::new(Mutex::new(client)));
473 self.spawn_monitoring_task();
474 }
475
476 pub async fn stop_async(&mut self) -> Result<(), TransportError> {
481 if !self.state.is_operational() {
482 return Err(TransportError::NotStarted);
483 }
484
485 if let Some(task) = self.accept_task.take() {
487 task.abort();
488 let _ = task.await;
489 debug!(
490 transport_id = %self.transport_id,
491 "Onion service accept loop stopped"
492 );
493 }
494
495 if let Some(task) = self.monitoring_task.take() {
497 task.abort();
498 let _ = task.await;
499 }
500 if let Ok(mut w) = self.cached_monitoring.write() {
501 *w = None;
502 }
503
504 self.control_client = None;
505 self.onion_address = None;
506
507 let mut connecting = self.connecting.lock().await;
509 for (addr, entry) in connecting.drain() {
510 entry.task.abort();
511 debug!(
512 transport_id = %self.transport_id,
513 remote_addr = %addr,
514 "Tor connect aborted (transport stopping)"
515 );
516 }
517 drop(connecting);
518
519 let mut pool = self.pool.lock().await;
521 for (addr, conn) in pool.drain() {
522 conn.recv_task.abort();
523 let _ = conn.recv_task.await;
524 debug!(
525 transport_id = %self.transport_id,
526 remote_addr = %addr,
527 "Tor connection closed (transport stopping)"
528 );
529 }
530 drop(pool);
531
532 self.state = TransportState::Down;
533
534 info!(
535 transport_id = %self.transport_id,
536 "Tor transport stopped"
537 );
538
539 Ok(())
540 }
541
542 fn spawn_monitoring_task(&mut self) {
545 let Some(client) = self.control_client.clone() else {
546 return;
547 };
548 let cache = self.cached_monitoring.clone();
549 let stats = self.stats.clone();
550 let transport_id = self.transport_id;
551
552 let handle = tokio::spawn(async move {
553 let mut interval = tokio::time::interval(Duration::from_secs(10));
554 let mut last_bootstrap: u8 = 0;
555 let mut last_liveness = String::new();
556 let mut was_dormant = false;
557 let mut stall_warned = false;
558 let started_at = Instant::now();
559
560 loop {
561 interval.tick().await;
562 let mut guard = client.lock().await;
563 match guard.monitoring_snapshot().await {
564 Ok(info) => {
565 for &milestone in &[25u8, 50, 75, 100] {
567 if info.bootstrap >= milestone && last_bootstrap < milestone {
568 info!(
569 transport_id = %transport_id,
570 bootstrap = info.bootstrap,
571 "Tor bootstrap {}%",
572 milestone
573 );
574 }
575 }
576
577 if info.bootstrap < 100
579 && started_at.elapsed() > Duration::from_secs(60)
580 && !stall_warned
581 {
582 warn!(
583 transport_id = %transport_id,
584 bootstrap = info.bootstrap,
585 "Tor bootstrap stalled — not at 100% after 60s"
586 );
587 stall_warned = true;
588 }
589 if info.bootstrap == 100 {
590 stall_warned = false;
591 }
592
593 last_bootstrap = info.bootstrap;
594
595 if !last_liveness.is_empty() && info.network_liveness != last_liveness {
597 warn!(
598 transport_id = %transport_id,
599 from = %last_liveness,
600 to = %info.network_liveness,
601 "Tor network liveness changed"
602 );
603 }
604 last_liveness = info.network_liveness.clone();
605
606 if info.dormant && !was_dormant {
608 warn!(
609 transport_id = %transport_id,
610 "Tor daemon entered dormant mode"
611 );
612 }
613 was_dormant = info.dormant;
614
615 if let Ok(mut w) = cache.write() {
616 *w = Some(info);
617 }
618 }
619 Err(e) => {
620 stats.record_control_error();
621 warn!(
622 transport_id = %transport_id,
623 error = %e,
624 "Tor monitoring query failed"
625 );
626 }
627 }
628 }
629 });
630
631 self.monitoring_task = Some(handle);
632 }
633
634 pub async fn send_async(
641 &self,
642 addr: &TransportAddr,
643 data: &[u8],
644 ) -> Result<usize, TransportError> {
645 if !self.state.is_operational() {
646 return Err(TransportError::NotStarted);
647 }
648
649 let mtu = self.config.mtu() as usize;
651 if data.len() > mtu {
652 self.stats.record_mtu_exceeded();
653 return Err(TransportError::MtuExceeded {
654 packet_size: data.len(),
655 mtu: self.config.mtu(),
656 });
657 }
658
659 let writer = {
661 let pool = self.pool.lock().await;
662 pool.get(addr).map(|c| c.writer.clone())
663 };
664
665 let writer = match writer {
666 Some(w) => w,
667 None => {
668 self.connect(addr).await?
670 }
671 };
672
673 let mut w = writer.lock().await;
675 match w.write_all(data).await {
676 Ok(()) => {
677 self.stats.record_send(data.len());
678 trace!(
679 transport_id = %self.transport_id,
680 remote_addr = %addr,
681 bytes = data.len(),
682 "Tor packet sent"
683 );
684 Ok(data.len())
685 }
686 Err(e) => {
687 self.stats.record_send_error();
688 drop(w);
689 let mut pool = self.pool.lock().await;
691 if let Some(conn) = pool.remove(addr) {
692 conn.recv_task.abort();
693 }
694 Err(TransportError::SendFailed(format!("{}", e)))
695 }
696 }
697 }
698
699 async fn connect(
705 &self,
706 addr: &TransportAddr,
707 ) -> Result<Arc<Mutex<OwnedWriteHalf>>, TransportError> {
708 let tor_addr = parse_tor_addr(addr)?;
709 let proxy_addr = self.config.socks5_addr();
710 let timeout_ms = self.config.connect_timeout_ms();
711
712 debug!(
713 transport_id = %self.transport_id,
714 remote_addr = %addr,
715 proxy = %proxy_addr,
716 timeout_secs = timeout_ms / 1000,
717 "Connecting via Tor SOCKS5"
718 );
719
720 let isolation_key = addr.to_string();
725 let connect_start = Instant::now();
726 let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
727 match &tor_addr {
728 TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
729 Socks5Stream::connect_with_password(
730 proxy_addr,
731 (host.as_str(), *port),
732 "fips",
733 &isolation_key,
734 )
735 .await
736 }
737 TorAddr::Clearnet(socket_addr) => {
738 Socks5Stream::connect_with_password(
739 proxy_addr,
740 *socket_addr,
741 "fips",
742 &isolation_key,
743 )
744 .await
745 }
746 }
747 })
748 .await;
749
750 let stream = match socks_result {
751 Ok(Ok(socks_stream)) => socks_stream.into_inner(),
752 Ok(Err(e)) => {
753 self.stats.record_socks5_error();
754 warn!(
755 transport_id = %self.transport_id,
756 remote_addr = %addr,
757 error = %e,
758 elapsed_secs = connect_start.elapsed().as_secs(),
759 "Tor SOCKS5 connection failed"
760 );
761 return Err(TransportError::ConnectionRefused);
762 }
763 Err(_) => {
764 self.stats.record_connect_timeout();
765 warn!(
766 transport_id = %self.transport_id,
767 remote_addr = %addr,
768 timeout_secs = timeout_ms / 1000,
769 "Tor SOCKS5 connection timed out"
770 );
771 return Err(TransportError::Timeout);
772 }
773 };
774
775 let std_stream = stream
777 .into_std()
778 .map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
779 configure_socket(&std_stream, &self.config)?;
780
781 let stream = TcpStream::from_std(std_stream)
783 .map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
784
785 let (read_half, write_half) = stream.into_split();
787 let writer = Arc::new(Mutex::new(write_half));
788
789 let transport_id = self.transport_id;
790 let packet_tx = self.packet_tx.clone();
791 let pool = self.pool.clone();
792 let recv_stats = self.stats.clone();
793 let remote_addr = addr.clone();
794 let mtu = self.config.mtu();
795
796 let recv_task = tokio::spawn(async move {
797 tor_receive_loop(
798 read_half,
799 transport_id,
800 remote_addr.clone(),
801 packet_tx,
802 pool,
803 mtu,
804 recv_stats,
805 )
806 .await;
807 });
808
809 let conn = TorConnection {
810 writer: writer.clone(),
811 recv_task,
812 mtu,
813 established_at: Instant::now(),
814 };
815
816 let mut pool = self.pool.lock().await;
817 pool.insert(addr.clone(), conn);
818
819 self.stats.record_connection_established();
820
821 info!(
822 transport_id = %self.transport_id,
823 remote_addr = %addr,
824 elapsed_secs = connect_start.elapsed().as_secs(),
825 "Tor circuit established via SOCKS5"
826 );
827
828 Ok(writer)
829 }
830
831 pub async fn connect_async(&self, addr: &TransportAddr) -> Result<(), TransportError> {
840 if !self.state.is_operational() {
841 return Err(TransportError::NotStarted);
842 }
843
844 {
846 let pool = self.pool.lock().await;
847 if pool.contains_key(addr) {
848 return Ok(());
849 }
850 }
851
852 {
854 let connecting = self.connecting.lock().await;
855 if connecting.contains_key(addr) {
856 return Ok(());
857 }
858 }
859
860 let tor_addr = parse_tor_addr(addr)?;
861 let proxy_addr = self.config.socks5_addr().to_string();
862 let timeout_ms = self.config.connect_timeout_ms();
863 let transport_id = self.transport_id;
864 let remote_addr = addr.clone();
865 let config = self.config.clone();
866
867 debug!(
868 transport_id = %transport_id,
869 remote_addr = %remote_addr,
870 timeout_ms,
871 "Initiating background Tor SOCKS5 connect"
872 );
873
874 let isolation_key = addr.to_string();
876
877 let task = tokio::spawn(async move {
878 let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
881 match &tor_addr {
882 TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
883 Socks5Stream::connect_with_password(
884 proxy_addr.as_str(),
885 (host.as_str(), *port),
886 "fips",
887 &isolation_key,
888 )
889 .await
890 }
891 TorAddr::Clearnet(socket_addr) => {
892 Socks5Stream::connect_with_password(
893 proxy_addr.as_str(),
894 *socket_addr,
895 "fips",
896 &isolation_key,
897 )
898 .await
899 }
900 }
901 })
902 .await;
903
904 let stream = match socks_result {
905 Ok(Ok(socks_stream)) => socks_stream.into_inner(),
906 Ok(Err(e)) => {
907 debug!(
908 transport_id = %transport_id,
909 remote_addr = %remote_addr,
910 error = %e,
911 "Background Tor SOCKS5 connect failed"
912 );
913 return Err(TransportError::ConnectionRefused);
914 }
915 Err(_) => {
916 debug!(
917 transport_id = %transport_id,
918 remote_addr = %remote_addr,
919 "Background Tor SOCKS5 connect timed out"
920 );
921 return Err(TransportError::Timeout);
922 }
923 };
924
925 let std_stream = stream
927 .into_std()
928 .map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
929 configure_socket(&std_stream, &config)?;
930
931 let mtu = config.mtu();
932
933 let stream = TcpStream::from_std(std_stream)
935 .map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
936
937 Ok((stream, mtu))
938 });
939
940 let mut connecting = self.connecting.lock().await;
941 connecting.insert(addr.clone(), ConnectingEntry { task });
942
943 Ok(())
944 }
945
946 pub fn connection_state_sync(&self, addr: &TransportAddr) -> ConnectionState {
955 if let Ok(pool) = self.pool.try_lock() {
957 if pool.contains_key(addr) {
958 return ConnectionState::Connected;
959 }
960 } else {
961 return ConnectionState::Connecting; }
963
964 let mut connecting = match self.connecting.try_lock() {
966 Ok(c) => c,
967 Err(_) => return ConnectionState::Connecting,
968 };
969
970 let entry = match connecting.get_mut(addr) {
971 Some(e) => e,
972 None => return ConnectionState::None,
973 };
974
975 if !entry.task.is_finished() {
977 return ConnectionState::Connecting;
978 }
979
980 let addr_clone = addr.clone();
982 let task = connecting.remove(&addr_clone).unwrap().task;
983
984 match task.now_or_never() {
986 Some(Ok(Ok((stream, mtu)))) => {
987 self.promote_connection(addr, stream, mtu);
989 ConnectionState::Connected
990 }
991 Some(Ok(Err(e))) => ConnectionState::Failed(format!("{}", e)),
992 Some(Err(e)) => {
993 ConnectionState::Failed(format!("task failed: {}", e))
995 }
996 None => {
997 ConnectionState::Connecting
999 }
1000 }
1001 }
1002
1003 fn promote_connection(&self, addr: &TransportAddr, stream: TcpStream, mtu: u16) {
1008 let (read_half, write_half) = stream.into_split();
1009 let writer = Arc::new(Mutex::new(write_half));
1010
1011 let transport_id = self.transport_id;
1012 let packet_tx = self.packet_tx.clone();
1013 let pool = self.pool.clone();
1014 let recv_stats = self.stats.clone();
1015 let remote_addr = addr.clone();
1016
1017 let recv_task = tokio::spawn(async move {
1018 tor_receive_loop(
1019 read_half,
1020 transport_id,
1021 remote_addr.clone(),
1022 packet_tx,
1023 pool,
1024 mtu,
1025 recv_stats,
1026 )
1027 .await;
1028 });
1029
1030 let conn = TorConnection {
1031 writer,
1032 recv_task,
1033 mtu,
1034 established_at: Instant::now(),
1035 };
1036
1037 if let Ok(mut pool) = self.pool.try_lock() {
1040 pool.insert(addr.clone(), conn);
1041 self.stats.record_connection_established();
1042 debug!(
1043 transport_id = %self.transport_id,
1044 remote_addr = %addr,
1045 "Tor connection established (background connect)"
1046 );
1047 } else {
1048 conn.recv_task.abort();
1050 warn!(
1051 transport_id = %self.transport_id,
1052 remote_addr = %addr,
1053 "Failed to promote Tor connection (pool locked)"
1054 );
1055 }
1056 }
1057
1058 pub async fn close_connection_async(&self, addr: &TransportAddr) {
1060 let mut pool = self.pool.lock().await;
1061 if let Some(conn) = pool.remove(addr) {
1062 conn.recv_task.abort();
1063 debug!(
1064 transport_id = %self.transport_id,
1065 remote_addr = %addr,
1066 "Tor connection closed"
1067 );
1068 }
1069 }
1070}
1071
1072impl Transport for TorTransport {
1073 fn transport_id(&self) -> TransportId {
1074 self.transport_id
1075 }
1076
1077 fn transport_type(&self) -> &TransportType {
1078 &TransportType::TOR
1079 }
1080
1081 fn state(&self) -> TransportState {
1082 self.state
1083 }
1084
1085 fn mtu(&self) -> u16 {
1086 self.config.mtu()
1087 }
1088
1089 fn link_mtu(&self, _addr: &TransportAddr) -> u16 {
1090 self.config.mtu()
1091 }
1092
1093 fn start(&mut self) -> Result<(), TransportError> {
1094 Err(TransportError::NotSupported(
1095 "use start_async() for Tor transport".into(),
1096 ))
1097 }
1098
1099 fn stop(&mut self) -> Result<(), TransportError> {
1100 Err(TransportError::NotSupported(
1101 "use stop_async() for Tor transport".into(),
1102 ))
1103 }
1104
1105 fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
1106 Err(TransportError::NotSupported(
1107 "use send_async() for Tor transport".into(),
1108 ))
1109 }
1110
1111 fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
1112 Ok(Vec::new())
1113 }
1114
1115 fn accept_connections(&self) -> bool {
1116 self.onion_address.is_some()
1117 }
1118}
1119
1120async fn tor_receive_loop(
1130 mut reader: tokio::net::tcp::OwnedReadHalf,
1131 transport_id: TransportId,
1132 remote_addr: TransportAddr,
1133 packet_tx: PacketTx,
1134 pool: ConnectionPool,
1135 mtu: u16,
1136 stats: Arc<TorStats>,
1137) {
1138 debug!(
1139 transport_id = %transport_id,
1140 remote_addr = %remote_addr,
1141 "Tor receive loop starting"
1142 );
1143
1144 loop {
1145 match read_fmp_packet(&mut reader, mtu).await {
1146 Ok(data) => {
1147 stats.record_recv(data.len());
1148
1149 trace!(
1150 transport_id = %transport_id,
1151 remote_addr = %remote_addr,
1152 bytes = data.len(),
1153 "Tor packet received"
1154 );
1155
1156 let packet = ReceivedPacket::new(transport_id, remote_addr.clone(), data);
1157
1158 if packet_tx.send(packet).is_err() {
1159 debug!(
1160 transport_id = %transport_id,
1161 "Packet channel closed, stopping Tor receive loop"
1162 );
1163 break;
1164 }
1165 }
1166 Err(e) => {
1167 stats.record_recv_error();
1168 debug!(
1169 transport_id = %transport_id,
1170 remote_addr = %remote_addr,
1171 error = %e,
1172 "Tor receive error, removing connection"
1173 );
1174 break;
1175 }
1176 }
1177 }
1178
1179 let mut pool_guard = pool.lock().await;
1181 pool_guard.remove(&remote_addr);
1182
1183 debug!(
1184 transport_id = %transport_id,
1185 remote_addr = %remote_addr,
1186 "Tor receive loop stopped"
1187 );
1188}
1189
1190fn configure_socket(
1198 stream: &std::net::TcpStream,
1199 _config: &TorConfig,
1200) -> Result<(), TransportError> {
1201 let socket = socket2::SockRef::from(stream);
1202
1203 socket
1205 .set_tcp_nodelay(true)
1206 .map_err(|e| TransportError::StartFailed(format!("set nodelay: {}", e)))?;
1207
1208 let keepalive_secs = 30u64;
1210 if keepalive_secs > 0 {
1211 let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(keepalive_secs));
1212 socket
1213 .set_tcp_keepalive(&keepalive)
1214 .map_err(|e| TransportError::StartFailed(format!("set keepalive: {}", e)))?;
1215 }
1216
1217 Ok(())
1218}
1219
1220async fn tor_accept_loop(
1231 listener: TcpListener,
1232 transport_id: TransportId,
1233 packet_tx: PacketTx,
1234 pool: ConnectionPool,
1235 mtu: u16,
1236 max_inbound: usize,
1237 stats: Arc<TorStats>,
1238) {
1239 debug!(
1240 transport_id = %transport_id,
1241 "Onion service accept loop starting"
1242 );
1243
1244 loop {
1245 let (stream, peer_addr) = match listener.accept().await {
1246 Ok(result) => result,
1247 Err(e) => {
1248 warn!(
1249 transport_id = %transport_id,
1250 error = %e,
1251 "Onion service accept error"
1252 );
1253 continue;
1254 }
1255 };
1256
1257 let current_count = {
1259 let pool_guard = pool.lock().await;
1260 pool_guard.len()
1261 };
1262 if current_count >= max_inbound {
1263 stats.record_connection_rejected();
1264 debug!(
1265 transport_id = %transport_id,
1266 peer_addr = %peer_addr,
1267 max_inbound,
1268 "Rejecting inbound onion connection (limit reached)"
1269 );
1270 drop(stream);
1271 continue;
1272 }
1273
1274 let std_stream = match stream.into_std() {
1276 Ok(s) => s,
1277 Err(e) => {
1278 warn!(
1279 transport_id = %transport_id,
1280 error = %e,
1281 "Failed to convert accepted stream to std"
1282 );
1283 continue;
1284 }
1285 };
1286
1287 let socket = socket2::SockRef::from(&std_stream);
1288 let _ = socket.set_tcp_nodelay(true);
1289 let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(30));
1290 let _ = socket.set_tcp_keepalive(&keepalive);
1291
1292 let stream = match TcpStream::from_std(std_stream) {
1293 Ok(s) => s,
1294 Err(e) => {
1295 warn!(
1296 transport_id = %transport_id,
1297 error = %e,
1298 "Failed to convert accepted stream back to tokio"
1299 );
1300 continue;
1301 }
1302 };
1303
1304 let remote_addr = TransportAddr::from_string(&peer_addr.to_string());
1305
1306 let (read_half, write_half) = stream.into_split();
1308 let writer = Arc::new(Mutex::new(write_half));
1309
1310 let recv_pool = pool.clone();
1311 let recv_stats = stats.clone();
1312 let recv_addr = remote_addr.clone();
1313 let recv_tx = packet_tx.clone();
1314
1315 let recv_task = tokio::spawn(async move {
1316 tor_receive_loop(
1317 read_half,
1318 transport_id,
1319 recv_addr,
1320 recv_tx,
1321 recv_pool,
1322 mtu,
1323 recv_stats,
1324 )
1325 .await;
1326 });
1327
1328 let conn = TorConnection {
1329 writer,
1330 recv_task,
1331 mtu,
1332 established_at: Instant::now(),
1333 };
1334
1335 {
1336 let mut pool_guard = pool.lock().await;
1337 pool_guard.insert(remote_addr.clone(), conn);
1338 }
1339
1340 stats.record_connection_accepted();
1341
1342 debug!(
1343 transport_id = %transport_id,
1344 peer_addr = %peer_addr,
1345 "Accepted inbound onion connection"
1346 );
1347 }
1348}
1349
1350fn validate_host_port(addr: &str, field_name: &str) -> Result<(), TransportError> {
1356 if addr.parse::<SocketAddr>().is_ok() {
1357 return Ok(());
1358 }
1359 let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
1361 if parts.len() != 2 || parts[0].parse::<u16>().is_err() || parts[1].is_empty() {
1362 return Err(TransportError::StartFailed(format!(
1363 "invalid {} '{}': expected host:port or IP:port",
1364 field_name, addr
1365 )));
1366 }
1367 Ok(())
1368}
1369
1370#[cfg(test)]
1375mod tests {
1376 use super::*;
1377 use crate::transport::packet_channel;
1378
1379 fn make_config() -> TorConfig {
1380 TorConfig {
1381 socks5_addr: Some("127.0.0.1:19050".to_string()),
1382 ..Default::default()
1383 }
1384 }
1385
1386 #[test]
1387 fn test_parse_tor_addr_onion() {
1388 let addr = TransportAddr::from_string("abcdef1234567890.onion:2121");
1389 let tor_addr = parse_tor_addr(&addr).unwrap();
1390 match tor_addr {
1391 TorAddr::Onion(host, port) => {
1392 assert_eq!(host, "abcdef1234567890.onion");
1393 assert_eq!(port, 2121);
1394 }
1395 _ => panic!("expected Onion variant"),
1396 }
1397 }
1398
1399 #[test]
1400 fn test_parse_tor_addr_clearnet() {
1401 let addr = TransportAddr::from_string("192.168.1.1:8080");
1402 let tor_addr = parse_tor_addr(&addr).unwrap();
1403 match tor_addr {
1404 TorAddr::Clearnet(socket_addr) => {
1405 assert_eq!(
1406 socket_addr,
1407 "192.168.1.1:8080".parse::<SocketAddr>().unwrap()
1408 );
1409 }
1410 _ => panic!("expected Clearnet variant"),
1411 }
1412 }
1413
1414 #[test]
1415 fn test_parse_tor_addr_clearnet_hostname() {
1416 let addr = TransportAddr::from_string("peer1.example.com:2121");
1417 let tor_addr = parse_tor_addr(&addr).unwrap();
1418 match tor_addr {
1419 TorAddr::ClearnetHostname(host, port) => {
1420 assert_eq!(host, "peer1.example.com");
1421 assert_eq!(port, 2121);
1422 }
1423 _ => panic!("expected ClearnetHostname variant"),
1424 }
1425 }
1426
1427 #[test]
1428 fn test_parse_tor_addr_invalid() {
1429 let addr = TransportAddr::from_string("localhost:2121");
1431 assert!(parse_tor_addr(&addr).is_err());
1432
1433 let addr = TransportAddr::from_string("not-a-valid-address");
1435 assert!(parse_tor_addr(&addr).is_err());
1436
1437 let addr = TransportAddr::from_string("example.com:notaport");
1439 assert!(parse_tor_addr(&addr).is_err());
1440 }
1441
1442 #[test]
1443 fn test_config_defaults() {
1444 let config = TorConfig::default();
1445 assert_eq!(config.mode(), "socks5");
1446 assert_eq!(config.socks5_addr(), "127.0.0.1:9050");
1447 assert_eq!(config.connect_timeout_ms(), 120000);
1448 assert_eq!(config.mtu(), 1400);
1449 assert_eq!(config.advertised_port(), 443);
1450 }
1451
1452 #[test]
1453 fn test_advertised_port_override() {
1454 let config = TorConfig {
1455 advertised_port: Some(9001),
1456 ..Default::default()
1457 };
1458 assert_eq!(config.advertised_port(), 9001);
1459 }
1460
1461 #[test]
1466 fn test_advert_address_round_trips_through_parser() {
1467 let onion = "mwvj6q3pnsiaky7i6wg5s42xlfurt5uqr3qzckrlw2graa2ugcgwhiqd.onion";
1468 let cfg = TorConfig::default();
1469 let advertised = format!("{}:{}", onion, cfg.advertised_port());
1470
1471 let parsed = parse_tor_addr(&TransportAddr::from_string(&advertised)).unwrap();
1472 match parsed {
1473 TorAddr::Onion(host, port) => {
1474 assert_eq!(host, onion);
1475 assert_eq!(port, 443);
1476 }
1477 other => panic!("expected Onion variant, got {:?}", other),
1478 }
1479
1480 assert!(parse_tor_addr(&TransportAddr::from_string(onion)).is_err());
1484 }
1485
1486 #[tokio::test]
1487 async fn test_start_stop() {
1488 let (tx, _rx) = packet_channel(32);
1489 let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1490
1491 transport.start_async().await.unwrap();
1492 assert_eq!(transport.state(), TransportState::Up);
1493
1494 transport.stop_async().await.unwrap();
1495 assert_eq!(transport.state(), TransportState::Down);
1496 }
1497
1498 #[tokio::test]
1499 async fn test_double_start_fails() {
1500 let (tx, _rx) = packet_channel(32);
1501 let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1502
1503 transport.start_async().await.unwrap();
1504 assert!(transport.start_async().await.is_err());
1505 }
1506
1507 #[tokio::test]
1508 async fn test_stop_not_started_fails() {
1509 let (tx, _rx) = packet_channel(32);
1510 let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1511
1512 assert!(transport.stop_async().await.is_err());
1513 }
1514
1515 #[tokio::test]
1516 async fn test_send_not_started() {
1517 let (tx, _rx) = packet_channel(32);
1518 let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1519
1520 let addr = TransportAddr::from_string("127.0.0.1:2121");
1521 let result = transport.send_async(&addr, &[0u8; 10]).await;
1522 assert!(result.is_err());
1523 }
1524
1525 #[test]
1526 fn test_transport_type() {
1527 let (tx, _rx) = packet_channel(32);
1528 let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1529
1530 let tt = transport.transport_type();
1531 assert_eq!(tt.name, "tor");
1532 assert!(tt.connection_oriented);
1533 assert!(tt.reliable);
1534 }
1535
1536 #[test]
1537 fn test_sync_methods_return_not_supported() {
1538 let (tx, _rx) = packet_channel(32);
1539 let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1540
1541 assert!(transport.start().is_err());
1542 assert!(transport.stop().is_err());
1543 let addr = TransportAddr::from_string("127.0.0.1:2121");
1544 assert!(transport.send(&addr, &[0u8; 10]).is_err());
1545 }
1546
1547 #[test]
1548 fn test_accept_connections_false() {
1549 let (tx, _rx) = packet_channel(32);
1550 let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1551
1552 assert!(!transport.accept_connections());
1553 }
1554
1555 #[test]
1556 fn test_discover_returns_empty() {
1557 let (tx, _rx) = packet_channel(32);
1558 let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1559
1560 assert!(transport.discover().unwrap().is_empty());
1561 }
1562
1563 #[tokio::test]
1564 async fn test_invalid_socks5_addr_start_fails() {
1565 let (tx, _rx) = packet_channel(32);
1566 let config = TorConfig {
1567 socks5_addr: Some("not-a-socket-addr".to_string()),
1568 ..Default::default()
1569 };
1570 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1571 assert!(transport.start_async().await.is_err());
1572 }
1573
1574 #[tokio::test]
1575 async fn test_unsupported_mode_start_fails() {
1576 let (tx, _rx) = packet_channel(32);
1577 let config = TorConfig {
1578 mode: Some("embedded".to_string()),
1579 socks5_addr: Some("127.0.0.1:9050".to_string()),
1580 ..Default::default()
1581 };
1582 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1583 assert!(transport.start_async().await.is_err());
1584 }
1585
1586 use crate::config::TcpConfig;
1591 use crate::transport::tcp::TcpTransport;
1592 use mock_socks5::MockSocks5Server;
1593
1594 const MSG1_WIRE_SIZE: usize = 114;
1596 const MSG1_PAYLOAD_LEN: u16 = (MSG1_WIRE_SIZE - 4) as u16;
1598
1599 fn build_msg1_frame() -> Vec<u8> {
1601 let mut frame = vec![0xAA; MSG1_WIRE_SIZE];
1602 frame[0] = 0x01; frame[1] = 0x00; frame[2..4].copy_from_slice(&MSG1_PAYLOAD_LEN.to_le_bytes());
1605 frame
1606 }
1607
1608 #[tokio::test]
1609 async fn test_send_recv_via_socks5() {
1610 let (dest_tx, mut dest_rx) = packet_channel(32);
1612 let dest_config = TcpConfig {
1613 bind_addr: Some("127.0.0.1:0".to_string()),
1614 ..Default::default()
1615 };
1616 let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
1617 dest.start_async().await.unwrap();
1618 let dest_addr = dest.local_addr().unwrap();
1619
1620 let mock = MockSocks5Server::new(dest_addr).await.unwrap();
1622 let proxy_addr = mock.addr();
1623 let _proxy_handle = mock.spawn();
1624
1625 let (tor_tx, _tor_rx) = packet_channel(32);
1627 let tor_config = TorConfig {
1628 socks5_addr: Some(proxy_addr.to_string()),
1629 ..Default::default()
1630 };
1631 let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
1632 tor.start_async().await.unwrap();
1633
1634 let frame = build_msg1_frame();
1636 let target = TransportAddr::from_string(&dest_addr.to_string());
1637 tor.send_async(&target, &frame).await.unwrap();
1638
1639 let received = tokio::time::timeout(Duration::from_secs(5), dest_rx.recv())
1641 .await
1642 .expect("timeout waiting for packet")
1643 .expect("channel closed");
1644
1645 assert_eq!(received.data, frame);
1646
1647 tor.stop_async().await.unwrap();
1649 dest.stop_async().await.unwrap();
1650 }
1651
1652 #[tokio::test]
1653 async fn test_socks5_proxy_down() {
1654 let (tx, _rx) = packet_channel(32);
1656 let config = TorConfig {
1657 socks5_addr: Some("127.0.0.1:19999".to_string()),
1658 connect_timeout_ms: Some(2000),
1659 ..Default::default()
1660 };
1661 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1662 transport.start_async().await.unwrap();
1663
1664 let addr = TransportAddr::from_string("192.168.1.1:2121");
1665 let result = transport.send_async(&addr, &build_msg1_frame()).await;
1666 assert!(result.is_err());
1667 }
1668
1669 #[tokio::test]
1670 async fn test_connect_timeout() {
1671 let (tx, _rx) = packet_channel(32);
1673 let config = TorConfig {
1674 socks5_addr: Some("192.0.2.1:9050".to_string()),
1676 connect_timeout_ms: Some(500),
1677 ..Default::default()
1678 };
1679 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1680 transport.start_async().await.unwrap();
1681
1682 let addr = TransportAddr::from_string("10.0.0.1:2121");
1683 let result = transport.send_async(&addr, &build_msg1_frame()).await;
1684 assert!(result.is_err());
1685 }
1686
1687 #[tokio::test]
1688 async fn test_close_connection() {
1689 let (dest_tx, _dest_rx) = packet_channel(32);
1691 let dest_config = TcpConfig {
1692 bind_addr: Some("127.0.0.1:0".to_string()),
1693 ..Default::default()
1694 };
1695 let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
1696 dest.start_async().await.unwrap();
1697 let dest_addr = dest.local_addr().unwrap();
1698
1699 let mock = MockSocks5Server::new(dest_addr).await.unwrap();
1700 let proxy_addr = mock.addr();
1701 let _proxy_handle = mock.spawn();
1702
1703 let (tor_tx, _tor_rx) = packet_channel(32);
1704 let tor_config = TorConfig {
1705 socks5_addr: Some(proxy_addr.to_string()),
1706 ..Default::default()
1707 };
1708 let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
1709 tor.start_async().await.unwrap();
1710
1711 let target = TransportAddr::from_string(&dest_addr.to_string());
1713 tor.send_async(&target, &build_msg1_frame()).await.unwrap();
1714
1715 {
1717 let pool = tor.pool.lock().await;
1718 assert_eq!(pool.len(), 1);
1719 }
1720
1721 tor.close_connection_async(&target).await;
1723
1724 {
1726 let pool = tor.pool.lock().await;
1727 assert_eq!(pool.len(), 0);
1728 }
1729
1730 tor.stop_async().await.unwrap();
1731 dest.stop_async().await.unwrap();
1732 }
1733
1734 use mock_control::MockTorControlServer;
1739
1740 #[tokio::test]
1741 async fn test_control_port_start_stop() {
1742 let mock = MockTorControlServer::start().await;
1743 let (tx, _rx) = packet_channel(32);
1744
1745 let config = TorConfig {
1746 mode: Some("control_port".to_string()),
1747 socks5_addr: Some("127.0.0.1:19050".to_string()),
1748 control_addr: Some(mock.addr().to_string()),
1749 control_auth: Some("password:testpass".to_string()),
1750 ..Default::default()
1751 };
1752 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1753
1754 transport.start_async().await.unwrap();
1755 assert_eq!(transport.state(), TransportState::Up);
1756 assert!(transport.onion_address().is_none());
1757 assert!(!transport.accept_connections());
1758
1759 transport.stop_async().await.unwrap();
1760 }
1761
1762 #[tokio::test]
1763 async fn test_config_defaults_phase2() {
1764 let config = TorConfig::default();
1765 assert_eq!(config.control_addr(), "/run/tor/control");
1766 assert_eq!(config.control_auth(), "cookie");
1767 assert_eq!(config.cookie_path(), "/var/run/tor/control.authcookie");
1768 assert_eq!(config.max_inbound_connections(), 64);
1769 }
1770
1771 use crate::config::DirectoryServiceConfig;
1776 use tempfile::TempDir;
1777
1778 #[test]
1779 fn test_directory_service_config_defaults() {
1780 let config = DirectoryServiceConfig::default();
1781 assert_eq!(
1782 config.hostname_file(),
1783 "/var/lib/tor/fips_onion_service/hostname"
1784 );
1785 assert_eq!(config.bind_addr(), "127.0.0.1:8443");
1786 }
1787
1788 #[tokio::test]
1789 async fn test_directory_mode_start_stop() {
1790 let dir = TempDir::new().unwrap();
1791 let hostname_path = dir.path().join("hostname");
1792 std::fs::write(
1793 &hostname_path,
1794 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
1795 )
1796 .unwrap();
1797
1798 let (tx, _rx) = packet_channel(32);
1799 let config = TorConfig {
1800 mode: Some("directory".to_string()),
1801 socks5_addr: Some("127.0.0.1:19050".to_string()),
1802 directory_service: Some(DirectoryServiceConfig {
1803 hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1804 bind_addr: Some("127.0.0.1:0".to_string()),
1805 }),
1806 ..Default::default()
1807 };
1808 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1809
1810 transport.start_async().await.unwrap();
1811 assert_eq!(transport.state(), TransportState::Up);
1812 assert_eq!(
1813 transport.onion_address(),
1814 Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion"),
1815 );
1816 assert!(transport.accept_connections());
1817
1818 transport.stop_async().await.unwrap();
1819 assert_eq!(transport.state(), TransportState::Down);
1820 }
1821
1822 #[tokio::test]
1823 async fn test_directory_mode_missing_hostname_file() {
1824 let (tx, _rx) = packet_channel(32);
1825 let config = TorConfig {
1826 mode: Some("directory".to_string()),
1827 socks5_addr: Some("127.0.0.1:19050".to_string()),
1828 directory_service: Some(DirectoryServiceConfig {
1829 hostname_file: Some("/nonexistent/hostname".to_string()),
1830 bind_addr: Some("127.0.0.1:0".to_string()),
1831 }),
1832 ..Default::default()
1833 };
1834 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1835
1836 let result = transport.start_async().await;
1837 assert!(result.is_err());
1838 let err = format!("{}", result.unwrap_err());
1839 assert!(err.contains("hostname"));
1840 }
1841
1842 #[tokio::test]
1843 async fn test_directory_mode_invalid_hostname() {
1844 let dir = TempDir::new().unwrap();
1845 let hostname_path = dir.path().join("hostname");
1846 std::fs::write(&hostname_path, "not-an-onion-address\n").unwrap();
1847
1848 let (tx, _rx) = packet_channel(32);
1849 let config = TorConfig {
1850 mode: Some("directory".to_string()),
1851 socks5_addr: Some("127.0.0.1:19050".to_string()),
1852 directory_service: Some(DirectoryServiceConfig {
1853 hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1854 bind_addr: Some("127.0.0.1:0".to_string()),
1855 }),
1856 ..Default::default()
1857 };
1858 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1859
1860 let result = transport.start_async().await;
1861 assert!(result.is_err());
1862 let err = format!("{}", result.unwrap_err());
1863 assert!(err.contains("invalid onion address"));
1864 }
1865
1866 #[tokio::test]
1867 async fn test_directory_mode_accept_inbound() {
1868 let dir = TempDir::new().unwrap();
1869 let hostname_path = dir.path().join("hostname");
1870 std::fs::write(
1871 &hostname_path,
1872 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
1873 )
1874 .unwrap();
1875
1876 let (tx, _rx) = packet_channel(32);
1877 let config = TorConfig {
1878 mode: Some("directory".to_string()),
1879 socks5_addr: Some("127.0.0.1:19050".to_string()),
1880 directory_service: Some(DirectoryServiceConfig {
1881 hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1882 bind_addr: Some("127.0.0.1:0".to_string()),
1883 }),
1884 ..Default::default()
1885 };
1886 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1887 transport.start_async().await.unwrap();
1888 assert!(transport.accept_connections());
1889
1890 transport.stop_async().await.unwrap();
1891 }
1892
1893 #[tokio::test]
1894 async fn test_socks5_mode_rejects_directory_service_config() {
1895 let (tx, _rx) = packet_channel(32);
1896 let config = TorConfig {
1897 mode: Some("socks5".to_string()),
1898 socks5_addr: Some("127.0.0.1:9050".to_string()),
1899 directory_service: Some(DirectoryServiceConfig::default()),
1900 ..Default::default()
1901 };
1902 let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1903 let result = transport.start_async().await;
1904 assert!(result.is_err());
1905 let err = format!("{}", result.unwrap_err());
1906 assert!(err.contains("directory"));
1907 }
1908}