Skip to main content

fips_core/transport/tor/
mod.rs

1//! Tor Transport Implementation
2//!
3//! Provides Tor-based transport for FIPS peer communication. Supports
4//! three modes:
5//!
6//! - **socks5**: Outbound-only connections through a Tor SOCKS5 proxy
7//!   to both clearnet peers and .onion hidden services.
8//! - **control_port**: Outbound via SOCKS5 plus control port connection
9//!   for Tor daemon monitoring (bootstrap status, traffic stats, network liveness).
10//! - **directory**: Inbound via a Tor-managed `HiddenServiceDir` onion
11//!   service, outbound via SOCKS5. No control port needed; enables
12//!   Tor's `Sandbox 1` mode. Reads `.onion` address from hostname file.
13//!
14//! ## Architecture
15//!
16//! Like TCP, each peer has its own connection. The transport reuses FMP
17//! stream framing from `tcp::stream` and follows the same connection pool
18//! pattern as the TCP transport. Inbound connections arrive via a local
19//! TCP listener that the Tor daemon forwards onion service traffic to.
20
21pub mod control;
22pub mod stats;
23
24#[cfg(test)]
25mod mock_control;
26#[cfg(test)]
27mod mock_socks5;
28
29use super::{
30    ConnectionState, DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr,
31    TransportError, TransportId, TransportState, TransportType,
32};
33use crate::config::TorConfig;
34use crate::transport::tcp::stream::read_fmp_packet;
35use control::{ControlAuth, TorControlClient, TorMonitoringInfo};
36use stats::TorStats;
37
38use futures::FutureExt;
39use socket2::TcpKeepalive;
40use std::collections::HashMap;
41use std::net::SocketAddr;
42use std::sync::Arc;
43use std::time::Duration;
44use tokio::io::AsyncWriteExt;
45use tokio::net::tcp::OwnedWriteHalf;
46use tokio::net::{TcpListener, TcpStream};
47use tokio::sync::Mutex;
48use tokio::task::JoinHandle;
49use tokio::time::Instant;
50use tokio_socks::tcp::Socks5Stream;
51use tracing::{debug, info, trace, warn};
52
53// ============================================================================
54// Tor Address Types
55// ============================================================================
56
57/// Tor-specific address type for SOCKS5 CONNECT.
58#[derive(Clone, Debug)]
59pub enum TorAddr {
60    /// .onion hidden service address (hostname, port).
61    Onion(String, u16),
62    /// Clearnet address routed through Tor (IP, port).
63    Clearnet(SocketAddr),
64    /// Clearnet hostname routed through Tor (hostname, port).
65    /// Passed as hostname to SOCKS5 so Tor resolves it — avoids local
66    /// DNS leaks and is compatible with SafeSocks 1.
67    ClearnetHostname(String, u16),
68}
69
70/// Parse a TransportAddr string into a TorAddr.
71///
72/// If the address contains ".onion:", parse as an onion address.
73/// If it parses as a numeric IP:port, use Clearnet.
74/// Otherwise, treat as a clearnet hostname:port for Tor-side DNS resolution.
75fn parse_tor_addr(addr: &TransportAddr) -> Result<TorAddr, TransportError> {
76    let s = addr.as_str().ok_or_else(|| {
77        TransportError::InvalidAddress("Tor address must be a valid UTF-8 string".into())
78    })?;
79
80    if s.contains(".onion:") {
81        // Parse "hostname.onion:port"
82        let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
83            TransportError::InvalidAddress(format!("invalid onion address: {}", s))
84        })?;
85        let port: u16 = port_str.parse().map_err(|_| {
86            TransportError::InvalidAddress(format!("invalid port in onion address: {}", s))
87        })?;
88        Ok(TorAddr::Onion(host.to_string(), port))
89    } else if let Ok(socket_addr) = s.parse::<SocketAddr>() {
90        // Numeric IP:port
91        Ok(TorAddr::Clearnet(socket_addr))
92    } else {
93        // Hostname:port — pass through SOCKS5 for Tor-side DNS resolution
94        let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
95            TransportError::InvalidAddress(format!("invalid address (expected host:port): {}", s))
96        })?;
97        let port: u16 = port_str
98            .parse()
99            .map_err(|_| TransportError::InvalidAddress(format!("invalid port: {}", s)))?;
100        if !host.contains('.') {
101            return Err(TransportError::InvalidAddress(format!(
102                "hostname must be fully qualified (contain a dot): {}",
103                host
104            )));
105        }
106        Ok(TorAddr::ClearnetHostname(host.to_string(), port))
107    }
108}
109
110// ============================================================================
111// Connection Pool
112// ============================================================================
113
114#[derive(Clone, Copy, Debug, PartialEq, Eq)]
115enum Direction {
116    Inbound,
117    Outbound,
118}
119
120/// State for a single Tor connection to a peer.
121struct TorConnection {
122    /// Write half of the split stream.
123    writer: Arc<Mutex<OwnedWriteHalf>>,
124    /// Receive task for this connection.
125    recv_task: JoinHandle<()>,
126    /// MTU for this connection.
127    #[allow(dead_code)]
128    mtu: u16,
129    /// When the connection was established.
130    #[allow(dead_code)]
131    established_at: Instant,
132    direction: Direction,
133}
134
135/// Shared connection pool.
136type ConnectionPool = Arc<Mutex<HashMap<TransportAddr, TorConnection>>>;
137
138/// A pending background connection attempt.
139///
140/// Holds the JoinHandle for a spawned SOCKS5 connect task. The task
141/// produces a configured `TcpStream` and MTU on success.
142struct ConnectingEntry {
143    /// Background task performing SOCKS5 connect + socket configuration.
144    task: JoinHandle<Result<(TcpStream, u16), TransportError>>,
145}
146
147/// Map of addresses with background connection attempts in progress.
148type ConnectingPool = Arc<Mutex<HashMap<TransportAddr, ConnectingEntry>>>;
149
150// ============================================================================
151// Tor Transport
152// ============================================================================
153
154/// Tor transport for FIPS.
155///
156/// Provides connection-oriented, reliable byte stream delivery over Tor.
157/// In `socks5` mode, outbound-only through a SOCKS5 proxy. In
158/// `control_port` mode, also manages an onion service for inbound
159/// connections via the Tor control port.
160pub struct TorTransport {
161    /// Unique transport identifier.
162    transport_id: TransportId,
163    /// Optional instance name (for named instances in config).
164    name: Option<String>,
165    /// Configuration.
166    config: TorConfig,
167    /// Current state.
168    state: TransportState,
169    /// Connection pool: addr -> per-connection state.
170    pool: ConnectionPool,
171    /// Pending connection attempts: addr -> background connect task.
172    connecting: ConnectingPool,
173    /// Channel for delivering received packets to Node.
174    packet_tx: PacketTx,
175    /// Transport statistics.
176    stats: Arc<TorStats>,
177    /// Accept loop task handle (active when onion service is running).
178    accept_task: Option<JoinHandle<()>>,
179    /// Onion service hostname (e.g., "abcdef...xyz.onion").
180    /// Set in directory mode from the Tor-managed hostname file.
181    onion_address: Option<String>,
182    /// Control port client (monitoring queries).
183    control_client: Option<Arc<Mutex<TorControlClient>>>,
184    /// Cached Tor daemon monitoring info, updated by background task.
185    cached_monitoring: Arc<std::sync::RwLock<Option<TorMonitoringInfo>>>,
186    /// Background monitoring task handle.
187    monitoring_task: Option<JoinHandle<()>>,
188}
189
190impl TorTransport {
191    /// Create a new Tor transport.
192    pub fn new(
193        transport_id: TransportId,
194        name: Option<String>,
195        config: TorConfig,
196        packet_tx: PacketTx,
197    ) -> Self {
198        Self {
199            transport_id,
200            name,
201            config,
202            state: TransportState::Configured,
203            pool: Arc::new(Mutex::new(HashMap::new())),
204            connecting: Arc::new(Mutex::new(HashMap::new())),
205            packet_tx,
206            stats: Arc::new(TorStats::new()),
207            accept_task: None,
208            onion_address: None,
209            control_client: None,
210            cached_monitoring: Arc::new(std::sync::RwLock::new(None)),
211            monitoring_task: None,
212        }
213    }
214
215    /// Get the instance name (if configured as a named instance).
216    pub fn name(&self) -> Option<&str> {
217        self.name.as_deref()
218    }
219
220    /// Get the onion service address (if active).
221    pub fn onion_address(&self) -> Option<&str> {
222        self.onion_address.as_deref()
223    }
224
225    /// Get the transport statistics.
226    pub fn stats(&self) -> &Arc<TorStats> {
227        &self.stats
228    }
229
230    /// Get the cached Tor daemon monitoring info (if available).
231    pub fn cached_monitoring(&self) -> Option<TorMonitoringInfo> {
232        self.cached_monitoring.read().ok()?.clone()
233    }
234
235    /// Get the Tor transport mode.
236    pub fn mode(&self) -> &str {
237        self.config.mode()
238    }
239
240    /// Start the transport asynchronously.
241    ///
242    /// In `socks5` mode: validates config and transitions to Up.
243    /// In `control_port` mode: also connects to the Tor control port
244    /// and authenticates for monitoring.
245    /// In `directory` mode: reads .onion address from hostname file,
246    /// binds a listener, and spawns an accept loop for inbound.
247    pub async fn start_async(&mut self) -> Result<(), TransportError> {
248        if !self.state.can_start() {
249            return Err(TransportError::AlreadyStarted);
250        }
251
252        self.state = TransportState::Starting;
253
254        // Validate SOCKS5 address format (all modes need it for outbound)
255        let socks5_addr = self.config.socks5_addr().to_string();
256        validate_host_port(&socks5_addr, "socks5_addr")?;
257
258        let mode = self.config.mode().to_string();
259        match mode.as_str() {
260            "socks5" => {
261                // Reject inbound service configs in socks5 mode
262                if self.config.directory_service.is_some() {
263                    return Err(TransportError::StartFailed(
264                        "directory_service config requires mode 'directory', not 'socks5'".into(),
265                    ));
266                }
267                self.state = TransportState::Up;
268            }
269            "control_port" => {
270                self.start_control_port_mode().await?;
271            }
272            "directory" => {
273                self.start_directory_mode().await?;
274            }
275            other => {
276                return Err(TransportError::StartFailed(format!(
277                    "unsupported Tor mode '{}' (expected 'socks5', 'control_port', or 'directory')",
278                    other
279                )));
280            }
281        }
282
283        if let Some(ref name) = self.name {
284            info!(
285                name = %name,
286                mode = %mode,
287                socks5_addr = %socks5_addr,
288                onion_address = ?self.onion_address,
289                mtu = self.config.mtu(),
290                "Tor transport started"
291            );
292        } else {
293            info!(
294                mode = %mode,
295                socks5_addr = %socks5_addr,
296                onion_address = ?self.onion_address,
297                mtu = self.config.mtu(),
298                "Tor transport started"
299            );
300        }
301
302        Ok(())
303    }
304
305    /// Start control_port mode: connect to control port and authenticate
306    /// for monitoring queries.
307    async fn start_control_port_mode(&mut self) -> Result<(), TransportError> {
308        let control_addr = self.config.control_addr().to_string();
309        // Unix socket paths start with / or ./ — skip host:port validation
310        if !control_addr.starts_with('/') && !control_addr.starts_with("./") {
311            validate_host_port(&control_addr, "control_addr")?;
312        }
313
314        // Connect to Tor control port
315        let mut client = TorControlClient::connect(&control_addr)
316            .await
317            .map_err(|e| {
318                self.stats.record_control_error();
319                TransportError::StartFailed(format!("Tor control port: {}", e))
320            })?;
321
322        // Authenticate
323        let auth = ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path())
324            .map_err(|e| TransportError::StartFailed(format!("Tor auth config: {}", e)))?;
325
326        client.authenticate(&auth).await.map_err(|e| {
327            self.stats.record_control_error();
328            TransportError::StartFailed(format!("Tor authentication: {}", e))
329        })?;
330
331        // Store control client (used for monitoring queries)
332        self.control_client = Some(Arc::new(Mutex::new(client)));
333        self.state = TransportState::Up;
334        self.spawn_monitoring_task();
335
336        Ok(())
337    }
338
339    /// Start directory mode: read .onion address from Tor-managed hostname
340    /// file, bind a local listener, and spawn the accept loop.
341    ///
342    /// In directory mode, Tor manages the onion service via `HiddenServiceDir`
343    /// in torrc. No control port connection is needed. This enables Tor's
344    /// `Sandbox 1` mode (strongest single hardening option).
345    async fn start_directory_mode(&mut self) -> Result<(), TransportError> {
346        let dir_config = self.config.directory_service.clone().unwrap_or_default();
347
348        // Read .onion address from Tor-managed hostname file
349        let hostname_file = dir_config.hostname_file();
350        let onion_addr = std::fs::read_to_string(hostname_file)
351            .map_err(|e| {
352                TransportError::StartFailed(format!(
353                    "failed to read onion hostname from '{}': {} \
354                     (ensure HiddenServiceDir is configured in torrc and Tor has started)",
355                    hostname_file, e
356                ))
357            })?
358            .trim()
359            .to_string();
360
361        if onion_addr.is_empty() || !onion_addr.ends_with(".onion") {
362            return Err(TransportError::StartFailed(format!(
363                "invalid onion address in '{}': '{}'",
364                hostname_file, onion_addr
365            )));
366        }
367
368        self.onion_address = Some(onion_addr.clone());
369
370        // Bind local listener (must match HiddenServicePort target in torrc)
371        let bind_addr = dir_config.bind_addr();
372        let listener = TcpListener::bind(bind_addr).await.map_err(|e| {
373            TransportError::StartFailed(format!(
374                "failed to bind directory-mode listener on {}: {}",
375                bind_addr, e
376            ))
377        })?;
378        let local_addr = listener
379            .local_addr()
380            .map_err(|e| TransportError::StartFailed(format!("failed to get local addr: {}", e)))?;
381
382        info!(
383            onion_address = %onion_addr,
384            local_addr = %local_addr,
385            hostname_file = %hostname_file,
386            "Directory-mode onion service active"
387        );
388
389        // Spawn accept loop (same as control_port mode)
390        let transport_id = self.transport_id;
391        let packet_tx = self.packet_tx.clone();
392        let pool = self.pool.clone();
393        let mtu = self.config.mtu();
394        let max_inbound = self.config.max_inbound_connections();
395        let stats = self.stats.clone();
396
397        let accept_handle = tokio::spawn(async move {
398            tor_accept_loop(
399                listener,
400                transport_id,
401                packet_tx,
402                pool,
403                mtu,
404                max_inbound,
405                stats,
406            )
407            .await;
408        });
409
410        self.accept_task = Some(accept_handle);
411        self.state = TransportState::Up;
412
413        // Optionally connect to control port for monitoring (non-fatal)
414        if self.config.control_addr.is_some() {
415            self.try_connect_control_port().await;
416        }
417
418        Ok(())
419    }
420
421    /// Attempt to connect to the Tor control port for monitoring.
422    /// Non-fatal: logs a warning on failure and continues without monitoring.
423    async fn try_connect_control_port(&mut self) {
424        let control_addr = self.config.control_addr().to_string();
425        if !control_addr.starts_with('/')
426            && !control_addr.starts_with("./")
427            && let Err(e) = validate_host_port(&control_addr, "control_addr")
428        {
429            warn!(
430                transport_id = %self.transport_id,
431                error = %e,
432                "Tor control port address invalid, monitoring disabled"
433            );
434            return;
435        }
436
437        let client = match TorControlClient::connect(&control_addr).await {
438            Ok(c) => c,
439            Err(e) => {
440                warn!(
441                    transport_id = %self.transport_id,
442                    addr = %control_addr,
443                    error = %e,
444                    "Tor control port connect failed, monitoring disabled"
445                );
446                return;
447            }
448        };
449
450        let auth =
451            match ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path()) {
452                Ok(a) => a,
453                Err(e) => {
454                    warn!(
455                        transport_id = %self.transport_id,
456                        error = %e,
457                        "Tor control auth config error, monitoring disabled"
458                    );
459                    return;
460                }
461            };
462
463        let mut client = client;
464        if let Err(e) = client.authenticate(&auth).await {
465            warn!(
466                transport_id = %self.transport_id,
467                error = %e,
468                "Tor control port auth failed, monitoring disabled"
469            );
470            return;
471        }
472
473        info!(
474            transport_id = %self.transport_id,
475            addr = %control_addr,
476            "Tor control port connected (monitoring enabled)"
477        );
478
479        self.control_client = Some(Arc::new(Mutex::new(client)));
480        self.spawn_monitoring_task();
481    }
482
483    /// Stop the transport asynchronously.
484    ///
485    /// Aborts the accept loop (if running), closes all connections,
486    /// and transitions to Down.
487    pub async fn stop_async(&mut self) -> Result<(), TransportError> {
488        if !self.state.is_operational() {
489            return Err(TransportError::NotStarted);
490        }
491
492        // Abort accept loop (if running)
493        if let Some(task) = self.accept_task.take() {
494            task.abort();
495            let _ = task.await;
496            debug!(
497                transport_id = %self.transport_id,
498                "Onion service accept loop stopped"
499            );
500        }
501
502        // Abort monitoring task (if running)
503        if let Some(task) = self.monitoring_task.take() {
504            task.abort();
505            let _ = task.await;
506        }
507        if let Ok(mut w) = self.cached_monitoring.write() {
508            *w = None;
509        }
510
511        self.control_client = None;
512        self.onion_address = None;
513
514        // Abort pending connection attempts
515        let mut connecting = self.connecting.lock().await;
516        for (addr, entry) in connecting.drain() {
517            entry.task.abort();
518            debug!(
519                transport_id = %self.transport_id,
520                remote_addr = %addr,
521                "Tor connect aborted (transport stopping)"
522            );
523        }
524        drop(connecting);
525
526        // Close all connections
527        let mut pool = self.pool.lock().await;
528        for (addr, conn) in pool.drain() {
529            conn.recv_task.abort();
530            let _ = conn.recv_task.await;
531            match conn.direction {
532                Direction::Inbound => self.stats.record_pool_inbound_removed(),
533                Direction::Outbound => self.stats.record_pool_outbound_removed(),
534            }
535            debug!(
536                transport_id = %self.transport_id,
537                remote_addr = %addr,
538                direction = ?conn.direction,
539                "Tor connection closed (transport stopping)"
540            );
541        }
542        drop(pool);
543
544        self.state = TransportState::Down;
545
546        info!(
547            transport_id = %self.transport_id,
548            "Tor transport stopped"
549        );
550
551        Ok(())
552    }
553
554    /// Spawn a background task that periodically queries the Tor control
555    /// port for daemon status and caches the result.
556    fn spawn_monitoring_task(&mut self) {
557        let Some(client) = self.control_client.clone() else {
558            return;
559        };
560        let cache = self.cached_monitoring.clone();
561        let stats = self.stats.clone();
562        let transport_id = self.transport_id;
563
564        let handle = tokio::spawn(async move {
565            let mut interval = tokio::time::interval(Duration::from_secs(10));
566            let mut last_bootstrap: u8 = 0;
567            let mut last_liveness = String::new();
568            let mut was_dormant = false;
569            let mut stall_warned = false;
570            let started_at = Instant::now();
571
572            loop {
573                interval.tick().await;
574                let mut guard = client.lock().await;
575                match guard.monitoring_snapshot().await {
576                    Ok(info) => {
577                        // Log bootstrap milestones
578                        for &milestone in &[25u8, 50, 75, 100] {
579                            if info.bootstrap >= milestone && last_bootstrap < milestone {
580                                info!(
581                                    transport_id = %transport_id,
582                                    bootstrap = info.bootstrap,
583                                    "Tor bootstrap {}%",
584                                    milestone
585                                );
586                            }
587                        }
588
589                        // Bootstrap stall warning
590                        if info.bootstrap < 100
591                            && started_at.elapsed() > Duration::from_secs(60)
592                            && !stall_warned
593                        {
594                            warn!(
595                                transport_id = %transport_id,
596                                bootstrap = info.bootstrap,
597                                "Tor bootstrap stalled — not at 100% after 60s"
598                            );
599                            stall_warned = true;
600                        }
601                        if info.bootstrap == 100 {
602                            stall_warned = false;
603                        }
604
605                        last_bootstrap = info.bootstrap;
606
607                        // Network liveness transitions
608                        if !last_liveness.is_empty() && info.network_liveness != last_liveness {
609                            warn!(
610                                transport_id = %transport_id,
611                                from = %last_liveness,
612                                to = %info.network_liveness,
613                                "Tor network liveness changed"
614                            );
615                        }
616                        last_liveness = info.network_liveness.clone();
617
618                        // Dormant mode entry
619                        if info.dormant && !was_dormant {
620                            warn!(
621                                transport_id = %transport_id,
622                                "Tor daemon entered dormant mode"
623                            );
624                        }
625                        was_dormant = info.dormant;
626
627                        if let Ok(mut w) = cache.write() {
628                            *w = Some(info);
629                        }
630                    }
631                    Err(e) => {
632                        stats.record_control_error();
633                        warn!(
634                            transport_id = %transport_id,
635                            error = %e,
636                            "Tor monitoring query failed"
637                        );
638                    }
639                }
640            }
641        });
642
643        self.monitoring_task = Some(handle);
644    }
645
646    /// Send a packet asynchronously.
647    ///
648    /// If no connection exists to the given address, performs connect-on-send:
649    /// establishes a new connection through the SOCKS5 proxy, configures
650    /// socket options, splits the stream, spawns a receive task, and stores
651    /// the connection in the pool.
652    pub async fn send_async(
653        &self,
654        addr: &TransportAddr,
655        data: &[u8],
656    ) -> Result<usize, TransportError> {
657        if !self.state.is_operational() {
658            return Err(TransportError::NotStarted);
659        }
660
661        // Pre-send MTU check
662        let mtu = self.config.mtu() as usize;
663        if data.len() > mtu {
664            self.stats.record_mtu_exceeded();
665            return Err(TransportError::MtuExceeded {
666                packet_size: data.len(),
667                mtu: self.config.mtu(),
668            });
669        }
670
671        // Get or create connection
672        let writer = {
673            let pool = self.pool.lock().await;
674            pool.get(addr).map(|c| c.writer.clone())
675        };
676
677        let writer = match writer {
678            Some(w) => w,
679            None => {
680                // Connect-on-send
681                self.connect(addr).await?
682            }
683        };
684
685        // Write packet directly (no framing transformation needed)
686        let mut w = writer.lock().await;
687        match w.write_all(data).await {
688            Ok(()) => {
689                self.stats.record_send(data.len());
690                trace!(
691                    transport_id = %self.transport_id,
692                    remote_addr = %addr,
693                    bytes = data.len(),
694                    "Tor packet sent"
695                );
696                Ok(data.len())
697            }
698            Err(e) => {
699                self.stats.record_send_error();
700                drop(w);
701                // Remove failed connection from pool
702                let mut pool = self.pool.lock().await;
703                if let Some(conn) = pool.remove(addr) {
704                    conn.recv_task.abort();
705                    match conn.direction {
706                        Direction::Inbound => self.stats.record_pool_inbound_removed(),
707                        Direction::Outbound => self.stats.record_pool_outbound_removed(),
708                    }
709                }
710                Err(TransportError::SendFailed(format!("{}", e)))
711            }
712        }
713    }
714
715    /// Establish a new connection through the SOCKS5 proxy.
716    ///
717    /// Performs SOCKS5 CONNECT to the target via the proxy, configures
718    /// socket options, splits the stream, spawns a receive task, and
719    /// stores in the pool.
720    async fn connect(
721        &self,
722        addr: &TransportAddr,
723    ) -> Result<Arc<Mutex<OwnedWriteHalf>>, TransportError> {
724        let tor_addr = parse_tor_addr(addr)?;
725        let proxy_addr = self.config.socks5_addr();
726        let timeout_ms = self.config.connect_timeout_ms();
727
728        debug!(
729            transport_id = %self.transport_id,
730            remote_addr = %addr,
731            proxy = %proxy_addr,
732            timeout_secs = timeout_ms / 1000,
733            "Connecting via Tor SOCKS5"
734        );
735
736        // SOCKS5 CONNECT through proxy with timeout.
737        // Uses username/password auth for stream isolation: each destination
738        // gets its own Tor circuit via IsolateSOCKSAuth. The credentials are
739        // not verified by Tor — they serve purely as circuit isolation keys.
740        let isolation_key = addr.to_string();
741        let connect_start = Instant::now();
742        let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
743            match &tor_addr {
744                TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
745                    Socks5Stream::connect_with_password(
746                        proxy_addr,
747                        (host.as_str(), *port),
748                        "fips",
749                        &isolation_key,
750                    )
751                    .await
752                }
753                TorAddr::Clearnet(socket_addr) => {
754                    Socks5Stream::connect_with_password(
755                        proxy_addr,
756                        *socket_addr,
757                        "fips",
758                        &isolation_key,
759                    )
760                    .await
761                }
762            }
763        })
764        .await;
765
766        let stream = match socks_result {
767            Ok(Ok(socks_stream)) => socks_stream.into_inner(),
768            Ok(Err(e)) => {
769                self.stats.record_socks5_error();
770                warn!(
771                    transport_id = %self.transport_id,
772                    remote_addr = %addr,
773                    error = %e,
774                    elapsed_secs = connect_start.elapsed().as_secs(),
775                    "Tor SOCKS5 connection failed"
776                );
777                return Err(TransportError::ConnectionRefused);
778            }
779            Err(_) => {
780                self.stats.record_connect_timeout();
781                warn!(
782                    transport_id = %self.transport_id,
783                    remote_addr = %addr,
784                    timeout_secs = timeout_ms / 1000,
785                    "Tor SOCKS5 connection timed out"
786                );
787                return Err(TransportError::Timeout);
788            }
789        };
790
791        // Configure socket options via socket2
792        let std_stream = stream
793            .into_std()
794            .map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
795        configure_socket(&std_stream, &self.config)?;
796
797        // Convert back to tokio
798        let stream = TcpStream::from_std(std_stream)
799            .map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
800
801        // Split and spawn receive task
802        let (read_half, write_half) = stream.into_split();
803        let writer = Arc::new(Mutex::new(write_half));
804
805        let transport_id = self.transport_id;
806        let packet_tx = self.packet_tx.clone();
807        let pool = self.pool.clone();
808        let recv_stats = self.stats.clone();
809        let remote_addr = addr.clone();
810        let mtu = self.config.mtu();
811
812        let recv_task = tokio::spawn(async move {
813            tor_receive_loop(
814                read_half,
815                transport_id,
816                remote_addr.clone(),
817                packet_tx,
818                pool,
819                mtu,
820                recv_stats,
821                Direction::Outbound,
822            )
823            .await;
824        });
825
826        let conn = TorConnection {
827            writer: writer.clone(),
828            recv_task,
829            mtu,
830            established_at: Instant::now(),
831            direction: Direction::Outbound,
832        };
833
834        let mut pool = self.pool.lock().await;
835        pool.insert(addr.clone(), conn);
836
837        self.stats.record_connection_established();
838        self.stats.record_pool_outbound_added();
839
840        info!(
841            transport_id = %self.transport_id,
842            remote_addr = %addr,
843            elapsed_secs = connect_start.elapsed().as_secs(),
844            "Tor circuit established via SOCKS5"
845        );
846
847        Ok(writer)
848    }
849
850    /// Initiate a non-blocking connection to a remote address.
851    ///
852    /// Spawns a background task that performs SOCKS5 connect with timeout,
853    /// configures socket options, and returns the configured stream. The
854    /// connection becomes available for `send_async()` once the task
855    /// completes successfully.
856    ///
857    /// Poll `connection_state_sync()` to check progress.
858    pub async fn connect_async(&self, addr: &TransportAddr) -> Result<(), TransportError> {
859        if !self.state.is_operational() {
860            return Err(TransportError::NotStarted);
861        }
862
863        // Already established?
864        {
865            let pool = self.pool.lock().await;
866            if pool.contains_key(addr) {
867                return Ok(());
868            }
869        }
870
871        // Already connecting?
872        {
873            let connecting = self.connecting.lock().await;
874            if connecting.contains_key(addr) {
875                return Ok(());
876            }
877        }
878
879        let tor_addr = parse_tor_addr(addr)?;
880        let proxy_addr = self.config.socks5_addr().to_string();
881        let timeout_ms = self.config.connect_timeout_ms();
882        let transport_id = self.transport_id;
883        let remote_addr = addr.clone();
884        let config = self.config.clone();
885
886        debug!(
887            transport_id = %transport_id,
888            remote_addr = %remote_addr,
889            timeout_ms,
890            "Initiating background Tor SOCKS5 connect"
891        );
892
893        // Stream isolation key for this destination
894        let isolation_key = addr.to_string();
895
896        let task = tokio::spawn(async move {
897            // SOCKS5 CONNECT through proxy with timeout.
898            // Uses username/password auth for stream isolation (see connect()).
899            let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
900                match &tor_addr {
901                    TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
902                        Socks5Stream::connect_with_password(
903                            proxy_addr.as_str(),
904                            (host.as_str(), *port),
905                            "fips",
906                            &isolation_key,
907                        )
908                        .await
909                    }
910                    TorAddr::Clearnet(socket_addr) => {
911                        Socks5Stream::connect_with_password(
912                            proxy_addr.as_str(),
913                            *socket_addr,
914                            "fips",
915                            &isolation_key,
916                        )
917                        .await
918                    }
919                }
920            })
921            .await;
922
923            let stream = match socks_result {
924                Ok(Ok(socks_stream)) => socks_stream.into_inner(),
925                Ok(Err(e)) => {
926                    debug!(
927                        transport_id = %transport_id,
928                        remote_addr = %remote_addr,
929                        error = %e,
930                        "Background Tor SOCKS5 connect failed"
931                    );
932                    return Err(TransportError::ConnectionRefused);
933                }
934                Err(_) => {
935                    debug!(
936                        transport_id = %transport_id,
937                        remote_addr = %remote_addr,
938                        "Background Tor SOCKS5 connect timed out"
939                    );
940                    return Err(TransportError::Timeout);
941                }
942            };
943
944            // Configure socket options via socket2
945            let std_stream = stream
946                .into_std()
947                .map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
948            configure_socket(&std_stream, &config)?;
949
950            let mtu = config.mtu();
951
952            // Convert back to tokio
953            let stream = TcpStream::from_std(std_stream)
954                .map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
955
956            Ok((stream, mtu))
957        });
958
959        let mut connecting = self.connecting.lock().await;
960        connecting.insert(addr.clone(), ConnectingEntry { task });
961
962        Ok(())
963    }
964
965    /// Query the state of a connection to a remote address.
966    ///
967    /// Checks both established and connecting pools. If a background
968    /// connect task has completed, promotes it to the established pool
969    /// (spawning a receive loop) or reports the failure.
970    ///
971    /// This method is synchronous but uses `try_lock` internally.
972    /// Returns `ConnectionState::Connecting` if locks can't be acquired.
973    pub fn connection_state_sync(&self, addr: &TransportAddr) -> ConnectionState {
974        // Check established pool first
975        if let Ok(pool) = self.pool.try_lock() {
976            if pool.contains_key(addr) {
977                return ConnectionState::Connected;
978            }
979        } else {
980            return ConnectionState::Connecting; // can't tell, assume still going
981        }
982
983        // Check connecting pool
984        let mut connecting = match self.connecting.try_lock() {
985            Ok(c) => c,
986            Err(_) => return ConnectionState::Connecting,
987        };
988
989        let entry = match connecting.get_mut(addr) {
990            Some(e) => e,
991            None => return ConnectionState::None,
992        };
993
994        // Check if the background task has completed
995        if !entry.task.is_finished() {
996            return ConnectionState::Connecting;
997        }
998
999        // Task is done — take the result and remove from connecting pool.
1000        let addr_clone = addr.clone();
1001        let task = connecting.remove(&addr_clone).unwrap().task;
1002
1003        // Since the task is finished, we can safely poll it with now_or_never.
1004        match task.now_or_never() {
1005            Some(Ok(Ok((stream, mtu)))) => {
1006                // Promote to established pool
1007                self.promote_connection(addr, stream, mtu);
1008                ConnectionState::Connected
1009            }
1010            Some(Ok(Err(e))) => ConnectionState::Failed(format!("{}", e)),
1011            Some(Err(e)) => {
1012                // JoinError (panic or cancel)
1013                ConnectionState::Failed(format!("task failed: {}", e))
1014            }
1015            None => {
1016                // Shouldn't happen since is_finished() was true
1017                ConnectionState::Connecting
1018            }
1019        }
1020    }
1021
1022    /// Promote a completed background connection to the established pool.
1023    ///
1024    /// Splits the stream, spawns a receive loop, and inserts into the pool.
1025    /// Called from `connection_state_sync()` when a background task completes.
1026    fn promote_connection(&self, addr: &TransportAddr, stream: TcpStream, mtu: u16) {
1027        let (read_half, write_half) = stream.into_split();
1028        let writer = Arc::new(Mutex::new(write_half));
1029
1030        let transport_id = self.transport_id;
1031        let packet_tx = self.packet_tx.clone();
1032        let pool = self.pool.clone();
1033        let recv_stats = self.stats.clone();
1034        let remote_addr = addr.clone();
1035
1036        let recv_task = tokio::spawn(async move {
1037            tor_receive_loop(
1038                read_half,
1039                transport_id,
1040                remote_addr.clone(),
1041                packet_tx,
1042                pool,
1043                mtu,
1044                recv_stats,
1045                Direction::Outbound,
1046            )
1047            .await;
1048        });
1049
1050        let conn = TorConnection {
1051            writer,
1052            recv_task,
1053            mtu,
1054            established_at: Instant::now(),
1055            direction: Direction::Outbound,
1056        };
1057
1058        // Use try_lock since we're in a sync context and the pool
1059        // should be available (connection_state_sync already checked it)
1060        if let Ok(mut pool) = self.pool.try_lock() {
1061            pool.insert(addr.clone(), conn);
1062            self.stats.record_connection_established();
1063            self.stats.record_pool_outbound_added();
1064            debug!(
1065                transport_id = %self.transport_id,
1066                remote_addr = %addr,
1067                "Tor connection established (background connect)"
1068            );
1069        } else {
1070            // Pool locked — abort the recv task, connection will be retried
1071            conn.recv_task.abort();
1072            warn!(
1073                transport_id = %self.transport_id,
1074                remote_addr = %addr,
1075                "Failed to promote Tor connection (pool locked)"
1076            );
1077        }
1078    }
1079
1080    /// Close a specific connection asynchronously.
1081    pub async fn close_connection_async(&self, addr: &TransportAddr) {
1082        let mut pool = self.pool.lock().await;
1083        if let Some(conn) = pool.remove(addr) {
1084            conn.recv_task.abort();
1085            match conn.direction {
1086                Direction::Inbound => self.stats.record_pool_inbound_removed(),
1087                Direction::Outbound => self.stats.record_pool_outbound_removed(),
1088            }
1089            debug!(
1090                transport_id = %self.transport_id,
1091                remote_addr = %addr,
1092                direction = ?conn.direction,
1093                "Tor connection closed"
1094            );
1095        }
1096    }
1097}
1098
1099impl Transport for TorTransport {
1100    fn transport_id(&self) -> TransportId {
1101        self.transport_id
1102    }
1103
1104    fn transport_type(&self) -> &TransportType {
1105        &TransportType::TOR
1106    }
1107
1108    fn state(&self) -> TransportState {
1109        self.state
1110    }
1111
1112    fn mtu(&self) -> u16 {
1113        self.config.mtu()
1114    }
1115
1116    fn link_mtu(&self, _addr: &TransportAddr) -> u16 {
1117        self.config.mtu()
1118    }
1119
1120    fn start(&mut self) -> Result<(), TransportError> {
1121        Err(TransportError::NotSupported(
1122            "use start_async() for Tor transport".into(),
1123        ))
1124    }
1125
1126    fn stop(&mut self) -> Result<(), TransportError> {
1127        Err(TransportError::NotSupported(
1128            "use stop_async() for Tor transport".into(),
1129        ))
1130    }
1131
1132    fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
1133        Err(TransportError::NotSupported(
1134            "use send_async() for Tor transport".into(),
1135        ))
1136    }
1137
1138    fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
1139        Ok(Vec::new())
1140    }
1141
1142    fn accept_connections(&self) -> bool {
1143        self.onion_address.is_some()
1144    }
1145}
1146
1147// ============================================================================
1148// Receive Loop (per-connection)
1149// ============================================================================
1150
1151/// Per-connection Tor receive loop.
1152///
1153/// Reads complete FMP packets using the stream reader, delivers them to
1154/// the node via the packet channel. On error or EOF, removes the
1155/// connection from the pool and exits.
1156async fn tor_receive_loop(
1157    mut reader: tokio::net::tcp::OwnedReadHalf,
1158    transport_id: TransportId,
1159    remote_addr: TransportAddr,
1160    packet_tx: PacketTx,
1161    pool: ConnectionPool,
1162    mtu: u16,
1163    stats: Arc<TorStats>,
1164    direction: Direction,
1165) {
1166    debug!(
1167        transport_id = %transport_id,
1168        remote_addr = %remote_addr,
1169        "Tor receive loop starting"
1170    );
1171
1172    loop {
1173        match read_fmp_packet(&mut reader, mtu).await {
1174            Ok(data) => {
1175                stats.record_recv(data.len());
1176
1177                trace!(
1178                    transport_id = %transport_id,
1179                    remote_addr = %remote_addr,
1180                    bytes = data.len(),
1181                    "Tor packet received"
1182                );
1183
1184                let packet = ReceivedPacket::new(transport_id, remote_addr.clone(), data);
1185
1186                if packet_tx.send(packet).is_err() {
1187                    debug!(
1188                        transport_id = %transport_id,
1189                        "Packet channel closed, stopping Tor receive loop"
1190                    );
1191                    break;
1192                }
1193            }
1194            Err(e) => {
1195                stats.record_recv_error();
1196                debug!(
1197                    transport_id = %transport_id,
1198                    remote_addr = %remote_addr,
1199                    error = %e,
1200                    "Tor receive error, removing connection"
1201                );
1202                break;
1203            }
1204        }
1205    }
1206
1207    // Clean up: remove ourselves from the pool and decrement the matching
1208    // direction counter only if this task owned the removed entry.
1209    let mut pool_guard = pool.lock().await;
1210    let removed = pool_guard.remove(&remote_addr).is_some();
1211    drop(pool_guard);
1212    if removed {
1213        match direction {
1214            Direction::Inbound => stats.record_pool_inbound_removed(),
1215            Direction::Outbound => stats.record_pool_outbound_removed(),
1216        }
1217    }
1218
1219    debug!(
1220        transport_id = %transport_id,
1221        remote_addr = %remote_addr,
1222        direction = ?direction,
1223        "Tor receive loop stopped"
1224    );
1225}
1226
1227// ============================================================================
1228// Socket Configuration
1229// ============================================================================
1230
1231/// Configure socket options on a SOCKS5-connected stream.
1232///
1233/// Sets TCP_NODELAY and keepalive on the underlying TCP connection.
1234fn configure_socket(
1235    stream: &std::net::TcpStream,
1236    _config: &TorConfig,
1237) -> Result<(), TransportError> {
1238    let socket = socket2::SockRef::from(stream);
1239
1240    // TCP_NODELAY — always enable for FIPS (latency-sensitive protocol messages)
1241    socket
1242        .set_tcp_nodelay(true)
1243        .map_err(|e| TransportError::StartFailed(format!("set nodelay: {}", e)))?;
1244
1245    // TCP keepalive (30s default, matching TCP transport)
1246    let keepalive_secs = 30u64;
1247    if keepalive_secs > 0 {
1248        let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(keepalive_secs));
1249        socket
1250            .set_tcp_keepalive(&keepalive)
1251            .map_err(|e| TransportError::StartFailed(format!("set keepalive: {}", e)))?;
1252    }
1253
1254    Ok(())
1255}
1256
1257// ============================================================================
1258// Accept Loop (onion service inbound)
1259// ============================================================================
1260
1261/// Accept loop for inbound onion service connections.
1262///
1263/// Mirrors the TCP transport's accept loop. Tor forwards inbound
1264/// connections to a local TCP listener; we accept them, configure
1265/// socket options, split the stream, and spawn a per-connection
1266/// receive task.
1267async fn tor_accept_loop(
1268    listener: TcpListener,
1269    transport_id: TransportId,
1270    packet_tx: PacketTx,
1271    pool: ConnectionPool,
1272    mtu: u16,
1273    max_inbound: usize,
1274    stats: Arc<TorStats>,
1275) {
1276    debug!(
1277        transport_id = %transport_id,
1278        "Onion service accept loop starting"
1279    );
1280
1281    loop {
1282        let (stream, peer_addr) = match listener.accept().await {
1283            Ok(result) => result,
1284            Err(e) => {
1285                warn!(
1286                    transport_id = %transport_id,
1287                    error = %e,
1288                    "Onion service accept error"
1289                );
1290                continue;
1291            }
1292        };
1293
1294        // Check inbound connection limit. Outbound SOCKS5-connect entries
1295        // share the pool but do not consume onion-service inbound budget.
1296        if stats.pool_inbound_count() >= max_inbound as u64 {
1297            stats.record_connection_rejected();
1298            debug!(
1299                transport_id = %transport_id,
1300                peer_addr = %peer_addr,
1301                max_inbound,
1302                "Rejecting inbound onion connection (limit reached)"
1303            );
1304            drop(stream);
1305            continue;
1306        }
1307
1308        // Configure socket options on the accepted stream
1309        let std_stream = match stream.into_std() {
1310            Ok(s) => s,
1311            Err(e) => {
1312                warn!(
1313                    transport_id = %transport_id,
1314                    error = %e,
1315                    "Failed to convert accepted stream to std"
1316                );
1317                continue;
1318            }
1319        };
1320
1321        let socket = socket2::SockRef::from(&std_stream);
1322        let _ = socket.set_tcp_nodelay(true);
1323        let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(30));
1324        let _ = socket.set_tcp_keepalive(&keepalive);
1325
1326        let stream = match TcpStream::from_std(std_stream) {
1327            Ok(s) => s,
1328            Err(e) => {
1329                warn!(
1330                    transport_id = %transport_id,
1331                    error = %e,
1332                    "Failed to convert accepted stream back to tokio"
1333                );
1334                continue;
1335            }
1336        };
1337
1338        let remote_addr = TransportAddr::from_string(&peer_addr.to_string());
1339
1340        // Split stream and spawn receive task
1341        let (read_half, write_half) = stream.into_split();
1342        let writer = Arc::new(Mutex::new(write_half));
1343
1344        let recv_pool = pool.clone();
1345        let recv_stats = stats.clone();
1346        let recv_addr = remote_addr.clone();
1347        let recv_tx = packet_tx.clone();
1348
1349        let recv_task = tokio::spawn(async move {
1350            tor_receive_loop(
1351                read_half,
1352                transport_id,
1353                recv_addr,
1354                recv_tx,
1355                recv_pool,
1356                mtu,
1357                recv_stats,
1358                Direction::Inbound,
1359            )
1360            .await;
1361        });
1362
1363        let conn = TorConnection {
1364            writer,
1365            recv_task,
1366            mtu,
1367            established_at: Instant::now(),
1368            direction: Direction::Inbound,
1369        };
1370
1371        {
1372            let mut pool_guard = pool.lock().await;
1373            pool_guard.insert(remote_addr.clone(), conn);
1374        }
1375
1376        stats.record_connection_accepted();
1377        stats.record_pool_inbound_added();
1378
1379        debug!(
1380            transport_id = %transport_id,
1381            peer_addr = %peer_addr,
1382            "Accepted inbound onion connection"
1383        );
1384    }
1385}
1386
1387// ============================================================================
1388// Helpers
1389// ============================================================================
1390
1391/// Validate that a string is in host:port format.
1392fn validate_host_port(addr: &str, field_name: &str) -> Result<(), TransportError> {
1393    if addr.parse::<SocketAddr>().is_ok() {
1394        return Ok(());
1395    }
1396    // Not a raw IP:port — check it's at least host:port format
1397    let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
1398    if parts.len() != 2 || parts[0].parse::<u16>().is_err() || parts[1].is_empty() {
1399        return Err(TransportError::StartFailed(format!(
1400            "invalid {} '{}': expected host:port or IP:port",
1401            field_name, addr
1402        )));
1403    }
1404    Ok(())
1405}
1406
1407// ============================================================================
1408// Tests
1409// ============================================================================
1410
1411#[cfg(test)]
1412mod tests {
1413    use super::*;
1414    use crate::transport::packet_channel;
1415
1416    fn make_config() -> TorConfig {
1417        TorConfig {
1418            socks5_addr: Some("127.0.0.1:19050".to_string()),
1419            ..Default::default()
1420        }
1421    }
1422
1423    #[test]
1424    fn test_parse_tor_addr_onion() {
1425        let addr = TransportAddr::from_string("abcdef1234567890.onion:2121");
1426        let tor_addr = parse_tor_addr(&addr).unwrap();
1427        match tor_addr {
1428            TorAddr::Onion(host, port) => {
1429                assert_eq!(host, "abcdef1234567890.onion");
1430                assert_eq!(port, 2121);
1431            }
1432            _ => panic!("expected Onion variant"),
1433        }
1434    }
1435
1436    #[test]
1437    fn test_parse_tor_addr_clearnet() {
1438        let addr = TransportAddr::from_string("192.168.1.1:8080");
1439        let tor_addr = parse_tor_addr(&addr).unwrap();
1440        match tor_addr {
1441            TorAddr::Clearnet(socket_addr) => {
1442                assert_eq!(
1443                    socket_addr,
1444                    "192.168.1.1:8080".parse::<SocketAddr>().unwrap()
1445                );
1446            }
1447            _ => panic!("expected Clearnet variant"),
1448        }
1449    }
1450
1451    #[test]
1452    fn test_parse_tor_addr_clearnet_hostname() {
1453        let addr = TransportAddr::from_string("peer1.example.com:2121");
1454        let tor_addr = parse_tor_addr(&addr).unwrap();
1455        match tor_addr {
1456            TorAddr::ClearnetHostname(host, port) => {
1457                assert_eq!(host, "peer1.example.com");
1458                assert_eq!(port, 2121);
1459            }
1460            _ => panic!("expected ClearnetHostname variant"),
1461        }
1462    }
1463
1464    #[test]
1465    fn test_parse_tor_addr_invalid() {
1466        // Bare name without a dot — not a valid hostname
1467        let addr = TransportAddr::from_string("localhost:2121");
1468        assert!(parse_tor_addr(&addr).is_err());
1469
1470        // No port
1471        let addr = TransportAddr::from_string("not-a-valid-address");
1472        assert!(parse_tor_addr(&addr).is_err());
1473
1474        // Invalid port
1475        let addr = TransportAddr::from_string("example.com:notaport");
1476        assert!(parse_tor_addr(&addr).is_err());
1477    }
1478
1479    #[test]
1480    fn test_config_defaults() {
1481        let config = TorConfig::default();
1482        assert_eq!(config.mode(), "socks5");
1483        assert_eq!(config.socks5_addr(), "127.0.0.1:9050");
1484        assert_eq!(config.connect_timeout_ms(), 120000);
1485        assert_eq!(config.mtu(), 1400);
1486        assert_eq!(config.advertised_port(), 443);
1487    }
1488
1489    #[test]
1490    fn test_advertised_port_override() {
1491        let config = TorConfig {
1492            advertised_port: Some(9001),
1493            ..Default::default()
1494        };
1495        assert_eq!(config.advertised_port(), 9001);
1496    }
1497
1498    /// Pins the publisher/parser contract for Tor overlay adverts.
1499    /// `build_overlay_advert` formats Tor endpoints as `<onion>:<port>`;
1500    /// `parse_tor_addr` must accept that exact form back. A bare onion
1501    /// (no port) was the production bug — assert it does not parse.
1502    #[test]
1503    fn test_advert_address_round_trips_through_parser() {
1504        let onion = "mwvj6q3pnsiaky7i6wg5s42xlfurt5uqr3qzckrlw2graa2ugcgwhiqd.onion";
1505        let cfg = TorConfig::default();
1506        let advertised = format!("{}:{}", onion, cfg.advertised_port());
1507
1508        let parsed = parse_tor_addr(&TransportAddr::from_string(&advertised)).unwrap();
1509        match parsed {
1510            TorAddr::Onion(host, port) => {
1511                assert_eq!(host, onion);
1512                assert_eq!(port, 443);
1513            }
1514            other => panic!("expected Onion variant, got {:?}", other),
1515        }
1516
1517        // Sanity-check the inverse: the bare-onion form (the bug) must
1518        // not parse, so any future regression in the publisher will be
1519        // caught by the round-trip test above.
1520        assert!(parse_tor_addr(&TransportAddr::from_string(onion)).is_err());
1521    }
1522
1523    #[tokio::test]
1524    async fn test_start_stop() {
1525        let (tx, _rx) = packet_channel(32);
1526        let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1527
1528        transport.start_async().await.unwrap();
1529        assert_eq!(transport.state(), TransportState::Up);
1530
1531        transport.stop_async().await.unwrap();
1532        assert_eq!(transport.state(), TransportState::Down);
1533    }
1534
1535    #[tokio::test]
1536    async fn test_double_start_fails() {
1537        let (tx, _rx) = packet_channel(32);
1538        let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1539
1540        transport.start_async().await.unwrap();
1541        assert!(transport.start_async().await.is_err());
1542    }
1543
1544    #[tokio::test]
1545    async fn test_stop_not_started_fails() {
1546        let (tx, _rx) = packet_channel(32);
1547        let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1548
1549        assert!(transport.stop_async().await.is_err());
1550    }
1551
1552    #[tokio::test]
1553    async fn test_send_not_started() {
1554        let (tx, _rx) = packet_channel(32);
1555        let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1556
1557        let addr = TransportAddr::from_string("127.0.0.1:2121");
1558        let result = transport.send_async(&addr, &[0u8; 10]).await;
1559        assert!(result.is_err());
1560    }
1561
1562    #[test]
1563    fn test_transport_type() {
1564        let (tx, _rx) = packet_channel(32);
1565        let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1566
1567        let tt = transport.transport_type();
1568        assert_eq!(tt.name, "tor");
1569        assert!(tt.connection_oriented);
1570        assert!(tt.reliable);
1571    }
1572
1573    #[test]
1574    fn test_sync_methods_return_not_supported() {
1575        let (tx, _rx) = packet_channel(32);
1576        let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1577
1578        assert!(transport.start().is_err());
1579        assert!(transport.stop().is_err());
1580        let addr = TransportAddr::from_string("127.0.0.1:2121");
1581        assert!(transport.send(&addr, &[0u8; 10]).is_err());
1582    }
1583
1584    #[test]
1585    fn test_accept_connections_false() {
1586        let (tx, _rx) = packet_channel(32);
1587        let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1588
1589        assert!(!transport.accept_connections());
1590    }
1591
1592    #[test]
1593    fn test_discover_returns_empty() {
1594        let (tx, _rx) = packet_channel(32);
1595        let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
1596
1597        assert!(transport.discover().unwrap().is_empty());
1598    }
1599
1600    #[tokio::test]
1601    async fn test_invalid_socks5_addr_start_fails() {
1602        let (tx, _rx) = packet_channel(32);
1603        let config = TorConfig {
1604            socks5_addr: Some("not-a-socket-addr".to_string()),
1605            ..Default::default()
1606        };
1607        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1608        assert!(transport.start_async().await.is_err());
1609    }
1610
1611    #[tokio::test]
1612    async fn test_unsupported_mode_start_fails() {
1613        let (tx, _rx) = packet_channel(32);
1614        let config = TorConfig {
1615            mode: Some("embedded".to_string()),
1616            socks5_addr: Some("127.0.0.1:9050".to_string()),
1617            ..Default::default()
1618        };
1619        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1620        assert!(transport.start_async().await.is_err());
1621    }
1622
1623    // ========================================================================
1624    // Integration tests using MockSocks5Server
1625    // ========================================================================
1626
1627    use crate::config::TcpConfig;
1628    use crate::transport::tcp::TcpTransport;
1629    use mock_socks5::MockSocks5Server;
1630
1631    /// msg1 wire size: 4 prefix + 4 sender_idx + 106 noise_msg1 = 114 bytes.
1632    const MSG1_WIRE_SIZE: usize = 114;
1633    /// msg1 payload_len: sender_idx(4) + noise_msg1(106) = 110.
1634    const MSG1_PAYLOAD_LEN: u16 = (MSG1_WIRE_SIZE - 4) as u16;
1635
1636    /// Build a msg1 frame (114 bytes) for testing.
1637    fn build_msg1_frame() -> Vec<u8> {
1638        let mut frame = vec![0xAA; MSG1_WIRE_SIZE];
1639        frame[0] = 0x01; // ver=0, phase=1
1640        frame[1] = 0x00; // flags
1641        frame[2..4].copy_from_slice(&MSG1_PAYLOAD_LEN.to_le_bytes());
1642        frame
1643    }
1644
1645    #[tokio::test]
1646    async fn test_send_recv_via_socks5() {
1647        // Set up a TCP transport as the "destination" with a listener
1648        let (dest_tx, mut dest_rx) = packet_channel(32);
1649        let dest_config = TcpConfig {
1650            bind_addr: Some("127.0.0.1:0".to_string()),
1651            ..Default::default()
1652        };
1653        let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
1654        dest.start_async().await.unwrap();
1655        let dest_addr = dest.local_addr().unwrap();
1656
1657        // Set up the mock SOCKS5 proxy pointing at the destination
1658        let mock = MockSocks5Server::new(dest_addr).await.unwrap();
1659        let proxy_addr = mock.addr();
1660        let _proxy_handle = mock.spawn();
1661
1662        // Set up the Tor transport pointing at the mock proxy
1663        let (tor_tx, _tor_rx) = packet_channel(32);
1664        let tor_config = TorConfig {
1665            socks5_addr: Some(proxy_addr.to_string()),
1666            ..Default::default()
1667        };
1668        let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
1669        tor.start_async().await.unwrap();
1670
1671        // Send a valid FMP frame (msg1) through the Tor transport
1672        let frame = build_msg1_frame();
1673        let target = TransportAddr::from_string(&dest_addr.to_string());
1674        tor.send_async(&target, &frame).await.unwrap();
1675
1676        // Receive it on the destination
1677        let received = tokio::time::timeout(Duration::from_secs(5), dest_rx.recv())
1678            .await
1679            .expect("timeout waiting for packet")
1680            .expect("channel closed");
1681
1682        assert_eq!(received.data, frame);
1683
1684        // Clean up
1685        tor.stop_async().await.unwrap();
1686        dest.stop_async().await.unwrap();
1687    }
1688
1689    #[tokio::test]
1690    async fn test_socks5_proxy_down() {
1691        // No SOCKS5 server running on this port
1692        let (tx, _rx) = packet_channel(32);
1693        let config = TorConfig {
1694            socks5_addr: Some("127.0.0.1:19999".to_string()),
1695            connect_timeout_ms: Some(2000),
1696            ..Default::default()
1697        };
1698        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1699        transport.start_async().await.unwrap();
1700
1701        let addr = TransportAddr::from_string("192.168.1.1:2121");
1702        let result = transport.send_async(&addr, &build_msg1_frame()).await;
1703        assert!(result.is_err());
1704    }
1705
1706    #[tokio::test]
1707    async fn test_connect_timeout() {
1708        // Use a non-routable address as the SOCKS5 proxy to trigger timeout
1709        let (tx, _rx) = packet_channel(32);
1710        let config = TorConfig {
1711            // 192.0.2.1 is TEST-NET, should be non-routable and timeout
1712            socks5_addr: Some("192.0.2.1:9050".to_string()),
1713            connect_timeout_ms: Some(500),
1714            ..Default::default()
1715        };
1716        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1717        transport.start_async().await.unwrap();
1718
1719        let addr = TransportAddr::from_string("10.0.0.1:2121");
1720        let result = transport.send_async(&addr, &build_msg1_frame()).await;
1721        assert!(result.is_err());
1722    }
1723
1724    #[tokio::test]
1725    async fn test_close_connection() {
1726        // Set up destination + mock proxy
1727        let (dest_tx, _dest_rx) = packet_channel(32);
1728        let dest_config = TcpConfig {
1729            bind_addr: Some("127.0.0.1:0".to_string()),
1730            ..Default::default()
1731        };
1732        let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
1733        dest.start_async().await.unwrap();
1734        let dest_addr = dest.local_addr().unwrap();
1735
1736        let mock = MockSocks5Server::new(dest_addr).await.unwrap();
1737        let proxy_addr = mock.addr();
1738        let _proxy_handle = mock.spawn();
1739
1740        let (tor_tx, _tor_rx) = packet_channel(32);
1741        let tor_config = TorConfig {
1742            socks5_addr: Some(proxy_addr.to_string()),
1743            ..Default::default()
1744        };
1745        let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
1746        tor.start_async().await.unwrap();
1747
1748        // Send to establish a connection
1749        let target = TransportAddr::from_string(&dest_addr.to_string());
1750        tor.send_async(&target, &build_msg1_frame()).await.unwrap();
1751
1752        // Verify pool has the connection
1753        {
1754            let pool = tor.pool.lock().await;
1755            assert_eq!(pool.len(), 1);
1756        }
1757        assert_eq!(tor.stats.snapshot().pool_outbound, 1);
1758        assert_eq!(tor.stats.snapshot().pool_inbound, 0);
1759
1760        // Close the connection
1761        tor.close_connection_async(&target).await;
1762
1763        // Verify pool is empty
1764        {
1765            let pool = tor.pool.lock().await;
1766            assert_eq!(pool.len(), 0);
1767        }
1768        assert_eq!(tor.stats.snapshot().pool_outbound, 0);
1769
1770        tor.stop_async().await.unwrap();
1771        dest.stop_async().await.unwrap();
1772    }
1773
1774    // ========================================================================
1775    // Control port mode tests
1776    // ========================================================================
1777
1778    use mock_control::MockTorControlServer;
1779
1780    #[tokio::test]
1781    async fn test_control_port_start_stop() {
1782        let mock = MockTorControlServer::start().await;
1783        let (tx, _rx) = packet_channel(32);
1784
1785        let config = TorConfig {
1786            mode: Some("control_port".to_string()),
1787            socks5_addr: Some("127.0.0.1:19050".to_string()),
1788            control_addr: Some(mock.addr().to_string()),
1789            control_auth: Some("password:testpass".to_string()),
1790            ..Default::default()
1791        };
1792        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1793
1794        transport.start_async().await.unwrap();
1795        assert_eq!(transport.state(), TransportState::Up);
1796        assert!(transport.onion_address().is_none());
1797        assert!(!transport.accept_connections());
1798
1799        transport.stop_async().await.unwrap();
1800    }
1801
1802    #[tokio::test]
1803    async fn test_config_defaults_phase2() {
1804        let config = TorConfig::default();
1805        assert_eq!(config.control_addr(), "/run/tor/control");
1806        assert_eq!(config.control_auth(), "cookie");
1807        assert_eq!(config.cookie_path(), "/var/run/tor/control.authcookie");
1808        assert_eq!(config.max_inbound_connections(), 64);
1809    }
1810
1811    // ========================================================================
1812    // Directory mode tests
1813    // ========================================================================
1814
1815    use crate::config::DirectoryServiceConfig;
1816    use tempfile::TempDir;
1817
1818    #[test]
1819    fn test_directory_service_config_defaults() {
1820        let config = DirectoryServiceConfig::default();
1821        assert_eq!(
1822            config.hostname_file(),
1823            "/var/lib/tor/fips_onion_service/hostname"
1824        );
1825        assert_eq!(config.bind_addr(), "127.0.0.1:8443");
1826    }
1827
1828    #[tokio::test]
1829    async fn test_directory_mode_start_stop() {
1830        let dir = TempDir::new().unwrap();
1831        let hostname_path = dir.path().join("hostname");
1832        std::fs::write(
1833            &hostname_path,
1834            "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
1835        )
1836        .unwrap();
1837
1838        let (tx, _rx) = packet_channel(32);
1839        let config = TorConfig {
1840            mode: Some("directory".to_string()),
1841            socks5_addr: Some("127.0.0.1:19050".to_string()),
1842            directory_service: Some(DirectoryServiceConfig {
1843                hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1844                bind_addr: Some("127.0.0.1:0".to_string()),
1845            }),
1846            ..Default::default()
1847        };
1848        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1849
1850        transport.start_async().await.unwrap();
1851        assert_eq!(transport.state(), TransportState::Up);
1852        assert_eq!(
1853            transport.onion_address(),
1854            Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion"),
1855        );
1856        assert!(transport.accept_connections());
1857
1858        transport.stop_async().await.unwrap();
1859        assert_eq!(transport.state(), TransportState::Down);
1860    }
1861
1862    #[tokio::test]
1863    async fn test_directory_mode_missing_hostname_file() {
1864        let (tx, _rx) = packet_channel(32);
1865        let config = TorConfig {
1866            mode: Some("directory".to_string()),
1867            socks5_addr: Some("127.0.0.1:19050".to_string()),
1868            directory_service: Some(DirectoryServiceConfig {
1869                hostname_file: Some("/nonexistent/hostname".to_string()),
1870                bind_addr: Some("127.0.0.1:0".to_string()),
1871            }),
1872            ..Default::default()
1873        };
1874        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1875
1876        let result = transport.start_async().await;
1877        assert!(result.is_err());
1878        let err = format!("{}", result.unwrap_err());
1879        assert!(err.contains("hostname"));
1880    }
1881
1882    #[tokio::test]
1883    async fn test_directory_mode_invalid_hostname() {
1884        let dir = TempDir::new().unwrap();
1885        let hostname_path = dir.path().join("hostname");
1886        std::fs::write(&hostname_path, "not-an-onion-address\n").unwrap();
1887
1888        let (tx, _rx) = packet_channel(32);
1889        let config = TorConfig {
1890            mode: Some("directory".to_string()),
1891            socks5_addr: Some("127.0.0.1:19050".to_string()),
1892            directory_service: Some(DirectoryServiceConfig {
1893                hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1894                bind_addr: Some("127.0.0.1:0".to_string()),
1895            }),
1896            ..Default::default()
1897        };
1898        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1899
1900        let result = transport.start_async().await;
1901        assert!(result.is_err());
1902        let err = format!("{}", result.unwrap_err());
1903        assert!(err.contains("invalid onion address"));
1904    }
1905
1906    #[tokio::test]
1907    async fn test_directory_mode_accept_inbound() {
1908        let dir = TempDir::new().unwrap();
1909        let hostname_path = dir.path().join("hostname");
1910        std::fs::write(
1911            &hostname_path,
1912            "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
1913        )
1914        .unwrap();
1915
1916        let (tx, _rx) = packet_channel(32);
1917        let config = TorConfig {
1918            mode: Some("directory".to_string()),
1919            socks5_addr: Some("127.0.0.1:19050".to_string()),
1920            directory_service: Some(DirectoryServiceConfig {
1921                hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
1922                bind_addr: Some("127.0.0.1:0".to_string()),
1923            }),
1924            ..Default::default()
1925        };
1926        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1927        transport.start_async().await.unwrap();
1928        assert!(transport.accept_connections());
1929
1930        transport.stop_async().await.unwrap();
1931    }
1932
1933    #[tokio::test]
1934    async fn test_socks5_mode_rejects_directory_service_config() {
1935        let (tx, _rx) = packet_channel(32);
1936        let config = TorConfig {
1937            mode: Some("socks5".to_string()),
1938            socks5_addr: Some("127.0.0.1:9050".to_string()),
1939            directory_service: Some(DirectoryServiceConfig::default()),
1940            ..Default::default()
1941        };
1942        let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
1943        let result = transport.start_async().await;
1944        assert!(result.is_err());
1945        let err = format!("{}", result.unwrap_err());
1946        assert!(err.contains("directory"));
1947    }
1948}