1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![cfg_attr(not(all(feature = "full")), allow(unused))]
48
49#[cfg(all(
50 any(feature = "native-tls", feature = "rustls"),
51 any(feature = "async-std", feature = "tokio")
52))]
53pub(crate) mod impls;
54pub mod task;
55
56mod coarse_time;
57mod compound;
58mod dyn_time;
59pub mod general;
60mod opaque;
61pub mod scheduler;
62mod timer;
63mod traits;
64pub mod unimpl;
65pub mod unix;
66
67#[cfg(any(feature = "async-std", feature = "tokio"))]
68use std::io;
69pub use traits::{
70 Blocking, CertifiedConn, CoarseTimeProvider, NetStreamListener, NetStreamProvider,
71 NoOpStreamOpsHandle, Runtime, SleepProvider, StreamOps, TlsProvider, ToplevelBlockOn,
72 ToplevelRuntime, UdpProvider, UdpSocket, UnsupportedStreamOp,
73};
74
75pub use coarse_time::{CoarseDuration, CoarseInstant, RealCoarseTimeProvider};
76pub use dyn_time::DynTimeProvider;
77pub use timer::{SleepProviderExt, Timeout, TimeoutError};
78
79pub mod tls {
82 pub use crate::traits::{CertifiedConn, TlsConnector};
83
84 #[cfg(all(feature = "native-tls", any(feature = "tokio", feature = "async-std")))]
85 pub use crate::impls::native_tls::NativeTlsProvider;
86 #[cfg(all(feature = "rustls", any(feature = "tokio", feature = "async-std")))]
87 pub use crate::impls::rustls::RustlsProvider;
88}
89
90#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
91pub mod tokio;
92
93#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "async-std"))]
94pub mod async_std;
95
96pub use compound::{CompoundRuntime, RuntimeSubstExt};
97
98#[cfg(all(
99 any(feature = "native-tls", feature = "rustls"),
100 feature = "async-std",
101 not(feature = "tokio")
102))]
103use async_std as preferred_backend_mod;
104#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
105use tokio as preferred_backend_mod;
106
107#[cfg(all(
115 any(feature = "native-tls", feature = "rustls"),
116 any(feature = "async-std", feature = "tokio")
117))]
118#[derive(Clone)]
119pub struct PreferredRuntime {
120 inner: preferred_backend_mod::PreferredRuntime,
122}
123
124#[cfg(all(
125 any(feature = "native-tls", feature = "rustls"),
126 any(feature = "async-std", feature = "tokio")
127))]
128crate::opaque::implement_opaque_runtime! {
129 PreferredRuntime { inner : preferred_backend_mod::PreferredRuntime }
130}
131
132#[cfg(all(
133 any(feature = "native-tls", feature = "rustls"),
134 any(feature = "async-std", feature = "tokio")
135))]
136impl PreferredRuntime {
137 pub fn current() -> io::Result<Self> {
170 let rt = preferred_backend_mod::PreferredRuntime::current()?;
171
172 Ok(Self { inner: rt })
173 }
174
175 pub fn create() -> io::Result<Self> {
195 let rt = preferred_backend_mod::PreferredRuntime::create()?;
196
197 Ok(Self { inner: rt })
198 }
199
200 #[doc(hidden)]
210 pub fn run_test<P, F, O>(func: P) -> O
211 where
212 P: FnOnce(Self) -> F,
213 F: futures::Future<Output = O>,
214 {
215 let runtime = Self::create().expect("Failed to create runtime");
216 runtime.clone().block_on(func(runtime))
217 }
218}
219
220#[doc(hidden)]
226pub mod testing__ {
227 pub trait TestOutcome {
230 fn check_ok(&self);
232 }
233 impl TestOutcome for () {
234 fn check_ok(&self) {}
235 }
236 impl<E: std::fmt::Debug> TestOutcome for Result<(), E> {
237 fn check_ok(&self) {
238 self.as_ref().expect("Test failure");
239 }
240 }
241}
242
243macro_rules! declare_conditional_macro {
246 ( $(#[$meta:meta])* macro $name:ident = ($f1:expr, $f2:expr) ) => {
247 $( #[$meta] )*
248 #[cfg(all(feature=$f1, feature=$f2))]
249 #[macro_export]
250 macro_rules! $name {
251 ($tt:tt) => {
252 $tt
253 };
254 }
255
256 $( #[$meta] )*
257 #[cfg(not(all(feature=$f1, feature=$f2)))]
258 #[macro_export]
259 macro_rules! $name {
260 ($tt:tt) => {};
261 }
262
263 pub use $name;
266 };
267}
268
269#[doc(hidden)]
271pub mod cond {
272 declare_conditional_macro! {
273 #[doc(hidden)]
275 macro if_tokio_native_tls_present = ("tokio", "native-tls")
276 }
277 declare_conditional_macro! {
278 #[doc(hidden)]
280 macro if_tokio_rustls_present = ("tokio", "rustls")
281 }
282 declare_conditional_macro! {
283 #[doc(hidden)]
285 macro if_async_std_native_tls_present = ("async-std", "native-tls")
286 }
287 declare_conditional_macro! {
288 #[doc(hidden)]
290 macro if_async_std_rustls_present = ("async-std", "rustls")
291 }
292}
293
294#[macro_export]
310#[cfg(all(
311 any(feature = "native-tls", feature = "rustls"),
312 any(feature = "tokio", feature = "async-std"),
313))]
314macro_rules! test_with_all_runtimes {
315 ( $fn:expr ) => {{
316 use $crate::cond::*;
317 use $crate::testing__::TestOutcome;
318 if_tokio_native_tls_present! {{
323 $crate::tokio::TokioNativeTlsRuntime::run_test($fn).check_ok();
324 }}
325 if_tokio_rustls_present! {{
326 $crate::tokio::TokioRustlsRuntime::run_test($fn).check_ok();
327 }}
328 if_async_std_native_tls_present! {{
329 $crate::async_std::AsyncStdNativeTlsRuntime::run_test($fn).check_ok();
330 }}
331 if_async_std_rustls_present! {{
332 $crate::async_std::AsyncStdRustlsRuntime::run_test($fn).check_ok();
333 }}
334 }};
335}
336
337#[macro_export]
349#[cfg(all(
350 any(feature = "native-tls", feature = "rustls"),
351 any(feature = "tokio", feature = "async-std"),
352))]
353macro_rules! test_with_one_runtime {
354 ( $fn:expr ) => {{
355 $crate::PreferredRuntime::run_test($fn)
356 }};
357}
358
359#[cfg(all(
360 test,
361 any(feature = "native-tls", feature = "rustls"),
362 any(feature = "async-std", feature = "tokio"),
363 not(miri), ))]
365mod test {
366 #![allow(clippy::unwrap_used, clippy::unnecessary_wraps)]
367 use crate::SleepProviderExt;
368 use crate::ToplevelRuntime;
369
370 use crate::traits::*;
371
372 use futures::io::{AsyncReadExt, AsyncWriteExt};
373 use futures::stream::StreamExt;
374 use native_tls_crate as native_tls;
375 use std::io::Result as IoResult;
376 use std::net::SocketAddr;
377 use std::net::{Ipv4Addr, SocketAddrV4};
378 use std::time::{Duration, Instant};
379
380 fn small_delay<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
383 let rt = runtime.clone();
384 runtime.block_on(async {
385 let i1 = Instant::now();
386 let one_msec = Duration::from_millis(1);
387 rt.sleep(one_msec).await;
388 let i2 = Instant::now();
389 assert!(i2 >= i1 + one_msec);
390 });
391 Ok(())
392 }
393
394 fn small_timeout_ok<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
396 let rt = runtime.clone();
397 runtime.block_on(async {
398 let one_day = Duration::from_secs(86400);
399 let outcome = rt.timeout(one_day, async { 413_u32 }).await;
400 assert_eq!(outcome, Ok(413));
401 });
402 Ok(())
403 }
404
405 fn small_timeout_expire<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
407 use futures::future::pending;
408
409 let rt = runtime.clone();
410 runtime.block_on(async {
411 let one_micros = Duration::from_micros(1);
412 let outcome = rt.timeout(one_micros, pending::<()>()).await;
413 assert_eq!(outcome, Err(crate::TimeoutError));
414 assert_eq!(
415 outcome.err().unwrap().to_string(),
416 "Timeout expired".to_string()
417 );
418 });
419 Ok(())
420 }
421 fn tiny_wallclock<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
426 let rt = runtime.clone();
427 runtime.block_on(async {
428 let i1 = Instant::now();
429 let now = runtime.wallclock();
430 let one_millis = Duration::from_millis(1);
431 let one_millis_later = now + one_millis;
432
433 rt.sleep_until_wallclock(one_millis_later).await;
434
435 let i2 = Instant::now();
436 let newtime = runtime.wallclock();
437 assert!(newtime >= one_millis_later);
438 assert!(i2 - i1 >= one_millis);
439 });
440 Ok(())
441 }
442
443 fn self_connect_tcp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
447 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
448 let rt1 = runtime.clone();
449
450 let listener = runtime.block_on(rt1.listen(&(SocketAddr::from(localhost))))?;
451 let addr = listener.local_addr()?;
452
453 runtime.block_on(async {
454 let task1 = async {
455 let mut buf = vec![0_u8; 11];
456 let (mut con, _addr) = listener.incoming().next().await.expect("closed?")?;
457 con.read_exact(&mut buf[..]).await?;
458 IoResult::Ok(buf)
459 };
460 let task2 = async {
461 let mut con = rt1.connect(&addr).await?;
462 con.write_all(b"Hello world").await?;
463 con.flush().await?;
464 IoResult::Ok(())
465 };
466
467 let (data, send_r) = futures::join!(task1, task2);
468 send_r?;
469
470 assert_eq!(&data?[..], b"Hello world");
471
472 Ok(())
473 })
474 }
475
476 fn self_connect_udp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
480 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
481 let rt1 = runtime.clone();
482
483 let socket1 = runtime.block_on(rt1.bind(&(localhost.into())))?;
484 let addr1 = socket1.local_addr()?;
485
486 let socket2 = runtime.block_on(rt1.bind(&(localhost.into())))?;
487 let addr2 = socket2.local_addr()?;
488
489 runtime.block_on(async {
490 let task1 = async {
491 let mut buf = [0_u8; 16];
492 let (len, addr) = socket1.recv(&mut buf[..]).await?;
493 IoResult::Ok((buf[..len].to_vec(), addr))
494 };
495 let task2 = async {
496 socket2.send(b"Hello world", &addr1).await?;
497 IoResult::Ok(())
498 };
499
500 let (recv_r, send_r) = futures::join!(task1, task2);
501 send_r?;
502 let (buff, addr) = recv_r?;
503 assert_eq!(addr2, addr);
504 assert_eq!(&buff, b"Hello world");
505
506 Ok(())
507 })
508 }
509
510 fn listener_stream<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
515 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
516 let rt1 = runtime.clone();
517
518 let listener = runtime
519 .block_on(rt1.listen(&SocketAddr::from(localhost)))
520 .unwrap();
521 let addr = listener.local_addr().unwrap();
522 let mut stream = listener.incoming();
523
524 runtime.block_on(async {
525 let task1 = async {
526 let mut n = 0_u32;
527 loop {
528 let (mut con, _addr) = stream.next().await.unwrap()?;
529 let mut buf = [0_u8; 11];
530 con.read_exact(&mut buf[..]).await?;
531 n += 1;
532 if &buf[..] == b"world done!" {
533 break IoResult::Ok(n);
534 }
535 }
536 };
537 let task2 = async {
538 for _ in 0_u8..5 {
539 let mut con = rt1.connect(&addr).await?;
540 con.write_all(b"Hello world").await?;
541 con.flush().await?;
542 }
543 let mut con = rt1.connect(&addr).await?;
544 con.write_all(b"world done!").await?;
545 con.flush().await?;
546 con.close().await?;
547 IoResult::Ok(())
548 };
549
550 let (n, send_r) = futures::join!(task1, task2);
551 send_r?;
552
553 assert_eq!(n?, 6);
554
555 Ok(())
556 })
557 }
558
559 fn simple_tls<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
564 static PFX_ID: &[u8] = include_bytes!("test.pfx");
575 static PFX_PASSWORD: &str = "abc";
578
579 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
580 let listener = std::net::TcpListener::bind(localhost)?;
581 let addr = listener.local_addr()?;
582
583 let identity = native_tls::Identity::from_pkcs12(PFX_ID, PFX_PASSWORD).unwrap();
584
585 let th = std::thread::spawn(move || {
587 use std::io::{Read, Write};
589 let acceptor = native_tls::TlsAcceptor::new(identity).unwrap();
590 let (con, _addr) = listener.accept()?;
591 let mut con = acceptor.accept(con).unwrap();
592 let mut buf = [0_u8; 16];
593 loop {
594 let n = con.read(&mut buf)?;
595 if n == 0 {
596 break;
597 }
598 con.write_all(&buf[..n])?;
599 }
600 IoResult::Ok(())
601 });
602
603 let connector = runtime.tls_connector();
604
605 runtime.block_on(async {
606 let text = b"I Suddenly Dont Understand Anything";
607 let mut buf = vec![0_u8; text.len()];
608 let conn = runtime.connect(&addr).await?;
609 let mut conn = connector.negotiate_unvalidated(conn, "Kan.Aya").await?;
610 assert!(conn.peer_certificate()?.is_some());
611 conn.write_all(text).await?;
612 conn.flush().await?;
613 conn.read_exact(&mut buf[..]).await?;
614 assert_eq!(&buf[..], text);
615 conn.close().await?;
616 IoResult::Ok(())
617 })?;
618
619 th.join().unwrap()?;
620 IoResult::Ok(())
621 }
622
623 macro_rules! tests_with_runtime {
624 { $runtime:expr => $($id:ident),* $(,)? } => {
625 $(
626 #[test]
627 fn $id() -> std::io::Result<()> {
628 super::$id($runtime)
629 }
630 )*
631 }
632 }
633
634 macro_rules! runtime_tests {
635 { $($id:ident),* $(,)? } =>
636 {
637 #[cfg(feature="tokio")]
638 mod tokio_runtime_tests {
639 tests_with_runtime! { &crate::tokio::PreferredRuntime::create()? => $($id),* }
640 }
641 #[cfg(feature="async-std")]
642 mod async_std_runtime_tests {
643 tests_with_runtime! { &crate::async_std::PreferredRuntime::create()? => $($id),* }
644 }
645 mod default_runtime_tests {
646 tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
647 }
648 }
649 }
650
651 macro_rules! tls_runtime_tests {
652 { $($id:ident),* $(,)? } =>
653 {
654 #[cfg(all(feature="tokio", feature = "native-tls"))]
655 mod tokio_native_tls_tests {
656 tests_with_runtime! { &crate::tokio::TokioNativeTlsRuntime::create()? => $($id),* }
657 }
658 #[cfg(all(feature="async-std", feature = "native-tls"))]
659 mod async_std_native_tls_tests {
660 tests_with_runtime! { &crate::async_std::AsyncStdNativeTlsRuntime::create()? => $($id),* }
661 }
662 #[cfg(all(feature="tokio", feature="rustls"))]
663 mod tokio_rustls_tests {
664 tests_with_runtime! { &crate::tokio::TokioRustlsRuntime::create()? => $($id),* }
665 }
666 #[cfg(all(feature="async-std", feature="rustls"))]
667 mod async_std_rustls_tests {
668 tests_with_runtime! { &crate::async_std::AsyncStdRustlsRuntime::create()? => $($id),* }
669 }
670 mod default_runtime_tls_tests {
671 tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
672 }
673 }
674 }
675
676 runtime_tests! {
677 small_delay,
678 small_timeout_ok,
679 small_timeout_expire,
680 tiny_wallclock,
681 self_connect_tcp,
682 self_connect_udp,
683 listener_stream,
684 }
685
686 tls_runtime_tests! {
687 simple_tls,
688 }
689}