Skip to main content

ntex_net/
lib.rs

1//! Utility for async runtime abstraction
2#![deny(clippy::pedantic)]
3#![allow(
4    clippy::missing_fields_in_debug,
5    clippy::must_use_candidate,
6    clippy::missing_errors_doc,
7    clippy::missing_panics_doc,
8    clippy::cast_possible_truncation
9)]
10use std::{io, net, net::SocketAddr};
11
12use ntex_io::Io;
13use ntex_rt::{BlockFuture, Driver, Runner};
14use ntex_service::cfg::SharedCfg;
15
16pub mod channel;
17pub mod connect;
18
19#[cfg(unix)]
20pub mod polling;
21
22#[cfg(target_os = "linux")]
23pub mod uring;
24
25#[cfg(unix)]
26mod helpers;
27
28#[cfg(feature = "tokio")]
29pub mod tokio;
30
31#[cfg(feature = "compio")]
32mod compio;
33
34#[allow(clippy::wrong_self_convention)]
35pub trait Reactor: Driver {
36    fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> channel::Receiver<Io>;
37
38    fn unix_connect(
39        &self,
40        addr: std::path::PathBuf,
41        cfg: SharedCfg,
42    ) -> channel::Receiver<Io>;
43
44    /// Convert std `TcpStream` to `Io`
45    fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io>;
46
47    #[cfg(unix)]
48    /// Convert std `UnixStream` to `Io`
49    fn from_unix_stream(
50        &self,
51        _: std::os::unix::net::UnixStream,
52        _: SharedCfg,
53    ) -> io::Result<Io>;
54}
55
56#[inline]
57/// Opens a TCP connection to a remote host.
58pub async fn tcp_connect(addr: SocketAddr, cfg: SharedCfg) -> io::Result<Io> {
59    with_current(|driver| driver.tcp_connect(addr, cfg)).await
60}
61
62#[inline]
63/// Opens a unix stream connection.
64pub async fn unix_connect<'a, P>(addr: P, cfg: SharedCfg) -> io::Result<Io>
65where
66    P: AsRef<std::path::Path> + 'a,
67{
68    with_current(|driver| driver.unix_connect(addr.as_ref().into(), cfg)).await
69}
70
71#[inline]
72/// Convert std `TcpStream` to `TcpStream`
73pub fn from_tcp_stream(stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
74    with_current(|driver| driver.from_tcp_stream(stream, cfg))
75}
76
77#[cfg(unix)]
78#[inline]
79/// Convert std `UnixStream` to `UnixStream`
80pub fn from_unix_stream(
81    stream: std::os::unix::net::UnixStream,
82    cfg: SharedCfg,
83) -> io::Result<Io> {
84    with_current(|driver| driver.from_unix_stream(stream, cfg))
85}
86
87fn with_current<T, F: FnOnce(&dyn Reactor) -> T>(f: F) -> T {
88    #[cold]
89    fn not_in_ntex_driver() -> ! {
90        panic!("not in a ntex driver")
91    }
92
93    if CURRENT_DRIVER.is_set() {
94        CURRENT_DRIVER.with(|d| f(&**d))
95    } else {
96        not_in_ntex_driver()
97    }
98}
99
100scoped_tls::scoped_thread_local!(static CURRENT_DRIVER: Box<dyn Reactor>);
101
102#[derive(Debug)]
103pub struct DefaultRuntime;
104
105impl Runner for DefaultRuntime {
106    #[allow(unused_variables)]
107    fn block_on(&self, fut: BlockFuture) {
108        #[cfg(feature = "tokio")]
109        {
110            let driver: Box<dyn Reactor> = Box::new(self::tokio::TokioDriver);
111
112            CURRENT_DRIVER.set(&driver, || {
113                crate::tokio::block_on(fut);
114            });
115        }
116
117        #[cfg(all(feature = "compio", not(feature = "tokio")))]
118        {
119            let driver: Box<dyn Reactor> = Box::new(self::compio::CompioDriver);
120
121            CURRENT_DRIVER.set(&driver, || {
122                crate::compio::block_on(fut);
123            });
124        }
125
126        #[cfg(all(unix, not(feature = "tokio"), not(feature = "compio")))]
127        {
128            #[cfg(feature = "neon-polling")]
129            {
130                let driver =
131                    crate::polling::Driver::new().expect("Cannot construct driver");
132                let driver: Box<dyn Reactor> = Box::new(driver);
133
134                CURRENT_DRIVER.set(&driver, || {
135                    let rt = ntex_rt::Runtime::new(driver.handle());
136                    rt.block_on(fut, &*driver);
137                    driver.clear();
138                });
139            }
140
141            #[cfg(all(target_os = "linux", feature = "neon-uring"))]
142            {
143                let driver =
144                    crate::uring::Driver::new(2048).expect("Cannot construct driver");
145                let driver: Box<dyn Reactor> = Box::new(driver);
146
147                CURRENT_DRIVER.set(&driver, || {
148                    let rt = ntex_rt::Runtime::new(driver.handle());
149                    rt.block_on(fut, &*driver);
150                    driver.clear();
151                });
152            }
153
154            #[cfg(all(not(feature = "neon-uring"), not(feature = "neon-polling")))]
155            {
156                #[cfg(target_os = "linux")]
157                let driver: Box<dyn Reactor> =
158                    if let Ok(driver) = crate::uring::Driver::new(2048) {
159                        Box::new(driver)
160                    } else {
161                        Box::new(
162                            crate::polling::Driver::new().expect("Cannot construct driver"),
163                        )
164                    };
165
166                #[cfg(not(target_os = "linux"))]
167                let driver: Box<dyn Reactor> = Box::new(
168                    crate::polling::Driver::new().expect("Cannot construct driver"),
169                );
170
171                CURRENT_DRIVER.set(&driver, || {
172                    let rt = ntex_rt::Runtime::new(driver.handle());
173                    rt.block_on(fut, &*driver);
174                    driver.clear();
175                });
176            }
177        }
178    }
179}