1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7 collections::{HashMap, VecDeque},
8 io,
9 num::NonZeroUsize,
10 pin::Pin,
11 sync::Arc,
12 task::{Poll, Wake, Waker},
13 time::Duration,
14};
15
16use compio_log::{instrument, trace};
17use flume::{Receiver, Sender};
18use polling::{Event, Events, Poller};
19use smallvec::SmallVec;
20
21use crate::{
22 AsyncifyPool, BufferPool, DriverType, Entry, ErasedKey, ProactorBuilder,
23 key::{BorrowedKey, Key, RefExt},
24 op::Interest,
25 syscall,
26};
27
28mod extra;
29pub use extra::Extra;
30pub(crate) mod op;
31
32struct Track {
33 arg: WaitArg,
34 ready: bool,
35}
36
37impl From<WaitArg> for Track {
38 fn from(arg: WaitArg) -> Self {
39 Self { arg, ready: false }
40 }
41}
42
43pub unsafe trait OpCode {
52 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
55
56 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
58 None
59 }
60
61 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
65}
66
67pub use OpCode as PollOpCode;
68
69type Multi<T> = SmallVec<[T; 1]>;
71
72#[non_exhaustive]
74pub enum Decision {
75 Completed(usize),
77 Wait(Multi<WaitArg>),
79 Blocking,
81 #[cfg(aio)]
83 Aio(AioControl),
84}
85
86impl Decision {
87 pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
89 Self::Wait(SmallVec::from_buf([WaitArg { fd, interest }]))
90 }
91
92 pub fn wait_for_many<I: IntoIterator<Item = WaitArg>>(args: I) -> Self {
94 Self::Wait(Multi::from_iter(args))
95 }
96
97 pub fn wait_readable(fd: RawFd) -> Self {
99 Self::wait_for(fd, Interest::Readable)
100 }
101
102 pub fn wait_writable(fd: RawFd) -> Self {
104 Self::wait_for(fd, Interest::Writable)
105 }
106
107 #[cfg(aio)]
109 pub fn aio(
110 cb: &mut libc::aiocb,
111 submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
112 ) -> Self {
113 Self::Aio(AioControl {
114 aiocbp: NonNull::from(cb),
115 submit,
116 })
117 }
118}
119
120#[derive(Debug, Clone, Copy)]
122pub struct WaitArg {
123 pub fd: RawFd,
125 pub interest: Interest,
127}
128
129impl WaitArg {
130 pub fn readable(fd: RawFd) -> Self {
132 Self {
133 fd,
134 interest: Interest::Readable,
135 }
136 }
137
138 pub fn writable(fd: RawFd) -> Self {
140 Self {
141 fd,
142 interest: Interest::Writable,
143 }
144 }
145}
146
147#[cfg(aio)]
149#[derive(Debug, Clone, Copy)]
150pub struct AioControl {
151 pub aiocbp: NonNull<libc::aiocb>,
153 pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
155}
156
157#[derive(Debug, Default)]
158struct FdQueue {
159 read_queue: VecDeque<ErasedKey>,
160 write_queue: VecDeque<ErasedKey>,
161}
162
163struct RemoveToken {
170 idx: usize,
171 is_read: bool,
172}
173
174impl RemoveToken {
175 fn read(idx: usize) -> Self {
176 Self { idx, is_read: true }
177 }
178
179 fn write(idx: usize) -> Self {
180 Self {
181 idx,
182 is_read: false,
183 }
184 }
185}
186
187impl FdQueue {
188 fn is_empty(&self) -> bool {
189 self.read_queue.is_empty() && self.write_queue.is_empty()
190 }
191
192 fn remove_token(&mut self, token: RemoveToken) -> Option<ErasedKey> {
193 if token.is_read {
194 self.read_queue.remove(token.idx)
195 } else {
196 self.write_queue.remove(token.idx)
197 }
198 }
199
200 pub fn push_back_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
201 match interest {
202 Interest::Readable => {
203 self.read_queue.push_back(key);
204 RemoveToken::read(self.read_queue.len() - 1)
205 }
206 Interest::Writable => {
207 self.write_queue.push_back(key);
208 RemoveToken::write(self.write_queue.len() - 1)
209 }
210 }
211 }
212
213 pub fn push_front_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
214 let is_read = match interest {
215 Interest::Readable => {
216 self.read_queue.push_front(key);
217 true
218 }
219 Interest::Writable => {
220 self.write_queue.push_front(key);
221 false
222 }
223 };
224 RemoveToken { idx: 0, is_read }
225 }
226
227 pub fn remove(&mut self, key: &ErasedKey) {
228 self.read_queue.retain(|k| k != key);
229 self.write_queue.retain(|k| k != key);
230 }
231
232 pub fn event(&self) -> Event {
233 let mut event = Event::none(0);
234 if let Some(key) = self.read_queue.front() {
235 event.readable = true;
236 event.key = key.as_raw();
237 }
238 if let Some(key) = self.write_queue.front() {
239 event.writable = true;
240 event.key = key.as_raw();
241 }
242 event
243 }
244
245 pub fn pop_interest(&mut self, event: &Event) -> Option<(ErasedKey, Interest)> {
246 if event.readable
247 && let Some(key) = self.read_queue.pop_front()
248 {
249 return Some((key, Interest::Readable));
250 }
251 if event.writable
252 && let Some(key) = self.write_queue.pop_front()
253 {
254 return Some((key, Interest::Writable));
255 }
256 None
257 }
258}
259
260#[non_exhaustive]
263pub enum OpType {
264 Fd(Multi<RawFd>),
266 #[cfg(aio)]
268 Aio(NonNull<libc::aiocb>),
269}
270
271impl OpType {
272 pub fn fd(fd: RawFd) -> Self {
274 Self::Fd(SmallVec::from_buf([fd]))
275 }
276
277 pub fn multi_fd<I: IntoIterator<Item = RawFd>>(fds: I) -> Self {
279 Self::Fd(Multi::from_iter(fds))
280 }
281}
282
283pub(crate) struct Driver {
285 events: Events,
286 notify: Arc<Notify>,
287 registry: HashMap<RawFd, FdQueue>,
288 pool: AsyncifyPool,
289 completed_tx: Sender<Entry>,
290 completed_rx: Receiver<Entry>,
291}
292
293impl Driver {
294 pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
295 instrument!(compio_log::Level::TRACE, "new", ?builder);
296 trace!("new poll driver");
297
298 let events = if let Some(cap) = NonZeroUsize::new(builder.capacity as _) {
299 Events::with_capacity(cap)
300 } else {
301 Events::new()
302 };
303 let poll = Poller::new()?;
304 let notify = Arc::new(Notify::new(poll));
305 let (completed_tx, completed_rx) = flume::unbounded();
306
307 Ok(Self {
308 events,
309 notify,
310 registry: HashMap::new(),
311 pool: builder.create_or_get_thread_pool(),
312 completed_tx,
313 completed_rx,
314 })
315 }
316
317 pub fn driver_type(&self) -> DriverType {
318 DriverType::Poll
319 }
320
321 pub fn default_extra(&self) -> Extra {
322 Extra::new()
323 }
324
325 fn poller(&self) -> &Poller {
326 &self.notify.poll
327 }
328
329 fn with_events<F, R>(&mut self, f: F) -> R
330 where
331 F: FnOnce(&mut Self, &mut Events) -> R,
332 {
333 let mut events = std::mem::take(&mut self.events);
334 let res = f(self, &mut events);
335 self.events = events;
336 res
337 }
338
339 fn try_get_queue(&mut self, fd: RawFd) -> Option<&mut FdQueue> {
340 self.registry.get_mut(&fd)
341 }
342
343 fn get_queue(&mut self, fd: RawFd) -> &mut FdQueue {
344 self.try_get_queue(fd).expect("the fd should be submitted")
345 }
346
347 unsafe fn submit(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
352 let Self {
353 registry, notify, ..
354 } = self;
355 let need_add = !registry.contains_key(&arg.fd);
356 let queue = registry.entry(arg.fd).or_default();
357 let token = queue.push_back_interest(key, arg.interest);
358 let event = queue.event();
359 let res = if need_add {
360 unsafe { notify.poll.add(arg.fd, event) }
362 } else {
363 let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
364 notify.poll.modify(fd, event)
365 };
366 if res.is_err() {
367 queue.remove_token(token);
369 if queue.is_empty() {
370 registry.remove(&arg.fd);
371 }
372 }
373
374 res
375 }
376
377 unsafe fn submit_front(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
382 let need_add = !self.registry.contains_key(&arg.fd);
383 let queue = self.registry.entry(arg.fd).or_default();
384 queue.push_front_interest(key, arg.interest);
385 let event = queue.event();
386 if need_add {
387 unsafe { self.poller().add(arg.fd, event)? }
389 } else {
390 let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
391 self.poller().modify(fd, event)?;
392 }
393 Ok(())
394 }
395
396 fn renew(&mut self, fd: BorrowedFd, renew_event: Event) -> io::Result<()> {
397 if !renew_event.readable && !renew_event.writable {
398 self.poller().delete(fd)?;
399 self.registry.remove(&fd.as_raw_fd());
400 } else {
401 self.poller().modify(fd, renew_event)?;
402 }
403 Ok(())
404 }
405
406 fn remove_one(&mut self, key: &ErasedKey, fd: RawFd) -> io::Result<()> {
408 let Some(queue) = self.try_get_queue(fd) else {
409 return Ok(());
410 };
411 queue.remove(key);
412 let renew_event = queue.event();
413 if queue.is_empty() {
414 self.registry.remove(&fd);
415 }
416 self.renew(unsafe { BorrowedFd::borrow_raw(fd) }, renew_event)
417 }
418
419 fn cancel_one(&mut self, key: ErasedKey, fd: RawFd) -> Option<Entry> {
421 self.remove_one(&key, fd)
422 .map_or(None, |_| Some(Entry::new_cancelled(key)))
423 }
424
425 pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
426 Ok(())
427 }
428
429 pub fn cancel<T>(&mut self, key: Key<T>) {
430 let op_type = key.borrow().pinned_op().op_type();
431 match op_type {
432 None => {}
433 Some(OpType::Fd(fds)) => {
434 let mut pushed = false;
435 for fd in fds {
436 let entry = self.cancel_one(key.clone().erase(), fd);
437 if !pushed && let Some(entry) = entry {
438 _ = self.completed_tx.send(entry);
439 pushed = true;
440 }
441 }
442 }
443 #[cfg(aio)]
444 Some(OpType::Aio(aiocbp)) => {
445 let aiocb = unsafe { aiocbp.as_ref() };
446 let fd = aiocb.aio_fildes;
447 syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
448 }
449 }
450 }
451
452 pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
453 instrument!(compio_log::Level::TRACE, "push", ?key);
454 match { key.borrow().pinned_op().pre_submit()? } {
455 Decision::Wait(args) => {
456 key.borrow()
457 .extra_mut()
458 .as_poll_mut()
459 .set_args(args.clone());
460 for arg in args.iter().copied() {
461 let res = unsafe { self.submit(key.clone(), arg) };
463 if let Err(e) = res {
465 args.into_iter().for_each(|arg| {
466 let _ = self.remove_one(&key, arg.fd);
468 });
469 return Poll::Ready(Err(e));
470 }
471 trace!("register {:?}", arg);
472 }
473 Poll::Pending
474 }
475 Decision::Completed(res) => Poll::Ready(Ok(res)),
476 Decision::Blocking => {
477 self.push_blocking(key);
478 Poll::Pending
479 }
480 #[cfg(aio)]
481 Decision::Aio(AioControl { mut aiocbp, submit }) => {
482 let aiocb = unsafe { aiocbp.as_mut() };
483 let user_data = key.as_raw();
484 #[cfg(freebsd)]
485 {
486 aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
488 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
489 aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
490 }
491 #[cfg(solarish)]
492 let mut notify = libc::port_notify {
493 portnfy_port: self.as_raw_fd(),
494 portnfy_user: user_data as _,
495 };
496 #[cfg(solarish)]
497 {
498 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
499 aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
500 }
501 match syscall!(submit(aiocbp.as_ptr())) {
502 Ok(_) => {
503 key.into_raw();
505 Poll::Pending
506 }
507 Err(e)
515 if matches!(
516 e.raw_os_error(),
517 Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
518 ) =>
519 {
520 self.push_blocking(key);
521 Poll::Pending
522 }
523 Err(e) => Poll::Ready(Err(e)),
524 }
525 }
526 }
527 }
528
529 fn push_blocking(&mut self, key: ErasedKey) {
530 let waker = self.waker();
531 let completed = self.completed_tx.clone();
532 let mut key = unsafe { key.freeze() };
534
535 let mut closure = move || {
536 let poll = key.pinned_op().operate();
537 let res = match poll {
538 Poll::Pending => unreachable!("this operation is not non-blocking"),
539 Poll::Ready(res) => res,
540 };
541 let _ = completed.send(Entry::new(key.into_inner(), res));
542 waker.wake();
543 };
544
545 while let Err(e) = self.pool.dispatch(closure) {
546 closure = e.0;
547 self.poll_completed();
548 }
549 }
550
551 fn poll_completed(&mut self) -> bool {
552 let mut ret = false;
553 while let Ok(entry) = self.completed_rx.try_recv() {
554 entry.notify();
555 ret = true;
556 }
557 ret
558 }
559
560 #[allow(clippy::blocks_in_conditions)]
561 fn poll_one(&mut self, event: Event, fd: RawFd) -> io::Result<()> {
562 let queue = self.get_queue(fd);
563
564 if let Some((key, _)) = queue.pop_interest(&event)
565 && let mut op = key.borrow()
566 && op.extra_mut().as_poll_mut().handle_event(fd)
567 {
568 match { op.pinned_op().operate() } {
570 Poll::Pending => {
572 let extra = op.extra_mut().as_poll_mut();
573 extra.reset();
574 for t in extra.track.iter() {
576 let res = unsafe { self.submit_front(key.clone(), t.arg) };
577 if let Err(e) = res {
578 for t in extra.track.iter() {
580 let _ = self.remove_one(&key, t.arg.fd);
581 }
582 return Err(e);
583 }
584 }
585 }
586 Poll::Ready(res) => {
587 drop(op);
588 Entry::new(key, res).notify()
589 }
590 };
591 }
592
593 let renew_event = self.get_queue(fd).event();
594 let fd = unsafe { BorrowedFd::borrow_raw(fd) };
595 self.renew(fd, renew_event)
596 }
597
598 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
599 instrument!(compio_log::Level::TRACE, "poll", ?timeout);
600 if self.poll_completed() {
601 return Ok(());
602 }
603 self.events.clear();
604 self.notify.poll.wait(&mut self.events, timeout)?;
605 if self.events.is_empty() && timeout.is_some() {
606 return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
607 }
608 self.with_events(|this, events| {
609 for event in events.iter() {
610 trace!("receive {} for {:?}", event.key, event);
611 let key = unsafe { BorrowedKey::from_raw(event.key) };
613 let mut op = key.borrow();
614 let op_type = op.pinned_op().op_type();
615 match op_type {
616 None => {
617 trace!("op {} is completed", event.key);
620 }
621 Some(OpType::Fd(_)) => {
622 let Some(fd) = op.extra().as_poll().next_fd() else {
624 return Ok(());
625 };
626 drop(op);
627 this.poll_one(event, fd)?;
628 }
629 #[cfg(aio)]
630 Some(OpType::Aio(aiocbp)) => {
631 drop(op);
632 let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
633 let res = match err {
634 libc::EINPROGRESS => {
639 trace!("op {} is not completed", key.as_raw());
640 continue;
641 }
642 libc::ECANCELED => {
643 unsafe { libc::aio_return(aiocbp.as_ptr()) };
645 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
646 }
647 _ => {
648 syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize)
649 }
650 };
651 let key = unsafe { ErasedKey::from_raw(event.key) };
652 Entry::new(key, res).notify()
653 }
654 }
655 }
656
657 Ok(())
658 })
659 }
660
661 pub fn waker(&self) -> Waker {
662 Waker::from(self.notify.clone())
663 }
664
665 pub fn create_buffer_pool(
666 &mut self,
667 buffer_len: u16,
668 buffer_size: usize,
669 ) -> io::Result<BufferPool> {
670 #[cfg(fusion)]
671 {
672 Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
673 buffer_len,
674 buffer_size,
675 )))
676 }
677 #[cfg(not(fusion))]
678 {
679 Ok(BufferPool::new(buffer_len, buffer_size))
680 }
681 }
682
683 pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
687 Ok(())
688 }
689}
690
691impl AsRawFd for Driver {
692 fn as_raw_fd(&self) -> RawFd {
693 self.poller().as_raw_fd()
694 }
695}
696
697impl Drop for Driver {
698 fn drop(&mut self) {
699 for fd in self.registry.keys() {
700 unsafe {
701 let fd = BorrowedFd::borrow_raw(*fd);
702 self.poller().delete(fd).ok();
703 }
704 }
705 }
706}
707
708impl Entry {
709 pub(crate) fn new_cancelled(key: ErasedKey) -> Self {
710 Entry::new(key, Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)))
711 }
712}
713
714pub(crate) struct Notify {
716 poll: Poller,
717}
718
719impl Notify {
720 fn new(poll: Poller) -> Self {
721 Self { poll }
722 }
723
724 pub fn notify(&self) -> io::Result<()> {
726 self.poll.notify()
727 }
728}
729
730impl Wake for Notify {
731 fn wake(self: Arc<Self>) {
732 self.wake_by_ref();
733 }
734
735 fn wake_by_ref(self: &Arc<Self>) {
736 self.notify().ok();
737 }
738}