1#![deny(clippy::pedantic)]
3#![allow(
4 clippy::missing_fields_in_debug,
5 clippy::must_use_candidate,
6 clippy::return_self_not_must_use,
7 clippy::missing_errors_doc,
8 clippy::missing_panics_doc,
9 clippy::cast_possible_truncation
10)]
11use std::{io, net, net::SocketAddr};
12
13use ntex_io::Io;
14use ntex_rt::{BlockFuture, Driver, Runner};
15use ntex_service::cfg::SharedCfg;
16
17pub mod channel;
18pub mod connect;
19
20#[cfg(unix)]
21pub mod polling;
22
23#[cfg(target_os = "linux")]
24pub mod uring;
25
26#[cfg(unix)]
27mod helpers;
28
29#[cfg(feature = "tokio")]
30pub mod tokio;
31
32#[cfg(feature = "compio")]
33mod compio;
34
35#[allow(clippy::wrong_self_convention)]
36pub trait Reactor: Driver {
37 fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> channel::Receiver<Io>;
38
39 fn unix_connect(
40 &self,
41 addr: std::path::PathBuf,
42 cfg: SharedCfg,
43 ) -> channel::Receiver<Io>;
44
45 fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io>;
47
48 #[cfg(unix)]
49 fn from_unix_stream(
51 &self,
52 _: std::os::unix::net::UnixStream,
53 _: SharedCfg,
54 ) -> io::Result<Io>;
55}
56
57#[inline]
58pub async fn tcp_connect(addr: SocketAddr, cfg: SharedCfg) -> io::Result<Io> {
60 with_current(|driver| driver.tcp_connect(addr, cfg)).await
61}
62
63#[inline]
64pub async fn unix_connect<'a, P>(addr: P, cfg: SharedCfg) -> io::Result<Io>
66where
67 P: AsRef<std::path::Path> + 'a,
68{
69 with_current(|driver| driver.unix_connect(addr.as_ref().into(), cfg)).await
70}
71
72#[inline]
73pub fn from_tcp_stream(stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
75 with_current(|driver| driver.from_tcp_stream(stream, cfg))
76}
77
78#[cfg(unix)]
79#[inline]
80pub fn from_unix_stream(
82 stream: std::os::unix::net::UnixStream,
83 cfg: SharedCfg,
84) -> io::Result<Io> {
85 with_current(|driver| driver.from_unix_stream(stream, cfg))
86}
87
88fn with_current<T, F: FnOnce(&dyn Reactor) -> T>(f: F) -> T {
89 #[cold]
90 fn not_in_ntex_driver() -> ! {
91 panic!("not in a ntex driver")
92 }
93
94 if CURRENT_DRIVER.is_set() {
95 CURRENT_DRIVER.with(|d| f(&**d))
96 } else {
97 not_in_ntex_driver()
98 }
99}
100
101scoped_tls::scoped_thread_local!(static CURRENT_DRIVER: Box<dyn Reactor>);
102
103#[derive(Debug)]
104pub struct DefaultRuntime;
105
106impl Runner for DefaultRuntime {
107 #[allow(unused_variables)]
108 fn block_on(&self, fut: BlockFuture) {
109 #[cfg(feature = "tokio")]
110 {
111 let driver: Box<dyn Reactor> = Box::new(self::tokio::TokioDriver);
112
113 CURRENT_DRIVER.set(&driver, || {
114 crate::tokio::block_on(fut);
115 });
116 }
117
118 #[cfg(all(feature = "compio", not(feature = "tokio")))]
119 {
120 let driver: Box<dyn Reactor> = Box::new(self::compio::CompioDriver);
121
122 CURRENT_DRIVER.set(&driver, || {
123 crate::compio::block_on(fut);
124 });
125 }
126
127 #[cfg(all(unix, not(feature = "tokio"), not(feature = "compio")))]
128 {
129 #[cfg(feature = "neon-polling")]
130 {
131 let driver =
132 crate::polling::Driver::new().expect("Cannot construct driver");
133 let driver: Box<dyn Reactor> = Box::new(driver);
134
135 CURRENT_DRIVER.set(&driver, || {
136 let rt = ntex_rt::Runtime::new(driver.handle());
137 rt.block_on(fut, &*driver);
138 driver.clear();
139 });
140 }
141
142 #[cfg(all(target_os = "linux", feature = "neon-uring"))]
143 {
144 let driver =
145 crate::uring::Driver::new(2048).expect("Cannot construct driver");
146 let driver: Box<dyn Reactor> = Box::new(driver);
147
148 CURRENT_DRIVER.set(&driver, || {
149 let rt = ntex_rt::Runtime::new(driver.handle());
150 rt.block_on(fut, &*driver);
151 driver.clear();
152 });
153 }
154
155 #[cfg(all(not(feature = "neon-uring"), not(feature = "neon-polling")))]
156 {
157 #[cfg(target_os = "linux")]
158 let driver: Box<dyn Reactor> =
159 if let Ok(driver) = crate::uring::Driver::new(2048) {
160 Box::new(driver)
161 } else {
162 Box::new(
163 crate::polling::Driver::new().expect("Cannot construct driver"),
164 )
165 };
166
167 #[cfg(not(target_os = "linux"))]
168 let driver: Box<dyn Reactor> = Box::new(
169 crate::polling::Driver::new().expect("Cannot construct driver"),
170 );
171
172 CURRENT_DRIVER.set(&driver, || {
173 let rt = ntex_rt::Runtime::new(driver.handle());
174 rt.block_on(fut, &*driver);
175 driver.clear();
176 });
177 }
178 }
179 }
180}