Skip to main content

ntex_net/
lib.rs

1//! Utility for async runtime abstraction
2#![deny(clippy::pedantic)]
3#![allow(
4    clippy::clone_on_copy,
5    clippy::cast_possible_truncation,
6    clippy::missing_fields_in_debug,
7    clippy::must_use_candidate,
8    clippy::missing_errors_doc,
9    clippy::missing_panics_doc
10)]
11use std::{io, net, net::SocketAddr, panic};
12
13use ntex_io::Io;
14use ntex_rt::{BlockFuture, Driver, Runner};
15use ntex_service::cfg::SharedCfg;
16
17pub mod channel;
18pub mod connect;
19
20#[cfg(unix)]
21pub mod polling;
22
23#[cfg(target_os = "linux")]
24pub mod uring;
25
26#[cfg(unix)]
27mod helpers;
28
29#[cfg(feature = "tokio")]
30pub mod tokio;
31
32#[cfg(feature = "compio")]
33mod compio;
34
35#[allow(clippy::wrong_self_convention)]
36pub trait Reactor: Driver {
37    fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> channel::Receiver<Io>;
38
39    fn unix_connect(
40        &self,
41        addr: std::path::PathBuf,
42        cfg: SharedCfg,
43    ) -> channel::Receiver<Io>;
44
45    /// Convert std `TcpStream` to `Io`
46    fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io>;
47
48    #[cfg(unix)]
49    /// Convert std `UnixStream` to `Io`
50    fn from_unix_stream(
51        &self,
52        _: std::os::unix::net::UnixStream,
53        _: SharedCfg,
54    ) -> io::Result<Io>;
55}
56
57#[inline]
58/// Opens a TCP connection to a remote host.
59pub async fn tcp_connect(addr: SocketAddr, cfg: SharedCfg) -> io::Result<Io> {
60    with_current(|driver| driver.tcp_connect(addr, cfg)).await
61}
62
63#[inline]
64/// Opens a unix stream connection.
65pub async fn unix_connect<'a, P>(addr: P, cfg: SharedCfg) -> io::Result<Io>
66where
67    P: AsRef<std::path::Path> + 'a,
68{
69    with_current(|driver| driver.unix_connect(addr.as_ref().into(), cfg)).await
70}
71
72#[inline]
73/// Convert std `TcpStream` to `TcpStream`
74pub fn from_tcp_stream(stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
75    with_current(|driver| driver.from_tcp_stream(stream, cfg))
76}
77
78#[cfg(unix)]
79#[inline]
80/// Convert std `UnixStream` to `UnixStream`
81pub fn from_unix_stream(
82    stream: std::os::unix::net::UnixStream,
83    cfg: SharedCfg,
84) -> io::Result<Io> {
85    with_current(|driver| driver.from_unix_stream(stream, cfg))
86}
87
88fn with_current<T, F: FnOnce(&dyn Reactor) -> T>(f: F) -> T {
89    #[cold]
90    fn not_in_ntex_driver() -> ! {
91        panic!("not in a ntex driver")
92    }
93
94    if CURRENT_DRIVER.is_set() {
95        CURRENT_DRIVER.with(|d| f(&**d))
96    } else {
97        not_in_ntex_driver()
98    }
99}
100
101scoped_tls::scoped_thread_local!(static CURRENT_DRIVER: Box<dyn Reactor>);
102
103#[derive(Debug)]
104pub struct DefaultRuntime;
105
106impl Runner for DefaultRuntime {
107    #[allow(unused_variables)]
108    fn block_on(&self, fut: BlockFuture) {
109        #[cfg(feature = "tokio")]
110        {
111            let driver: Box<dyn Reactor> = Box::new(self::tokio::TokioDriver);
112
113            CURRENT_DRIVER.set(&driver, || {
114                crate::tokio::block_on(fut);
115                ntex_rt::remove_all_items();
116            });
117        }
118
119        #[cfg(all(feature = "compio", not(feature = "tokio")))]
120        {
121            let driver: Box<dyn Reactor> = Box::new(self::compio::CompioDriver);
122
123            CURRENT_DRIVER.set(&driver, || {
124                crate::compio::block_on(fut);
125                ntex_rt::remove_all_items();
126            });
127        }
128
129        #[cfg(all(unix, not(feature = "tokio"), not(feature = "compio")))]
130        {
131            #[cfg(feature = "neon-polling")]
132            {
133                let driver =
134                    crate::polling::Driver::new().expect("Cannot construct driver");
135                let driver: Box<dyn Reactor> = Box::new(driver);
136
137                CURRENT_DRIVER.set(&driver, || {
138                    let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
139                        let rt = ntex_rt::Runtime::new(driver.handle());
140                        rt.block_on(fut, &*driver);
141                    }));
142                    if let Err(err) = res {
143                        ntex_rt::remove_all_items();
144                        panic::resume_unwind(err);
145                    } else {
146                        driver.clear();
147                        ntex_rt::remove_all_items();
148                    }
149                });
150            }
151
152            #[cfg(all(target_os = "linux", feature = "neon-uring"))]
153            {
154                let driver =
155                    crate::uring::Driver::new(2048).expect("Cannot construct driver");
156                let driver: Box<dyn Reactor> = Box::new(driver);
157
158                CURRENT_DRIVER.set(&driver, || {
159                    let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
160                        let rt = ntex_rt::Runtime::new(driver.handle());
161                        rt.block_on(fut, &*driver);
162                    }));
163                    if let Err(err) = res {
164                        ntex_rt::remove_all_items();
165                        panic::resume_unwind(err);
166                    } else {
167                        driver.clear();
168                        ntex_rt::remove_all_items();
169                    }
170                });
171            }
172
173            #[cfg(all(not(feature = "neon-uring"), not(feature = "neon-polling")))]
174            {
175                #[cfg(target_os = "linux")]
176                let driver: Box<dyn Reactor> =
177                    if let Ok(driver) = crate::uring::Driver::new(2048) {
178                        Box::new(driver)
179                    } else {
180                        Box::new(
181                            crate::polling::Driver::new().expect("Cannot construct driver"),
182                        )
183                    };
184
185                #[cfg(not(target_os = "linux"))]
186                let driver: Box<dyn Reactor> = Box::new(
187                    crate::polling::Driver::new().expect("Cannot construct driver"),
188                );
189
190                CURRENT_DRIVER.set(&driver, || {
191                    let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
192                        let rt = ntex_rt::Runtime::new(driver.handle());
193                        rt.block_on(fut, &*driver);
194                    }));
195                    if let Err(err) = res {
196                        ntex_rt::remove_all_items();
197                        panic::resume_unwind(err);
198                    } else {
199                        driver.clear();
200                        ntex_rt::remove_all_items();
201                    }
202                });
203            }
204        }
205    }
206}