1#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3use std::{io, net, net::SocketAddr};
4
5use ntex_io::Io;
6use ntex_rt::{BlockFuture, Driver, Runner};
7use ntex_service::cfg::SharedCfg;
8
9pub mod channel;
10pub mod connect;
11
12#[cfg(unix)]
13pub mod polling;
14
15#[cfg(target_os = "linux")]
16pub mod uring;
17
18#[cfg(unix)]
19mod helpers;
20
21#[cfg(feature = "tokio")]
22pub mod tokio;
23
24#[cfg(feature = "compio")]
25mod compio;
26
27#[allow(clippy::wrong_self_convention)]
28pub trait Reactor: Driver {
29 fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> channel::Receiver<Io>;
30
31 fn unix_connect(
32 &self,
33 addr: std::path::PathBuf,
34 cfg: SharedCfg,
35 ) -> channel::Receiver<Io>;
36
37 fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io>;
39
40 #[cfg(unix)]
41 fn from_unix_stream(
43 &self,
44 _: std::os::unix::net::UnixStream,
45 _: SharedCfg,
46 ) -> io::Result<Io>;
47}
48
49#[inline]
50pub async fn tcp_connect(addr: SocketAddr, cfg: SharedCfg) -> io::Result<Io> {
52 with_current(|driver| driver.tcp_connect(addr, cfg)).await
53}
54
55#[inline]
56pub async fn unix_connect<'a, P>(addr: P, cfg: SharedCfg) -> io::Result<Io>
58where
59 P: AsRef<std::path::Path> + 'a,
60{
61 with_current(|driver| driver.unix_connect(addr.as_ref().into(), cfg)).await
62}
63
64#[inline]
65pub fn from_tcp_stream(stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
67 with_current(|driver| driver.from_tcp_stream(stream, cfg))
68}
69
70#[cfg(unix)]
71#[inline]
72pub fn from_unix_stream(
74 stream: std::os::unix::net::UnixStream,
75 cfg: SharedCfg,
76) -> io::Result<Io> {
77 with_current(|driver| driver.from_unix_stream(stream, cfg))
78}
79
80fn with_current<T, F: FnOnce(&dyn Reactor) -> T>(f: F) -> T {
81 #[cold]
82 fn not_in_ntex_driver() -> ! {
83 panic!("not in a ntex driver")
84 }
85
86 if CURRENT_DRIVER.is_set() {
87 CURRENT_DRIVER.with(|d| f(&**d))
88 } else {
89 not_in_ntex_driver()
90 }
91}
92
93scoped_tls::scoped_thread_local!(static CURRENT_DRIVER: Box<dyn Reactor>);
94
95#[derive(Debug)]
96pub struct DefaultRuntime;
97
98impl Runner for DefaultRuntime {
99 fn block_on(&self, _fut: BlockFuture) {
100 #[cfg(feature = "tokio")]
101 {
102 let driver: Box<dyn Reactor> = Box::new(self::tokio::TokioDriver);
103
104 CURRENT_DRIVER.set(&driver, || {
105 crate::tokio::block_on(_fut);
106 });
107 }
108
109 #[cfg(all(feature = "compio", not(feature = "tokio")))]
110 {
111 let driver: Box<dyn Reactor> = Box::new(self::compio::CompioDriver);
112
113 CURRENT_DRIVER.set(&driver, || {
114 crate::compio::block_on(_fut);
115 });
116 }
117
118 #[cfg(all(not(feature = "tokio"), not(feature = "compio")))]
119 {
120 #[cfg(feature = "neon-polling")]
121 {
122 let driver =
123 crate::polling::Driver::new().expect("Cannot construct driver");
124 let driver: Box<dyn Reactor> = Box::new(driver);
125
126 CURRENT_DRIVER.set(&driver, || {
127 let rt = ntex_rt::Runtime::new(driver.handle());
128 rt.block_on(_fut, &*driver);
129 driver.clear();
130 });
131 }
132
133 #[cfg(all(target_os = "linux", feature = "neon-uring"))]
134 {
135 let driver =
136 crate::uring::Driver::new(2048).expect("Cannot construct driver");
137 let driver: Box<dyn Reactor> = Box::new(driver);
138
139 CURRENT_DRIVER.set(&driver, || {
140 let rt = ntex_rt::Runtime::new(driver.handle());
141 rt.block_on(_fut, &*driver);
142 driver.clear();
143 });
144 }
145
146 #[cfg(all(not(feature = "neon-uring"), not(feature = "neon-polling")))]
147 {
148 #[cfg(target_os = "linux")]
149 let driver: Box<dyn Reactor> =
150 if let Ok(driver) = crate::uring::Driver::new(2048) {
151 Box::new(driver)
152 } else {
153 Box::new(
154 crate::polling::Driver::new().expect("Cannot construct driver"),
155 )
156 };
157
158 #[cfg(not(target_os = "linux"))]
159 let driver: Box<dyn Reactor> = Box::new(
160 crate::polling::Driver::new().expect("Cannot construct driver"),
161 );
162
163 CURRENT_DRIVER.set(&driver, || {
164 let rt = ntex_rt::Runtime::new(driver.handle());
165 rt.block_on(_fut, &*driver);
166 driver.clear();
167 });
168 }
169 }
170 }
171}