1use std::os::fd::{AsRawFd, BorrowedFd, RawFd};
2use std::{cell::Cell, cell::UnsafeCell, fmt, io, net, rc::Rc, sync::Arc};
3use std::{collections::VecDeque, num::NonZeroUsize, time::Duration};
4
5#[cfg(unix)]
6use std::os::unix::net::UnixStream as OsUnixStream;
7
8use ntex_io::Io;
9use ntex_rt::{DriverType, Notify, PollResult, Runtime};
10use ntex_service::cfg::SharedCfg;
11use polling::{Event, Events, PollMode, Poller};
12use socket2::{Protocol, SockAddr, Socket, Type};
13
14use super::{TcpStream, UnixStream, stream::StreamOps};
15use crate::channel::Receiver;
16
17pub trait Handler {
18 fn event(&mut self, id: usize, event: Event);
20
21 fn error(&mut self, id: usize, err: io::Error);
23
24 fn tick(&mut self);
26
27 fn cleanup(&mut self);
29}
30
31enum Change {
32 Error {
33 batch: usize,
34 user_data: u32,
35 error: io::Error,
36 },
37}
38
39#[derive(Debug)]
40pub struct DriverApi {
41 id: usize,
42 batch: u64,
43 poll: Arc<Poller>,
44 changes: Rc<UnsafeCell<VecDeque<Change>>>,
45}
46
47impl DriverApi {
48 pub fn attach(&self, fd: RawFd, id: u32, event: Event) {
53 self.attach_with_mode(fd, id, event, PollMode::Oneshot)
54 }
55
56 pub fn attach_with_mode(&self, fd: RawFd, id: u32, mut event: Event, mode: PollMode) {
61 event.key = (id as u64 | self.batch) as usize;
62 if let Err(err) = unsafe { self.poll.add_with_mode(fd, event, mode) } {
63 self.change(Change::Error {
64 batch: self.id,
65 user_data: id,
66 error: err,
67 })
68 }
69 }
70
71 pub fn detach(&self, fd: RawFd, id: u32) {
73 if let Err(err) = self.poll.delete(unsafe { BorrowedFd::borrow_raw(fd) }) {
74 self.change(Change::Error {
75 batch: self.id,
76 user_data: id,
77 error: err,
78 })
79 }
80 }
81
82 pub fn modify(&self, fd: RawFd, id: u32, event: Event) {
84 self.modify_with_mode(fd, id, event, PollMode::Oneshot)
85 }
86
87 pub fn modify_with_mode(&self, fd: RawFd, id: u32, mut event: Event, mode: PollMode) {
89 event.key = (id as u64 | self.batch) as usize;
90
91 let result =
92 self.poll
93 .modify_with_mode(unsafe { BorrowedFd::borrow_raw(fd) }, event, mode);
94 if let Err(err) = result {
95 self.change(Change::Error {
96 batch: self.id,
97 user_data: id,
98 error: err,
99 })
100 }
101 }
102
103 fn change(&self, ev: Change) {
104 unsafe { (*self.changes.get()).push_back(ev) };
105 }
106}
107
108pub struct Driver {
110 poll: Arc<Poller>,
111 capacity: usize,
112 changes: Rc<UnsafeCell<VecDeque<Change>>>,
113 hid: Cell<u64>,
114 #[allow(clippy::box_collection)]
115 handlers: Cell<Option<Box<Vec<HandlerItem>>>>,
116}
117
118struct HandlerItem {
119 hnd: Box<dyn Handler>,
120 modified: bool,
121}
122
123impl HandlerItem {
124 fn tick(&mut self) {
125 if self.modified {
126 self.modified = false;
127 self.hnd.tick();
128 }
129 }
130}
131
132impl Driver {
133 const BATCH: u64 = 48;
134 const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000;
135 const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
136
137 pub fn new() -> io::Result<Self> {
138 Driver::with_capacity(2048)
139 }
140
141 pub fn with_capacity(io_queue_capacity: u32) -> io::Result<Self> {
142 log::trace!("New poll driver");
143
144 Ok(Self {
145 hid: Cell::new(0),
146 poll: Arc::new(Poller::new()?),
147 capacity: io_queue_capacity as usize,
148 changes: Rc::new(UnsafeCell::new(VecDeque::with_capacity(32))),
149 handlers: Cell::new(Some(Box::new(Vec::default()))),
150 })
151 }
152
153 pub const fn tp(&self) -> DriverType {
155 DriverType::Poll
156 }
157
158 pub fn register<F>(&self, f: F)
160 where
161 F: FnOnce(DriverApi) -> Box<dyn Handler>,
162 {
163 let id = self.hid.get();
164 let mut handlers = self
165 .handlers
166 .take()
167 .expect("Cannot register handler during event handling");
168
169 let api = DriverApi {
170 id: id as usize,
171 batch: id << Self::BATCH,
172 poll: self.poll.clone(),
173 changes: self.changes.clone(),
174 };
175 handlers.push(HandlerItem {
176 hnd: f(api),
177 modified: false,
178 });
179 self.hid.set(id + 1);
180 self.handlers.set(Some(handlers));
181 }
182
183 fn apply_changes(&self, handlers: &mut [HandlerItem]) {
184 while let Some(op) = unsafe { (*self.changes.get()).pop_front() } {
185 match op {
186 Change::Error {
187 batch,
188 user_data,
189 error,
190 } => handlers[batch].hnd.error(user_data as usize, error),
191 }
192 }
193 }
194}
195
196impl AsRawFd for Driver {
197 fn as_raw_fd(&self) -> RawFd {
198 self.poll.as_raw_fd()
199 }
200}
201
202impl fmt::Debug for Driver {
203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 f.debug_struct("Driver")
205 .field("poll", &self.poll)
206 .field("capacity", &self.capacity)
207 .field("hid", &self.hid)
208 .finish()
209 }
210}
211
212impl crate::Reactor for Driver {
213 fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> Receiver<Io> {
214 let addr = SockAddr::from(addr);
215 let result = Socket::new(addr.domain(), Type::STREAM, Some(Protocol::TCP))
216 .and_then(crate::helpers::prep_socket)
217 .map(move |sock| (addr, sock));
218
219 match result {
220 Err(err) => Receiver::new(Err(err)),
221 Ok((addr, sock)) => {
222 super::connect::ConnectOps::get(self).connect(sock, addr, cfg, false)
223 }
224 }
225 }
226
227 fn unix_connect(&self, addr: std::path::PathBuf, cfg: SharedCfg) -> Receiver<Io> {
228 let result = SockAddr::unix(addr).and_then(|addr| {
229 Socket::new(addr.domain(), Type::STREAM, None)
230 .and_then(crate::helpers::prep_socket)
231 .map(move |sock| (addr, sock))
232 });
233
234 match result {
235 Err(err) => Receiver::new(Err(err)),
236 Ok((addr, sock)) => {
237 super::connect::ConnectOps::get(self).connect(sock, addr, cfg, true)
238 }
239 }
240 }
241
242 fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
243 stream.set_nodelay(true)?;
244
245 Ok(Io::new(
246 TcpStream(
247 crate::helpers::prep_socket(Socket::from(stream))?,
248 StreamOps::get(self),
249 ),
250 cfg,
251 ))
252 }
253
254 #[cfg(unix)]
255 fn from_unix_stream(&self, stream: OsUnixStream, cfg: SharedCfg) -> io::Result<Io> {
256 Ok(Io::new(
257 UnixStream(
258 crate::helpers::prep_socket(Socket::from(stream))?,
259 StreamOps::get(self),
260 ),
261 cfg,
262 ))
263 }
264}
265
266impl ntex_rt::Driver for Driver {
267 fn run(&self, rt: &Runtime) -> io::Result<()> {
269 let mut events = if self.capacity == 0 {
270 Events::new()
271 } else {
272 Events::with_capacity(NonZeroUsize::new(self.capacity).unwrap())
273 };
274
275 loop {
276 let result = rt.poll();
277 let has_changes = !unsafe { (*self.changes.get()).is_empty() };
278 if has_changes {
279 let mut handlers = self.handlers.take().unwrap();
280 self.apply_changes(&mut handlers);
281 self.handlers.set(Some(handlers));
282 }
283
284 let timeout = match result {
285 PollResult::Pending => None,
286 PollResult::PollAgain => Some(Duration::ZERO),
287 PollResult::Ready => return Ok(()),
288 };
289 events.clear();
290 self.poll.wait(&mut events, timeout)?;
291
292 let mut handlers = self.handlers.take().unwrap();
293 for event in events.iter() {
294 let key = event.key as u64;
295 let batch = ((key & Self::BATCH_MASK) >> Self::BATCH) as usize;
296 handlers[batch].modified = true;
297 handlers[batch]
298 .hnd
299 .event((key & Self::DATA_MASK) as usize, event)
300 }
301 self.apply_changes(&mut handlers);
302 for h in handlers.iter_mut() {
303 h.tick();
304 }
305 self.handlers.set(Some(handlers));
306 }
307 }
308
309 fn handle(&self) -> Box<dyn Notify> {
311 Box::new(NotifyHandle::new(self.poll.clone()))
312 }
313
314 fn clear(&self) {
316 for mut h in self.handlers.take().unwrap().into_iter() {
317 h.hnd.cleanup()
318 }
319 }
320}
321
322#[derive(Clone, Debug)]
323pub(crate) struct NotifyHandle {
325 poll: Arc<Poller>,
326}
327
328impl NotifyHandle {
329 fn new(poll: Arc<Poller>) -> Self {
330 Self { poll }
331 }
332}
333
334impl Notify for NotifyHandle {
335 fn notify(&self) -> io::Result<()> {
337 self.poll.notify()
338 }
339}