ntex_net/
lib.rs

1//! Utility for async runtime abstraction
2#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3use std::{io, net, net::SocketAddr};
4
5use ntex_io::Io;
6use ntex_rt::{BlockFuture, Driver, Runner};
7use ntex_service::cfg::SharedCfg;
8
9pub mod channel;
10pub mod connect;
11
12#[cfg(unix)]
13pub mod polling;
14
15#[cfg(target_os = "linux")]
16pub mod uring;
17
18#[cfg(unix)]
19mod helpers;
20
21#[cfg(feature = "tokio")]
22pub mod tokio;
23
24#[cfg(feature = "compio")]
25mod compio;
26
27#[allow(clippy::wrong_self_convention)]
28pub trait Reactor: Driver {
29    fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> channel::Receiver<Io>;
30
31    fn unix_connect(
32        &self,
33        addr: std::path::PathBuf,
34        cfg: SharedCfg,
35    ) -> channel::Receiver<Io>;
36
37    /// Convert std TcpStream to Io
38    fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io>;
39
40    #[cfg(unix)]
41    /// Convert std UnixStream to Io
42    fn from_unix_stream(
43        &self,
44        _: std::os::unix::net::UnixStream,
45        _: SharedCfg,
46    ) -> io::Result<Io>;
47}
48
49#[inline]
50/// Opens a TCP connection to a remote host.
51pub async fn tcp_connect(addr: SocketAddr, cfg: SharedCfg) -> io::Result<Io> {
52    with_current(|driver| driver.tcp_connect(addr, cfg)).await
53}
54
55#[inline]
56/// Opens a unix stream connection.
57pub async fn unix_connect<'a, P>(addr: P, cfg: SharedCfg) -> io::Result<Io>
58where
59    P: AsRef<std::path::Path> + 'a,
60{
61    with_current(|driver| driver.unix_connect(addr.as_ref().into(), cfg)).await
62}
63
64#[inline]
65/// Convert std TcpStream to TcpStream
66pub fn from_tcp_stream(stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
67    with_current(|driver| driver.from_tcp_stream(stream, cfg))
68}
69
70#[cfg(unix)]
71#[inline]
72/// Convert std UnixStream to UnixStream
73pub fn from_unix_stream(
74    stream: std::os::unix::net::UnixStream,
75    cfg: SharedCfg,
76) -> io::Result<Io> {
77    with_current(|driver| driver.from_unix_stream(stream, cfg))
78}
79
80fn with_current<T, F: FnOnce(&dyn Reactor) -> T>(f: F) -> T {
81    #[cold]
82    fn not_in_ntex_driver() -> ! {
83        panic!("not in a ntex driver")
84    }
85
86    if CURRENT_DRIVER.is_set() {
87        CURRENT_DRIVER.with(|d| f(&**d))
88    } else {
89        not_in_ntex_driver()
90    }
91}
92
93scoped_tls::scoped_thread_local!(static CURRENT_DRIVER: Box<dyn Reactor>);
94
95#[derive(Debug)]
96pub struct DefaultRuntime;
97
98impl Runner for DefaultRuntime {
99    fn block_on(&self, _fut: BlockFuture) {
100        #[cfg(feature = "tokio")]
101        {
102            let driver: Box<dyn Reactor> = Box::new(self::tokio::TokioDriver);
103
104            CURRENT_DRIVER.set(&driver, || {
105                crate::tokio::block_on(_fut);
106            });
107        }
108
109        #[cfg(all(feature = "compio", not(feature = "tokio")))]
110        {
111            let driver: Box<dyn Reactor> = Box::new(self::compio::CompioDriver);
112
113            CURRENT_DRIVER.set(&driver, || {
114                crate::compio::block_on(_fut);
115            });
116        }
117
118        #[cfg(all(not(feature = "tokio"), not(feature = "compio")))]
119        {
120            #[cfg(feature = "neon-polling")]
121            {
122                let driver =
123                    crate::polling::Driver::new().expect("Cannot construct driver");
124                let driver: Box<dyn Reactor> = Box::new(driver);
125
126                CURRENT_DRIVER.set(&driver, || {
127                    let rt = ntex_rt::Runtime::new(driver.handle());
128                    rt.block_on(_fut, &*driver);
129                    driver.clear();
130                });
131            }
132
133            #[cfg(all(target_os = "linux", feature = "neon-uring"))]
134            {
135                let driver =
136                    crate::uring::Driver::new(2048).expect("Cannot construct driver");
137                let driver: Box<dyn Reactor> = Box::new(driver);
138
139                CURRENT_DRIVER.set(&driver, || {
140                    let rt = ntex_rt::Runtime::new(driver.handle());
141                    rt.block_on(_fut, &*driver);
142                    driver.clear();
143                });
144            }
145
146            #[cfg(all(not(feature = "neon-uring"), not(feature = "neon-polling")))]
147            {
148                #[cfg(target_os = "linux")]
149                let driver: Box<dyn Reactor> =
150                    if let Ok(driver) = crate::uring::Driver::new(2048) {
151                        Box::new(driver)
152                    } else {
153                        Box::new(
154                            crate::polling::Driver::new().expect("Cannot construct driver"),
155                        )
156                    };
157
158                #[cfg(not(target_os = "linux"))]
159                let driver: Box<dyn Reactor> = Box::new(
160                    crate::polling::Driver::new().expect("Cannot construct driver"),
161                );
162
163                CURRENT_DRIVER.set(&driver, || {
164                    let rt = ntex_rt::Runtime::new(driver.handle());
165                    rt.block_on(_fut, &*driver);
166                    driver.clear();
167                });
168            }
169        }
170    }
171}