Skip to main content

fips_core/transport/tor/
mod.rs

1//! Tor Transport Implementation
2//!
3//! Provides Tor-based transport for FIPS peer communication. Supports
4//! three modes:
5//!
6//! - **socks5**: Outbound-only connections through a Tor SOCKS5 proxy
7//!   to both clearnet peers and .onion hidden services.
8//! - **control_port**: Outbound via SOCKS5 plus control port connection
9//!   for Tor daemon monitoring (bootstrap status, traffic stats, network liveness).
10//! - **directory**: Inbound via a Tor-managed `HiddenServiceDir` onion
11//!   service, outbound via SOCKS5. No control port needed; enables
12//!   Tor's `Sandbox 1` mode. Reads `.onion` address from hostname file.
13//!
14//! ## Architecture
15//!
16//! Like TCP, each peer has its own connection. The transport reuses FMP
17//! stream framing from `tcp::stream` and follows the same connection pool
18//! pattern as the TCP transport. Inbound connections arrive via a local
19//! TCP listener that the Tor daemon forwards onion service traffic to.
20
21pub mod control;
22pub mod stats;
23
24#[cfg(test)]
25mod mock_control;
26#[cfg(test)]
27mod mock_socks5;
28
29use super::{
30    ConnectionState, DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr,
31    TransportError, TransportId, TransportState, TransportType,
32};
33use crate::config::TorConfig;
34use crate::transport::tcp::stream::read_fmp_packet;
35use control::{ControlAuth, TorControlClient, TorMonitoringInfo};
36use stats::TorStats;
37
38use futures::FutureExt;
39use socket2::TcpKeepalive;
40use std::collections::HashMap;
41use std::net::SocketAddr;
42use std::sync::Arc;
43use std::time::Duration;
44use tokio::io::AsyncWriteExt;
45use tokio::net::tcp::OwnedWriteHalf;
46use tokio::net::{TcpListener, TcpStream};
47use tokio::sync::Mutex;
48use tokio::task::JoinHandle;
49use tokio::time::Instant;
50use tokio_socks::tcp::Socks5Stream;
51use tracing::{debug, info, trace, warn};
52
53// ============================================================================
54// Tor Address Types
55// ============================================================================
56
57/// Tor-specific address type for SOCKS5 CONNECT.
58#[derive(Clone, Debug)]
59pub enum TorAddr {
60    /// .onion hidden service address (hostname, port).
61    Onion(String, u16),
62    /// Clearnet address routed through Tor (IP, port).
63    Clearnet(SocketAddr),
64    /// Clearnet hostname routed through Tor (hostname, port).
65    /// Passed as hostname to SOCKS5 so Tor resolves it — avoids local
66    /// DNS leaks and is compatible with SafeSocks 1.
67    ClearnetHostname(String, u16),
68}
69
70/// Parse a TransportAddr string into a TorAddr.
71///
72/// If the address contains ".onion:", parse as an onion address.
73/// If it parses as a numeric IP:port, use Clearnet.
74/// Otherwise, treat as a clearnet hostname:port for Tor-side DNS resolution.
75fn parse_tor_addr(addr: &TransportAddr) -> Result<TorAddr, TransportError> {
76    let s = addr.as_str().ok_or_else(|| {
77        TransportError::InvalidAddress("Tor address must be a valid UTF-8 string".into())
78    })?;
79
80    if s.contains(".onion:") {
81        // Parse "hostname.onion:port"
82        let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
83            TransportError::InvalidAddress(format!("invalid onion address: {}", s))
84        })?;
85        let port: u16 = port_str.parse().map_err(|_| {
86            TransportError::InvalidAddress(format!("invalid port in onion address: {}", s))
87        })?;
88        Ok(TorAddr::Onion(host.to_string(), port))
89    } else if let Ok(socket_addr) = s.parse::<SocketAddr>() {
90        // Numeric IP:port
91        Ok(TorAddr::Clearnet(socket_addr))
92    } else {
93        // Hostname:port — pass through SOCKS5 for Tor-side DNS resolution
94        let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
95            TransportError::InvalidAddress(format!("invalid address (expected host:port): {}", s))
96        })?;
97        let port: u16 = port_str
98            .parse()
99            .map_err(|_| TransportError::InvalidAddress(format!("invalid port: {}", s)))?;
100        if !host.contains('.') {
101            return Err(TransportError::InvalidAddress(format!(
102                "hostname must be fully qualified (contain a dot): {}",
103                host
104            )));
105        }
106        Ok(TorAddr::ClearnetHostname(host.to_string(), port))
107    }
108}
109
110// ============================================================================
111// Connection Pool
112// ============================================================================
113
114/// State for a single Tor connection to a peer.
115struct TorConnection {
116    /// Write half of the split stream.
117    writer: Arc<Mutex<OwnedWriteHalf>>,
118    /// Receive task for this connection.
119    recv_task: JoinHandle<()>,
120    /// MTU for this connection.
121    #[allow(dead_code)]
122    mtu: u16,
123    /// When the connection was established.
124    #[allow(dead_code)]
125    established_at: Instant,
126}
127
128/// Shared connection pool.
129type ConnectionPool = Arc<Mutex<HashMap<TransportAddr, TorConnection>>>;
130
131/// A pending background connection attempt.
132///
133/// Holds the JoinHandle for a spawned SOCKS5 connect task. The task
134/// produces a configured `TcpStream` and MTU on success.
135struct ConnectingEntry {
136    /// Background task performing SOCKS5 connect + socket configuration.
137    task: JoinHandle<Result<(TcpStream, u16), TransportError>>,
138}
139
140/// Map of addresses with background connection attempts in progress.
141type ConnectingPool = Arc<Mutex<HashMap<TransportAddr, ConnectingEntry>>>;
142
143// ============================================================================
144// Tor Transport
145// ============================================================================
146
147/// Tor transport for FIPS.
148///
149/// Provides connection-oriented, reliable byte stream delivery over Tor.
150/// In `socks5` mode, outbound-only through a SOCKS5 proxy. In
151/// `control_port` mode, also manages an onion service for inbound
152/// connections via the Tor control port.
153pub struct TorTransport {
154    /// Unique transport identifier.
155    transport_id: TransportId,
156    /// Optional instance name (for named instances in config).
157    name: Option<String>,
158    /// Configuration.
159    config: TorConfig,
160    /// Current state.
161    state: TransportState,
162    /// Connection pool: addr -> per-connection state.
163    pool: ConnectionPool,
164    /// Pending connection attempts: addr -> background connect task.
165    connecting: ConnectingPool,
166    /// Channel for delivering received packets to Node.
167    packet_tx: PacketTx,
168    /// Transport statistics.
169    stats: Arc<TorStats>,
170    /// Accept loop task handle (active when onion service is running).
171    accept_task: Option<JoinHandle<()>>,
172    /// Onion service hostname (e.g., "abcdef...xyz.onion").
173    /// Set in directory mode from the Tor-managed hostname file.
174    onion_address: Option<String>,
175    /// Control port client (monitoring queries).
176    control_client: Option<Arc<Mutex<TorControlClient>>>,
177    /// Cached Tor daemon monitoring info, updated by background task.
178    cached_monitoring: Arc<std::sync::RwLock<Option<TorMonitoringInfo>>>,
179    /// Background monitoring task handle.
180    monitoring_task: Option<JoinHandle<()>>,
181}
182
183impl TorTransport {
184    /// Create a new Tor transport.
185    pub fn new(
186        transport_id: TransportId,
187        name: Option<String>,
188        config: TorConfig,
189        packet_tx: PacketTx,
190    ) -> Self {
191        Self {
192            transport_id,
193            name,
194            config,
195            state: TransportState::Configured,
196            pool: Arc::new(Mutex::new(HashMap::new())),
197            connecting: Arc::new(Mutex::new(HashMap::new())),
198            packet_tx,
199            stats: Arc::new(TorStats::new()),
200            accept_task: None,
201            onion_address: None,
202            control_client: None,
203            cached_monitoring: Arc::new(std::sync::RwLock::new(None)),
204            monitoring_task: None,
205        }
206    }
207
208    /// Get the instance name (if configured as a named instance).
209    pub fn name(&self) -> Option<&str> {
210        self.name.as_deref()
211    }
212
213    /// Get the onion service address (if active).
214    pub fn onion_address(&self) -> Option<&str> {
215        self.onion_address.as_deref()
216    }
217
218    /// Get the transport statistics.
219    pub fn stats(&self) -> &Arc<TorStats> {
220        &self.stats
221    }
222
223    /// Get the cached Tor daemon monitoring info (if available).
224    pub fn cached_monitoring(&self) -> Option<TorMonitoringInfo> {
225        self.cached_monitoring.read().ok()?.clone()
226    }
227
228    /// Get the Tor transport mode.
229    pub fn mode(&self) -> &str {
230        self.config.mode()
231    }
232
233    /// Start the transport asynchronously.
234    ///
235    /// In `socks5` mode: validates config and transitions to Up.
236    /// In `control_port` mode: also connects to the Tor control port
237    /// and authenticates for monitoring.
238    /// In `directory` mode: reads .onion address from hostname file,
239    /// binds a listener, and spawns an accept loop for inbound.
240    pub async fn start_async(&mut self) -> Result<(), TransportError> {
241        if !self.state.can_start() {
242            return Err(TransportError::AlreadyStarted);
243        }
244
245        self.state = TransportState::Starting;
246
247        // Validate SOCKS5 address format (all modes need it for outbound)
248        let socks5_addr = self.config.socks5_addr().to_string();
249        validate_host_port(&socks5_addr, "socks5_addr")?;
250
251        let mode = self.config.mode().to_string();
252        match mode.as_str() {
253            "socks5" => {
254                // Reject inbound service configs in socks5 mode
255                if self.config.directory_service.is_some() {
256                    return Err(TransportError::StartFailed(
257                        "directory_service config requires mode 'directory', not 'socks5'".into(),
258                    ));
259                }
260                self.state = TransportState::Up;
261            }
262            "control_port" => {
263                self.start_control_port_mode().await?;
264            }
265            "directory" => {
266                self.start_directory_mode().await?;
267            }
268            other => {
269                return Err(TransportError::StartFailed(format!(
270                    "unsupported Tor mode '{}' (expected 'socks5', 'control_port', or 'directory')",
271                    other
272                )));
273            }
274        }
275
276        if let Some(ref name) = self.name {
277            info!(
278                name = %name,
279                mode = %mode,
280                socks5_addr = %socks5_addr,
281                onion_address = ?self.onion_address,
282                mtu = self.config.mtu(),
283                "Tor transport started"
284            );
285        } else {
286            info!(
287                mode = %mode,
288                socks5_addr = %socks5_addr,
289                onion_address = ?self.onion_address,
290                mtu = self.config.mtu(),
291                "Tor transport started"
292            );
293        }
294
295        Ok(())
296    }
297
298    /// Start control_port mode: connect to control port and authenticate
299    /// for monitoring queries.
300    async fn start_control_port_mode(&mut self) -> Result<(), TransportError> {
301        let control_addr = self.config.control_addr().to_string();
302        // Unix socket paths start with / or ./ — skip host:port validation
303        if !control_addr.starts_with('/') && !control_addr.starts_with("./") {
304            validate_host_port(&control_addr, "control_addr")?;
305        }
306
307        // Connect to Tor control port
308        let mut client = TorControlClient::connect(&control_addr)
309            .await
310            .map_err(|e| {
311                self.stats.record_control_error();
312                TransportError::StartFailed(format!("Tor control port: {}", e))
313            })?;
314
315        // Authenticate
316        let auth = ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path())
317            .map_err(|e| TransportError::StartFailed(format!("Tor auth config: {}", e)))?;
318
319        client.authenticate(&auth).await.map_err(|e| {
320            self.stats.record_control_error();
321            TransportError::StartFailed(format!("Tor authentication: {}", e))
322        })?;
323
324        // Store control client (used for monitoring queries)
325        self.control_client = Some(Arc::new(Mutex::new(client)));
326        self.state = TransportState::Up;
327        self.spawn_monitoring_task();
328
329        Ok(())
330    }
331
332    /// Start directory mode: read .onion address from Tor-managed hostname
333    /// file, bind a local listener, and spawn the accept loop.
334    ///
335    /// In directory mode, Tor manages the onion service via `HiddenServiceDir`
336    /// in torrc. No control port connection is needed. This enables Tor's
337    /// `Sandbox 1` mode (strongest single hardening option).
338    async fn start_directory_mode(&mut self) -> Result<(), TransportError> {
339        let dir_config = self.config.directory_service.clone().unwrap_or_default();
340
341        // Read .onion address from Tor-managed hostname file
342        let hostname_file = dir_config.hostname_file();
343        let onion_addr = std::fs::read_to_string(hostname_file)
344            .map_err(|e| {
345                TransportError::StartFailed(format!(
346                    "failed to read onion hostname from '{}': {} \
347                     (ensure HiddenServiceDir is configured in torrc and Tor has started)",
348                    hostname_file, e
349                ))
350            })?
351            .trim()
352            .to_string();
353
354        if onion_addr.is_empty() || !onion_addr.ends_with(".onion") {
355            return Err(TransportError::StartFailed(format!(
356                "invalid onion address in '{}': '{}'",
357                hostname_file, onion_addr
358            )));
359        }
360
361        self.onion_address = Some(onion_addr.clone());
362
363        // Bind local listener (must match HiddenServicePort target in torrc)
364        let bind_addr = dir_config.bind_addr();
365        let listener = TcpListener::bind(bind_addr).await.map_err(|e| {
366            TransportError::StartFailed(format!(
367                "failed to bind directory-mode listener on {}: {}",
368                bind_addr, e
369            ))
370        })?;
371        let local_addr = listener
372            .local_addr()
373            .map_err(|e| TransportError::StartFailed(format!("failed to get local addr: {}", e)))?;
374
375        info!(
376            onion_address = %onion_addr,
377            local_addr = %local_addr,
378            hostname_file = %hostname_file,
379            "Directory-mode onion service active"
380        );
381
382        // Spawn accept loop (same as control_port mode)
383        let transport_id = self.transport_id;
384        let packet_tx = self.packet_tx.clone();
385        let pool = self.pool.clone();
386        let mtu = self.config.mtu();
387        let max_inbound = self.config.max_inbound_connections();
388        let stats = self.stats.clone();
389
390        let accept_handle = tokio::spawn(async move {
391            tor_accept_loop(
392                listener,
393                transport_id,
394                packet_tx,
395                pool,
396                mtu,
397                max_inbound,
398                stats,
399            )
400            .await;
401        });
402
403        self.accept_task = Some(accept_handle);
404        self.state = TransportState::Up;
405
406        // Optionally connect to control port for monitoring (non-fatal)
407        if self.config.control_addr.is_some() {
408            self.try_connect_control_port().await;
409        }
410
411        Ok(())
412    }
413
414    /// Attempt to connect to the Tor control port for monitoring.
415    /// Non-fatal: logs a warning on failure and continues without monitoring.
416    async fn try_connect_control_port(&mut self) {
417        let control_addr = self.config.control_addr().to_string();
418        if !control_addr.starts_with('/')
419            && !control_addr.starts_with("./")
420            && let Err(e) = validate_host_port(&control_addr, "control_addr")
421        {
422            warn!(
423                transport_id = %self.transport_id,
424                error = %e,
425                "Tor control port address invalid, monitoring disabled"
426            );
427            return;
428        }
429
430        let client = match TorControlClient::connect(&control_addr).await {
431            Ok(c) => c,
432            Err(e) => {
433                warn!(
434                    transport_id = %self.transport_id,
435                    addr = %control_addr,
436                    error = %e,
437                    "Tor control port connect failed, monitoring disabled"
438                );
439                return;
440            }
441        };
442
443        let auth =
444            match ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path()) {
445                Ok(a) => a,
446                Err(e) => {
447                    warn!(
448                        transport_id = %self.transport_id,
449                        error = %e,
450                        "Tor control auth config error, monitoring disabled"
451                    );
452                    return;
453                }
454            };
455
456        let mut client = client;
457        if let Err(e) = client.authenticate(&auth).await {
458            warn!(
459                transport_id = %self.transport_id,
460                error = %e,
461                "Tor control port auth failed, monitoring disabled"
462            );
463            return;
464        }
465
466        info!(
467            transport_id = %self.transport_id,
468            addr = %control_addr,
469            "Tor control port connected (monitoring enabled)"
470        );
471
472        self.control_client = Some(Arc::new(Mutex::new(client)));
473        self.spawn_monitoring_task();
474    }
475
476    /// Stop the transport asynchronously.
477    ///
478    /// Aborts the accept loop (if running), closes all connections,
479    /// and transitions to Down.
480    pub async fn stop_async(&mut self) -> Result<(), TransportError> {
481        if !self.state.is_operational() {
482            return Err(TransportError::NotStarted);
483        }
484
485        // Abort accept loop (if running)
486        if let Some(task) = self.accept_task.take() {
487            task.abort();
488            let _ = task.await;
489            debug!(
490                transport_id = %self.transport_id,
491                "Onion service accept loop stopped"
492            );
493        }
494
495        // Abort monitoring task (if running)
496        if let Some(task) = self.monitoring_task.take() {
497            task.abort();
498            let _ = task.await;
499        }
500        if let Ok(mut w) = self.cached_monitoring.write() {
501            *w = None;
502        }
503
504        self.control_client = None;
505        self.onion_address = None;
506
507        // Abort pending connection attempts
508        let mut connecting = self.connecting.lock().await;
509        for (addr, entry) in connecting.drain() {
510            entry.task.abort();
511            debug!(
512                transport_id = %self.transport_id,
513                remote_addr = %addr,
514                "Tor connect aborted (transport stopping)"
515            );
516        }
517        drop(connecting);
518
519        // Close all connections
520        let mut pool = self.pool.lock().await;
521        for (addr, conn) in pool.drain() {
522            conn.recv_task.abort();
523            let _ = conn.recv_task.await;
524            debug!(
525                transport_id = %self.transport_id,
526                remote_addr = %addr,
527                "Tor connection closed (transport stopping)"
528            );
529        }
530        drop(pool);
531
532        self.state = TransportState::Down;
533
534        info!(
535            transport_id = %self.transport_id,
536            "Tor transport stopped"
537        );
538
539        Ok(())
540    }
541
542    /// Spawn a background task that periodically queries the Tor control
543    /// port for daemon status and caches the result.
544    fn spawn_monitoring_task(&mut self) {
545        let Some(client) = self.control_client.clone() else {
546            return;
547        };
548        let cache = self.cached_monitoring.clone();
549        let stats = self.stats.clone();
550        let transport_id = self.transport_id;
551
552        let handle = tokio::spawn(async move {
553            let mut interval = tokio::time::interval(Duration::from_secs(10));
554            let mut last_bootstrap: u8 = 0;
555            let mut last_liveness = String::new();
556            let mut was_dormant = false;
557            let mut stall_warned = false;
558            let started_at = Instant::now();
559
560            loop {
561                interval.tick().await;
562                let mut guard = client.lock().await;
563                match guard.monitoring_snapshot().await {
564                    Ok(info) => {
565                        // Log bootstrap milestones
566                        for &milestone in &[25u8, 50, 75, 100] {
567                            if info.bootstrap >= milestone && last_bootstrap < milestone {
568                                info!(
569                                    transport_id = %transport_id,
570                                    bootstrap = info.bootstrap,
571                                    "Tor bootstrap {}%",
572                                    milestone
573                                );
574                            }
575                        }
576
577                        // Bootstrap stall warning
578                        if info.bootstrap < 100
579                            && started_at.elapsed() > Duration::from_secs(60)
580                            && !stall_warned
581                        {
582                            warn!(
583                                transport_id = %transport_id,
584                                bootstrap = info.bootstrap,
585                                "Tor bootstrap stalled — not at 100% after 60s"
586                            );
587                            stall_warned = true;
588                        }
589                        if info.bootstrap == 100 {
590                            stall_warned = false;
591                        }
592
593                        last_bootstrap = info.bootstrap;
594
595                        // Network liveness transitions
596                        if !last_liveness.is_empty() && info.network_liveness != last_liveness {
597                            warn!(
598                                transport_id = %transport_id,
599                                from = %last_liveness,
600                                to = %info.network_liveness,
601                                "Tor network liveness changed"
602                            );
603                        }
604                        last_liveness = info.network_liveness.clone();
605
606                        // Dormant mode entry
607                        if info.dormant && !was_dormant {
608                            warn!(
609                                transport_id = %transport_id,
610                                "Tor daemon entered dormant mode"
611                            );
612                        }
613                        was_dormant = info.dormant;
614
615                        if let Ok(mut w) = cache.write() {
616                            *w = Some(info);
617                        }
618                    }
619                    Err(e) => {
620                        stats.record_control_error();
621                        warn!(
622                            transport_id = %transport_id,
623                            error = %e,
624                            "Tor monitoring query failed"
625                        );
626                    }
627                }
628            }
629        });
630
631        self.monitoring_task = Some(handle);
632    }
633
634    /// Send a packet asynchronously.
635    ///
636    /// If no connection exists to the given address, performs connect-on-send:
637    /// establishes a new connection through the SOCKS5 proxy, configures
638    /// socket options, splits the stream, spawns a receive task, and stores
639    /// the connection in the pool.
640    pub async fn send_async(
641        &self,
642        addr: &TransportAddr,
643        data: &[u8],
644    ) -> Result<usize, TransportError> {
645        if !self.state.is_operational() {
646            return Err(TransportError::NotStarted);
647        }
648
649        // Pre-send MTU check
650        let mtu = self.config.mtu() as usize;
651        if data.len() > mtu {
652            self.stats.record_mtu_exceeded();
653            return Err(TransportError::MtuExceeded {
654                packet_size: data.len(),
655                mtu: self.config.mtu(),
656            });
657        }
658
659        // Get or create connection
660        let writer = {
661            let pool = self.pool.lock().await;
662            pool.get(addr).map(|c| c.writer.clone())
663        };
664
665        let writer = match writer {
666            Some(w) => w,
667            None => {
668                // Connect-on-send
669                self.connect(addr).await?
670            }
671        };
672
673        // Write packet directly (no framing transformation needed)
674        let mut w = writer.lock().await;
675        match w.write_all(data).await {
676            Ok(()) => {
677                self.stats.record_send(data.len());
678                trace!(
679                    transport_id = %self.transport_id,
680                    remote_addr = %addr,
681                    bytes = data.len(),
682                    "Tor packet sent"
683                );
684                Ok(data.len())
685            }
686            Err(e) => {
687                self.stats.record_send_error();
688                drop(w);
689                // Remove failed connection from pool
690                let mut pool = self.pool.lock().await;
691                if let Some(conn) = pool.remove(addr) {
692                    conn.recv_task.abort();
693                }
694                Err(TransportError::SendFailed(format!("{}", e)))
695            }
696        }
697    }
698
699    /// Establish a new connection through the SOCKS5 proxy.
700    ///
701    /// Performs SOCKS5 CONNECT to the target via the proxy, configures
702    /// socket options, splits the stream, spawns a receive task, and
703    /// stores in the pool.
704    async fn connect(
705        &self,
706        addr: &TransportAddr,
707    ) -> Result<Arc<Mutex<OwnedWriteHalf>>, TransportError> {
708        let tor_addr = parse_tor_addr(addr)?;
709        let proxy_addr = self.config.socks5_addr();
710        let timeout_ms = self.config.connect_timeout_ms();
711
712        debug!(
713            transport_id = %self.transport_id,
714            remote_addr = %addr,
715            proxy = %proxy_addr,
716            timeout_secs = timeout_ms / 1000,
717            "Connecting via Tor SOCKS5"
718        );
719
720        // SOCKS5 CONNECT through proxy with timeout.
721        // Uses username/password auth for stream isolation: each destination
722        // gets its own Tor circuit via IsolateSOCKSAuth. The credentials are
723        // not verified by Tor — they serve purely as circuit isolation keys.
724        let isolation_key = addr.to_string();
725        let connect_start = Instant::now();
726        let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
727            match &tor_addr {
728                TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
729                    Socks5Stream::connect_with_password(
730                        proxy_addr,
731                        (host.as_str(), *port),
732                        "fips",
733                        &isolation_key,
734                    )
735                    .await
736                }
737                TorAddr::Clearnet(socket_addr) => {
738                    Socks5Stream::connect_with_password(
739                        proxy_addr,
740                        *socket_addr,
741                        "fips",
742                        &isolation_key,
743                    )
744                    .await
745                }
746            }
747        })
748        .await;
749
750        let stream = match socks_result {
751            Ok(Ok(socks_stream)) => socks_stream.into_inner(),
752            Ok(Err(e)) => {
753                self.stats.record_socks5_error();
754                warn!(
755                    transport_id = %self.transport_id,
756                    remote_addr = %addr,
757                    error = %e,
758                    elapsed_secs = connect_start.elapsed().as_secs(),
759                    "Tor SOCKS5 connection failed"
760                );
761                return Err(TransportError::ConnectionRefused);
762            }
763            Err(_) => {
764                self.stats.record_connect_timeout();
765                warn!(
766                    transport_id = %self.transport_id,
767                    remote_addr = %addr,
768                    timeout_secs = timeout_ms / 1000,
769                    "Tor SOCKS5 connection timed out"
770                );
771                return Err(TransportError::Timeout);
772            }
773        };
774
775        // Configure socket options via socket2
776        let std_stream = stream
777            .into_std()
778            .map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
779        configure_socket(&std_stream, &self.config)?;
780
781        // Convert back to tokio
782        let stream = TcpStream::from_std(std_stream)
783            .map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
784
785        // Split and spawn receive task
786        let (read_half, write_half) = stream.into_split();
787        let writer = Arc::new(Mutex::new(write_half));
788
789        let transport_id = self.transport_id;
790        let packet_tx = self.packet_tx.clone();
791        let pool = self.pool.clone();
792        let recv_stats = self.stats.clone();
793        let remote_addr = addr.clone();
794        let mtu = self.config.mtu();
795
796        let recv_task = tokio::spawn(async move {
797            tor_receive_loop(
798                read_half,
799                transport_id,
800                remote_addr.clone(),
801                packet_tx,
802                pool,
803                mtu,
804                recv_stats,
805            )
806            .await;
807        });
808
809        let conn = TorConnection {
810            writer: writer.clone(),
811            recv_task,
812            mtu,
813            established_at: Instant::now(),
814        };
815
816        let mut pool = self.pool.lock().await;
817        pool.insert(addr.clone(), conn);
818
819        self.stats.record_connection_established();
820
821        info!(
822            transport_id = %self.transport_id,
823            remote_addr = %addr,
824            elapsed_secs = connect_start.elapsed().as_secs(),
825            "Tor circuit established via SOCKS5"
826        );
827
828        Ok(writer)
829    }
830
831    /// Initiate a non-blocking connection to a remote address.
832    ///
833    /// Spawns a background task that performs SOCKS5 connect with timeout,
834    /// configures socket options, and returns the configured stream. The
835    /// connection becomes available for `send_async()` once the task
836    /// completes successfully.
837    ///
838    /// Poll `connection_state_sync()` to check progress.
839    pub async fn connect_async(&self, addr: &TransportAddr) -> Result<(), TransportError> {
840        if !self.state.is_operational() {
841            return Err(TransportError::NotStarted);
842        }
843
844        // Already established?
845        {
846            let pool = self.pool.lock().await;
847            if pool.contains_key(addr) {
848                return Ok(());
849            }
850        }
851
852        // Already connecting?
853        {
854            let connecting = self.connecting.lock().await;
855            if connecting.contains_key(addr) {
856                return Ok(());
857            }
858        }
859
860        let tor_addr = parse_tor_addr(addr)?;
861        let proxy_addr = self.config.socks5_addr().to_string();
862        let timeout_ms = self.config.connect_timeout_ms();
863        let transport_id = self.transport_id;
864        let remote_addr = addr.clone();
865        let config = self.config.clone();
866
867        debug!(
868            transport_id = %transport_id,
869            remote_addr = %remote_addr,
870            timeout_ms,
871            "Initiating background Tor SOCKS5 connect"
872        );
873
874        // Stream isolation key for this destination
875        let isolation_key = addr.to_string();
876
877        let task = tokio::spawn(async move {
878            // SOCKS5 CONNECT through proxy with timeout.
879            // Uses username/password auth for stream isolation (see connect()).
880            let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
881                match &tor_addr {
882                    TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
883                        Socks5Stream::connect_with_password(
884                            proxy_addr.as_str(),
885                            (host.as_str(), *port),
886                            "fips",
887                            &isolation_key,
888                        )
889                        .await
890                    }
891                    TorAddr::Clearnet(socket_addr) => {
892                        Socks5Stream::connect_with_password(
893                            proxy_addr.as_str(),
894                            *socket_addr,
895                            "fips",
896                            &isolation_key,
897                        )
898                        .await
899                    }
900                }
901            })
902            .await;
903
904            let stream = match socks_result {
905                Ok(Ok(socks_stream)) => socks_stream.into_inner(),
906                Ok(Err(e)) => {
907                    debug!(
908                        transport_id = %transport_id,
909                        remote_addr = %remote_addr,
910                        error = %e,
911                        "Background Tor SOCKS5 connect failed"
912                    );
913                    return Err(TransportError::ConnectionRefused);
914                }
915                Err(_) => {
916                    debug!(
917                        transport_id = %transport_id,
918                        remote_addr = %remote_addr,
919                        "Background Tor SOCKS5 connect timed out"
920                    );
921                    return Err(TransportError::Timeout);
922                }
923            };
924
925            // Configure socket options via socket2
926            let std_stream = stream
927                .into_std()
928                .map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
929            configure_socket(&std_stream, &config)?;
930
931            let mtu = config.mtu();
932
933            // Convert back to tokio
934            let stream = TcpStream::from_std(std_stream)
935                .map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
936
937            Ok((stream, mtu))
938        });
939
940        let mut connecting = self.connecting.lock().await;
941        connecting.insert(addr.clone(), ConnectingEntry { task });
942
943        Ok(())
944    }
945
946    /// Query the state of a connection to a remote address.
947    ///
948    /// Checks both established and connecting pools. If a background
949    /// connect task has completed, promotes it to the established pool
950    /// (spawning a receive loop) or reports the failure.
951    ///
952    /// This method is synchronous but uses `try_lock` internally.
953    /// Returns `ConnectionState::Connecting` if locks can't be acquired.
954    pub fn connection_state_sync(&self, addr: &TransportAddr) -> ConnectionState {
955        // Check established pool first
956        if let Ok(pool) = self.pool.try_lock() {
957            if pool.contains_key(addr) {
958                return ConnectionState::Connected;
959            }
960        } else {
961            return ConnectionState::Connecting; // can't tell, assume still going
962        }
963
964        // Check connecting pool
965        let mut connecting = match self.connecting.try_lock() {
966            Ok(c) => c,
967            Err(_) => return ConnectionState::Connecting,
968        };
969
970        let entry = match connecting.get_mut(addr) {
971            Some(e) => e,
972            None => return ConnectionState::None,
973        };
974
975        // Check if the background task has completed
976        if !entry.task.is_finished() {
977            return ConnectionState::Connecting;
978        }
979
980        // Task is done — take the result and remove from connecting pool.
981        let addr_clone = addr.clone();
982        let task = connecting.remove(&addr_clone).unwrap().task;
983
984        // Since the task is finished, we can safely poll it with now_or_never.
985        match task.now_or_never() {
986            Some(Ok(Ok((stream, mtu)))) => {
987                // Promote to established pool
988                self.promote_connection(addr, stream, mtu);
989                ConnectionState::Connected
990            }
991            Some(Ok(Err(e))) => ConnectionState::Failed(format!("{}", e)),
992            Some(Err(e)) => {
993                // JoinError (panic or cancel)
994                ConnectionState::Failed(format!("task failed: {}", e))
995            }
996            None => {
997                // Shouldn't happen since is_finished() was true
998                ConnectionState::Connecting
999            }
1000        }
1001    }
1002
1003    /// Promote a completed background connection to the established pool.
1004    ///
1005    /// Splits the stream, spawns a receive loop, and inserts into the pool.
1006    /// Called from `connection_state_sync()` when a background task completes.
1007    fn promote_connection(&self, addr: &TransportAddr, stream: TcpStream, mtu: u16) {
1008        let (read_half, write_half) = stream.into_split();
1009        let writer = Arc::new(Mutex::new(write_half));
1010
1011        let transport_id = self.transport_id;
1012        let packet_tx = self.packet_tx.clone();
1013        let pool = self.pool.clone();
1014        let recv_stats = self.stats.clone();
1015        let remote_addr = addr.clone();
1016
1017        let recv_task = tokio::spawn(async move {
1018            tor_receive_loop(
1019                read_half,
1020                transport_id,
1021                remote_addr.clone(),
1022                packet_tx,
1023                pool,
1024                mtu,
1025                recv_stats,
1026            )
1027            .await;
1028        });
1029
1030        let conn = TorConnection {
1031            writer,
1032            recv_task,
1033            mtu,
1034            established_at: Instant::now(),
1035        };
1036
1037        // Use try_lock since we're in a sync context and the pool
1038        // should be available (connection_state_sync already checked it)
1039        if let Ok(mut pool) = self.pool.try_lock() {
1040            pool.insert(addr.clone(), conn);
1041            self.stats.record_connection_established();
1042            debug!(
1043                transport_id = %self.transport_id,
1044                remote_addr = %addr,
1045                "Tor connection established (background connect)"
1046            );
1047        } else {
1048            // Pool locked — abort the recv task, connection will be retried
1049            conn.recv_task.abort();
1050            warn!(
1051                transport_id = %self.transport_id,
1052                remote_addr = %addr,
1053                "Failed to promote Tor connection (pool locked)"
1054            );
1055        }
1056    }
1057
1058    /// Close a specific connection asynchronously.
1059    pub async fn close_connection_async(&self, addr: &TransportAddr) {
1060        let mut pool = self.pool.lock().await;
1061        if let Some(conn) = pool.remove(addr) {
1062            conn.recv_task.abort();
1063            debug!(
1064                transport_id = %self.transport_id,
1065                remote_addr = %addr,
1066                "Tor connection closed"
1067            );
1068        }
1069    }
1070}
1071
1072impl Transport for TorTransport {
1073    fn transport_id(&self) -> TransportId {
1074        self.transport_id
1075    }
1076
1077    fn transport_type(&self) -> &TransportType {
1078        &TransportType::TOR
1079    }
1080
1081    fn state(&self) -> TransportState {
1082        self.state
1083    }
1084
1085    fn mtu(&self) -> u16 {
1086        self.config.mtu()
1087    }
1088
1089    fn link_mtu(&self, _addr: &TransportAddr) -> u16 {
1090        self.config.mtu()
1091    }
1092
1093    fn start(&mut self) -> Result<(), TransportError> {
1094        Err(TransportError::NotSupported(
1095            "use start_async() for Tor transport".into(),
1096        ))
1097    }
1098
1099    fn stop(&mut self) -> Result<(), TransportError> {
1100        Err(TransportError::NotSupported(
1101            "use stop_async() for Tor transport".into(),
1102        ))
1103    }
1104
1105    fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
1106        Err(TransportError::NotSupported(
1107            "use send_async() for Tor transport".into(),
1108        ))
1109    }
1110
1111    fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
1112        Ok(Vec::new())
1113    }
1114
1115    fn accept_connections(&self) -> bool {
1116        self.onion_address.is_some()
1117    }
1118}
1119
1120// ============================================================================
1121// Receive Loop (per-connection)
1122// ============================================================================
1123
1124/// Per-connection Tor receive loop.
1125///
1126/// Reads complete FMP packets using the stream reader, delivers them to
1127/// the node via the packet channel. On error or EOF, removes the
1128/// connection from the pool and exits.
1129async fn tor_receive_loop(
1130    mut reader: tokio::net::tcp::OwnedReadHalf,
1131    transport_id: TransportId,
1132    remote_addr: TransportAddr,
1133    packet_tx: PacketTx,
1134    pool: ConnectionPool,
1135    mtu: u16,
1136    stats: Arc<TorStats>,
1137) {
1138    debug!(
1139        transport_id = %transport_id,
1140        remote_addr = %remote_addr,
1141        "Tor receive loop starting"
1142    );
1143
1144    loop {
1145        match read_fmp_packet(&mut reader, mtu).await {
1146            Ok(data) => {
1147                stats.record_recv(data.len());
1148
1149                trace!(
1150                    transport_id = %transport_id,
1151                    remote_addr = %remote_addr,
1152                    bytes = data.len(),
1153                    "Tor packet received"
1154                );
1155
1156                let packet = ReceivedPacket::new(transport_id, remote_addr.clone(), data);
1157
1158                if packet_tx.send(packet).is_err() {
1159                    debug!(
1160                        transport_id = %transport_id,
1161                        "Packet channel closed, stopping Tor receive loop"
1162                    );
1163                    break;
1164                }
1165            }
1166            Err(e) => {
1167                stats.record_recv_error();
1168                debug!(
1169                    transport_id = %transport_id,
1170                    remote_addr = %remote_addr,
1171                    error = %e,
1172                    "Tor receive error, removing connection"
1173                );
1174                break;
1175            }
1176        }
1177    }
1178
1179    // Clean up: remove ourselves from the pool
1180    let mut pool_guard = pool.lock().await;
1181    pool_guard.remove(&remote_addr);
1182
1183    debug!(
1184        transport_id = %transport_id,
1185        remote_addr = %remote_addr,
1186        "Tor receive loop stopped"
1187    );
1188}
1189
1190// ============================================================================
1191// Socket Configuration
1192// ============================================================================
1193
1194/// Configure socket options on a SOCKS5-connected stream.
1195///
1196/// Sets TCP_NODELAY and keepalive on the underlying TCP connection.
1197fn configure_socket(
1198    stream: &std::net::TcpStream,
1199    _config: &TorConfig,
1200) -> Result<(), TransportError> {
1201    let socket = socket2::SockRef::from(stream);
1202
1203    // TCP_NODELAY — always enable for FIPS (latency-sensitive protocol messages)
1204    socket
1205        .set_tcp_nodelay(true)
1206        .map_err(|e| TransportError::StartFailed(format!("set nodelay: {}", e)))?;
1207
1208    // TCP keepalive (30s default, matching TCP transport)
1209    let keepalive_secs = 30u64;
1210    if keepalive_secs > 0 {
1211        let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(keepalive_secs));
1212        socket
1213            .set_tcp_keepalive(&keepalive)
1214            .map_err(|e| TransportError::StartFailed(format!("set keepalive: {}", e)))?;
1215    }
1216
1217    Ok(())
1218}
1219
1220// ============================================================================
1221// Accept Loop (onion service inbound)
1222// ============================================================================
1223
1224/// Accept loop for inbound onion service connections.
1225///
1226/// Mirrors the TCP transport's accept loop. Tor forwards inbound
1227/// connections to a local TCP listener; we accept them, configure
1228/// socket options, split the stream, and spawn a per-connection
1229/// receive task.
1230async fn tor_accept_loop(
1231    listener: TcpListener,
1232    transport_id: TransportId,
1233    packet_tx: PacketTx,
1234    pool: ConnectionPool,
1235    mtu: u16,
1236    max_inbound: usize,
1237    stats: Arc<TorStats>,
1238) {
1239    debug!(
1240        transport_id = %transport_id,
1241        "Onion service accept loop starting"
1242    );
1243
1244    loop {
1245        let (stream, peer_addr) = match listener.accept().await {
1246            Ok(result) => result,
1247            Err(e) => {
1248                warn!(
1249                    transport_id = %transport_id,
1250                    error = %e,
1251                    "Onion service accept error"
1252                );
1253                continue;
1254            }
1255        };
1256
1257        // Check inbound connection limit
1258        let current_count = {
1259            let pool_guard = pool.lock().await;
1260            pool_guard.len()
1261        };
1262        if current_count >= max_inbound {
1263            stats.record_connection_rejected();
1264            debug!(
1265                transport_id = %transport_id,
1266                peer_addr = %peer_addr,
1267                max_inbound,
1268                "Rejecting inbound onion connection (limit reached)"
1269            );
1270            drop(stream);
1271            continue;
1272        }
1273
1274        // Configure socket options on the accepted stream
1275        let std_stream = match stream.into_std() {
1276            Ok(s) => s,
1277            Err(e) => {
1278                warn!(
1279                    transport_id = %transport_id,
1280                    error = %e,
1281                    "Failed to convert accepted stream to std"
1282                );
1283                continue;
1284            }
1285        };
1286
1287        let socket = socket2::SockRef::from(&std_stream);
1288        let _ = socket.set_tcp_nodelay(true);
1289        let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(30));
1290        let _ = socket.set_tcp_keepalive(&keepalive);
1291
1292        let stream = match TcpStream::from_std(std_stream) {
1293            Ok(s) => s,
1294            Err(e) => {
1295                warn!(
1296                    transport_id = %transport_id,
1297                    error = %e,
1298                    "Failed to convert accepted stream back to tokio"
1299                );
1300                continue;
1301            }
1302        };
1303
1304        let remote_addr = TransportAddr::from_string(&peer_addr.to_string());
1305
1306        // Split stream and spawn receive task
1307        let (read_half, write_half) = stream.into_split();
1308        let writer = Arc::new(Mutex::new(write_half));
1309
1310        let recv_pool = pool.clone();
1311        let recv_stats = stats.clone();
1312        let recv_addr = remote_addr.clone();
1313        let recv_tx = packet_tx.clone();
1314
1315        let recv_task = tokio::spawn(async move {
1316            tor_receive_loop(
1317                read_half,
1318                transport_id,
1319                recv_addr,
1320                recv_tx,
1321                recv_pool,
1322                mtu,
1323                recv_stats,
1324            )
1325            .await;
1326        });
1327
1328        let conn = TorConnection {
1329            writer,
1330            recv_task,
1331            mtu,
1332            established_at: Instant::now(),
1333        };
1334
1335        {
1336            let mut pool_guard = pool.lock().await;
1337            pool_guard.insert(remote_addr.clone(), conn);
1338        }
1339
1340        stats.record_connection_accepted();
1341
1342        debug!(
1343            transport_id = %transport_id,
1344            peer_addr = %peer_addr,
1345            "Accepted inbound onion connection"
1346        );
1347    }
1348}
1349
1350// ============================================================================
1351// Helpers
1352// ============================================================================
1353
1354/// Validate that a string is in host:port format.
1355fn validate_host_port(addr: &str, field_name: &str) -> Result<(), TransportError> {
1356    if addr.parse::<SocketAddr>().is_ok() {
1357        return Ok(());
1358    }
1359    // Not a raw IP:port — check it's at least host:port format
1360    let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
1361    if parts.len() != 2 || parts[0].parse::<u16>().is_err() || parts[1].is_empty() {
1362        return Err(TransportError::StartFailed(format!(
1363            "invalid {} '{}': expected host:port or IP:port",
1364            field_name, addr
1365        )));
1366    }
1367    Ok(())
1368}
1369
1370// ============================================================================
1371// Tests
1372// ============================================================================
1373
1374#[cfg(test)]
1375mod tests {
1376    use super::*;
1377    use crate::transport::packet_channel;
1378
1379    fn make_config() -> TorConfig {
1380        TorConfig {
1381            socks5_addr: Some("127.0.0.1:19050".to_string()),
1382            ..Default::default()
1383        }
1384    }
1385
1386    #[test]
1387    fn test_parse_tor_addr_onion() {
1388        let addr = TransportAddr::from_string("abcdef1234567890.onion:2121");
1389        let tor_addr = parse_tor_addr(&addr).unwrap();
1390        match tor_addr {
1391            TorAddr::Onion(host, port) => {
1392                assert_eq!(host, "abcdef1234567890.onion");
1393                assert_eq!(port, 2121);
1394            }
1395            _ => panic!("expected Onion variant"),
1396        }
1397    }
1398
1399    #[test]
1400    fn test_parse_tor_addr_clearnet() {
1401        let addr = TransportAddr::from_string("192.168.1.1:8080");
1402        let tor_addr = parse_tor_addr(&addr).unwrap();
1403        match tor_addr {
1404            TorAddr::Clearnet(socket_addr) => {
1405                assert_eq!(
1406                    socket_addr,
1407                    "192.168.1.1:8080".parse::<SocketAddr>().unwrap()
1408                );
1409            }
1410            _ => panic!("expected Clearnet variant"),
1411        }
1412    }
1413
1414    #[test]
1415    fn test_parse_tor_addr_clearnet_hostname() {
1416        let addr = TransportAddr::from_string("peer1.example.com:2121");
1417        let tor_addr = parse_tor_addr(&addr).unwrap();
1418        match tor_addr {
1419            TorAddr::ClearnetHostname(host, port) => {
1420                assert_eq!(host, "peer1.example.com");
1421                assert_eq!(port, 2121);
1422            }
1423            _ => panic!("expected ClearnetHostname variant"),
1424        }
1425    }
1426
1427    #[test]
1428    fn test_parse_tor_addr_invalid() {
1429        // Bare name without a dot — not a valid hostname
1430        let addr = TransportAddr::from_string("localhost:2121");
1431        assert!(parse_tor_addr(&addr).is_err());
1432
1433        // No port
1434        let addr = TransportAddr::from_string("not-a-valid-address");
1435        assert!(parse_tor_addr(&addr).is_err());
1436
1437        // Invalid port
1438        let addr = TransportAddr::from_string("example.com:notaport");
1439        assert!(parse_tor_addr(&addr).is_err());
1440    }
1441
1442    #[test]
1443    fn test_config_defaults() {
1444        let config = TorConfig::default();
1445        assert_eq!(config.mode(), "socks5");
1446        assert_eq!(config.socks5_addr(), "127.0.0.1:9050");
1447        assert_eq!(config.connect_timeout_ms(), 120000);
1448        assert_eq!(config.mtu(), 1400);
1449        assert_eq!(config.advertised_port(), 443);
1450    }
1451
1452    #[test]
1453    fn test_advertised_port_override() {
1454        let config = TorConfig {
1455            advertised_port: Some(9001),
1456            ..Default::default()
1457        };
1458        assert_eq!(config.advertised_port(), 9001);
1459    }
1460
1461    /// Pins the publisher/parser contract for Tor overlay adverts.
1462    /// `build_overlay_advert` formats Tor endpoints as `<onion>:<port>`;
1463    /// `parse_tor_addr` must accept that exact form back. A bare onion
1464    /// (no port) was the production bug — assert it does not parse.
1465    #[test]
1466    fn test_advert_address_round_trips_through_parser() {
1467        let onion = "mwvj6q3pnsiaky7i6wg5s42xlfurt5uqr3qzckrlw2graa2ugcgwhiqd.onion";
1468        let cfg = TorConfig::default();
1469        let advertised = format!("{}:{}", onion, cfg.advertised_port());
1470
1471        let parsed = parse_tor_addr(&TransportAddr::from_string(&advertised)).unwrap();
1472        match parsed {
1473            TorAddr::Onion(host, port) => {
1474                assert_eq!(host, onion);
1475                assert_eq!(port, 443);
1476            }
1477            other => panic!("expected Onion variant, got {:?}", other),
1478        }
1479
1480        // Sanity-check the inverse: the bare-onion form (the bug) must
1481        // not parse, so any future regression in the publisher will be
1482        // caught by the round-trip test above.
1483        assert!(parse_tor_addr(&TransportAddr::from_string(onion)).is_err());
1484    }
1485
1486    #[tokio::test]
1487    async fn test_start_stop() {
1488        let (tx, _rx) = packet_channel(32);
1489        let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1490
1491        transport.start_async().await.unwrap();
1492        assert_eq!(transport.state(), TransportState::Up);
1493
1494        transport.stop_async().await.unwrap();
1495        assert_eq!(transport.state(), TransportState::Down);
1496    }
1497
1498    #[tokio::test]
1499    async fn test_double_start_fails() {
1500        let (tx, _rx) = packet_channel(32);
1501        let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1502
1503        transport.start_async().await.unwrap();
1504        assert!(transport.start_async().await.is_err());
1505    }
1506
1507    #[tokio::test]
1508    async fn test_stop_not_started_fails() {
1509        let (tx, _rx) = packet_channel(32);
1510        let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1511
1512        assert!(transport.stop_async().await.is_err());
1513    }
1514
1515    #[tokio::test]
1516    async fn test_send_not_started() {
1517        let (tx, _rx) = packet_channel(32);
1518        let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1519
1520        let addr = TransportAddr::from_string("127.0.0.1:2121");
1521        let result = transport.send_async(&addr, &[0u8; 10]).await;
1522        assert!(result.is_err());
1523    }
1524
1525    #[test]
1526    fn test_transport_type() {
1527        let (tx, _rx) = packet_channel(32);
1528        let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1529
1530        let tt = transport.transport_type();
1531        assert_eq!(tt.name, "tor");
1532        assert!(tt.connection_oriented);
1533        assert!(tt.reliable);
1534    }
1535
1536    #[test]
1537    fn test_sync_methods_return_not_supported() {
1538        let (tx, _rx) = packet_channel(32);
1539        let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1540
1541        assert!(transport.start().is_err());
1542        assert!(transport.stop().is_err());
1543        let addr = TransportAddr::from_string("127.0.0.1:2121");
1544        assert!(transport.send(&addr, &[0u8; 10]).is_err());
1545    }
1546
1547    #[test]
1548    fn test_accept_connections_false() {
1549        let (tx, _rx) = packet_channel(32);
1550        let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1551
1552        assert!(!transport.accept_connections());
1553    }
1554
1555    #[test]
1556    fn test_discover_returns_empty() {
1557        let (tx, _rx) = packet_channel(32);
1558        let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1559
1560        assert!(transport.discover().unwrap().is_empty());
1561    }
1562
1563    #[tokio::test]
1564    async fn test_invalid_socks5_addr_start_fails() {
1565        let (tx, _rx) = packet_channel(32);
1566        let config = TorConfig {
1567            socks5_addr: Some("not-a-socket-addr".to_string()),
1568            ..Default::default()
1569        };
1570        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1571        assert!(transport.start_async().await.is_err());
1572    }
1573
1574    #[tokio::test]
1575    async fn test_unsupported_mode_start_fails() {
1576        let (tx, _rx) = packet_channel(32);
1577        let config = TorConfig {
1578            mode: Some("embedded".to_string()),
1579            socks5_addr: Some("127.0.0.1:9050".to_string()),
1580            ..Default::default()
1581        };
1582        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1583        assert!(transport.start_async().await.is_err());
1584    }
1585
1586    // ========================================================================
1587    // Integration tests using MockSocks5Server
1588    // ========================================================================
1589
1590    use crate::config::TcpConfig;
1591    use crate::transport::tcp::TcpTransport;
1592    use mock_socks5::MockSocks5Server;
1593
1594    /// msg1 wire size: 4 prefix + 4 sender_idx + 106 noise_msg1 = 114 bytes.
1595    const MSG1_WIRE_SIZE: usize = 114;
1596    /// msg1 payload_len: sender_idx(4) + noise_msg1(106) = 110.
1597    const MSG1_PAYLOAD_LEN: u16 = (MSG1_WIRE_SIZE - 4) as u16;
1598
1599    /// Build a msg1 frame (114 bytes) for testing.
1600    fn build_msg1_frame() -> Vec<u8> {
1601        let mut frame = vec![0xAA; MSG1_WIRE_SIZE];
1602        frame[0] = 0x01; // ver=0, phase=1
1603        frame[1] = 0x00; // flags
1604        frame[2..4].copy_from_slice(&MSG1_PAYLOAD_LEN.to_le_bytes());
1605        frame
1606    }
1607
1608    #[tokio::test]
1609    async fn test_send_recv_via_socks5() {
1610        // Set up a TCP transport as the "destination" with a listener
1611        let (dest_tx, mut dest_rx) = packet_channel(32);
1612        let dest_config = TcpConfig {
1613            bind_addr: Some("127.0.0.1:0".to_string()),
1614            ..Default::default()
1615        };
1616        let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
1617        dest.start_async().await.unwrap();
1618        let dest_addr = dest.local_addr().unwrap();
1619
1620        // Set up the mock SOCKS5 proxy pointing at the destination
1621        let mock = MockSocks5Server::new(dest_addr).await.unwrap();
1622        let proxy_addr = mock.addr();
1623        let _proxy_handle = mock.spawn();
1624
1625        // Set up the Tor transport pointing at the mock proxy
1626        let (tor_tx, _tor_rx) = packet_channel(32);
1627        let tor_config = TorConfig {
1628            socks5_addr: Some(proxy_addr.to_string()),
1629            ..Default::default()
1630        };
1631        let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
1632        tor.start_async().await.unwrap();
1633
1634        // Send a valid FMP frame (msg1) through the Tor transport
1635        let frame = build_msg1_frame();
1636        let target = TransportAddr::from_string(&dest_addr.to_string());
1637        tor.send_async(&target, &frame).await.unwrap();
1638
1639        // Receive it on the destination
1640        let received = tokio::time::timeout(Duration::from_secs(5), dest_rx.recv())
1641            .await
1642            .expect("timeout waiting for packet")
1643            .expect("channel closed");
1644
1645        assert_eq!(received.data, frame);
1646
1647        // Clean up
1648        tor.stop_async().await.unwrap();
1649        dest.stop_async().await.unwrap();
1650    }
1651
1652    #[tokio::test]
1653    async fn test_socks5_proxy_down() {
1654        // No SOCKS5 server running on this port
1655        let (tx, _rx) = packet_channel(32);
1656        let config = TorConfig {
1657            socks5_addr: Some("127.0.0.1:19999".to_string()),
1658            connect_timeout_ms: Some(2000),
1659            ..Default::default()
1660        };
1661        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1662        transport.start_async().await.unwrap();
1663
1664        let addr = TransportAddr::from_string("192.168.1.1:2121");
1665        let result = transport.send_async(&addr, &build_msg1_frame()).await;
1666        assert!(result.is_err());
1667    }
1668
1669    #[tokio::test]
1670    async fn test_connect_timeout() {
1671        // Use a non-routable address as the SOCKS5 proxy to trigger timeout
1672        let (tx, _rx) = packet_channel(32);
1673        let config = TorConfig {
1674            // 192.0.2.1 is TEST-NET, should be non-routable and timeout
1675            socks5_addr: Some("192.0.2.1:9050".to_string()),
1676            connect_timeout_ms: Some(500),
1677            ..Default::default()
1678        };
1679        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1680        transport.start_async().await.unwrap();
1681
1682        let addr = TransportAddr::from_string("10.0.0.1:2121");
1683        let result = transport.send_async(&addr, &build_msg1_frame()).await;
1684        assert!(result.is_err());
1685    }
1686
1687    #[tokio::test]
1688    async fn test_close_connection() {
1689        // Set up destination + mock proxy
1690        let (dest_tx, _dest_rx) = packet_channel(32);
1691        let dest_config = TcpConfig {
1692            bind_addr: Some("127.0.0.1:0".to_string()),
1693            ..Default::default()
1694        };
1695        let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
1696        dest.start_async().await.unwrap();
1697        let dest_addr = dest.local_addr().unwrap();
1698
1699        let mock = MockSocks5Server::new(dest_addr).await.unwrap();
1700        let proxy_addr = mock.addr();
1701        let _proxy_handle = mock.spawn();
1702
1703        let (tor_tx, _tor_rx) = packet_channel(32);
1704        let tor_config = TorConfig {
1705            socks5_addr: Some(proxy_addr.to_string()),
1706            ..Default::default()
1707        };
1708        let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
1709        tor.start_async().await.unwrap();
1710
1711        // Send to establish a connection
1712        let target = TransportAddr::from_string(&dest_addr.to_string());
1713        tor.send_async(&target, &build_msg1_frame()).await.unwrap();
1714
1715        // Verify pool has the connection
1716        {
1717            let pool = tor.pool.lock().await;
1718            assert_eq!(pool.len(), 1);
1719        }
1720
1721        // Close the connection
1722        tor.close_connection_async(&target).await;
1723
1724        // Verify pool is empty
1725        {
1726            let pool = tor.pool.lock().await;
1727            assert_eq!(pool.len(), 0);
1728        }
1729
1730        tor.stop_async().await.unwrap();
1731        dest.stop_async().await.unwrap();
1732    }
1733
1734    // ========================================================================
1735    // Control port mode tests
1736    // ========================================================================
1737
1738    use mock_control::MockTorControlServer;
1739
1740    #[tokio::test]
1741    async fn test_control_port_start_stop() {
1742        let mock = MockTorControlServer::start().await;
1743        let (tx, _rx) = packet_channel(32);
1744
1745        let config = TorConfig {
1746            mode: Some("control_port".to_string()),
1747            socks5_addr: Some("127.0.0.1:19050".to_string()),
1748            control_addr: Some(mock.addr().to_string()),
1749            control_auth: Some("password:testpass".to_string()),
1750            ..Default::default()
1751        };
1752        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1753
1754        transport.start_async().await.unwrap();
1755        assert_eq!(transport.state(), TransportState::Up);
1756        assert!(transport.onion_address().is_none());
1757        assert!(!transport.accept_connections());
1758
1759        transport.stop_async().await.unwrap();
1760    }
1761
1762    #[tokio::test]
1763    async fn test_config_defaults_phase2() {
1764        let config = TorConfig::default();
1765        assert_eq!(config.control_addr(), "/run/tor/control");
1766        assert_eq!(config.control_auth(), "cookie");
1767        assert_eq!(config.cookie_path(), "/var/run/tor/control.authcookie");
1768        assert_eq!(config.max_inbound_connections(), 64);
1769    }
1770
1771    // ========================================================================
1772    // Directory mode tests
1773    // ========================================================================
1774
1775    use crate::config::DirectoryServiceConfig;
1776    use tempfile::TempDir;
1777
1778    #[test]
1779    fn test_directory_service_config_defaults() {
1780        let config = DirectoryServiceConfig::default();
1781        assert_eq!(
1782            config.hostname_file(),
1783            "/var/lib/tor/fips_onion_service/hostname"
1784        );
1785        assert_eq!(config.bind_addr(), "127.0.0.1:8443");
1786    }
1787
1788    #[tokio::test]
1789    async fn test_directory_mode_start_stop() {
1790        let dir = TempDir::new().unwrap();
1791        let hostname_path = dir.path().join("hostname");
1792        std::fs::write(
1793            &hostname_path,
1794            "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
1795        )
1796        .unwrap();
1797
1798        let (tx, _rx) = packet_channel(32);
1799        let config = TorConfig {
1800            mode: Some("directory".to_string()),
1801            socks5_addr: Some("127.0.0.1:19050".to_string()),
1802            directory_service: Some(DirectoryServiceConfig {
1803                hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1804                bind_addr: Some("127.0.0.1:0".to_string()),
1805            }),
1806            ..Default::default()
1807        };
1808        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1809
1810        transport.start_async().await.unwrap();
1811        assert_eq!(transport.state(), TransportState::Up);
1812        assert_eq!(
1813            transport.onion_address(),
1814            Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion"),
1815        );
1816        assert!(transport.accept_connections());
1817
1818        transport.stop_async().await.unwrap();
1819        assert_eq!(transport.state(), TransportState::Down);
1820    }
1821
1822    #[tokio::test]
1823    async fn test_directory_mode_missing_hostname_file() {
1824        let (tx, _rx) = packet_channel(32);
1825        let config = TorConfig {
1826            mode: Some("directory".to_string()),
1827            socks5_addr: Some("127.0.0.1:19050".to_string()),
1828            directory_service: Some(DirectoryServiceConfig {
1829                hostname_file: Some("/nonexistent/hostname".to_string()),
1830                bind_addr: Some("127.0.0.1:0".to_string()),
1831            }),
1832            ..Default::default()
1833        };
1834        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1835
1836        let result = transport.start_async().await;
1837        assert!(result.is_err());
1838        let err = format!("{}", result.unwrap_err());
1839        assert!(err.contains("hostname"));
1840    }
1841
1842    #[tokio::test]
1843    async fn test_directory_mode_invalid_hostname() {
1844        let dir = TempDir::new().unwrap();
1845        let hostname_path = dir.path().join("hostname");
1846        std::fs::write(&hostname_path, "not-an-onion-address\n").unwrap();
1847
1848        let (tx, _rx) = packet_channel(32);
1849        let config = TorConfig {
1850            mode: Some("directory".to_string()),
1851            socks5_addr: Some("127.0.0.1:19050".to_string()),
1852            directory_service: Some(DirectoryServiceConfig {
1853                hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1854                bind_addr: Some("127.0.0.1:0".to_string()),
1855            }),
1856            ..Default::default()
1857        };
1858        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1859
1860        let result = transport.start_async().await;
1861        assert!(result.is_err());
1862        let err = format!("{}", result.unwrap_err());
1863        assert!(err.contains("invalid onion address"));
1864    }
1865
1866    #[tokio::test]
1867    async fn test_directory_mode_accept_inbound() {
1868        let dir = TempDir::new().unwrap();
1869        let hostname_path = dir.path().join("hostname");
1870        std::fs::write(
1871            &hostname_path,
1872            "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
1873        )
1874        .unwrap();
1875
1876        let (tx, _rx) = packet_channel(32);
1877        let config = TorConfig {
1878            mode: Some("directory".to_string()),
1879            socks5_addr: Some("127.0.0.1:19050".to_string()),
1880            directory_service: Some(DirectoryServiceConfig {
1881                hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1882                bind_addr: Some("127.0.0.1:0".to_string()),
1883            }),
1884            ..Default::default()
1885        };
1886        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1887        transport.start_async().await.unwrap();
1888        assert!(transport.accept_connections());
1889
1890        transport.stop_async().await.unwrap();
1891    }
1892
1893    #[tokio::test]
1894    async fn test_socks5_mode_rejects_directory_service_config() {
1895        let (tx, _rx) = packet_channel(32);
1896        let config = TorConfig {
1897            mode: Some("socks5".to_string()),
1898            socks5_addr: Some("127.0.0.1:9050".to_string()),
1899            directory_service: Some(DirectoryServiceConfig::default()),
1900            ..Default::default()
1901        };
1902        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1903        let result = transport.start_async().await;
1904        assert!(result.is_err());
1905        let err = format!("{}", result.unwrap_err());
1906        assert!(err.contains("directory"));
1907    }
1908}