1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7 collections::{HashMap, VecDeque},
8 io,
9 num::NonZeroUsize,
10 pin::Pin,
11 sync::Arc,
12 task::{Poll, Wake, Waker},
13 time::Duration,
14};
15
16use compio_log::{instrument, trace};
17use crossbeam_queue::SegQueue;
18use polling::{Event, Events, Poller};
19use smallvec::SmallVec;
20
21use crate::{
22 AsyncifyPool, BufferPool, DriverType, Entry, ErasedKey, ProactorBuilder,
23 key::{BorrowedKey, Key, RefExt},
24 op::Interest,
25 syscall,
26};
27
28mod extra;
29pub use extra::Extra;
30pub(crate) mod op;
31
32struct Track {
33 arg: WaitArg,
34 ready: bool,
35}
36
37impl From<WaitArg> for Track {
38 fn from(arg: WaitArg) -> Self {
39 Self { arg, ready: false }
40 }
41}
42
43pub unsafe trait OpCode {
52 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
55
56 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
58 None
59 }
60
61 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
65}
66
67pub use OpCode as PollOpCode;
68
69type Multi<T> = SmallVec<[T; 1]>;
71
72#[non_exhaustive]
74pub enum Decision {
75 Completed(usize),
77 Wait(Multi<WaitArg>),
79 Blocking,
81 #[cfg(aio)]
83 Aio(AioControl),
84}
85
86impl Decision {
87 pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
89 Self::Wait(SmallVec::from_buf([WaitArg { fd, interest }]))
90 }
91
92 pub fn wait_for_many<I: IntoIterator<Item = WaitArg>>(args: I) -> Self {
94 Self::Wait(Multi::from_iter(args))
95 }
96
97 pub fn wait_readable(fd: RawFd) -> Self {
99 Self::wait_for(fd, Interest::Readable)
100 }
101
102 pub fn wait_writable(fd: RawFd) -> Self {
104 Self::wait_for(fd, Interest::Writable)
105 }
106
107 #[cfg(aio)]
109 pub fn aio(
110 cb: &mut libc::aiocb,
111 submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
112 ) -> Self {
113 Self::Aio(AioControl {
114 aiocbp: NonNull::from(cb),
115 submit,
116 })
117 }
118}
119
120#[derive(Debug, Clone, Copy)]
122pub struct WaitArg {
123 pub fd: RawFd,
125 pub interest: Interest,
127}
128
129impl WaitArg {
130 pub fn readable(fd: RawFd) -> Self {
132 Self {
133 fd,
134 interest: Interest::Readable,
135 }
136 }
137
138 pub fn writable(fd: RawFd) -> Self {
140 Self {
141 fd,
142 interest: Interest::Writable,
143 }
144 }
145}
146
147#[cfg(aio)]
149#[derive(Debug, Clone, Copy)]
150pub struct AioControl {
151 pub aiocbp: NonNull<libc::aiocb>,
153 pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
155}
156
157#[derive(Debug, Default)]
158struct FdQueue {
159 read_queue: VecDeque<ErasedKey>,
160 write_queue: VecDeque<ErasedKey>,
161}
162
163struct RemoveToken {
170 idx: usize,
171 is_read: bool,
172}
173
174impl RemoveToken {
175 fn read(idx: usize) -> Self {
176 Self { idx, is_read: true }
177 }
178
179 fn write(idx: usize) -> Self {
180 Self {
181 idx,
182 is_read: false,
183 }
184 }
185}
186
187impl FdQueue {
188 fn is_empty(&self) -> bool {
189 self.read_queue.is_empty() && self.write_queue.is_empty()
190 }
191
192 fn remove_token(&mut self, token: RemoveToken) -> Option<ErasedKey> {
193 if token.is_read {
194 self.read_queue.remove(token.idx)
195 } else {
196 self.write_queue.remove(token.idx)
197 }
198 }
199
200 pub fn push_back_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
201 match interest {
202 Interest::Readable => {
203 self.read_queue.push_back(key);
204 RemoveToken::read(self.read_queue.len() - 1)
205 }
206 Interest::Writable => {
207 self.write_queue.push_back(key);
208 RemoveToken::write(self.write_queue.len() - 1)
209 }
210 }
211 }
212
213 pub fn push_front_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
214 let is_read = match interest {
215 Interest::Readable => {
216 self.read_queue.push_front(key);
217 true
218 }
219 Interest::Writable => {
220 self.write_queue.push_front(key);
221 false
222 }
223 };
224 RemoveToken { idx: 0, is_read }
225 }
226
227 pub fn remove(&mut self, key: &ErasedKey) {
228 self.read_queue.retain(|k| k != key);
229 self.write_queue.retain(|k| k != key);
230 }
231
232 pub fn event(&self) -> Event {
233 let mut event = Event::none(0);
234 if let Some(key) = self.read_queue.front() {
235 event.readable = true;
236 event.key = key.as_raw();
237 }
238 if let Some(key) = self.write_queue.front() {
239 event.writable = true;
240 event.key = key.as_raw();
241 }
242 event
243 }
244
245 pub fn pop_interest(&mut self, event: &Event) -> Option<(ErasedKey, Interest)> {
246 if event.readable
247 && let Some(key) = self.read_queue.pop_front()
248 {
249 return Some((key, Interest::Readable));
250 }
251 if event.writable
252 && let Some(key) = self.write_queue.pop_front()
253 {
254 return Some((key, Interest::Writable));
255 }
256 None
257 }
258}
259
260#[non_exhaustive]
263pub enum OpType {
264 Fd(Multi<RawFd>),
266 #[cfg(aio)]
268 Aio(NonNull<libc::aiocb>),
269}
270
271impl OpType {
272 pub fn fd(fd: RawFd) -> Self {
274 Self::Fd(SmallVec::from_buf([fd]))
275 }
276
277 pub fn multi_fd<I: IntoIterator<Item = RawFd>>(fds: I) -> Self {
279 Self::Fd(Multi::from_iter(fds))
280 }
281}
282
283pub(crate) struct Driver {
285 events: Events,
286 notify: Arc<Notify>,
287 registry: HashMap<RawFd, FdQueue>,
288 pool: AsyncifyPool,
289 pool_completed: Arc<SegQueue<Entry>>,
290}
291
292impl Driver {
293 pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
294 instrument!(compio_log::Level::TRACE, "new", ?builder);
295 trace!("new poll driver");
296
297 let events = if let Some(cap) = NonZeroUsize::new(builder.capacity as _) {
298 Events::with_capacity(cap)
299 } else {
300 Events::new()
301 };
302 let poll = Poller::new()?;
303 let notify = Arc::new(Notify::new(poll));
304
305 Ok(Self {
306 events,
307 notify,
308 registry: HashMap::new(),
309 pool: builder.create_or_get_thread_pool(),
310 pool_completed: Arc::new(SegQueue::new()),
311 })
312 }
313
314 pub fn driver_type(&self) -> DriverType {
315 DriverType::Poll
316 }
317
318 pub fn default_extra(&self) -> Extra {
319 Extra::new()
320 }
321
322 fn poller(&self) -> &Poller {
323 &self.notify.poll
324 }
325
326 fn with_events<F, R>(&mut self, f: F) -> R
327 where
328 F: FnOnce(&mut Self, &mut Events) -> R,
329 {
330 let mut events = std::mem::take(&mut self.events);
331 let res = f(self, &mut events);
332 self.events = events;
333 res
334 }
335
336 fn try_get_queue(&mut self, fd: RawFd) -> Option<&mut FdQueue> {
337 self.registry.get_mut(&fd)
338 }
339
340 fn get_queue(&mut self, fd: RawFd) -> &mut FdQueue {
341 self.try_get_queue(fd).expect("the fd should be submitted")
342 }
343
344 unsafe fn submit(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
349 let Self {
350 registry, notify, ..
351 } = self;
352 let need_add = !registry.contains_key(&arg.fd);
353 let queue = registry.entry(arg.fd).or_default();
354 let token = queue.push_back_interest(key, arg.interest);
355 let event = queue.event();
356 let res = if need_add {
357 unsafe { notify.poll.add(arg.fd, event) }
359 } else {
360 let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
361 notify.poll.modify(fd, event)
362 };
363 if res.is_err() {
364 queue.remove_token(token);
366 if queue.is_empty() {
367 registry.remove(&arg.fd);
368 }
369 }
370
371 res
372 }
373
374 unsafe fn submit_front(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
379 let need_add = !self.registry.contains_key(&arg.fd);
380 let queue = self.registry.entry(arg.fd).or_default();
381 queue.push_front_interest(key, arg.interest);
382 let event = queue.event();
383 if need_add {
384 unsafe { self.poller().add(arg.fd, event)? }
386 } else {
387 let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
388 self.poller().modify(fd, event)?;
389 }
390 Ok(())
391 }
392
393 fn renew(&mut self, fd: BorrowedFd, renew_event: Event) -> io::Result<()> {
394 if !renew_event.readable && !renew_event.writable {
395 self.poller().delete(fd)?;
396 self.registry.remove(&fd.as_raw_fd());
397 } else {
398 self.poller().modify(fd, renew_event)?;
399 }
400 Ok(())
401 }
402
403 fn remove_one(&mut self, key: &ErasedKey, fd: RawFd) -> io::Result<()> {
405 let Some(queue) = self.try_get_queue(fd) else {
406 return Ok(());
407 };
408 queue.remove(key);
409 let renew_event = queue.event();
410 if queue.is_empty() {
411 self.registry.remove(&fd);
412 }
413 self.renew(unsafe { BorrowedFd::borrow_raw(fd) }, renew_event)
414 }
415
416 fn cancel_one(&mut self, key: ErasedKey, fd: RawFd) -> Option<Entry> {
418 self.remove_one(&key, fd)
419 .map_or(None, |_| Some(Entry::new_cancelled(key)))
420 }
421
422 pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
423 Ok(())
424 }
425
426 pub fn cancel<T>(&mut self, key: Key<T>) {
427 let op_type = key.borrow().pinned_op().op_type();
428 match op_type {
429 None => {}
430 Some(OpType::Fd(fds)) => {
431 let mut pushed = false;
432 for fd in fds {
433 let entry = self.cancel_one(key.clone().erase(), fd);
434 if !pushed && let Some(entry) = entry {
435 self.pool_completed.push(entry);
436 pushed = true;
437 }
438 }
439 }
440 #[cfg(aio)]
441 Some(OpType::Aio(aiocbp)) => {
442 let aiocb = unsafe { aiocbp.as_ref() };
443 let fd = aiocb.aio_fildes;
444 syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
445 }
446 }
447 }
448
449 pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
450 instrument!(compio_log::Level::TRACE, "push", ?key);
451 match { key.borrow().pinned_op().pre_submit()? } {
452 Decision::Wait(args) => {
453 key.borrow()
454 .extra_mut()
455 .as_poll_mut()
456 .set_args(args.clone());
457 for arg in args.iter().copied() {
458 let res = unsafe { self.submit(key.clone(), arg) };
460 if let Err(e) = res {
462 args.into_iter().for_each(|arg| {
463 let _ = self.remove_one(&key, arg.fd);
465 });
466 return Poll::Ready(Err(e));
467 }
468 trace!("register {:?}", arg);
469 }
470 Poll::Pending
471 }
472 Decision::Completed(res) => Poll::Ready(Ok(res)),
473 Decision::Blocking => self.push_blocking(key),
474 #[cfg(aio)]
475 Decision::Aio(AioControl { mut aiocbp, submit }) => {
476 let aiocb = unsafe { aiocbp.as_mut() };
477 let user_data = key.as_raw();
478 #[cfg(freebsd)]
479 {
480 aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
482 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
483 aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
484 }
485 #[cfg(solarish)]
486 let mut notify = libc::port_notify {
487 portnfy_port: self.as_raw_fd(),
488 portnfy_user: user_data as _,
489 };
490 #[cfg(solarish)]
491 {
492 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
493 aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
494 }
495 match syscall!(submit(aiocbp.as_ptr())) {
496 Ok(_) => {
497 key.into_raw();
499 Poll::Pending
500 }
501 Err(e)
509 if matches!(
510 e.raw_os_error(),
511 Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
512 ) =>
513 {
514 self.push_blocking(key)
515 }
516 Err(e) => Poll::Ready(Err(e)),
517 }
518 }
519 }
520 }
521
522 fn push_blocking(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
523 let waker = self.waker();
524 let completed = self.pool_completed.clone();
525 let mut key = unsafe { key.freeze() };
527
528 let mut closure = move || {
529 let poll = key.pinned_op().operate();
530 let res = match poll {
531 Poll::Pending => unreachable!("this operation is not non-blocking"),
532 Poll::Ready(res) => res,
533 };
534 completed.push(Entry::new(key.into_inner(), res));
535 waker.wake();
536 };
537 loop {
538 match self.pool.dispatch(closure) {
539 Ok(()) => return Poll::Pending,
540 Err(e) => {
541 closure = e.0;
542 self.poll_blocking();
543 }
544 }
545 }
546 }
547
548 fn poll_blocking(&mut self) -> bool {
549 if self.pool_completed.is_empty() {
550 return false;
551 }
552 while let Some(entry) = self.pool_completed.pop() {
553 entry.notify();
554 }
555 true
556 }
557
558 #[allow(clippy::blocks_in_conditions)]
559 fn poll_one(&mut self, event: Event, fd: RawFd) -> io::Result<()> {
560 let queue = self.get_queue(fd);
561
562 if let Some((key, _)) = queue.pop_interest(&event)
563 && let mut op = key.borrow()
564 && op.extra_mut().as_poll_mut().handle_event(fd)
565 {
566 match { op.pinned_op().operate() } {
568 Poll::Pending => {
570 let extra = op.extra_mut().as_poll_mut();
571 extra.reset();
572 for t in extra.track.iter() {
574 let res = unsafe { self.submit_front(key.clone(), t.arg) };
575 if let Err(e) = res {
576 for t in extra.track.iter() {
578 let _ = self.remove_one(&key, t.arg.fd);
579 }
580 return Err(e);
581 }
582 }
583 }
584 Poll::Ready(res) => {
585 drop(op);
586 Entry::new(key, res).notify()
587 }
588 };
589 }
590
591 let renew_event = self.get_queue(fd).event();
592 let fd = unsafe { BorrowedFd::borrow_raw(fd) };
593 self.renew(fd, renew_event)
594 }
595
596 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
597 instrument!(compio_log::Level::TRACE, "poll", ?timeout);
598 if self.poll_blocking() {
599 return Ok(());
600 }
601 self.events.clear();
602 self.notify.poll.wait(&mut self.events, timeout)?;
603 if self.events.is_empty() && timeout.is_some() {
604 return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
605 }
606 self.with_events(|this, events| {
607 for event in events.iter() {
608 trace!("receive {} for {:?}", event.key, event);
609 let key = unsafe { BorrowedKey::from_raw(event.key) };
611 let mut op = key.borrow();
612 let op_type = op.pinned_op().op_type();
613 match op_type {
614 None => {
615 trace!("op {} is completed", event.key);
618 }
619 Some(OpType::Fd(_)) => {
620 let Some(fd) = op.extra().as_poll().next_fd() else {
622 return Ok(());
623 };
624 drop(op);
625 this.poll_one(event, fd)?;
626 }
627 #[cfg(aio)]
628 Some(OpType::Aio(aiocbp)) => {
629 drop(op);
630 let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
631 let res = match err {
632 libc::EINPROGRESS => {
637 trace!("op {} is not completed", key.as_raw());
638 continue;
639 }
640 libc::ECANCELED => {
641 unsafe { libc::aio_return(aiocbp.as_ptr()) };
643 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
644 }
645 _ => {
646 syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize)
647 }
648 };
649 let key = unsafe { ErasedKey::from_raw(event.key) };
650 Entry::new(key, res).notify()
651 }
652 }
653 }
654
655 Ok(())
656 })
657 }
658
659 pub fn waker(&self) -> Waker {
660 Waker::from(self.notify.clone())
661 }
662
663 pub fn create_buffer_pool(
664 &mut self,
665 buffer_len: u16,
666 buffer_size: usize,
667 ) -> io::Result<BufferPool> {
668 #[cfg(fusion)]
669 {
670 Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
671 buffer_len,
672 buffer_size,
673 )))
674 }
675 #[cfg(not(fusion))]
676 {
677 Ok(BufferPool::new(buffer_len, buffer_size))
678 }
679 }
680
681 pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
685 Ok(())
686 }
687}
688
689impl AsRawFd for Driver {
690 fn as_raw_fd(&self) -> RawFd {
691 self.poller().as_raw_fd()
692 }
693}
694
695impl Drop for Driver {
696 fn drop(&mut self) {
697 for fd in self.registry.keys() {
698 unsafe {
699 let fd = BorrowedFd::borrow_raw(*fd);
700 self.poller().delete(fd).ok();
701 }
702 }
703 }
704}
705
706impl Entry {
707 pub(crate) fn new_cancelled(key: ErasedKey) -> Self {
708 Entry::new(key, Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)))
709 }
710}
711
712pub(crate) struct Notify {
714 poll: Poller,
715}
716
717impl Notify {
718 fn new(poll: Poller) -> Self {
719 Self { poll }
720 }
721
722 pub fn notify(&self) -> io::Result<()> {
724 self.poll.notify()
725 }
726}
727
728impl Wake for Notify {
729 fn wake(self: Arc<Self>) {
730 self.wake_by_ref();
731 }
732
733 fn wake_by_ref(self: &Arc<Self>) {
734 self.notify().ok();
735 }
736}