1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
47#![cfg_attr(not(all(feature = "full")), allow(unused))]
51
52#[cfg(all(
53 any(feature = "native-tls", feature = "rustls"),
54 any(feature = "async-std", feature = "tokio", feature = "smol")
55))]
56pub(crate) mod impls;
57pub mod task;
58
59mod coarse_time;
60mod compound;
61mod dyn_time;
62pub mod general;
63mod network;
64mod opaque;
65pub mod scheduler;
66mod timer;
67mod traits;
68pub mod unimpl;
69pub mod unix;
70
71#[cfg(any(feature = "async-std", feature = "tokio", feature = "smol"))]
72use std::io;
73pub use traits::{
74 Blocking, CertifiedConn, CoarseTimeProvider, NetStreamListener, NetStreamProvider,
75 NoOpStreamOpsHandle, Runtime, SleepProvider, SpawnExt, StreamOps, TlsProvider, ToplevelBlockOn,
76 ToplevelRuntime, UdpProvider, UdpSocket, UnsupportedStreamOp,
77};
78
79pub use coarse_time::{CoarseDuration, CoarseInstant, RealCoarseTimeProvider};
80pub use dyn_time::DynTimeProvider;
81pub use network::{CommonListenOptions, TcpListenOptions, UnixListenOptions};
82pub use timer::{SleepProviderExt, Timeout, TimeoutError};
83
84pub mod tls {
87 #[cfg(all(
88 any(feature = "native-tls", feature = "rustls"),
89 any(feature = "async-std", feature = "tokio", feature = "smol")
90 ))]
91 pub use crate::impls::unimpl_tls::UnimplementedTls;
92 pub use crate::traits::{
93 CertifiedConn, TlsAcceptorSettings, TlsConnector, TlsServerUnsupported,
94 };
95
96 #[cfg(all(
97 feature = "native-tls",
98 any(feature = "tokio", feature = "async-std", feature = "smol")
99 ))]
100 pub use crate::impls::native_tls::NativeTlsProvider;
101 #[cfg(all(
102 feature = "rustls",
103 any(feature = "tokio", feature = "async-std", feature = "smol")
104 ))]
105 pub use crate::impls::rustls::RustlsProvider;
106 #[cfg(all(
107 feature = "rustls",
108 feature = "tls-server",
109 any(feature = "tokio", feature = "async-std", feature = "smol")
110 ))]
111 pub use crate::impls::rustls::rustls_server::{RustlsAcceptor, RustlsServerStream};
112}
113
114#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
115pub mod tokio;
116
117#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "async-std"))]
118pub mod async_std;
119
120#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "smol"))]
121pub mod smol;
122
123pub use compound::{CompoundRuntime, RuntimeSubstExt};
124
125#[cfg(all(
126 any(feature = "native-tls", feature = "rustls"),
127 feature = "async-std",
128 not(feature = "tokio")
129))]
130use async_std as preferred_backend_mod;
131#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
132use tokio as preferred_backend_mod;
133
134#[cfg(all(
146 any(feature = "native-tls", feature = "rustls"),
147 any(feature = "async-std", feature = "tokio")
148))]
149#[derive(Clone)]
150pub struct PreferredRuntime {
151 inner: preferred_backend_mod::PreferredRuntime,
153}
154
155#[cfg(all(
156 any(feature = "native-tls", feature = "rustls"),
157 any(feature = "async-std", feature = "tokio")
158))]
159crate::opaque::implement_opaque_runtime! {
160 PreferredRuntime { inner : preferred_backend_mod::PreferredRuntime }
161}
162
163#[cfg(all(
164 any(feature = "native-tls", feature = "rustls"),
165 any(feature = "async-std", feature = "tokio")
166))]
167impl PreferredRuntime {
168 pub fn current() -> io::Result<Self> {
204 let rt = preferred_backend_mod::PreferredRuntime::current()?;
205
206 Ok(Self { inner: rt })
207 }
208
209 pub fn create() -> io::Result<Self> {
233 let rt = preferred_backend_mod::PreferredRuntime::create()?;
234
235 Ok(Self { inner: rt })
236 }
237
238 #[doc(hidden)]
248 pub fn run_test<P, F, O>(func: P) -> O
249 where
250 P: FnOnce(Self) -> F,
251 F: futures::Future<Output = O>,
252 {
253 let runtime = Self::create().expect("Failed to create runtime");
254 runtime.clone().block_on(func(runtime))
255 }
256}
257
258#[doc(hidden)]
264pub mod testing__ {
265 pub trait TestOutcome {
268 fn check_ok(&self);
270 }
271 impl TestOutcome for () {
272 fn check_ok(&self) {}
273 }
274 impl<E: std::fmt::Debug> TestOutcome for Result<(), E> {
275 fn check_ok(&self) {
276 self.as_ref().expect("Test failure");
277 }
278 }
279}
280
281macro_rules! declare_conditional_macro {
284 ( $(#[$meta:meta])* macro $name:ident = ($f1:expr, $f2:expr) ) => {
285 $( #[$meta] )*
286 #[cfg(all(feature=$f1, feature=$f2))]
287 #[macro_export]
288 macro_rules! $name {
289 ($tt:tt) => {
290 $tt
291 };
292 }
293
294 $( #[$meta] )*
295 #[cfg(not(all(feature=$f1, feature=$f2)))]
296 #[macro_export]
297 macro_rules! $name {
298 ($tt:tt) => {};
299 }
300
301 pub use $name;
304 };
305}
306
307#[doc(hidden)]
309pub mod cond {
310 declare_conditional_macro! {
311 #[doc(hidden)]
313 macro if_tokio_native_tls_present = ("tokio", "native-tls")
314 }
315 declare_conditional_macro! {
316 #[doc(hidden)]
318 macro if_tokio_rustls_present = ("tokio", "rustls")
319 }
320 declare_conditional_macro! {
321 #[doc(hidden)]
323 macro if_async_std_native_tls_present = ("async-std", "native-tls")
324 }
325 declare_conditional_macro! {
326 #[doc(hidden)]
328 macro if_async_std_rustls_present = ("async-std", "rustls")
329 }
330 declare_conditional_macro! {
331 #[doc(hidden)]
333 macro if_smol_native_tls_present = ("smol", "native-tls")
334 }
335 declare_conditional_macro! {
336 #[doc(hidden)]
338 macro if_smol_rustls_present = ("smol", "rustls")
339 }
340}
341
342#[macro_export]
358#[cfg(all(
359 any(feature = "native-tls", feature = "rustls"),
360 any(feature = "tokio", feature = "async-std", feature = "smol"),
361))]
362macro_rules! test_with_all_runtimes {
363 ( $fn:expr ) => {{
364 use $crate::cond::*;
365 use $crate::testing__::TestOutcome;
366 if_tokio_native_tls_present! {{
371 $crate::tokio::TokioNativeTlsRuntime::run_test($fn).check_ok();
372 }}
373 if_tokio_rustls_present! {{
374 $crate::tokio::TokioRustlsRuntime::run_test($fn).check_ok();
375 }}
376 if_async_std_native_tls_present! {{
377 $crate::async_std::AsyncStdNativeTlsRuntime::run_test($fn).check_ok();
378 }}
379 if_async_std_rustls_present! {{
380 $crate::async_std::AsyncStdRustlsRuntime::run_test($fn).check_ok();
381 }}
382 if_smol_native_tls_present! {{
383 $crate::smol::SmolNativeTlsRuntime::run_test($fn).check_ok();
384 }}
385 if_smol_rustls_present! {{
386 $crate::smol::SmolRustlsRuntime::run_test($fn).check_ok();
387 }}
388 }};
389}
390
391#[macro_export]
403#[cfg(all(
404 any(feature = "native-tls", feature = "rustls"),
405 any(feature = "tokio", feature = "async-std"),
406))]
407macro_rules! test_with_one_runtime {
408 ( $fn:expr ) => {{ $crate::PreferredRuntime::run_test($fn) }};
409}
410
411#[cfg(all(
412 test,
413 any(feature = "native-tls", feature = "rustls"),
414 any(feature = "async-std", feature = "tokio", feature = "smol"),
415 not(miri), ))]
417mod test {
418 #![allow(clippy::bool_assert_comparison)]
420 #![allow(clippy::clone_on_copy)]
421 #![allow(clippy::dbg_macro)]
422 #![allow(clippy::mixed_attributes_style)]
423 #![allow(clippy::print_stderr)]
424 #![allow(clippy::print_stdout)]
425 #![allow(clippy::single_char_pattern)]
426 #![allow(clippy::unwrap_used)]
427 #![allow(clippy::unchecked_time_subtraction)]
428 #![allow(clippy::useless_vec)]
429 #![allow(clippy::needless_pass_by_value)]
430 #![allow(clippy::unnecessary_wraps)]
432 use crate::SleepProviderExt;
433 use crate::ToplevelRuntime;
434
435 use crate::traits::*;
436
437 use futures::io::{AsyncReadExt, AsyncWriteExt};
438 use futures::stream::StreamExt;
439 use native_tls_crate as native_tls;
440 use std::io::Result as IoResult;
441 use std::net::SocketAddr;
442 use std::net::{Ipv4Addr, SocketAddrV4};
443 use web_time_compat::{Duration, Instant, InstantExt, SystemTimeExt};
444
445 fn small_delay<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
448 let rt = runtime.clone();
449 runtime.block_on(async {
450 let i1 = Instant::get();
451 let one_msec = Duration::from_millis(1);
452 rt.sleep(one_msec).await;
453 let i2 = Instant::get();
454 assert!(i2 >= i1 + one_msec);
455 });
456 Ok(())
457 }
458
459 fn small_timeout_ok<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
461 let rt = runtime.clone();
462 runtime.block_on(async {
463 let one_day = Duration::from_secs(86400);
464 let outcome = rt.timeout(one_day, async { 413_u32 }).await;
465 assert_eq!(outcome, Ok(413));
466 });
467 Ok(())
468 }
469
470 fn small_timeout_expire<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
472 use futures::future::pending;
473
474 let rt = runtime.clone();
475 runtime.block_on(async {
476 let one_micros = Duration::from_micros(1);
477 let outcome = rt.timeout(one_micros, pending::<()>()).await;
478 assert_eq!(outcome, Err(crate::TimeoutError));
479 assert_eq!(
480 outcome.err().unwrap().to_string(),
481 "Timeout expired".to_string()
482 );
483 });
484 Ok(())
485 }
486 fn tiny_wallclock<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
491 let rt = runtime.clone();
492 runtime.block_on(async {
493 let i1 = Instant::get();
494 let now = runtime.wallclock();
495 let one_millis = Duration::from_millis(1);
496 let one_millis_later = now + one_millis;
497
498 rt.sleep_until_wallclock(one_millis_later).await;
499
500 let i2 = Instant::get();
501 let newtime = runtime.wallclock();
502 assert!(newtime >= one_millis_later);
503 assert!(i2 - i1 >= one_millis);
504 });
505 Ok(())
506 }
507
508 fn self_connect_tcp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
512 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
513 let rt1 = runtime.clone();
514
515 let listen_options = Default::default();
516 let listener =
517 runtime.block_on(rt1.listen(&(SocketAddr::from(localhost)), &listen_options))?;
518 let addr = listener.local_addr()?;
519
520 runtime.block_on(async {
521 let task1 = async {
522 let mut buf = vec![0_u8; 11];
523 let (mut con, _addr) = listener.incoming().next().await.expect("closed?")?;
524 con.read_exact(&mut buf[..]).await?;
525 IoResult::Ok(buf)
526 };
527 let task2 = async {
528 let mut con = rt1.connect(&addr).await?;
529 con.write_all(b"Hello world").await?;
530 con.flush().await?;
531 IoResult::Ok(())
532 };
533
534 let (data, send_r) = futures::join!(task1, task2);
535 send_r?;
536
537 assert_eq!(&data?[..], b"Hello world");
538
539 Ok(())
540 })
541 }
542
543 fn self_connect_udp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
547 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
548 let rt1 = runtime.clone();
549
550 let socket1 = runtime.block_on(rt1.bind(&(localhost.into())))?;
551 let addr1 = socket1.local_addr()?;
552
553 let socket2 = runtime.block_on(rt1.bind(&(localhost.into())))?;
554 let addr2 = socket2.local_addr()?;
555
556 runtime.block_on(async {
557 let task1 = async {
558 let mut buf = [0_u8; 16];
559 let (len, addr) = socket1.recv(&mut buf[..]).await?;
560 IoResult::Ok((buf[..len].to_vec(), addr))
561 };
562 let task2 = async {
563 socket2.send(b"Hello world", &addr1).await?;
564 IoResult::Ok(())
565 };
566
567 let (recv_r, send_r) = futures::join!(task1, task2);
568 send_r?;
569 let (buff, addr) = recv_r?;
570 assert_eq!(addr2, addr);
571 assert_eq!(&buff, b"Hello world");
572
573 Ok(())
574 })
575 }
576
577 fn listener_stream<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
582 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
583 let rt1 = runtime.clone();
584
585 let listen_options = Default::default();
586 let listener = runtime
587 .block_on(rt1.listen(&SocketAddr::from(localhost), &listen_options))
588 .unwrap();
589 let addr = listener.local_addr().unwrap();
590 let mut stream = listener.incoming();
591
592 runtime.block_on(async {
593 let task1 = async {
594 let mut n = 0_u32;
595 loop {
596 let (mut con, _addr) = stream.next().await.unwrap()?;
597 let mut buf = [0_u8; 11];
598 con.read_exact(&mut buf[..]).await?;
599 n += 1;
600 if &buf[..] == b"world done!" {
601 break IoResult::Ok(n);
602 }
603 }
604 };
605 let task2 = async {
606 for _ in 0_u8..5 {
607 let mut con = rt1.connect(&addr).await?;
608 con.write_all(b"Hello world").await?;
609 con.flush().await?;
610 }
611 let mut con = rt1.connect(&addr).await?;
612 con.write_all(b"world done!").await?;
613 con.flush().await?;
614 con.close().await?;
615 IoResult::Ok(())
616 };
617
618 let (n, send_r) = futures::join!(task1, task2);
619 send_r?;
620
621 assert_eq!(n?, 6);
622
623 Ok(())
624 })
625 }
626
627 fn simple_tls<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
632 static PFX_ID: &[u8] = include_bytes!("test.pfx");
643 static PFX_PASSWORD: &str = "abc";
646
647 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
648 let listener = std::net::TcpListener::bind(localhost)?;
649 let addr = listener.local_addr()?;
650
651 let identity = native_tls::Identity::from_pkcs12(PFX_ID, PFX_PASSWORD).unwrap();
652
653 let th = std::thread::spawn(move || {
655 use std::io::{Read, Write};
657 let acceptor = native_tls::TlsAcceptor::new(identity).unwrap();
658 let (con, _addr) = listener.accept()?;
659 let mut con = acceptor.accept(con).unwrap();
660 let mut buf = [0_u8; 16];
661 loop {
662 let n = con.read(&mut buf)?;
663 if n == 0 {
664 break;
665 }
666 con.write_all(&buf[..n])?;
667 }
668 IoResult::Ok(())
669 });
670
671 let connector = runtime.tls_connector();
672
673 runtime.block_on(async {
674 let text = b"I Suddenly Dont Understand Anything";
675 let mut buf = vec![0_u8; text.len()];
676 let conn = runtime.connect(&addr).await?;
677 let mut conn = connector.negotiate_unvalidated(conn, "Kan.Aya").await?;
678 assert!(conn.peer_certificate()?.is_some());
679 conn.write_all(text).await?;
680 conn.flush().await?;
681 conn.read_exact(&mut buf[..]).await?;
682 assert_eq!(&buf[..], text);
683 conn.close().await?;
684 IoResult::Ok(())
685 })?;
686
687 th.join().unwrap()?;
688 IoResult::Ok(())
689 }
690
691 fn simple_tls_server<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
692 let mut rng = tor_basic_utils::test_rng::testing_rng();
693 let tls_cert = tor_cert_x509::TlsKeyAndCert::create(
694 &mut rng,
695 std::time::SystemTime::get(),
696 "prospit.example.org",
697 "derse.example.org",
698 )
699 .unwrap();
700 let cert = tls_cert.certificates_der()[0].to_vec();
701 let settings = TlsAcceptorSettings::new(tls_cert).unwrap();
702
703 let Ok(tls_acceptor) = runtime.tls_acceptor(settings) else {
704 println!("Skipping tls-server test for runtime {:?}", &runtime);
705 return IoResult::Ok(());
706 };
707 println!("Running tls-server test for runtime {:?}", &runtime);
708
709 let tls_connector = runtime.tls_connector();
710
711 let localhost: SocketAddr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into();
712 let rt1 = runtime.clone();
713
714 let msg = b"Derse Reviles Him And Outlaws Frogs Wherever They Can";
715 runtime.block_on(async move {
716 let listen_options = Default::default();
717 let listener = runtime.listen(&localhost, &listen_options).await.unwrap();
718 let address = listener.local_addr().unwrap();
719
720 let h1 = runtime
721 .spawn_with_handle(async move {
722 let conn = listener.incoming().next().await.unwrap().unwrap().0;
723 let mut conn = tls_acceptor.negotiate_unvalidated(conn, "").await.unwrap();
724
725 let mut buf = vec![];
726 conn.read_to_end(&mut buf).await.unwrap();
727 (buf, conn.own_certificate().unwrap().unwrap().into_owned())
728 })
729 .unwrap();
730
731 let h2 = runtime
732 .spawn_with_handle(async move {
733 let conn = rt1.connect(&address).await.unwrap();
734 let mut conn = tls_connector
735 .negotiate_unvalidated(conn, "prospit.example.org")
736 .await
737 .unwrap();
738 conn.write_all(msg).await.unwrap();
739 conn.close().await.unwrap();
740 conn.peer_certificate().unwrap().unwrap().into_owned()
741 })
742 .unwrap();
743
744 let (received, server_own_cert) = h1.await;
745 let client_peer_cert = h2.await;
746 assert_eq!(received, msg);
747 assert_eq!(&server_own_cert, &cert);
748 assert_eq!(&client_peer_cert, &cert);
749 });
750 IoResult::Ok(())
751 }
752
753 macro_rules! tests_with_runtime {
754 { $runtime:expr => $($id:ident),* $(,)? } => {
755 $(
756 #[test]
757 fn $id() -> std::io::Result<()> {
758 super::$id($runtime)
759 }
760 )*
761 }
762 }
763
764 macro_rules! runtime_tests {
765 { $($id:ident),* $(,)? } =>
766 {
767 #[cfg(feature="tokio")]
768 mod tokio_runtime_tests {
769 tests_with_runtime! { &crate::tokio::PreferredRuntime::create()? => $($id),* }
770 }
771 #[cfg(feature="async-std")]
772 mod async_std_runtime_tests {
773 tests_with_runtime! { &crate::async_std::PreferredRuntime::create()? => $($id),* }
774 }
775 #[cfg(feature="smol")]
776 mod smol_runtime_tests {
777 tests_with_runtime! { &crate::smol::PreferredRuntime::create()? => $($id),* }
778 }
779 mod default_runtime_tests {
780 tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
781 }
782 }
783 }
784
785 macro_rules! tls_runtime_tests {
786 { $($id:ident),* $(,)? } =>
787 {
788 #[cfg(all(feature="tokio", feature = "native-tls"))]
789 mod tokio_native_tls_tests {
790 tests_with_runtime! { &crate::tokio::TokioNativeTlsRuntime::create()? => $($id),* }
791 }
792 #[cfg(all(feature="async-std", feature = "native-tls"))]
793 mod async_std_native_tls_tests {
794 tests_with_runtime! { &crate::async_std::AsyncStdNativeTlsRuntime::create()? => $($id),* }
795 }
796 #[cfg(all(feature="smol", feature = "native-tls"))]
797 mod smol_native_tls_tests {
798 tests_with_runtime! { &crate::smol::SmolNativeTlsRuntime::create()? => $($id),* }
799 }
800 #[cfg(all(feature="tokio", feature="rustls"))]
801 mod tokio_rustls_tests {
802 tests_with_runtime! { &crate::tokio::TokioRustlsRuntime::create()? => $($id),* }
803 }
804 #[cfg(all(feature="async-std", feature="rustls"))]
805 mod async_std_rustls_tests {
806 tests_with_runtime! { &crate::async_std::AsyncStdRustlsRuntime::create()? => $($id),* }
807 }
808 #[cfg(all(feature="smol", feature="rustls"))]
809 mod smol_rustls_tests {
810 tests_with_runtime! { &crate::smol::SmolRustlsRuntime::create()? => $($id),* }
811 }
812 mod default_runtime_tls_tests {
813 tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
814 }
815 }
816 }
817
818 runtime_tests! {
819 small_delay,
820 small_timeout_ok,
821 small_timeout_expire,
822 tiny_wallclock,
823 self_connect_tcp,
824 self_connect_udp,
825 listener_stream,
826 }
827
828 tls_runtime_tests! {
829 simple_tls,
830 simple_tls_server,
831 }
832}