ntex_net/polling/
driver.rs

1use std::os::fd::{AsRawFd, BorrowedFd, RawFd};
2use std::{cell::Cell, cell::UnsafeCell, fmt, io, net, rc::Rc, sync::Arc};
3use std::{collections::VecDeque, num::NonZeroUsize, time::Duration};
4
5#[cfg(unix)]
6use std::os::unix::net::UnixStream as OsUnixStream;
7
8use ntex_io::Io;
9use ntex_rt::{DriverType, Notify, PollResult, Runtime};
10use ntex_service::cfg::SharedCfg;
11use polling::{Event, Events, PollMode, Poller};
12use socket2::{Protocol, SockAddr, Socket, Type};
13
14use super::{TcpStream, UnixStream, stream::StreamOps};
15use crate::channel::Receiver;
16
17pub trait Handler {
18    /// Submitted interest
19    fn event(&mut self, id: usize, event: Event);
20
21    /// Operation submission has failed
22    fn error(&mut self, id: usize, err: io::Error);
23
24    /// Driver turn is completed
25    fn tick(&mut self);
26
27    /// Cleanup before drop
28    fn cleanup(&mut self);
29}
30
31enum Change {
32    Error {
33        batch: usize,
34        user_data: u32,
35        error: io::Error,
36    },
37}
38
39#[derive(Debug)]
40pub struct DriverApi {
41    id: usize,
42    batch: u64,
43    poll: Arc<Poller>,
44    changes: Rc<UnsafeCell<VecDeque<Change>>>,
45}
46
47impl DriverApi {
48    /// Attach an fd to the driver.
49    ///
50    /// `fd` must be attached to the driver before using register/unregister
51    /// methods.
52    pub fn attach(&self, fd: RawFd, id: u32, event: Event) {
53        self.attach_with_mode(fd, id, event, PollMode::Oneshot)
54    }
55
56    /// Attach an fd to the driver with specific mode.
57    ///
58    /// `fd` must be attached to the driver before using register/unregister
59    /// methods.
60    pub fn attach_with_mode(&self, fd: RawFd, id: u32, mut event: Event, mode: PollMode) {
61        event.key = (id as u64 | self.batch) as usize;
62        if let Err(err) = unsafe { self.poll.add_with_mode(fd, event, mode) } {
63            self.change(Change::Error {
64                batch: self.id,
65                user_data: id,
66                error: err,
67            })
68        }
69    }
70
71    /// Detach an fd from the driver.
72    pub fn detach(&self, fd: RawFd, id: u32) {
73        if let Err(err) = self.poll.delete(unsafe { BorrowedFd::borrow_raw(fd) }) {
74            self.change(Change::Error {
75                batch: self.id,
76                user_data: id,
77                error: err,
78            })
79        }
80    }
81
82    /// Register interest for specified file descriptor.
83    pub fn modify(&self, fd: RawFd, id: u32, event: Event) {
84        self.modify_with_mode(fd, id, event, PollMode::Oneshot)
85    }
86
87    /// Register interest for specified file descriptor.
88    pub fn modify_with_mode(&self, fd: RawFd, id: u32, mut event: Event, mode: PollMode) {
89        event.key = (id as u64 | self.batch) as usize;
90
91        let result =
92            self.poll
93                .modify_with_mode(unsafe { BorrowedFd::borrow_raw(fd) }, event, mode);
94        if let Err(err) = result {
95            self.change(Change::Error {
96                batch: self.id,
97                user_data: id,
98                error: err,
99            })
100        }
101    }
102
103    fn change(&self, ev: Change) {
104        unsafe { (*self.changes.get()).push_back(ev) };
105    }
106}
107
108/// Low-level driver of polling.
109pub struct Driver {
110    poll: Arc<Poller>,
111    capacity: usize,
112    changes: Rc<UnsafeCell<VecDeque<Change>>>,
113    hid: Cell<u64>,
114    #[allow(clippy::box_collection)]
115    handlers: Cell<Option<Box<Vec<HandlerItem>>>>,
116}
117
118struct HandlerItem {
119    hnd: Box<dyn Handler>,
120    modified: bool,
121}
122
123impl HandlerItem {
124    fn tick(&mut self) {
125        if self.modified {
126            self.modified = false;
127            self.hnd.tick();
128        }
129    }
130}
131
132impl Driver {
133    const BATCH: u64 = 48;
134    const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000;
135    const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
136
137    pub fn new() -> io::Result<Self> {
138        Driver::with_capacity(2048)
139    }
140
141    pub fn with_capacity(io_queue_capacity: u32) -> io::Result<Self> {
142        log::trace!("New poll driver");
143
144        Ok(Self {
145            hid: Cell::new(0),
146            poll: Arc::new(Poller::new()?),
147            capacity: io_queue_capacity as usize,
148            changes: Rc::new(UnsafeCell::new(VecDeque::with_capacity(32))),
149            handlers: Cell::new(Some(Box::new(Vec::default()))),
150        })
151    }
152
153    /// Driver type
154    pub const fn tp(&self) -> DriverType {
155        DriverType::Poll
156    }
157
158    /// Register updates handler
159    pub fn register<F>(&self, f: F)
160    where
161        F: FnOnce(DriverApi) -> Box<dyn Handler>,
162    {
163        let id = self.hid.get();
164        let mut handlers = self
165            .handlers
166            .take()
167            .expect("Cannot register handler during event handling");
168
169        let api = DriverApi {
170            id: id as usize,
171            batch: id << Self::BATCH,
172            poll: self.poll.clone(),
173            changes: self.changes.clone(),
174        };
175        handlers.push(HandlerItem {
176            hnd: f(api),
177            modified: false,
178        });
179        self.hid.set(id + 1);
180        self.handlers.set(Some(handlers));
181    }
182
183    fn apply_changes(&self, handlers: &mut [HandlerItem]) {
184        while let Some(op) = unsafe { (*self.changes.get()).pop_front() } {
185            match op {
186                Change::Error {
187                    batch,
188                    user_data,
189                    error,
190                } => handlers[batch].hnd.error(user_data as usize, error),
191            }
192        }
193    }
194}
195
196impl AsRawFd for Driver {
197    fn as_raw_fd(&self) -> RawFd {
198        self.poll.as_raw_fd()
199    }
200}
201
202impl fmt::Debug for Driver {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        f.debug_struct("Driver")
205            .field("poll", &self.poll)
206            .field("capacity", &self.capacity)
207            .field("hid", &self.hid)
208            .finish()
209    }
210}
211
212impl crate::Reactor for Driver {
213    fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> Receiver<Io> {
214        let addr = SockAddr::from(addr);
215        let result = Socket::new(addr.domain(), Type::STREAM, Some(Protocol::TCP))
216            .and_then(crate::helpers::prep_socket)
217            .map(move |sock| (addr, sock));
218
219        match result {
220            Err(err) => Receiver::new(Err(err)),
221            Ok((addr, sock)) => {
222                super::connect::ConnectOps::get(self).connect(sock, addr, cfg, false)
223            }
224        }
225    }
226
227    fn unix_connect(&self, addr: std::path::PathBuf, cfg: SharedCfg) -> Receiver<Io> {
228        let result = SockAddr::unix(addr).and_then(|addr| {
229            Socket::new(addr.domain(), Type::STREAM, None)
230                .and_then(crate::helpers::prep_socket)
231                .map(move |sock| (addr, sock))
232        });
233
234        match result {
235            Err(err) => Receiver::new(Err(err)),
236            Ok((addr, sock)) => {
237                super::connect::ConnectOps::get(self).connect(sock, addr, cfg, true)
238            }
239        }
240    }
241
242    fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
243        stream.set_nodelay(true)?;
244
245        Ok(Io::new(
246            TcpStream(
247                crate::helpers::prep_socket(Socket::from(stream))?,
248                StreamOps::get(self),
249            ),
250            cfg,
251        ))
252    }
253
254    #[cfg(unix)]
255    fn from_unix_stream(&self, stream: OsUnixStream, cfg: SharedCfg) -> io::Result<Io> {
256        Ok(Io::new(
257            UnixStream(
258                crate::helpers::prep_socket(Socket::from(stream))?,
259                StreamOps::get(self),
260            ),
261            cfg,
262        ))
263    }
264}
265
266impl ntex_rt::Driver for Driver {
267    /// Poll the driver and handle completed entries.
268    fn run(&self, rt: &Runtime) -> io::Result<()> {
269        let mut events = if self.capacity == 0 {
270            Events::new()
271        } else {
272            Events::with_capacity(NonZeroUsize::new(self.capacity).unwrap())
273        };
274
275        loop {
276            let result = rt.poll();
277            let has_changes = !unsafe { (*self.changes.get()).is_empty() };
278            if has_changes {
279                let mut handlers = self.handlers.take().unwrap();
280                self.apply_changes(&mut handlers);
281                self.handlers.set(Some(handlers));
282            }
283
284            let timeout = match result {
285                PollResult::Pending => None,
286                PollResult::PollAgain => Some(Duration::ZERO),
287                PollResult::Ready => return Ok(()),
288            };
289            events.clear();
290            self.poll.wait(&mut events, timeout)?;
291
292            let mut handlers = self.handlers.take().unwrap();
293            for event in events.iter() {
294                let key = event.key as u64;
295                let batch = ((key & Self::BATCH_MASK) >> Self::BATCH) as usize;
296                handlers[batch].modified = true;
297                handlers[batch]
298                    .hnd
299                    .event((key & Self::DATA_MASK) as usize, event)
300            }
301            self.apply_changes(&mut handlers);
302            for h in handlers.iter_mut() {
303                h.tick();
304            }
305            self.handlers.set(Some(handlers));
306        }
307    }
308
309    /// Get notification handle
310    fn handle(&self) -> Box<dyn Notify> {
311        Box::new(NotifyHandle::new(self.poll.clone()))
312    }
313
314    /// Clear handlers
315    fn clear(&self) {
316        for mut h in self.handlers.take().unwrap().into_iter() {
317            h.hnd.cleanup()
318        }
319    }
320}
321
322#[derive(Clone, Debug)]
323/// A notify handle to the inner driver.
324pub(crate) struct NotifyHandle {
325    poll: Arc<Poller>,
326}
327
328impl NotifyHandle {
329    fn new(poll: Arc<Poller>) -> Self {
330        Self { poll }
331    }
332}
333
334impl Notify for NotifyHandle {
335    /// Notify the driver
336    fn notify(&self) -> io::Result<()> {
337        self.poll.notify()
338    }
339}