1#![allow(unused, dead_code)]
84#![deny(missing_docs)]
85
86use obfs4::{ClientBuilder, Obfs4PT};
87use ptrs::{error, info, warn};
88use ptrs::{
89 ClientBuilder as _, ClientTransport, PluggableTransport, ServerBuilder, ServerTransport,
90};
91
92use anyhow::{anyhow, Context, Result};
93use clap::Parser;
94use fast_socks5::{
95 server::{DenyAuthentication, SimpleUserPassword},
96 util::target_addr::TargetAddr,
97 AuthenticationMethod,
98};
99use safelog::sensitive;
100use tokio::task::JoinSet;
101use tokio::{
102 io::{copy_bidirectional, AsyncRead, AsyncWrite, AsyncWriteExt},
103 net::{TcpListener, TcpStream},
104 sync::oneshot,
105};
106use tokio_util::sync::CancellationToken;
107use tracing::Level;
108use tracing_subscriber::{filter::LevelFilter, prelude::*};
109
110use std::{env, net::SocketAddr, pin::Pin, str::FromStr, sync::Arc};
111
112const MAX_CONCURRENT_CONNS: usize = 1024;
115
116const CLIENT_SOCKS_ADDR: &str = "127.0.0.1:0";
118
119#[derive(Debug, thiserror::Error)]
121#[error("Error while obtaining bridge line data")]
122struct BridgeLineParseError;
123
124#[derive(Parser)]
125#[command(author, version, long_about = None, about="Tunnel Tor SOCKS5 traffic through pluggable transport connections")]
126struct Args {
127 #[arg(long, default_value_t = false)]
129 enable_logging: bool,
130
131 #[arg(long, default_value_t=String::from("ERROR"))]
133 log_level: String,
134
135 #[arg(long, default_value_t = false)]
137 unsafe_logging: bool,
138}
139
140fn init_logging_recvr(
144 enable: bool,
145 should_scrub: bool,
146 level_str: &str,
147 statedir: &str,
148) -> Result<()> {
149 if should_scrub {
150 safelog::enforce_safe_logging();
151 } else {
152 safelog::disable_safe_logging();
153 }
154
155 let console_filter =
167 tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
168 tracing_subscriber::EnvFilter::new(
169 "info,fast_socks5=warn,obfs4::sessions=warn,lyrebird::handshake=info",
170 )
171 });
172 let console_layer = tracing_subscriber::fmt::layer()
173 .with_writer(std::io::stderr)
174 .with_filter(console_filter);
175
176 let log_layers = if enable {
177 let level = Level::from_str(level_str)?;
178
179 let file = std::fs::File::create(format!("{statedir}/obfs4proxy.log"))?;
180
181 let state_dir_layer = tracing_subscriber::fmt::layer()
182 .with_writer(file)
183 .with_filter(LevelFilter::from_level(level));
184
185 console_layer.and_then(state_dir_layer).boxed()
186 } else {
187 console_layer.boxed()
188 };
189
190 tracing_subscriber::registry().with(log_layers).init();
191
192 Ok(())
193}
194
195pub fn resolve_target_addr(addr: &TargetAddr) -> Result<SocketAddr> {
200 match addr {
201 TargetAddr::Ip(sa) => Ok(*sa),
202 TargetAddr::Domain(_, _) => {
203 ptrs::resolve_addr(format!("{addr}")).context("domain resolution failed")
205 }
206 }
207}
208
209pub async fn run() -> Result<()> {
219 let args = Args::parse();
220
221 let statedir = ptrs::make_state_dir()?;
223
224 init_logging_recvr(
226 args.enable_logging,
227 !args.unsafe_logging,
228 &args.log_level,
229 &statedir,
230 )?;
231
232 let cancel_token = tokio_util::sync::CancellationToken::new();
233
234 let mut exit_rx = if ptrs::is_client()? {
236 client_setup(&statedir, cancel_token.clone()).await?
238 } else {
239 #[cfg(feature = "experimental-server")]
246 {
247 server_setup(&statedir, cancel_token.clone()).await?
248 }
249 #[cfg(not(feature = "experimental-server"))]
250 {
251 let _ = &statedir;
252 let _ = &cancel_token;
253 return Err(anyhow!(
254 "lyrebird server-side is not implemented; rebuild with \
255 `--features experimental-server` for development use only"
256 ));
257 }
258 };
259
260 info!("accepting connections");
261
262 tokio::select! {
267 _ = &mut exit_rx => {
268 info!("proxy closed");
269 return Ok(())
270 }
271 sig = shutdown_signal() => {
272 if sig.is_terminate() {
273 info!("proxy terminated");
274 return Ok(())
275 }
276 info!("received interrupt, shutting down");
277 cancel_token.cancel();
278 }
279 }
280
281 tokio::select! {
285 _ = exit_rx => {}
286 _ = shutdown_signal() => {}
287 }
288
289 Ok(())
290}
291
292#[derive(Clone, Copy)]
296enum Shutdown {
297 Interrupt,
298 Terminate,
299}
300
301impl Shutdown {
302 fn is_terminate(self) -> bool {
303 matches!(self, Shutdown::Terminate)
304 }
305}
306
307#[cfg(unix)]
308async fn shutdown_signal() -> Shutdown {
309 use tokio::signal::unix::{signal, SignalKind};
310 let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
311 let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
312 tokio::select! {
313 _ = sigterm.recv() => Shutdown::Terminate,
314 _ = sigint.recv() => Shutdown::Interrupt,
315 }
316}
317
318#[cfg(not(unix))]
319async fn shutdown_signal() -> Shutdown {
320 use tokio::signal::windows::{ctrl_break, ctrl_c};
321 let mut c_c = ctrl_c().expect("install Ctrl+C handler");
322 let mut c_break = ctrl_break().expect("install Ctrl+Break handler");
323 tokio::select! {
324 _ = c_break.recv() => Shutdown::Terminate,
325 _ = c_c.recv() => Shutdown::Interrupt,
326 }
327}
328
329async fn client_setup(
334 statedir: &str,
335 cancel_token: CancellationToken,
336) -> Result<oneshot::Receiver<bool>> {
337 let obfs4_name = Obfs4PT::name();
338 let webtunnel_name = webtunnel::WEBTUNNEL_NAME.to_string();
339 let client_pt_info = ptrs::ClientInfo::new()?;
340 let proxy_uri = client_pt_info
344 .uri
345 .unwrap_or_else(|| url::Url::parse("data:,").expect("placeholder url"));
346 let (tx, rx) = oneshot::channel::<bool>();
347
348 pt_proto::print_version();
350
351 let mut listeners: Vec<
352 std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>,
353 > = Vec::new();
354
355 for name in client_pt_info.methods {
356 info!(name);
357
358 if name == obfs4_name {
359 let builder = Obfs4PT::client_builder();
360 let listener = tokio::net::TcpListener::bind(CLIENT_SOCKS_ADDR).await?;
361 let local_addr = listener.local_addr()?;
362 pt_proto::print_cmethod(&name, "socks5", local_addr);
363 listeners.push(Box::pin(client_accept_loop(
364 listener,
365 builder,
366 proxy_uri.clone(),
367 cancel_token.clone(),
368 )));
369 } else if name == webtunnel_name {
370 let builder = webtunnel::WebTunnelBuilder::default();
371 let listener = tokio::net::TcpListener::bind(CLIENT_SOCKS_ADDR).await?;
372 let local_addr = listener.local_addr()?;
373 pt_proto::print_cmethod(&name, "socks5", local_addr);
374 listeners.push(Box::pin(client_accept_loop(
375 listener,
376 builder,
377 proxy_uri.clone(),
378 cancel_token.clone(),
379 )));
380 } else {
381 pt_proto::print_cmethod_error(&name, "no such transport is supported");
382 warn!("no such transport is supported");
383 continue;
384 }
385 }
386
387 pt_proto::print_cmethods_done();
389
390 tokio::spawn(async move {
392 let total_len = listeners.len();
393 let mut running = total_len;
394
395 let mut pt_set = JoinSet::new();
397 for fut in listeners {
398 pt_set.spawn(fut);
399 }
400
401 while let Some(res) = pt_set.join_next().await {
403 running -= 1;
404 if let Err(e) = res {
405 warn!("listener failed: {e}");
406 }
407 info!("{running}/{total_len} listeners running");
408 }
409
410 let _ = tx.send(true);
414 });
415
416 Ok(rx)
417}
418
419async fn client_accept_loop<C>(
420 listener: TcpListener,
421 builder: impl ptrs::ClientBuilder<TcpStream, ClientPT = C> + Send + 'static,
422 proxy_uri: url::Url,
423 cancel_token: CancellationToken,
424) -> Result<()>
425where
426 C: ptrs::ClientTransport<TcpStream, std::io::Error> + Send + 'static,
428{
429 let pt_name = C::method_name();
430 let sem = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CONNS));
431 loop {
432 tokio::select! {
433 _ = cancel_token.cancelled() => {
434 info!("{pt_name} received shutdown signal");
435 break; }
437 res = listener.accept() => {
438 let (conn, client_addr) = match res {
439 Err(e) => {
440 error!("failed to accept tcp connection {e}");
441 break;
442 }
443 Ok(c) => c,
444 };
445 let permit = match Arc::clone(&sem).acquire_owned().await {
448 Ok(p) => p,
449 Err(_) => break, };
451 let builder_clone = builder.clone();
452 let proxy_clone = proxy_uri.clone();
453 tokio::spawn(async move {
454 let _permit = permit; let _ = client_handle_connection(conn, builder_clone, proxy_clone, client_addr).await;
456 });
457 }
458 }
459 }
460
461 Ok(())
462}
463
464async fn client_handle_connection<In, C, B>(
467 conn: In,
468 mut builder: B,
469 _proxy_uri: url::Url,
470 client_addr: SocketAddr,
471) -> Result<()>
472where
473 In: AsyncRead + AsyncWrite + Send + Unpin,
475 C: ptrs::ClientTransport<TcpStream, std::io::Error> + Send,
477 B: ptrs::ClientBuilder<TcpStream, ClientPT = C>,
478{
479 let mut config =
484 fast_socks5::server::Config::<fast_socks5::server::DenyAuthentication>::default()
485 .with_authentication(PtArgsAuth);
486 config.set_allow_no_auth(true);
487 config.set_execute_command(false);
494 let socks5_conn = fast_socks5::server::Socks5Socket::new(conn, Arc::new(config));
495
496 let mut socks5_conn = socks5_conn.upgrade_to_socks5().await?;
497 let creds = socks5_conn.take_credentials();
498
499 let target_addr = socks5_conn
500 .target_addr()
501 .ok_or(BridgeLineParseError)
502 .context("missing remote address in request")?;
503
504 let arg_string = arg_string_from_creds(creds);
508 let args = if arg_string.is_empty() {
509 ptrs::args::Args::default()
510 } else {
511 ptrs::args::Args::from_str(&arg_string).context("parsing PT arg string")?
512 };
513 <B as ptrs::ClientBuilder<TcpStream>>::options(&mut builder, &args)
514 .map_err(|e| anyhow::anyhow!("applying PT args to builder: {e}"))?;
515
516 let remote_addr = resolve_target_addr(target_addr).context("no remote address")?;
517
518 let remote: Pin<ptrs::FutureResult<TcpStream, std::io::Error>> =
523 Box::pin(tokio::net::TcpStream::connect(remote_addr));
524
525 let pt_client = builder.build();
528 let mut pt_conn = establish_pt_conn(pt_client, remote, client_addr).await?;
529
530 let mut parent = socks5_conn.into_inner();
536 parent
537 .write_all(&[0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
538 .await
539 .context("writing SOCKS5 success reply to PT parent")?;
540 parent
541 .flush()
542 .await
543 .context("flushing SOCKS5 success reply")?;
544
545 if let Err(e) = copy_bidirectional(&mut parent, &mut pt_conn).await {
546 warn!(
547 addres = sensitive(client_addr).to_string(),
548 "tunnel closed with error: {e:#?}"
549 );
550 }
551 Ok(())
552}
553
554pub(crate) async fn establish_pt_conn<In2, C2>(
576 pt_client: C2,
577 dial: Pin<ptrs::FutureResult<In2, std::io::Error>>,
578 client_addr: SocketAddr,
579) -> Result<C2::OutRW>
580where
581 In2: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
584 C2: ptrs::ClientTransport<In2, std::io::Error> + Send,
585{
586 match ptrs::ClientTransport::<In2, std::io::Error>::establish(pt_client, dial).await {
587 Err(e) => {
588 warn!(
589 address = sensitive(client_addr).to_string(),
590 "handshake failed: {e:#?}"
591 );
592 Err(obfs4::Error::from(e.to_string())).context("handshake failed")
593 }
594 Ok(c) => Ok(c),
595 }
596}
597
598async fn client_handle_connection_clientpt<In, C>(
606 conn: In,
607 pt_client: C,
608 _proxy_uri: url::Url,
609 client_addr: SocketAddr,
610) -> Result<()>
611where
612 In: AsyncRead + AsyncWrite + Send + Unpin,
614 C: ptrs::ClientTransport<TcpStream, std::io::Error>,
616{
617 let mut config: fast_socks5::server::Config<SimpleUserPassword> =
618 fast_socks5::server::Config::default();
619 let mut socks5_conn = fast_socks5::server::Socks5Socket::new(conn, Arc::new(config));
621
622 let target_addr = socks5_conn
624 .target_addr()
625 .ok_or(BridgeLineParseError)
626 .context("missing remote address in request")?;
627
628 let args: Option<ptrs::args::Args> = match socks5_conn.auth() {
639 AuthenticationMethod::Password { username, password } => {
640 if username.is_empty() {
641 socks5_conn.flush().await?;
642 socks5_conn.shutdown().await?;
643 return Err(anyhow!("username with 0 length"));
644 }
645 if password.is_empty() {
646 socks5_conn.flush().await?;
647 socks5_conn.shutdown().await?;
648 return Err(anyhow!("password with 0 length"));
649 }
650
651 let mut arg_string = username.clone();
652 if !(password.len() == 1 && password.as_bytes().first().copied() == Some(0x00)) {
655 arg_string.push_str(password);
656 }
657
658 match ptrs::args::Args::from_str(&arg_string) {
659 Ok(a) => Some(a),
660 Err(e) => {
661 return Err(anyhow!(
662 "failed to parse provided args \"{arg_string}\": {e}"
663 ))
664 }
665 }
666 }
667 AuthenticationMethod::None => None,
668 _ => return Err(anyhow!("negotiated unsupported authentication method")),
669 };
670
671 let remote_addr = resolve_target_addr(target_addr).context("no remote address")?;
672
673 let remote = tokio::net::TcpStream::connect(remote_addr);
674
675 let mut pt_conn = match pt_client.establish(Box::pin(remote)).await {
678 Ok(c) => c,
679 Err(e) => {
680 warn!(
681 address = sensitive(client_addr).to_string(),
682 "handshake failed: {e:#?}"
683 );
684 return Err(obfs4::Error::from(e.to_string())).context("handshake failed");
685 }
686 };
687
688 if let Err(e) = copy_bidirectional(&mut socks5_conn.into_inner(), &mut pt_conn).await {
689 warn!(
690 addres = sensitive(client_addr).to_string(),
691 "tunnel closed with error: {e:#?}"
692 );
693 }
694
695 Ok(())
696}
697
698#[cfg(feature = "experimental-server")]
708async fn server_setup(
709 statedir: &str,
710 cancel_token: CancellationToken,
711) -> Result<oneshot::Receiver<bool>> {
712 let obfs4_name = Obfs4PT::name();
713
714 let server_info = ptrs::ServerInfo::new()?;
715 let (tx, rx) = oneshot::channel::<bool>();
716
717 let mut listeners = Vec::new();
718
719 for bind_addr in server_info.bind_addrs {
720 info!(bind_addr.method_name);
721 if bind_addr.method_name != obfs4_name {
722 warn!("no such transport is supported");
723 continue;
724 }
725
726 let mut builder = Obfs4PT::server_builder();
727 let server = builder
728 .statefile_location(statedir)?
729 .options(&bind_addr.options)?
730 .build();
731
732 let listener = tokio::net::TcpListener::bind(bind_addr.addr).await?;
733 listeners.push(server_listen_loop::<TcpStream, _>(
734 listener,
735 server,
736 cancel_token.clone(),
737 ));
738 }
739
740 tokio::spawn(async move {
742 let total_len = listeners.len();
743 let mut running = total_len;
744
745 let mut pt_set = JoinSet::new();
747 for fut in listeners {
748 pt_set.spawn(fut);
749 }
750
751 while let Some(res) = pt_set.join_next().await {
753 running -= 1;
754 if let Err(e) = res {
755 warn!("listener failed: {e}");
756 }
757 info!("{running}/{total_len} listeners running");
758 }
759
760 let _ = tx.send(true);
764 });
765
766 Ok(rx)
767}
768
769#[cfg(feature = "experimental-server")]
770async fn server_listen_loop<In, S>(
771 listener: TcpListener,
772 server: S,
773 cancel_token: CancellationToken,
774) -> Result<()>
775where
776 In: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
778 S: ptrs::ServerTransport<In> + Send + Sync + ptrs::ServerTransport<TcpStream> + 'static,
780 <S as ptrs::ServerTransport<In>>::OutErr: 'static,
781{
782 let method_name = <S as ServerTransport<In>>::method_name();
783 let server = Arc::new(server);
784 loop {
785 tokio::select! {
786 _ = cancel_token.cancelled() => {
787 info!("{method_name} received shutdown signal - closing listener");
788 break
789 }
790 res = listener.accept() => {
791 let (mut conn, client_addr) = match res {
792 Err(e) => {
793 error!("{method_name} closing listener - failed to accept tcp connection {e}");
794 break;
795 }
796 Ok(c) => c,
797 };
798 tokio::spawn(server_handle_connection(
799 conn,
800 server.clone(),
801 client_addr,
802 ));
803 }
804 }
805 }
806
807 Ok(())
808}
809
810#[cfg(feature = "experimental-server")]
811async fn server_handle_connection<In, S>(
812 mut conn: In,
813 server: Arc<S>,
814 client_addr: SocketAddr,
815) -> Result<()>
816where
817 In: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
819 S: ptrs::ServerTransport<In> + Send + Sync + ptrs::ServerTransport<TcpStream>,
821 <S as ptrs::ServerTransport<In>>::OutErr: 'static,
822{
823 let _ = (&mut conn, server, client_addr);
824 unimplemented!(
831 "lyrebird server-side PT handshake is not implemented; \
832 do not enable experimental-server in production"
833 );
834}
835
836#[derive(Clone, Copy, Default)]
845struct PtArgsAuth;
846
847#[async_trait::async_trait]
848impl fast_socks5::server::Authentication for PtArgsAuth {
849 type Item = (String, String);
850
851 async fn authenticate(&self, credentials: Option<(String, String)>) -> Option<Self::Item> {
852 Some(credentials.unwrap_or_default())
855 }
856}
857
858pub fn arg_string_from_creds(creds: Option<(String, String)>) -> String {
871 match creds {
872 None => String::new(),
873 Some((uname, passwd)) if passwd.as_bytes() == [0x00] => uname,
874 Some((uname, passwd)) => {
875 let mut s = uname;
876 s.push_str(&passwd);
877 s
878 }
879 }
880}
881
882mod pt_proto {
886 use std::io::Write;
887 use std::net::SocketAddr;
888
889 const VERSION: &str = "1";
890
891 pub fn print_version() {
892 emit(format!("VERSION {VERSION}"));
893 }
894
895 pub fn print_cmethod(transport: &str, proto: &str, addr: SocketAddr) {
896 emit(format!("CMETHOD {transport} {proto} {addr}"));
897 }
898
899 pub fn print_cmethod_error(transport: &str, reason: &str) {
900 emit(format!("CMETHOD-ERROR {transport} {reason}"));
901 }
902
903 pub fn print_cmethods_done() {
904 emit("CMETHODS DONE".to_string());
905 }
906
907 fn emit(line: String) {
908 let mut out = std::io::stdout().lock();
909 let _ = writeln!(out, "{line}");
910 let _ = out.flush();
911 }
912}
913
914#[cfg(test)]
915mod tests {
916 use super::*;
917
918 #[test]
919 fn arg_string_uname_only_when_passwd_is_nul() {
920 let creds = Some(("cert=AAA;iat-mode=0".to_string(), "\0".to_string()));
921 assert_eq!(arg_string_from_creds(creds), "cert=AAA;iat-mode=0");
922 }
923
924 #[test]
925 fn arg_string_concat_when_passwd_nonempty() {
926 let creds = Some(("cert=".to_string(), "AAA;iat-mode=0".to_string()));
927 assert_eq!(arg_string_from_creds(creds), "cert=AAA;iat-mode=0");
928 }
929
930 #[test]
931 fn arg_string_empty_when_no_creds() {
932 assert_eq!(arg_string_from_creds(None), "");
933 }
934
935 #[test]
936 fn arg_string_then_parse_yields_kv_map() {
937 let big = "cert=".to_string() + &"A".repeat(250);
940 let tail = ";iat-mode=0".to_string();
941 let creds = Some((big.clone(), tail.clone()));
942 let arg_string = arg_string_from_creds(creds);
943 assert_eq!(arg_string, big + &tail);
944
945 let args = ptrs::args::Args::from_str(&arg_string).expect("parse");
946 assert!(args.retrieve("cert").is_some());
947 assert_eq!(args.retrieve("iat-mode").as_deref(), Some("0"));
948 }
949
950 #[tokio::test]
951 async fn pt_args_auth_propagates_creds() {
952 use fast_socks5::server::Authentication;
953 let auth = PtArgsAuth;
954 let got = auth
955 .authenticate(Some(("u".to_string(), "p".to_string())))
956 .await;
957 assert_eq!(got, Some(("u".to_string(), "p".to_string())));
958 }
959
960 #[tokio::test]
961 async fn pt_args_auth_accepts_no_creds() {
962 use fast_socks5::server::Authentication;
963 let auth = PtArgsAuth;
964 let got = auth.authenticate(None).await;
965 assert_eq!(got, Some((String::new(), String::new())));
966 }
967
968 #[test]
971 fn resolve_target_addr_ip() {
972 let addr = TargetAddr::Ip("127.0.0.1:9050".parse().unwrap());
973 let resolved = resolve_target_addr(&addr).unwrap();
974 assert_eq!(resolved.to_string(), "127.0.0.1:9050");
975 }
976
977 #[test]
978 fn resolve_target_addr_ipv6() {
979 let addr = TargetAddr::Ip("[::1]:443".parse().unwrap());
980 let resolved = resolve_target_addr(&addr).unwrap();
981 assert_eq!(resolved.to_string(), "[::1]:443");
982 }
983
984 #[test]
985 fn resolve_target_addr_domain_fails() {
986 let addr = TargetAddr::Domain("example.com".into(), 443);
987 let err = resolve_target_addr(&addr);
988 assert!(
989 err.is_err(),
990 "domain resolution should fail (PT doesn't do DNS)"
991 );
992 }
993
994 #[test]
997 fn arg_string_empty_uname_and_passwd() {
998 let creds = Some((String::new(), String::new()));
999 assert_eq!(arg_string_from_creds(creds), "");
1000 }
1001
1002 #[test]
1003 fn arg_string_passwd_is_nul_only() {
1004 let creds = Some((String::new(), "\0".to_string()));
1005 assert_eq!(arg_string_from_creds(creds), "");
1006 }
1007
1008 use ptrs::ClientBuilder as _;
1022 use tokio::io::DuplexStream;
1023 use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
1024
1025 fn obfs4_client_from_args(arg_string: &str) -> obfs4::Client {
1032 let args = ptrs::args::Args::from_str(arg_string).expect("parse bridge-line args");
1033 let mut builder = obfs4::ClientBuilder::default();
1034 <obfs4::ClientBuilder as ptrs::ClientBuilder<DuplexStream>>::options(&mut builder, &args)
1035 .expect("apply obfs4 args to builder");
1036 <obfs4::ClientBuilder as ptrs::ClientBuilder<DuplexStream>>::build(&builder)
1037 }
1038
1039 #[tokio::test]
1040 async fn establish_pt_conn_obfs4_handshake_and_proxies_bytes() {
1041 let server_builder = obfs4::ServerBuilder::<DuplexStream>::default();
1044 let arg_string = server_builder.client_params();
1045 let server = server_builder.build();
1046
1047 let (client_side, server_side) = tokio::io::duplex(65_536);
1049
1050 let server_task = tokio::spawn(async move {
1052 let mut s = server.wrap(server_side).await.expect("server handshake");
1053 let mut buf = [0u8; 64];
1054 let n = s.read(&mut buf).await.expect("server read");
1055 s.write_all(&buf[..n]).await.expect("server echo write");
1056 s.flush().await.expect("server flush");
1057 });
1058
1059 let client = obfs4_client_from_args(&arg_string);
1062 let dial: Pin<ptrs::FutureResult<DuplexStream, std::io::Error>> =
1063 Box::pin(async move { Ok(client_side) });
1064 let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1065
1066 let mut tunnel = tokio::time::timeout(
1067 std::time::Duration::from_secs(5),
1068 establish_pt_conn(client, dial, client_addr),
1069 )
1070 .await
1071 .expect("establish_pt_conn timed out")
1072 .expect("establish_pt_conn should complete the obfs4 handshake");
1073
1074 let msg = b"through-the-obfs4-tunnel";
1078 tunnel.write_all(msg).await.expect("client write");
1079 tunnel.flush().await.expect("client flush");
1080
1081 let mut got = vec![0u8; msg.len()];
1082 tokio::time::timeout(
1083 std::time::Duration::from_secs(5),
1084 tunnel.read_exact(&mut got),
1085 )
1086 .await
1087 .expect("client read timed out")
1088 .expect("client read");
1089 assert_eq!(&got, msg, "data must round-trip through the obfs4 tunnel");
1090
1091 tokio::time::timeout(std::time::Duration::from_secs(5), server_task)
1092 .await
1093 .expect("server task timed out")
1094 .expect("server task panicked");
1095 }
1096
1097 #[tokio::test]
1098 async fn establish_pt_conn_dial_failure_is_error_not_panic() {
1099 let server_builder = obfs4::ServerBuilder::<DuplexStream>::default();
1104 let arg_string = server_builder.client_params();
1105 let client = obfs4_client_from_args(&arg_string);
1106
1107 let dial: Pin<ptrs::FutureResult<DuplexStream, std::io::Error>> = Box::pin(async {
1108 Err(std::io::Error::new(
1109 std::io::ErrorKind::ConnectionRefused,
1110 "dial refused",
1111 ))
1112 });
1113 let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1114
1115 let result = establish_pt_conn(client, dial, client_addr).await;
1116 assert!(
1117 result.is_err(),
1118 "a failed dial must produce an error, not a tunnel"
1119 );
1120 }
1121
1122 #[tokio::test]
1123 async fn establish_pt_conn_handshake_eof_is_error_not_panic() {
1124 let server_builder = obfs4::ServerBuilder::<DuplexStream>::default();
1134 let arg_string = server_builder.client_params();
1135 let client = obfs4_client_from_args(&arg_string);
1136
1137 let (client_side, server_side) = tokio::io::duplex(65_536);
1138 drop(server_side);
1140
1141 let dial: Pin<ptrs::FutureResult<DuplexStream, std::io::Error>> =
1142 Box::pin(async move { Ok(client_side) });
1143 let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1144
1145 let result = tokio::time::timeout(
1146 std::time::Duration::from_secs(5),
1147 establish_pt_conn(client, dial, client_addr),
1148 )
1149 .await
1150 .expect("establish_pt_conn should fail fast on EOF, not block until timeout");
1151
1152 assert!(
1153 result.is_err(),
1154 "a peer that closes mid-handshake must produce an error, not a tunnel"
1155 );
1156 }
1157
1158 #[tokio::test]
1166 async fn client_accept_loop_exits_on_pre_cancelled_token() {
1167 let listener = TcpListener::bind("127.0.0.1:0")
1169 .await
1170 .expect("bind listener for test");
1171
1172 let cancel = CancellationToken::new();
1174 cancel.cancel();
1175
1176 let builder = Obfs4PT::client_builder();
1177 let proxy_uri = url::Url::parse("data:,").expect("placeholder url");
1178
1179 let result = tokio::time::timeout(
1182 std::time::Duration::from_secs(2),
1183 client_accept_loop(listener, builder, proxy_uri, cancel),
1184 )
1185 .await;
1186
1187 assert!(
1188 result.is_ok(),
1189 "client_accept_loop must exit promptly when the token is cancelled, not spin"
1190 );
1191 assert!(
1192 result.unwrap().is_ok(),
1193 "client_accept_loop should return Ok(()) on graceful cancel"
1194 );
1195 }
1196
1197 async fn socks5_client_connect(
1213 parent: &mut DuplexStream,
1214 arg_string: &str,
1215 bridge: SocketAddr,
1216 ) {
1217 parent.write_all(&[0x05, 0x01, 0x02]).await.unwrap();
1219 parent.flush().await.unwrap();
1220 let mut sel = [0u8; 2];
1221 parent.read_exact(&mut sel).await.unwrap();
1222 assert_eq!(sel, [0x05, 0x02], "server must select user/pass auth");
1223
1224 let uname = arg_string.as_bytes();
1227 assert!(
1228 uname.len() <= 255,
1229 "this test packs the arg string into one SOCKS field"
1230 );
1231 let mut auth = vec![0x01, uname.len() as u8];
1232 auth.extend_from_slice(uname);
1233 auth.extend_from_slice(&[0x01, 0x00]); parent.write_all(&auth).await.unwrap();
1235 parent.flush().await.unwrap();
1236 let mut authresp = [0u8; 2];
1237 parent.read_exact(&mut authresp).await.unwrap();
1238 assert_eq!(authresp, [0x01, 0x00], "user/pass auth must succeed");
1239
1240 let (octets, port) = match bridge {
1242 SocketAddr::V4(v4) => (v4.ip().octets(), v4.port()),
1243 SocketAddr::V6(_) => unreachable!("test binds IPv4 loopback"),
1244 };
1245 let mut req = vec![0x05, 0x01, 0x00, 0x01];
1246 req.extend_from_slice(&octets);
1247 req.extend_from_slice(&port.to_be_bytes());
1248 parent.write_all(&req).await.unwrap();
1249 parent.flush().await.unwrap();
1250 }
1251
1252 #[tokio::test]
1253 async fn client_handle_connection_tunnels_through_obfs4_and_replies_itself() {
1254 let server_builder = obfs4::ServerBuilder::<TcpStream>::default();
1261 let arg_string = server_builder.client_params();
1262 let server = server_builder.build();
1263
1264 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1265 let bridge_addr = listener.local_addr().unwrap();
1266
1267 let bridge = tokio::spawn(async move {
1268 let (sock, _peer) = listener.accept().await.expect("bridge accept");
1269 let mut s = server.wrap(sock).await.expect("bridge obfs4 handshake");
1270 let mut buf = [0u8; 64];
1271 let n = s.read(&mut buf).await.expect("bridge read");
1272 s.write_all(&buf[..n]).await.expect("bridge echo");
1273 s.flush().await.expect("bridge flush");
1274 });
1275
1276 let (mut parent, lyrebird_side) = tokio::io::duplex(65_536);
1279 let builder = Obfs4PT::client_builder();
1280 let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1281 let handler = tokio::spawn(client_handle_connection(
1282 lyrebird_side,
1283 builder,
1284 url::Url::parse("data:,").unwrap(),
1285 client_addr,
1286 ));
1287
1288 socks5_client_connect(&mut parent, &arg_string, bridge_addr).await;
1289
1290 let mut reply = [0u8; 10];
1294 tokio::time::timeout(
1295 std::time::Duration::from_secs(5),
1296 parent.read_exact(&mut reply),
1297 )
1298 .await
1299 .expect("SOCKS5 reply timed out")
1300 .expect("read SOCKS5 reply");
1301 assert_eq!(
1302 reply,
1303 [0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0],
1304 "lyrebird must send the SOCKS5 success reply itself after the obfs4 handshake"
1305 );
1306
1307 let probe = b"bridge-tunnel-probe";
1309 parent.write_all(probe).await.unwrap();
1310 parent.flush().await.unwrap();
1311 let mut got = vec![0u8; probe.len()];
1312 tokio::time::timeout(
1313 std::time::Duration::from_secs(5),
1314 parent.read_exact(&mut got),
1315 )
1316 .await
1317 .expect("probe round-trip timed out")
1318 .expect("read probe echo");
1319 assert_eq!(
1320 &got, probe,
1321 "probe must round-trip through the obfs4 tunnel"
1322 );
1323
1324 tokio::time::timeout(std::time::Duration::from_secs(5), bridge)
1325 .await
1326 .expect("bridge task timed out")
1327 .expect("bridge task panicked");
1328 drop(parent); let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handler).await;
1330 }
1331
1332 #[tokio::test]
1333 async fn client_handle_connection_no_success_reply_when_bridge_not_obfs4() {
1334 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1341 let bridge_addr = listener.local_addr().unwrap();
1342 let bridge = tokio::spawn(async move {
1343 let (sock, _peer) = listener.accept().await.expect("bridge accept");
1344 drop(sock); });
1346
1347 let arg_string = obfs4::ServerBuilder::<TcpStream>::default().client_params();
1350
1351 let (mut parent, lyrebird_side) = tokio::io::duplex(65_536);
1352 let builder = Obfs4PT::client_builder();
1353 let client_addr: SocketAddr = "127.0.0.1:9050".parse().unwrap();
1354 let handler = tokio::spawn(client_handle_connection(
1355 lyrebird_side,
1356 builder,
1357 url::Url::parse("data:,").unwrap(),
1358 client_addr,
1359 ));
1360
1361 socks5_client_connect(&mut parent, &arg_string, bridge_addr).await;
1362
1363 let mut reply = [0u8; 10];
1367 let read = tokio::time::timeout(
1368 std::time::Duration::from_secs(5),
1369 parent.read_exact(&mut reply),
1370 )
1371 .await
1372 .expect("the read should resolve (EOF), not hang");
1373 assert!(
1374 read.is_err(),
1375 "no SOCKS5 success reply must be sent when the obfs4 handshake fails"
1376 );
1377
1378 let outcome = tokio::time::timeout(std::time::Duration::from_secs(5), handler)
1379 .await
1380 .expect("handler should finish")
1381 .expect("handler task panicked");
1382 assert!(
1383 outcome.is_err(),
1384 "client_handle_connection must surface the failed obfs4 dial as an error"
1385 );
1386
1387 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), bridge).await;
1388 }
1389}