1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7 collections::{HashMap, VecDeque},
8 io,
9 num::NonZeroUsize,
10 pin::Pin,
11 sync::Arc,
12 task::Poll,
13 time::Duration,
14};
15
16use compio_log::{instrument, trace};
17use crossbeam_queue::SegQueue;
18pub(crate) use libc::{sockaddr_storage, socklen_t};
19use polling::{Event, Events, Poller};
20
21use crate::{AsyncifyPool, BufferPool, Entry, Key, ProactorBuilder, op::Interest, syscall};
22
23pub(crate) mod op;
24
25pub trait OpCode {
27 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
30
31 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
33 None
34 }
35
36 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
40}
41
42#[non_exhaustive]
44pub enum Decision {
45 Completed(usize),
47 Wait(WaitArg),
49 Blocking,
51 #[cfg(aio)]
53 Aio(AioControl),
54}
55
56impl Decision {
57 pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
59 Self::Wait(WaitArg { fd, interest })
60 }
61
62 pub fn wait_readable(fd: RawFd) -> Self {
64 Self::wait_for(fd, Interest::Readable)
65 }
66
67 pub fn wait_writable(fd: RawFd) -> Self {
69 Self::wait_for(fd, Interest::Writable)
70 }
71
72 #[cfg(aio)]
74 pub fn aio(
75 cb: &mut libc::aiocb,
76 submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
77 ) -> Self {
78 Self::Aio(AioControl {
79 aiocbp: NonNull::from(cb),
80 submit,
81 })
82 }
83}
84
85#[derive(Debug, Clone, Copy)]
87pub struct WaitArg {
88 pub fd: RawFd,
90 pub interest: Interest,
92}
93
94#[cfg(aio)]
96#[derive(Debug, Clone, Copy)]
97pub struct AioControl {
98 pub aiocbp: NonNull<libc::aiocb>,
100 pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
102}
103
104#[derive(Debug, Default)]
105struct FdQueue {
106 read_queue: VecDeque<usize>,
107 write_queue: VecDeque<usize>,
108}
109
110impl FdQueue {
111 pub fn push_back_interest(&mut self, user_data: usize, interest: Interest) {
112 match interest {
113 Interest::Readable => self.read_queue.push_back(user_data),
114 Interest::Writable => self.write_queue.push_back(user_data),
115 }
116 }
117
118 pub fn push_front_interest(&mut self, user_data: usize, interest: Interest) {
119 match interest {
120 Interest::Readable => self.read_queue.push_front(user_data),
121 Interest::Writable => self.write_queue.push_front(user_data),
122 }
123 }
124
125 pub fn remove(&mut self, user_data: usize) {
126 self.read_queue.retain(|&k| k != user_data);
127 self.write_queue.retain(|&k| k != user_data);
128 }
129
130 pub fn event(&self) -> Event {
131 let mut event = Event::none(0);
132 if let Some(&key) = self.read_queue.front() {
133 event.readable = true;
134 event.key = key;
135 }
136 if let Some(&key) = self.write_queue.front() {
137 event.writable = true;
138 event.key = key;
139 }
140 event
141 }
142
143 pub fn pop_interest(&mut self, event: &Event) -> Option<(usize, Interest)> {
144 if event.readable {
145 if let Some(user_data) = self.read_queue.pop_front() {
146 return Some((user_data, Interest::Readable));
147 }
148 }
149 if event.writable {
150 if let Some(user_data) = self.write_queue.pop_front() {
151 return Some((user_data, Interest::Writable));
152 }
153 }
154 None
155 }
156}
157
158#[non_exhaustive]
161pub enum OpType {
162 Fd(RawFd),
164 #[cfg(aio)]
166 Aio(NonNull<libc::aiocb>),
167}
168
169pub(crate) struct Driver {
171 events: Events,
172 poll: Arc<Poller>,
173 registry: HashMap<RawFd, FdQueue>,
174 pool: AsyncifyPool,
175 pool_completed: Arc<SegQueue<Entry>>,
176}
177
178impl Driver {
179 pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
180 instrument!(compio_log::Level::TRACE, "new", ?builder);
181 trace!("new poll driver");
182 let entries = builder.capacity as usize; let events = if entries == 0 {
184 Events::new()
185 } else {
186 Events::with_capacity(NonZeroUsize::new(entries).unwrap())
187 };
188
189 let poll = Arc::new(Poller::new()?);
190
191 Ok(Self {
192 events,
193 poll,
194 registry: HashMap::new(),
195 pool: builder.create_or_get_thread_pool(),
196 pool_completed: Arc::new(SegQueue::new()),
197 })
198 }
199
200 pub fn create_op<T: crate::sys::OpCode + 'static>(&self, op: T) -> Key<T> {
201 Key::new(self.as_raw_fd(), op)
202 }
203
204 unsafe fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<()> {
207 let need_add = !self.registry.contains_key(&arg.fd);
208 let queue = self.registry.entry(arg.fd).or_default();
209 queue.push_back_interest(user_data, arg.interest);
210 let event = queue.event();
211 if need_add {
212 self.poll.add(arg.fd, event)?;
213 } else {
214 let fd = BorrowedFd::borrow_raw(arg.fd);
215 self.poll.modify(fd, event)?;
216 }
217 Ok(())
218 }
219
220 fn renew(
221 poll: &Poller,
222 registry: &mut HashMap<RawFd, FdQueue>,
223 fd: BorrowedFd,
224 renew_event: Event,
225 ) -> io::Result<()> {
226 if !renew_event.readable && !renew_event.writable {
227 poll.delete(fd)?;
228 registry.remove(&fd.as_raw_fd());
229 } else {
230 poll.modify(fd, renew_event)?;
231 }
232 Ok(())
233 }
234
235 pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
236 Ok(())
237 }
238
239 pub fn cancel(&mut self, op: &mut Key<dyn crate::sys::OpCode>) {
240 let op_pin = op.as_op_pin();
241 match op_pin.op_type() {
242 None => {}
243 Some(OpType::Fd(fd)) => {
244 let queue = self
245 .registry
246 .get_mut(&fd)
247 .expect("the fd should be attached");
248 queue.remove(op.user_data());
249 let renew_event = queue.event();
250 if Self::renew(
251 &self.poll,
252 &mut self.registry,
253 unsafe { BorrowedFd::borrow_raw(fd) },
254 renew_event,
255 )
256 .is_ok()
257 {
258 self.pool_completed.push(entry_cancelled(op.user_data()));
259 }
260 }
261 #[cfg(aio)]
262 Some(OpType::Aio(aiocbp)) => {
263 let aiocb = unsafe { aiocbp.as_ref() };
264 let fd = aiocb.aio_fildes;
265 syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
266 }
267 }
268 }
269
270 pub fn push(&mut self, op: &mut Key<dyn crate::sys::OpCode>) -> Poll<io::Result<usize>> {
271 instrument!(compio_log::Level::TRACE, "push", ?op);
272 let user_data = op.user_data();
273 let op_pin = op.as_op_pin();
274 match op_pin.pre_submit()? {
275 Decision::Wait(arg) => {
276 unsafe {
278 self.submit(user_data, arg)?;
279 }
280 trace!("register {:?}", arg);
281 Poll::Pending
282 }
283 Decision::Completed(res) => Poll::Ready(Ok(res)),
284 Decision::Blocking => self.push_blocking(user_data),
285 #[cfg(aio)]
286 Decision::Aio(AioControl { mut aiocbp, submit }) => {
287 let aiocb = unsafe { aiocbp.as_mut() };
288 #[cfg(freebsd)]
289 {
290 aiocb.aio_sigevent.sigev_signo = self.poll.as_raw_fd();
292 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
293 aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
294 }
295 #[cfg(solarish)]
296 let mut notify = libc::port_notify {
297 portnfy_port: self.poll.as_raw_fd(),
298 portnfy_user: user_data as _,
299 };
300 #[cfg(solarish)]
301 {
302 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
303 aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
304 }
305 match syscall!(submit(aiocbp.as_ptr())) {
306 Ok(_) => Poll::Pending,
307 Err(e)
315 if matches!(
316 e.raw_os_error(),
317 Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
318 ) =>
319 {
320 self.push_blocking(user_data)
321 }
322 Err(e) => Poll::Ready(Err(e)),
323 }
324 }
325 }
326 }
327
328 fn push_blocking(&mut self, user_data: usize) -> Poll<io::Result<usize>> {
329 let poll = self.poll.clone();
330 let completed = self.pool_completed.clone();
331 let mut closure = move || {
332 let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
333 let op_pin = op.as_op_pin();
334 let res = match op_pin.operate() {
335 Poll::Pending => unreachable!("this operation is not non-blocking"),
336 Poll::Ready(res) => res,
337 };
338 completed.push(Entry::new(user_data, res));
339 poll.notify().ok();
340 };
341 loop {
342 match self.pool.dispatch(closure) {
343 Ok(()) => return Poll::Pending,
344 Err(e) => {
345 closure = e.0;
346 self.poll_blocking();
347 }
348 }
349 }
350 }
351
352 fn poll_blocking(&mut self) -> bool {
353 if self.pool_completed.is_empty() {
354 return false;
355 }
356 while let Some(entry) = self.pool_completed.pop() {
357 unsafe {
358 entry.notify();
359 }
360 }
361 true
362 }
363
364 pub unsafe fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
365 instrument!(compio_log::Level::TRACE, "poll", ?timeout);
366 if self.poll_blocking() {
367 return Ok(());
368 }
369 self.events.clear();
370 self.poll.wait(&mut self.events, timeout)?;
371 if self.events.is_empty() && timeout.is_some() {
372 return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
373 }
374 for event in self.events.iter() {
375 let user_data = event.key;
376 trace!("receive {} for {:?}", user_data, event);
377 let mut op = Key::<dyn crate::sys::OpCode>::new_unchecked(user_data);
378 let op = op.as_op_pin();
379 match op.op_type() {
380 None => {
381 trace!("op {} is completed", user_data);
384 }
385 Some(OpType::Fd(fd)) => {
386 let queue = self
389 .registry
390 .get_mut(&fd)
391 .expect("the fd should be attached");
392 if let Some((user_data, interest)) = queue.pop_interest(&event) {
393 let mut op = Key::<dyn crate::sys::OpCode>::new_unchecked(user_data);
394 let op = op.as_op_pin();
395 let res = match op.operate() {
396 Poll::Pending => {
397 queue.push_front_interest(user_data, interest);
399 None
400 }
401 Poll::Ready(res) => Some(res),
402 };
403 if let Some(res) = res {
404 Entry::new(user_data, res).notify();
405 }
406 }
407 let renew_event = queue.event();
408 Self::renew(
409 &self.poll,
410 &mut self.registry,
411 BorrowedFd::borrow_raw(fd),
412 renew_event,
413 )?;
414 }
415 #[cfg(aio)]
416 Some(OpType::Aio(aiocbp)) => {
417 let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
418 let res = match err {
419 libc::EINPROGRESS => {
424 trace!("op {} is not completed", user_data);
425 continue;
426 }
427 libc::ECANCELED => {
428 libc::aio_return(aiocbp.as_ptr());
430 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
431 }
432 _ => syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize),
433 };
434 Entry::new(user_data, res).notify();
435 }
436 }
437 }
438 Ok(())
439 }
440
441 pub fn handle(&self) -> NotifyHandle {
442 NotifyHandle::new(self.poll.clone())
443 }
444
445 pub fn create_buffer_pool(
446 &mut self,
447 buffer_len: u16,
448 buffer_size: usize,
449 ) -> io::Result<BufferPool> {
450 #[cfg(fusion)]
451 {
452 Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
453 buffer_len,
454 buffer_size,
455 )))
456 }
457 #[cfg(not(fusion))]
458 {
459 Ok(BufferPool::new(buffer_len, buffer_size))
460 }
461 }
462
463 pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
467 Ok(())
468 }
469}
470
471impl AsRawFd for Driver {
472 fn as_raw_fd(&self) -> RawFd {
473 self.poll.as_raw_fd()
474 }
475}
476
477impl Drop for Driver {
478 fn drop(&mut self) {
479 for fd in self.registry.keys() {
480 unsafe {
481 let fd = BorrowedFd::borrow_raw(*fd);
482 self.poll.delete(fd).ok();
483 }
484 }
485 }
486}
487
488fn entry_cancelled(user_data: usize) -> Entry {
489 Entry::new(
490 user_data,
491 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
492 )
493}
494
495pub struct NotifyHandle {
497 poll: Arc<Poller>,
498}
499
500impl NotifyHandle {
501 fn new(poll: Arc<Poller>) -> Self {
502 Self { poll }
503 }
504
505 pub fn notify(&self) -> io::Result<()> {
507 self.poll.notify()
508 }
509}