Skip to main content

lyrebird/
lib.rs

1//! # Lyrebird - Pluggable Transport Proxy Applications
2//!
3//! This crate provides a PT-manager loop usable as a library
4//! (`lyrebird::run()`) or as a standalone binary that implements the
5//! Tor pluggable-transport spec on top of the [`ptrs`] interface.
6//!
7//! **Stability**: the public API is unstable and subject to change
8//! without notice; do not rely on this crate for security-critical
9//! applications. The server-side path is incomplete and gated behind
10//! the `experimental-server` cargo feature — it must not be enabled in
11//! production.
12//!
13//! ## Lyrebird Pluggable Transport Bridge
14//!
15//! ['lyrebird'] provides an executable program designed to manage the calling
16//! interface used by the Tor libraries when launching pluggable transports (see `pt-spec.txt`).
17//!
18//! `... [tor_client] <---> [pt_client] <====> [pt_bridge] <---> [tor_orport] ...`
19//!
20//! Usage info:
21//!
22//! ```txt
23//! Tunnel Tor SOCKS5 traffic through pluggable transport connections
24//!
25//! Usage: lyrebird [OPTIONS]
26//!
27//! Options:
28//!       --enable-logging         Log to {TOR_PT_STATE_LOCATION}/obfs4proxy.log
29//!       --log-level <LOG_LEVEL>  Log Level (ERROR/WARN/INFO/DEBUG/TRACE) [default: ERROR]
30//!       --unsafe-logging         Disable the address scrubber on logging
31//!   -h, --help                   Print help
32//!   -V, --version                Print version
33//! ```
34//!
35//! ### Installation
36//!
37//! To install:
38//!
39//! `cargo install lyrebird`
40//!
41//! This installs in the configured Rust location (i.e. `$HOME/.cargo/bin`). You may
42//! wish to copy `./lyrebird` to a permanent location (e.g. `/usr/local/bin`).
43//!
44//! Client side torrc configuration:
45//! ```text
46//! ClientTransportPlugin obfs4 exec /usr/local/bin/lyrebird
47//! ```
48//!
49//! Bridge side torrc configuration:
50//! ```text
51//! # Act as a bridge relay.
52//! BridgeRelay 1
53//!
54//! # Enable the Extended ORPort
55//! ExtORPort auto
56//!
57//! # Use lyrebird to provide the obfs4 protocol.
58//! ServerTransportPlugin obfs4 exec /usr/local/bin/lyrebird
59//!
60//! # (Optional) Listen on the specified address/port for obfs4 connections as
61//! # opposed to picking a port automatically.
62//! #ServerTransportListenAddr obfs4 0.0.0.0:443
63//! ```
64//!
65//! ### Tips and tricks
66//!
67//!  * On modern Linux systems it is possible to have lyrebird bind to reserved
68//!    ports (<=1024) even when not running as root by granting the
69//!    `CAP_NET_BIND_SERVICE` capability with setcap:
70//!
71//!    `# setcap 'cap_net_bind_service=+ep' /usr/local/bin/lyrebird`
72//!
73//!  * The autogenerated obfs4 bridge parameters are placed in
74//!    `DataDir/pt_state/obfs4_state.json`.  To ease deployment, the client side
75//!    bridge line is written to `DataDir/pt_state/obfs4_bridgeline.txt`.
76//!
77// Follow-up work tracked in the issue tracker, not in rustdoc:
78//   - tunnel-level metrics (failures, byte counters).
79//   - rate-limiting for the bidirectional copy.
80//   - replace `tokio::io::copy_bidirectional` with an interactive copy
81//     that flushes only when the reader stalls.
82
83#![allow(unused, dead_code)]
84#![deny(missing_docs)]
85
86use obfs4::{ClientBuilder, Obfs4PT};
87use ptrs::{error, info, warn};
88use ptrs::{
89    ClientBuilder as _, ClientTransport, PluggableTransport, ServerBuilder, ServerTransport,
90};
91
92use anyhow::{anyhow, Context, Result};
93use clap::Parser;
94use fast_socks5::{
95    server::{DenyAuthentication, SimpleUserPassword},
96    util::target_addr::TargetAddr,
97    AuthenticationMethod,
98};
99use safelog::sensitive;
100use tokio::task::JoinSet;
101use tokio::{
102    io::{copy_bidirectional, AsyncRead, AsyncWrite, AsyncWriteExt},
103    net::{TcpListener, TcpStream},
104    sync::oneshot,
105};
106use tokio_util::sync::CancellationToken;
107use tracing::Level;
108use tracing_subscriber::{filter::LevelFilter, prelude::*};
109
110use std::{env, net::SocketAddr, pin::Pin, str::FromStr, sync::Arc};
111
112/// Maximum number of concurrently handled client connections.
113/// Bounds memory/task growth under connection floods (DoS mitigation).
114const MAX_CONCURRENT_CONNS: usize = 1024;
115
116/// Client Socks address to listen on.
117const CLIENT_SOCKS_ADDR: &str = "127.0.0.1:0";
118
119/// Error defined to denote a failure to get the bridge line
120#[derive(Debug, thiserror::Error)]
121#[error("Error while obtaining bridge line data")]
122struct BridgeLineParseError;
123
124#[derive(Parser)]
125#[command(author, version, long_about = None, about="Tunnel Tor SOCKS5 traffic through pluggable transport connections")]
126struct Args {
127    /// Log to {TOR_PT_STATE_LOCATION}/obfs4proxy.log
128    #[arg(long, default_value_t = false)]
129    enable_logging: bool,
130
131    /// Log Level (ERROR/WARN/INFO/DEBUG/TRACE)
132    #[arg(long, default_value_t=String::from("ERROR"))]
133    log_level: String,
134
135    /// Disable the address scrubber on logging
136    #[arg(long, default_value_t = false)]
137    unsafe_logging: bool,
138}
139
140/// initialize the logging receiver(s) for things to be logged into using the
141/// tracing / tracing_subscriber libraries
142// TODO: GeoIP. Json for file log writer.
143fn init_logging_recvr(
144    enable: bool,
145    should_scrub: bool,
146    level_str: &str,
147    statedir: &str,
148) -> Result<()> {
149    if should_scrub {
150        safelog::enforce_safe_logging();
151    } else {
152        safelog::disable_safe_logging();
153    }
154
155    // The PT protocol owns stdout for the parent ↔ PT control channel.
156    // Logs go to stderr; the file layer below (when enabled) writes
157    // separately into the state directory.
158    //
159    // Compact (single-line) layout matches the parent process's
160    // tracing-subscriber default — we share a terminal with it when
161    // launched via the busybox dispatch, so consistent formatting
162    // matters more than the pretty multi-line output. Per-target
163    // filter mutes the chatty `fast_socks5` connection log; arti is
164    // the one bashing through dozens of PT connections during
165    // bootstrap and the per-conn lines drown the relevant signal.
166    let console_filter =
167        tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
168            tracing_subscriber::EnvFilter::new(
169                "info,fast_socks5=warn,obfs4::sessions=warn,lyrebird::handshake=info",
170            )
171        });
172    let console_layer = tracing_subscriber::fmt::layer()
173        .with_writer(std::io::stderr)
174        .with_filter(console_filter);
175
176    let log_layers = if enable {
177        let level = Level::from_str(level_str)?;
178
179        let file = std::fs::File::create(format!("{statedir}/obfs4proxy.log"))?;
180
181        let state_dir_layer = tracing_subscriber::fmt::layer()
182            .with_writer(file)
183            .with_filter(LevelFilter::from_level(level));
184
185        console_layer.and_then(state_dir_layer).boxed()
186    } else {
187        console_layer.boxed()
188    };
189
190    tracing_subscriber::registry().with(log_layers).init();
191
192    Ok(())
193}
194
195/// Resolve a `fast_socks5::util::TargetAddr` to a concrete `SocketAddr`.
196/// `Ip(_)` variants pass through; `Domain` variants always fail because
197/// the PT spec forbids the transport from doing DNS — that's the calling
198/// Tor client's responsibility.
199pub fn resolve_target_addr(addr: &TargetAddr) -> Result<SocketAddr> {
200    match addr {
201        TargetAddr::Ip(sa) => Ok(*sa),
202        TargetAddr::Domain(_, _) => {
203            // this will always fail because ptrs does not do dns lookups.
204            ptrs::resolve_addr(format!("{addr}")).context("domain resolution failed")
205        }
206    }
207}
208
209/// Run the lyrebird pluggable transport. Expects the standard
210/// `TOR_PT_*` environment variables set by the parent process (arti /
211/// tor) and speaks the PT-managed-transport protocol on stdin/stdout.
212///
213/// # Cancel safety
214///
215/// This function is **not cancel-safe**. It manages long-lived server
216/// state and open connections. Dropping the future will abandon all
217/// active tunnels without graceful shutdown.
218pub async fn run() -> Result<()> {
219    let args = Args::parse();
220
221    // Make state directory
222    let statedir = ptrs::make_state_dir()?;
223
224    // launch tracing subscriber with filter level
225    init_logging_recvr(
226        args.enable_logging,
227        !args.unsafe_logging,
228        &args.log_level,
229        &statedir,
230    )?;
231
232    let cancel_token = tokio_util::sync::CancellationToken::new();
233
234    // launch runners
235    let mut exit_rx = if ptrs::is_client()? {
236        // running as CLIENT
237        client_setup(&statedir, cancel_token.clone()).await?
238    } else {
239        // running as SERVER
240        //
241        // Server-side is gated behind `experimental-server` because the PT
242        // handshake is not yet wired and the ExtORPort dial is missing.
243        // Without the feature we refuse to start rather than silently
244        // exposing an unauthenticated proxy.
245        #[cfg(feature = "experimental-server")]
246        {
247            server_setup(&statedir, cancel_token.clone()).await?
248        }
249        #[cfg(not(feature = "experimental-server"))]
250        {
251            let _ = &statedir;
252            let _ = &cancel_token;
253            return Err(anyhow!(
254                "lyrebird server-side is not implemented; rebuild with \
255                 `--features experimental-server` for development use only"
256            ));
257        }
258    };
259
260    info!("accepting connections");
261
262    // At this point, the pt config protocol is finished, and incoming
263    // connections will be processed.  Wait till the parent dies
264    // (immediate exit), a SIGTERM / Ctrl+Break is received (immediate
265    // exit), or a SIGINT / Ctrl+C is received.
266    tokio::select! {
267        _ = &mut exit_rx => {
268            info!("proxy closed");
269            return Ok(())
270        }
271        sig = shutdown_signal() => {
272            if sig.is_terminate() {
273                info!("proxy terminated");
274                return Ok(())
275            }
276            info!("received interrupt, shutting down");
277            cancel_token.cancel();
278        }
279    }
280
281    // Ok, it was the first interrupt, close all listeners, and wait till
282    // the parent dies, all current connections are closed, or either
283    // a second interrupt/terminate is received.
284    tokio::select! {
285        _ = exit_rx => {}
286        _ = shutdown_signal() => {}
287    }
288
289    Ok(())
290}
291
292/// Cross-platform shutdown-signal helper. On Unix maps `SIGTERM` →
293/// `Terminate`, `SIGINT` → `Interrupt`. On Windows uses
294/// `Ctrl+Break` → `Terminate`, `Ctrl+C` → `Interrupt`.
295#[derive(Clone, Copy)]
296enum Shutdown {
297    Interrupt,
298    Terminate,
299}
300
301impl Shutdown {
302    fn is_terminate(self) -> bool {
303        matches!(self, Shutdown::Terminate)
304    }
305}
306
307#[cfg(unix)]
308async fn shutdown_signal() -> Shutdown {
309    use tokio::signal::unix::{signal, SignalKind};
310    let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
311    let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
312    tokio::select! {
313        _ = sigterm.recv() => Shutdown::Terminate,
314        _ = sigint.recv() => Shutdown::Interrupt,
315    }
316}
317
318#[cfg(not(unix))]
319async fn shutdown_signal() -> Shutdown {
320    use tokio::signal::windows::{ctrl_break, ctrl_c};
321    let mut c_c = ctrl_c().expect("install Ctrl+C handler");
322    let mut c_break = ctrl_break().expect("install Ctrl+Break handler");
323    tokio::select! {
324        _ = c_break.recv() => Shutdown::Terminate,
325        _ = c_c.recv() => Shutdown::Interrupt,
326    }
327}
328
329// ================================================================ //
330//                            Client                                //
331// ================================================================ //
332
333async fn client_setup(
334    statedir: &str,
335    cancel_token: CancellationToken,
336) -> Result<oneshot::Receiver<bool>> {
337    let obfs4_name = Obfs4PT::name();
338    let webtunnel_name = webtunnel::WEBTUNNEL_NAME.to_string();
339    let client_pt_info = ptrs::ClientInfo::new()?;
340    // TOR_PT_PROXY is optional. The downstream code never reads this
341    // value (see `client_handle_connection`'s `_proxy_uri`), so we
342    // substitute a placeholder URL when tor did not supply one.
343    let proxy_uri = client_pt_info
344        .uri
345        .unwrap_or_else(|| url::Url::parse("data:,").expect("placeholder url"));
346    let (tx, rx) = oneshot::channel::<bool>();
347
348    // PT spec §3.3.3: announce our protocol version to the parent.
349    pt_proto::print_version();
350
351    let mut listeners: Vec<
352        std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>,
353    > = Vec::new();
354
355    for name in client_pt_info.methods {
356        info!(name);
357
358        if name == obfs4_name {
359            let builder = Obfs4PT::client_builder();
360            let listener = tokio::net::TcpListener::bind(CLIENT_SOCKS_ADDR).await?;
361            let local_addr = listener.local_addr()?;
362            pt_proto::print_cmethod(&name, "socks5", local_addr);
363            listeners.push(Box::pin(client_accept_loop(
364                listener,
365                builder,
366                proxy_uri.clone(),
367                cancel_token.clone(),
368            )));
369        } else if name == webtunnel_name {
370            let builder = webtunnel::WebTunnelBuilder::default();
371            let listener = tokio::net::TcpListener::bind(CLIENT_SOCKS_ADDR).await?;
372            let local_addr = listener.local_addr()?;
373            pt_proto::print_cmethod(&name, "socks5", local_addr);
374            listeners.push(Box::pin(client_accept_loop(
375                listener,
376                builder,
377                proxy_uri.clone(),
378                cancel_token.clone(),
379            )));
380        } else {
381            pt_proto::print_cmethod_error(&name, "no such transport is supported");
382            warn!("no such transport is supported");
383            continue;
384        }
385    }
386
387    // PT spec §3.3.3: end of method announcements.
388    pt_proto::print_cmethods_done();
389
390    // spawn a task that runs and monitors the progress of the listeners.
391    tokio::spawn(async move {
392        let total_len = listeners.len();
393        let mut running = total_len;
394
395        // launch all listener futures
396        let mut pt_set = JoinSet::new();
397        for fut in listeners {
398            pt_set.spawn(fut);
399        }
400
401        // if any of the listeners exit, handle it
402        while let Some(res) = pt_set.join_next().await {
403            running -= 1;
404            if let Err(e) = res {
405                warn!("listener failed: {e}");
406            }
407            info!("{running}/{total_len} listeners running");
408        }
409
410        // if all listeners exit then we can send the tx signal.
411        // Best-effort: the receiver may already be dropped if the
412        // parent select! moved on (e.g. signal-driven shutdown).
413        let _ = tx.send(true);
414    });
415
416    Ok(rx)
417}
418
419async fn client_accept_loop<C>(
420    listener: TcpListener,
421    builder: impl ptrs::ClientBuilder<TcpStream, ClientPT = C> + Send + 'static,
422    proxy_uri: url::Url,
423    cancel_token: CancellationToken,
424) -> Result<()>
425where
426    // the provided client builder should build the C ClientTransport.
427    C: ptrs::ClientTransport<TcpStream, std::io::Error> + Send + 'static,
428{
429    let pt_name = C::method_name();
430    let sem = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CONNS));
431    loop {
432        tokio::select! {
433            _ = cancel_token.cancelled() => {
434                info!("{pt_name} received shutdown signal");
435                break; // exit the loop so graceful shutdown actually stops accepting
436            }
437            res = listener.accept() => {
438                let (conn, client_addr) = match res {
439                    Err(e) => {
440                        error!("failed to accept tcp connection {e}");
441                        break;
442                    }
443                    Ok(c) => c,
444                };
445                // Acquire a concurrency permit before spawning, bounding
446                // the number of in-flight connection tasks (DoS mitigation).
447                let permit = match Arc::clone(&sem).acquire_owned().await {
448                    Ok(p) => p,
449                    Err(_) => break, // semaphore closed — shutting down
450                };
451                let builder_clone = builder.clone();
452                let proxy_clone = proxy_uri.clone();
453                tokio::spawn(async move {
454                    let _permit = permit; // held for the task's lifetime, freed on drop
455                    let _ = client_handle_connection(conn, builder_clone, proxy_clone, client_addr).await;
456                });
457            }
458        }
459    }
460
461    Ok(())
462}
463
464/// This function assumes that the provided connection / socket manages reconstruction
465/// and reliability before passing to this layer.
466async fn client_handle_connection<In, C, B>(
467    conn: In,
468    mut builder: B,
469    _proxy_uri: url::Url,
470    client_addr: SocketAddr,
471) -> Result<()>
472where
473    // the provided T must be usable as a connection in an async context
474    In: AsyncRead + AsyncWrite + Send + Unpin,
475    // the provided client builder should build the C ClientTransport.
476    C: ptrs::ClientTransport<TcpStream, std::io::Error> + Send,
477    B: ptrs::ClientBuilder<TcpStream, ClientPT = C>,
478{
479    // PT-spec §3.5 requires the parent to negotiate USERNAME/PASSWORD
480    // (auth method 0x02) and pack the per-bridge PT arguments into the
481    // UNAME/PASSWD fields. We accept any creds (and also accept clients
482    // that legitimately want NO_AUTH) — see `PtArgsAuth` below.
483    let mut config =
484        fast_socks5::server::Config::<fast_socks5::server::DenyAuthentication>::default()
485            .with_authentication(PtArgsAuth);
486    config.set_allow_no_auth(true);
487    // Critical: we are a pluggable transport, not a plain TCP proxy. With
488    // the default `execute_command = true`, `upgrade_to_socks5()` would
489    // itself open a *plain* TCP connection to the bridge, send the SOCKS
490    // reply, and proxy the parent against that — bypassing obfs4 entirely
491    // and consuming the parent socket. We must only *read* the request
492    // here and do the obfs4 dial + reply ourselves below.
493    config.set_execute_command(false);
494    let socks5_conn = fast_socks5::server::Socks5Socket::new(conn, Arc::new(config));
495
496    let mut socks5_conn = socks5_conn.upgrade_to_socks5().await?;
497    let creds = socks5_conn.take_credentials();
498
499    let target_addr = socks5_conn
500        .target_addr()
501        .ok_or(BridgeLineParseError)
502        .context("missing remote address in request")?;
503
504    // Reconstruct the PT-spec argument string from SOCKS5 user/pass and
505    // hand it to the transport's `options(&Args)` so the obfs4
506    // ClientBuilder picks up `cert=...` / `iat-mode=...` etc.
507    let arg_string = arg_string_from_creds(creds);
508    let args = if arg_string.is_empty() {
509        ptrs::args::Args::default()
510    } else {
511        ptrs::args::Args::from_str(&arg_string).context("parsing PT arg string")?
512    };
513    <B as ptrs::ClientBuilder<TcpStream>>::options(&mut builder, &args)
514        .map_err(|e| anyhow::anyhow!("applying PT args to builder: {e}"))?;
515
516    let remote_addr = resolve_target_addr(target_addr).context("no remote address")?;
517
518    // The outgoing OR-port dial. Boxing it as a `FutureResult` is the seam
519    // exercised by tests: `establish_pt_conn` only ever sees a pinned
520    // future yielding the underlying socket, so an in-memory
521    // `tokio::io::duplex()` half can stand in for the real `TcpStream`.
522    let remote: Pin<ptrs::FutureResult<TcpStream, std::io::Error>> =
523        Box::pin(tokio::net::TcpStream::connect(remote_addr));
524
525    // build the pluggable transport client and then dial, completing the
526    // connection and handshake when the future is await-ed.
527    let pt_client = builder.build();
528    let mut pt_conn = establish_pt_conn(pt_client, remote, client_addr).await?;
529
530    // The obfs4 tunnel to the bridge is up, so report CONNECT success to
531    // the PT parent (arti/tor). Because we disabled `execute_command`,
532    // fast-socks5 has NOT sent any reply — we must. This is the canonical
533    // SOCKS5 success frame: VER=5, REP=0 (succeeded), RSV=0, ATYP=1
534    // (IPv4), BND.ADDR=0.0.0.0, BND.PORT=0. The parent ignores BND.*.
535    let mut parent = socks5_conn.into_inner();
536    parent
537        .write_all(&[0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
538        .await
539        .context("writing SOCKS5 success reply to PT parent")?;
540    parent
541        .flush()
542        .await
543        .context("flushing SOCKS5 success reply")?;
544
545    if let Err(e) = copy_bidirectional(&mut parent, &mut pt_conn).await {
546        warn!(
547            addres = sensitive(client_addr).to_string(),
548            "tunnel closed with error: {e:#?}"
549        );
550    }
551    Ok(())
552}
553
554/// Drive a built pluggable-transport client to completion against a dial
555/// future, returning the wrapped tunnel stream.
556///
557/// This is the outgoing-connection seam carved out of
558/// [`client_handle_connection`]. The dial is supplied as an already-pinned
559/// [`ptrs::FutureResult`] (the same shape `establish` consumes) rather than
560/// being created inline with `TcpStream::connect`, so tests can inject an
561/// in-memory `tokio::io::duplex()` half — or a future that fails — and
562/// observe the handshake behaviour without touching the network.
563///
564/// On handshake failure the client address is logged (scrubbed) and a typed
565/// error is returned; the function never panics on a dial or handshake
566/// error reachable from the network.
567///
568/// # Cancel safety
569///
570/// cancel-safe: NO — `establish` runs the obfs4/webtunnel handshake, which
571/// writes to the underlying socket. Dropping this future mid-handshake can
572/// leave the peer expecting bytes that were never sent. The caller
573/// (`client_handle_connection`) runs inside a dedicated `tokio::spawn` task,
574/// so it is not composed under a `select!`/`timeout` cancellation point.
575pub(crate) async fn establish_pt_conn<In2, C2>(
576    pt_client: C2,
577    dial: Pin<ptrs::FutureResult<In2, std::io::Error>>,
578    client_addr: SocketAddr,
579) -> Result<C2::OutRW>
580where
581    // The dialed socket must be usable as an async connection. These are the
582    // same bounds the transport traits require of their `InRW` parameter.
583    In2: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
584    C2: ptrs::ClientTransport<In2, std::io::Error> + Send,
585{
586    match ptrs::ClientTransport::<In2, std::io::Error>::establish(pt_client, dial).await {
587        Err(e) => {
588            warn!(
589                address = sensitive(client_addr).to_string(),
590                "handshake failed: {e:#?}"
591            );
592            Err(obfs4::Error::from(e.to_string())).context("handshake failed")
593        }
594        Ok(c) => Ok(c),
595    }
596}
597
598/// This function assumes that the provided connection / socket manages reconstruction
599/// and reliability before passing to this layer.
600///
601/// proxy_uri (currently unused) is meant to indicate that the outgoing connection
602/// should be made through _another_ proxy based on the proxy uri. This is relatively
603/// easy in golang, but I am not sure how easy it will be here. I believe this is
604/// a rather uncommon option so it is left unimplemented for now.
605async fn client_handle_connection_clientpt<In, C>(
606    conn: In,
607    pt_client: C,
608    _proxy_uri: url::Url,
609    client_addr: SocketAddr,
610) -> Result<()>
611where
612    // the provided T must be usable as a connection in an async context
613    In: AsyncRead + AsyncWrite + Send + Unpin,
614    // the provided B must implement the Client Builder interface for T
615    C: ptrs::ClientTransport<TcpStream, std::io::Error>,
616{
617    let mut config: fast_socks5::server::Config<SimpleUserPassword> =
618        fast_socks5::server::Config::default();
619    // config.set_skip_auth(true);
620    let mut socks5_conn = fast_socks5::server::Socks5Socket::new(conn, Arc::new(config));
621
622    // let mut socks5_conn = socks5_conn.upgrade_to_socks5().await?;
623    let target_addr = socks5_conn
624        .target_addr()
625        .ok_or(BridgeLineParseError)
626        .context("missing remote address in request")?;
627
628    // TODO: get args from the socks request username:Password if it exists.
629    // This seems non-trivial to match against the golang obfs4 implementation.
630    // Maybe implement my own thing that implements the `Authenticate` trait?
631    // Maybe work with the tor_socksproto package?
632    //
633    // Pluggable transports use the username/password field to pass
634    // per-connection arguments.  The fields contain ASCII strings that
635    // are combined and then parsed into key/value pairs.
636    // argStr := string(uname)
637    // if !(plen == 1 && passwd[0] == 0x00) {
638    let args: Option<ptrs::args::Args> = match socks5_conn.auth() {
639        AuthenticationMethod::Password { username, password } => {
640            if username.is_empty() {
641                socks5_conn.flush().await?;
642                socks5_conn.shutdown().await?;
643                return Err(anyhow!("username with 0 length"));
644            }
645            if password.is_empty() {
646                socks5_conn.flush().await?;
647                socks5_conn.shutdown().await?;
648                return Err(anyhow!("password with 0 length"));
649            }
650
651            let mut arg_string = username.clone();
652            // tor will set the password to 'NUL', if the field doesn't contain any
653            // actual argument data.
654            if !(password.len() == 1 && password.as_bytes().first().copied() == Some(0x00)) {
655                arg_string.push_str(password);
656            }
657
658            match ptrs::args::Args::from_str(&arg_string) {
659                Ok(a) => Some(a),
660                Err(e) => {
661                    return Err(anyhow!(
662                        "failed to parse provided args \"{arg_string}\": {e}"
663                    ))
664                }
665            }
666        }
667        AuthenticationMethod::None => None,
668        _ => return Err(anyhow!("negotiated unsupported authentication method")),
669    };
670
671    let remote_addr = resolve_target_addr(target_addr).context("no remote address")?;
672
673    let remote = tokio::net::TcpStream::connect(remote_addr);
674
675    // build the pluggable transport client and then dial, completing the
676    // connection and handshake when the `wrap(..)` is await-ed.
677    let mut pt_conn = match pt_client.establish(Box::pin(remote)).await {
678        Ok(c) => c,
679        Err(e) => {
680            warn!(
681                address = sensitive(client_addr).to_string(),
682                "handshake failed: {e:#?}"
683            );
684            return Err(obfs4::Error::from(e.to_string())).context("handshake failed");
685        }
686    };
687
688    if let Err(e) = copy_bidirectional(&mut socks5_conn.into_inner(), &mut pt_conn).await {
689        warn!(
690            addres = sensitive(client_addr).to_string(),
691            "tunnel closed with error: {e:#?}"
692        );
693    }
694
695    Ok(())
696}
697
698// ================================================================ //
699//                            Server                                //
700// ================================================================ //
701//
702// All server-side plumbing is gated behind the `experimental-server`
703// cargo feature. The PT handshake and ExtORPort dial are not wired
704// (see `server_handle_connection`), so compiling this in by default
705// would risk operators standing up an unauthenticated proxy.
706
707#[cfg(feature = "experimental-server")]
708async fn server_setup(
709    statedir: &str,
710    cancel_token: CancellationToken,
711) -> Result<oneshot::Receiver<bool>> {
712    let obfs4_name = Obfs4PT::name();
713
714    let server_info = ptrs::ServerInfo::new()?;
715    let (tx, rx) = oneshot::channel::<bool>();
716
717    let mut listeners = Vec::new();
718
719    for bind_addr in server_info.bind_addrs {
720        info!(bind_addr.method_name);
721        if bind_addr.method_name != obfs4_name {
722            warn!("no such transport is supported");
723            continue;
724        }
725
726        let mut builder = Obfs4PT::server_builder();
727        let server = builder
728            .statefile_location(statedir)?
729            .options(&bind_addr.options)?
730            .build();
731
732        let listener = tokio::net::TcpListener::bind(bind_addr.addr).await?;
733        listeners.push(server_listen_loop::<TcpStream, _>(
734            listener,
735            server,
736            cancel_token.clone(),
737        ));
738    }
739
740    // spawn a task that runs and monitors the progress of the listeners.
741    tokio::spawn(async move {
742        let total_len = listeners.len();
743        let mut running = total_len;
744
745        // launch all listener futures
746        let mut pt_set = JoinSet::new();
747        for fut in listeners {
748            pt_set.spawn(fut);
749        }
750
751        // if any of the listeners exit, handle it
752        while let Some(res) = pt_set.join_next().await {
753            running -= 1;
754            if let Err(e) = res {
755                warn!("listener failed: {e}");
756            }
757            info!("{running}/{total_len} listeners running");
758        }
759
760        // if all listeners exit then we can send the tx signal.
761        // Best-effort: the receiver may already be dropped if the
762        // parent select! moved on (e.g. signal-driven shutdown).
763        let _ = tx.send(true);
764    });
765
766    Ok(rx)
767}
768
769#[cfg(feature = "experimental-server")]
770async fn server_listen_loop<In, S>(
771    listener: TcpListener,
772    server: S,
773    cancel_token: CancellationToken,
774) -> Result<()>
775where
776    // the provided In must be usable as a connection in an async context
777    In: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
778    // The provided S must be usable as a Pluggable Transport Server.
779    S: ptrs::ServerTransport<In> + Send + Sync + ptrs::ServerTransport<TcpStream> + 'static,
780    <S as ptrs::ServerTransport<In>>::OutErr: 'static,
781{
782    let method_name = <S as ServerTransport<In>>::method_name();
783    let server = Arc::new(server);
784    loop {
785        tokio::select! {
786            _ = cancel_token.cancelled() => {
787                info!("{method_name} received shutdown signal - closing listener");
788                break
789            }
790            res = listener.accept() => {
791                let (mut conn, client_addr) = match res {
792                    Err(e) => {
793                       error!("{method_name} closing listener - failed to accept tcp connection {e}");
794                       break;
795                   }
796                   Ok(c) => c,
797               };
798               tokio::spawn(server_handle_connection(
799                   conn,
800                   server.clone(),
801                   client_addr,
802               ));
803            }
804        }
805    }
806
807    Ok(())
808}
809
810#[cfg(feature = "experimental-server")]
811async fn server_handle_connection<In, S>(
812    mut conn: In,
813    server: Arc<S>,
814    client_addr: SocketAddr,
815) -> Result<()>
816where
817    // the provided In must be usable as a connection in an async context
818    In: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
819    // The provided S must be usable as a Pluggable Transport Server.
820    S: ptrs::ServerTransport<In> + Send + Sync + ptrs::ServerTransport<TcpStream>,
821    <S as ptrs::ServerTransport<In>>::OutErr: 'static,
822{
823    let _ = (&mut conn, server, client_addr);
824    // Two pieces are still missing here:
825    //   1) server.reveal(conn) to complete the PT handshake;
826    //   2) a real ExtORPort dial to the parent ORPort.
827    // The previous code shipped neither and instead unconditionally
828    // dialed a hardcoded 127.0.0.1:8000, which would have stood up an
829    // unauthenticated TCP proxy on any host running this build.
830    unimplemented!(
831        "lyrebird server-side PT handshake is not implemented; \
832         do not enable experimental-server in production"
833    );
834}
835
836// ================================================================ //
837//        PT-spec helpers (auth, args, parent-channel messages)     //
838// ================================================================ //
839
840/// Custom `fast_socks5` authenticator. Per pt-spec §3.5 the parent
841/// MUST negotiate USERNAME/PASSWORD when it wants to pass per-bridge PT
842/// args; we accept any (or no) credentials and just propagate them so
843/// `client_handle_connection` can grab them via `take_credentials()`.
844#[derive(Clone, Copy, Default)]
845struct PtArgsAuth;
846
847#[async_trait::async_trait]
848impl fast_socks5::server::Authentication for PtArgsAuth {
849    type Item = (String, String);
850
851    async fn authenticate(&self, credentials: Option<(String, String)>) -> Option<Self::Item> {
852        // Propagate creds untouched. `None` (NO_AUTH path) is also fine —
853        // we let it through as empty strings so the same plumbing applies.
854        Some(credentials.unwrap_or_default())
855    }
856}
857
858/// Reconstruct the PT-spec argument string from the SOCKS5
859/// USERNAME/PASSWORD fields, mirroring Go lyrebird `rfc1929.go:88-100`.
860///
861/// PT spec §3.5 mandates this peculiar split when the arg string
862/// exceeds 255 bytes: the parent process packs it into
863/// `(username, password)` with `username` capped at 255 bytes.
864///
865/// * `passwd == [0x00]` is the "no passwd; uname is the whole arg list"
866///   marker — return just `uname`.
867/// * Otherwise concatenate `uname` and `passwd` as raw bytes, no
868///   separator.
869/// * `None` means the client connected via NO_AUTH (no args).
870pub fn arg_string_from_creds(creds: Option<(String, String)>) -> String {
871    match creds {
872        None => String::new(),
873        Some((uname, passwd)) if passwd.as_bytes() == [0x00] => uname,
874        Some((uname, passwd)) => {
875            let mut s = uname;
876            s.push_str(&passwd);
877            s
878        }
879    }
880}
881
882/// Minimal subset of the PT spec's parent-channel protocol (the lines
883/// the PT prints on stdout for `tor`/`arti` to read). Without these
884/// arti waits forever and reports "PT binary gone".
885mod pt_proto {
886    use std::io::Write;
887    use std::net::SocketAddr;
888
889    const VERSION: &str = "1";
890
891    pub fn print_version() {
892        emit(format!("VERSION {VERSION}"));
893    }
894
895    pub fn print_cmethod(transport: &str, proto: &str, addr: SocketAddr) {
896        emit(format!("CMETHOD {transport} {proto} {addr}"));
897    }
898
899    pub fn print_cmethod_error(transport: &str, reason: &str) {
900        emit(format!("CMETHOD-ERROR {transport} {reason}"));
901    }
902
903    pub fn print_cmethods_done() {
904        emit("CMETHODS DONE".to_string());
905    }
906
907    fn emit(line: String) {
908        let mut out = std::io::stdout().lock();
909        let _ = writeln!(out, "{line}");
910        let _ = out.flush();
911    }
912}
913
914#[cfg(test)]
915mod tests {
916    use super::*;
917
918    #[test]
919    fn arg_string_uname_only_when_passwd_is_nul() {
920        let creds = Some(("cert=AAA;iat-mode=0".to_string(), "\0".to_string()));
921        assert_eq!(arg_string_from_creds(creds), "cert=AAA;iat-mode=0");
922    }
923
924    #[test]
925    fn arg_string_concat_when_passwd_nonempty() {
926        let creds = Some(("cert=".to_string(), "AAA;iat-mode=0".to_string()));
927        assert_eq!(arg_string_from_creds(creds), "cert=AAA;iat-mode=0");
928    }
929
930    #[test]
931    fn arg_string_empty_when_no_creds() {
932        assert_eq!(arg_string_from_creds(None), "");
933    }
934
935    #[test]
936    fn arg_string_then_parse_yields_kv_map() {
937        // 300-char value split across the two SOCKS5 fields (uname=255,
938        // passwd=remainder) — the canonical case the spec is written for.
939        let big = "cert=".to_string() + &"A".repeat(250);
940        let tail = ";iat-mode=0".to_string();
941        let creds = Some((big.clone(), tail.clone()));
942        let arg_string = arg_string_from_creds(creds);
943        assert_eq!(arg_string, big + &tail);
944
945        let args = ptrs::args::Args::from_str(&arg_string).expect("parse");
946        assert!(args.retrieve("cert").is_some());
947        assert_eq!(args.retrieve("iat-mode").as_deref(), Some("0"));
948    }
949
950    #[tokio::test]
951    async fn pt_args_auth_propagates_creds() {
952        use fast_socks5::server::Authentication;
953        let auth = PtArgsAuth;
954        let got = auth
955            .authenticate(Some(("u".to_string(), "p".to_string())))
956            .await;
957        assert_eq!(got, Some(("u".to_string(), "p".to_string())));
958    }
959
960    #[tokio::test]
961    async fn pt_args_auth_accepts_no_creds() {
962        use fast_socks5::server::Authentication;
963        let auth = PtArgsAuth;
964        let got = auth.authenticate(None).await;
965        assert_eq!(got, Some((String::new(), String::new())));
966    }
967
968    // -- resolve_target_addr --
969
970    #[test]
971    fn resolve_target_addr_ip() {
972        let addr = TargetAddr::Ip("127.0.0.1:9050".parse().unwrap());
973        let resolved = resolve_target_addr(&addr).unwrap();
974        assert_eq!(resolved.to_string(), "127.0.0.1:9050");
975    }
976
977    #[test]
978    fn resolve_target_addr_ipv6() {
979        let addr = TargetAddr::Ip("[::1]:443".parse().unwrap());
980        let resolved = resolve_target_addr(&addr).unwrap();
981        assert_eq!(resolved.to_string(), "[::1]:443");
982    }
983
984    #[test]
985    fn resolve_target_addr_domain_fails() {
986        let addr = TargetAddr::Domain("example.com".into(), 443);
987        let err = resolve_target_addr(&addr);
988        assert!(
989            err.is_err(),
990            "domain resolution should fail (PT doesn't do DNS)"
991        );
992    }
993
994    // -- arg_string edge cases --
995
996    #[test]
997    fn arg_string_empty_uname_and_passwd() {
998        let creds = Some((String::new(), String::new()));
999        assert_eq!(arg_string_from_creds(creds), "");
1000    }
1001
1002    #[test]
1003    fn arg_string_passwd_is_nul_only() {
1004        let creds = Some((String::new(), "\0".to_string()));
1005        assert_eq!(arg_string_from_creds(creds), "");
1006    }
1007
1008    // -- establish_pt_conn (outgoing-dial seam) --
1009    //
1010    // These exercise the client outgoing-connection path
1011    // (`client_handle_connection` → `establish_pt_conn`) that was previously
1012    // unreachable in tests because it created a real `TcpStream::connect`
1013    // inline. The dial is now an injectable `Pin<FutureResult<_, _>>`, so a
1014    // `tokio::io::duplex()` half (or a deliberately-failing future) stands in
1015    // for the OR-port socket. We drive the *ptrs trait* surface end to end:
1016    // build an obfs4 client from a bridge-line arg string exactly the way
1017    // `client_handle_connection` does (`Args` → `ClientBuilder::options` →
1018    // `build`), then run the real obfs4 handshake over the duplex against a
1019    // matching obfs4 `Server`.
1020
1021    use ptrs::ClientBuilder as _;
1022    use tokio::io::DuplexStream;
1023    use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
1024
1025    /// Build an obfs4 client transport from a bridge-line arg string via the
1026    /// same `ptrs` builder path lyrebird uses for a real SOCKS connection:
1027    /// parse the args, apply them through `ptrs::ClientBuilder::options`, then
1028    /// `build`. The obfs4 builder/transport impls are generic over the socket
1029    /// type, so we pin `InRW = DuplexStream` here (an in-memory stand-in for
1030    /// the OR-port `TcpStream`).
1031    fn obfs4_client_from_args(arg_string: &str) -> obfs4::Client {
1032        let args = ptrs::args::Args::from_str(arg_string).expect("parse bridge-line args");
1033        let mut builder = obfs4::ClientBuilder::default();
1034        <obfs4::ClientBuilder as ptrs::ClientBuilder<DuplexStream>>::options(&mut builder, &args)
1035            .expect("apply obfs4 args to builder");
1036        <obfs4::ClientBuilder as ptrs::ClientBuilder<DuplexStream>>::build(&builder)
1037    }
1038
1039    #[tokio::test]
1040    async fn establish_pt_conn_obfs4_handshake_and_proxies_bytes() {
1041        // A fresh obfs4 server with a random identity and the bridge-line
1042        // arg string that a client must present to reach it.
1043        let server_builder = obfs4::ServerBuilder::<DuplexStream>::default();
1044        let arg_string = server_builder.client_params();
1045        let server = server_builder.build();
1046
1047        // In-memory stand-in for the OR-port socket.
1048        let (client_side, server_side) = tokio::io::duplex(65_536);
1049
1050        // Server peer: complete the obfs4 handshake, then echo one message.
1051        let server_task = tokio::spawn(async move {
1052            let mut s = server.wrap(server_side).await.expect("server handshake");
1053            let mut buf = [0u8; 64];
1054            let n = s.read(&mut buf).await.expect("server read");
1055            s.write_all(&buf[..n]).await.expect("server echo write");
1056            s.flush().await.expect("server flush");
1057        });
1058
1059        // Client: build through the ptrs trait, then drive the seam with a
1060        // dial future that yields the duplex half instead of a TcpStream.
1061        let client = obfs4_client_from_args(&arg_string);
1062        let dial: Pin<ptrs::FutureResult<DuplexStream, std::io::Error>> =
1063            Box::pin(async move { Ok(client_side) });
1064        let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1065
1066        let mut tunnel = tokio::time::timeout(
1067            std::time::Duration::from_secs(5),
1068            establish_pt_conn(client, dial, client_addr),
1069        )
1070        .await
1071        .expect("establish_pt_conn timed out")
1072        .expect("establish_pt_conn should complete the obfs4 handshake");
1073
1074        // Bytes written into the obfs4 tunnel must come back through the
1075        // server echo — proving `establish` consumed the injected dial
1076        // stream and a real encrypted session is in place.
1077        let msg = b"through-the-obfs4-tunnel";
1078        tunnel.write_all(msg).await.expect("client write");
1079        tunnel.flush().await.expect("client flush");
1080
1081        let mut got = vec![0u8; msg.len()];
1082        tokio::time::timeout(
1083            std::time::Duration::from_secs(5),
1084            tunnel.read_exact(&mut got),
1085        )
1086        .await
1087        .expect("client read timed out")
1088        .expect("client read");
1089        assert_eq!(&got, msg, "data must round-trip through the obfs4 tunnel");
1090
1091        tokio::time::timeout(std::time::Duration::from_secs(5), server_task)
1092            .await
1093            .expect("server task timed out")
1094            .expect("server task panicked");
1095    }
1096
1097    #[tokio::test]
1098    async fn establish_pt_conn_dial_failure_is_error_not_panic() {
1099        // The dial future itself fails (e.g. OR-port connection refused).
1100        // `establish` must surface this as an `Err` without panicking — this
1101        // path is reachable from the network, so a regression to `unwrap`
1102        // would be a remote DoS.
1103        let server_builder = obfs4::ServerBuilder::<DuplexStream>::default();
1104        let arg_string = server_builder.client_params();
1105        let client = obfs4_client_from_args(&arg_string);
1106
1107        let dial: Pin<ptrs::FutureResult<DuplexStream, std::io::Error>> = Box::pin(async {
1108            Err(std::io::Error::new(
1109                std::io::ErrorKind::ConnectionRefused,
1110                "dial refused",
1111            ))
1112        });
1113        let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1114
1115        let result = establish_pt_conn(client, dial, client_addr).await;
1116        assert!(
1117            result.is_err(),
1118            "a failed dial must produce an error, not a tunnel"
1119        );
1120    }
1121
1122    #[tokio::test]
1123    async fn establish_pt_conn_handshake_eof_is_error_not_panic() {
1124        // The dial succeeds (a socket is produced) but the OR-port peer
1125        // immediately closes — e.g. the bridge dropped the connection during
1126        // the handshake. The obfs4 client's first handshake read then hits
1127        // EOF, which must surface as an `Err` from `establish_pt_conn`,
1128        // promptly and without panicking. (The client returns `UnexpectedEof`
1129        // on a 0-byte read rather than blocking until its handshake timeout.)
1130        // This complements the positive end-to-end test: that one proves the
1131        // bridge-line args reach the handshake crypto (a wrong/empty cert
1132        // would make it fail); this one proves the failure branch is handled.
1133        let server_builder = obfs4::ServerBuilder::<DuplexStream>::default();
1134        let arg_string = server_builder.client_params();
1135        let client = obfs4_client_from_args(&arg_string);
1136
1137        let (client_side, server_side) = tokio::io::duplex(65_536);
1138        // Close the peer half before the handshake reads anything.
1139        drop(server_side);
1140
1141        let dial: Pin<ptrs::FutureResult<DuplexStream, std::io::Error>> =
1142            Box::pin(async move { Ok(client_side) });
1143        let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1144
1145        let result = tokio::time::timeout(
1146            std::time::Duration::from_secs(5),
1147            establish_pt_conn(client, dial, client_addr),
1148        )
1149        .await
1150        .expect("establish_pt_conn should fail fast on EOF, not block until timeout");
1151
1152        assert!(
1153            result.is_err(),
1154            "a peer that closes mid-handshake must produce an error, not a tunnel"
1155        );
1156    }
1157
1158    // -- Bug 2 regression: cancel arm must break the accept loop --
1159
1160    /// Verify that `client_accept_loop` exits promptly when the
1161    /// `CancellationToken` is already cancelled before the loop starts.
1162    /// Without the `break` in the cancel arm, `cancelled()` is
1163    /// immediately ready on every iteration and the loop spins forever,
1164    /// causing this test to hit the timeout and fail.
1165    #[tokio::test]
1166    async fn client_accept_loop_exits_on_pre_cancelled_token() {
1167        // Bind a real listener so the function signature is satisfied.
1168        let listener = TcpListener::bind("127.0.0.1:0")
1169            .await
1170            .expect("bind listener for test");
1171
1172        // Cancel BEFORE entering the loop — simulates shutdown race.
1173        let cancel = CancellationToken::new();
1174        cancel.cancel();
1175
1176        let builder = Obfs4PT::client_builder();
1177        let proxy_uri = url::Url::parse("data:,").expect("placeholder url");
1178
1179        // The loop must return within 2 s.  On the old code (no break)
1180        // it would spin indefinitely and the timeout would fire.
1181        let result = tokio::time::timeout(
1182            std::time::Duration::from_secs(2),
1183            client_accept_loop(listener, builder, proxy_uri, cancel),
1184        )
1185        .await;
1186
1187        assert!(
1188            result.is_ok(),
1189            "client_accept_loop must exit promptly when the token is cancelled, not spin"
1190        );
1191        assert!(
1192            result.unwrap().is_ok(),
1193            "client_accept_loop should return Ok(()) on graceful cancel"
1194        );
1195    }
1196
1197    // -- Bridge-connection regression (fast_socks5 `execute_command`) --
1198    //
1199    // `client_handle_connection` must use fast-socks5 only to *parse* the
1200    // request, never to execute it. With the default `execute_command = true`,
1201    // `upgrade_to_socks5()` would itself open a *plain* TCP connection to the
1202    // bridge and send the SOCKS reply — bypassing obfs4, so no bridge could
1203    // ever be reached through the transport. The fix sets
1204    // `execute_command(false)` and has lyrebird dial obfs4 itself, replying to
1205    // the parent only once the tunnel is up. These tests drive the real SOCKS5
1206    // client protocol against `client_handle_connection` pointed at a real
1207    // loopback "bridge".
1208
1209    /// Play the SOCKS5 client (as arti/tor would) over `parent`: user/pass
1210    /// auth carrying the PT arg string, then a CONNECT to `bridge`. Returns
1211    /// after the CONNECT request is sent; the caller asserts on the reply.
1212    async fn socks5_client_connect(
1213        parent: &mut DuplexStream,
1214        arg_string: &str,
1215        bridge: SocketAddr,
1216    ) {
1217        // greeting: VER=5, 1 method, user/pass (0x02)
1218        parent.write_all(&[0x05, 0x01, 0x02]).await.unwrap();
1219        parent.flush().await.unwrap();
1220        let mut sel = [0u8; 2];
1221        parent.read_exact(&mut sel).await.unwrap();
1222        assert_eq!(sel, [0x05, 0x02], "server must select user/pass auth");
1223
1224        // RFC 1929 user/pass: pack the arg string into UNAME, PASSWD = single
1225        // NUL (the `arg_string_from_creds` "uname only" form).
1226        let uname = arg_string.as_bytes();
1227        assert!(
1228            uname.len() <= 255,
1229            "this test packs the arg string into one SOCKS field"
1230        );
1231        let mut auth = vec![0x01, uname.len() as u8];
1232        auth.extend_from_slice(uname);
1233        auth.extend_from_slice(&[0x01, 0x00]); // PLEN=1, PASSWD=0x00
1234        parent.write_all(&auth).await.unwrap();
1235        parent.flush().await.unwrap();
1236        let mut authresp = [0u8; 2];
1237        parent.read_exact(&mut authresp).await.unwrap();
1238        assert_eq!(authresp, [0x01, 0x00], "user/pass auth must succeed");
1239
1240        // CONNECT: VER=5, CMD=1, RSV=0, ATYP=1 (IPv4), addr, port.
1241        let (octets, port) = match bridge {
1242            SocketAddr::V4(v4) => (v4.ip().octets(), v4.port()),
1243            SocketAddr::V6(_) => unreachable!("test binds IPv4 loopback"),
1244        };
1245        let mut req = vec![0x05, 0x01, 0x00, 0x01];
1246        req.extend_from_slice(&octets);
1247        req.extend_from_slice(&port.to_be_bytes());
1248        parent.write_all(&req).await.unwrap();
1249        parent.flush().await.unwrap();
1250    }
1251
1252    #[tokio::test]
1253    async fn client_handle_connection_tunnels_through_obfs4_and_replies_itself() {
1254        // A real loopback "bridge" running an obfs4 *server*. `server.wrap()`
1255        // completes only if an obfs4 *client* handshake arrives — i.e. the
1256        // connection that reached the bridge was obfs4, not a plain TCP proxy.
1257        // On the buggy `execute_command = true` path the bridge would instead
1258        // receive fast-socks5's plain relay and `wrap()` would never complete,
1259        // failing this test.
1260        let server_builder = obfs4::ServerBuilder::<TcpStream>::default();
1261        let arg_string = server_builder.client_params();
1262        let server = server_builder.build();
1263
1264        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1265        let bridge_addr = listener.local_addr().unwrap();
1266
1267        let bridge = tokio::spawn(async move {
1268            let (sock, _peer) = listener.accept().await.expect("bridge accept");
1269            let mut s = server.wrap(sock).await.expect("bridge obfs4 handshake");
1270            let mut buf = [0u8; 64];
1271            let n = s.read(&mut buf).await.expect("bridge read");
1272            s.write_all(&buf[..n]).await.expect("bridge echo");
1273            s.flush().await.expect("bridge flush");
1274        });
1275
1276        // Parent (arti/tor) side over an in-memory duplex; `lyrebird_side` is
1277        // the connection `client_handle_connection` serves SOCKS on.
1278        let (mut parent, lyrebird_side) = tokio::io::duplex(65_536);
1279        let builder = Obfs4PT::client_builder();
1280        let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1281        let handler = tokio::spawn(client_handle_connection(
1282            lyrebird_side,
1283            builder,
1284            url::Url::parse("data:,").unwrap(),
1285            client_addr,
1286        ));
1287
1288        socks5_client_connect(&mut parent, &arg_string, bridge_addr).await;
1289
1290        // The reply is the canonical success frame lyrebird writes itself
1291        // (fast-socks5 sends nothing — execute_command is disabled), and it
1292        // arrives only because the obfs4 tunnel to the bridge came up.
1293        let mut reply = [0u8; 10];
1294        tokio::time::timeout(
1295            std::time::Duration::from_secs(5),
1296            parent.read_exact(&mut reply),
1297        )
1298        .await
1299        .expect("SOCKS5 reply timed out")
1300        .expect("read SOCKS5 reply");
1301        assert_eq!(
1302            reply,
1303            [0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0],
1304            "lyrebird must send the SOCKS5 success reply itself after the obfs4 handshake"
1305        );
1306
1307        // End-to-end: a probe must round-trip through the obfs4 tunnel.
1308        let probe = b"bridge-tunnel-probe";
1309        parent.write_all(probe).await.unwrap();
1310        parent.flush().await.unwrap();
1311        let mut got = vec![0u8; probe.len()];
1312        tokio::time::timeout(
1313            std::time::Duration::from_secs(5),
1314            parent.read_exact(&mut got),
1315        )
1316        .await
1317        .expect("probe round-trip timed out")
1318        .expect("read probe echo");
1319        assert_eq!(
1320            &got, probe,
1321            "probe must round-trip through the obfs4 tunnel"
1322        );
1323
1324        tokio::time::timeout(std::time::Duration::from_secs(5), bridge)
1325            .await
1326            .expect("bridge task timed out")
1327            .expect("bridge task panicked");
1328        drop(parent); // let copy_bidirectional see EOF and the handler finish
1329        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handler).await;
1330    }
1331
1332    #[tokio::test]
1333    async fn client_handle_connection_no_success_reply_when_bridge_not_obfs4() {
1334        // The bridge accepts the TCP connection but is NOT an obfs4 server: it
1335        // closes immediately, so the obfs4 client handshake fails. lyrebird
1336        // must therefore NOT report CONNECT success to the parent. The old
1337        // `execute_command = true` path replied success on the bare TCP
1338        // connect regardless of whether obfs4 could be established, which is
1339        // exactly the bug — so this test would fail on it.
1340        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1341        let bridge_addr = listener.local_addr().unwrap();
1342        let bridge = tokio::spawn(async move {
1343            let (sock, _peer) = listener.accept().await.expect("bridge accept");
1344            drop(sock); // not obfs4 — close before any handshake byte
1345        });
1346
1347        // A well-formed cert so `options()` succeeds and the failure is the
1348        // handshake, not arg parsing; this throwaway identity is never honored.
1349        let arg_string = obfs4::ServerBuilder::<TcpStream>::default().client_params();
1350
1351        let (mut parent, lyrebird_side) = tokio::io::duplex(65_536);
1352        let builder = Obfs4PT::client_builder();
1353        let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1354        let handler = tokio::spawn(client_handle_connection(
1355            lyrebird_side,
1356            builder,
1357            url::Url::parse("data:,").unwrap(),
1358            client_addr,
1359        ));
1360
1361        socks5_client_connect(&mut parent, &arg_string, bridge_addr).await;
1362
1363        // No success reply: the handler returns Err before writing one, so its
1364        // side of the duplex drops and the parent's read hits EOF rather than a
1365        // 10-byte success frame.
1366        let mut reply = [0u8; 10];
1367        let read = tokio::time::timeout(
1368            std::time::Duration::from_secs(5),
1369            parent.read_exact(&mut reply),
1370        )
1371        .await
1372        .expect("the read should resolve (EOF), not hang");
1373        assert!(
1374            read.is_err(),
1375            "no SOCKS5 success reply must be sent when the obfs4 handshake fails"
1376        );
1377
1378        let outcome = tokio::time::timeout(std::time::Duration::from_secs(5), handler)
1379            .await
1380            .expect("handler should finish")
1381            .expect("handler task panicked");
1382        assert!(
1383            outcome.is_err(),
1384            "client_handle_connection must surface the failed obfs4 dial as an error"
1385        );
1386
1387        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), bridge).await;
1388    }
1389}