1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7 collections::{HashMap, VecDeque},
8 io,
9 num::NonZeroUsize,
10 pin::Pin,
11 sync::Arc,
12 task::{Poll, Wake, Waker},
13 time::Duration,
14};
15
16use compio_log::{instrument, trace};
17use crossbeam_queue::SegQueue;
18use polling::{Event, Events, Poller};
19
20use crate::{
21 AsyncifyPool, BufferPool, DriverType, Entry, Key, ProactorBuilder, op::Interest, syscall,
22};
23
24pub(crate) mod op;
25
26pub trait OpCode {
28 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
31
32 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
34 None
35 }
36
37 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
41}
42
43#[non_exhaustive]
45pub enum Decision {
46 Completed(usize),
48 Wait(WaitArg),
50 Blocking,
52 #[cfg(aio)]
54 Aio(AioControl),
55}
56
57impl Decision {
58 pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
60 Self::Wait(WaitArg { fd, interest })
61 }
62
63 pub fn wait_readable(fd: RawFd) -> Self {
65 Self::wait_for(fd, Interest::Readable)
66 }
67
68 pub fn wait_writable(fd: RawFd) -> Self {
70 Self::wait_for(fd, Interest::Writable)
71 }
72
73 #[cfg(aio)]
75 pub fn aio(
76 cb: &mut libc::aiocb,
77 submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
78 ) -> Self {
79 Self::Aio(AioControl {
80 aiocbp: NonNull::from(cb),
81 submit,
82 })
83 }
84}
85
86#[derive(Debug, Clone, Copy)]
88pub struct WaitArg {
89 pub fd: RawFd,
91 pub interest: Interest,
93}
94
95#[cfg(aio)]
97#[derive(Debug, Clone, Copy)]
98pub struct AioControl {
99 pub aiocbp: NonNull<libc::aiocb>,
101 pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
103}
104
105#[derive(Debug, Default)]
106struct FdQueue {
107 read_queue: VecDeque<usize>,
108 write_queue: VecDeque<usize>,
109}
110
111impl FdQueue {
112 pub fn push_back_interest(&mut self, user_data: usize, interest: Interest) {
113 match interest {
114 Interest::Readable => self.read_queue.push_back(user_data),
115 Interest::Writable => self.write_queue.push_back(user_data),
116 }
117 }
118
119 pub fn push_front_interest(&mut self, user_data: usize, interest: Interest) {
120 match interest {
121 Interest::Readable => self.read_queue.push_front(user_data),
122 Interest::Writable => self.write_queue.push_front(user_data),
123 }
124 }
125
126 pub fn remove(&mut self, user_data: usize) {
127 self.read_queue.retain(|&k| k != user_data);
128 self.write_queue.retain(|&k| k != user_data);
129 }
130
131 pub fn event(&self) -> Event {
132 let mut event = Event::none(0);
133 if let Some(&key) = self.read_queue.front() {
134 event.readable = true;
135 event.key = key;
136 }
137 if let Some(&key) = self.write_queue.front() {
138 event.writable = true;
139 event.key = key;
140 }
141 event
142 }
143
144 pub fn pop_interest(&mut self, event: &Event) -> Option<(usize, Interest)> {
145 if event.readable
146 && let Some(user_data) = self.read_queue.pop_front()
147 {
148 return Some((user_data, Interest::Readable));
149 }
150 if event.writable
151 && let Some(user_data) = self.write_queue.pop_front()
152 {
153 return Some((user_data, Interest::Writable));
154 }
155 None
156 }
157}
158
159#[non_exhaustive]
162pub enum OpType {
163 Fd(RawFd),
165 #[cfg(aio)]
167 Aio(NonNull<libc::aiocb>),
168}
169
170pub(crate) struct Driver {
172 events: Events,
173 notify: Arc<Notify>,
174 registry: HashMap<RawFd, FdQueue>,
175 pool: AsyncifyPool,
176 pool_completed: Arc<SegQueue<Entry>>,
177}
178
179impl Driver {
180 pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
181 instrument!(compio_log::Level::TRACE, "new", ?builder);
182 trace!("new poll driver");
183 let entries = builder.capacity as usize; let events = if entries == 0 {
185 Events::new()
186 } else {
187 Events::with_capacity(NonZeroUsize::new(entries).unwrap())
188 };
189
190 let poll = Poller::new()?;
191 let notify = Arc::new(Notify::new(poll));
192
193 Ok(Self {
194 events,
195 notify,
196 registry: HashMap::new(),
197 pool: builder.create_or_get_thread_pool(),
198 pool_completed: Arc::new(SegQueue::new()),
199 })
200 }
201
202 pub fn driver_type(&self) -> DriverType {
203 DriverType::Poll
204 }
205
206 fn poller(&self) -> &Poller {
207 &self.notify.poll
208 }
209
210 pub fn create_op<T: crate::sys::OpCode + 'static>(&self, op: T) -> Key<T> {
211 Key::new(self.as_raw_fd(), op)
212 }
213
214 unsafe fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<()> {
217 let need_add = !self.registry.contains_key(&arg.fd);
218 let queue = self.registry.entry(arg.fd).or_default();
219 queue.push_back_interest(user_data, arg.interest);
220 let event = queue.event();
221 if need_add {
222 unsafe { self.poller().add(arg.fd, event)? }
224 } else {
225 let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
226 self.poller().modify(fd, event)?;
227 }
228 Ok(())
229 }
230
231 fn renew(
232 poll: &Poller,
233 registry: &mut HashMap<RawFd, FdQueue>,
234 fd: BorrowedFd,
235 renew_event: Event,
236 ) -> io::Result<()> {
237 if !renew_event.readable && !renew_event.writable {
238 poll.delete(fd)?;
239 registry.remove(&fd.as_raw_fd());
240 } else {
241 poll.modify(fd, renew_event)?;
242 }
243 Ok(())
244 }
245
246 pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
247 Ok(())
248 }
249
250 pub fn cancel(&mut self, op: &mut Key<dyn crate::sys::OpCode>) {
251 let op_pin = op.as_op_pin();
252 match op_pin.op_type() {
253 None => {}
254 Some(OpType::Fd(fd)) => {
255 let queue = self
256 .registry
257 .get_mut(&fd)
258 .expect("the fd should be attached");
259 queue.remove(op.user_data());
260 let renew_event = queue.event();
261 if Self::renew(
262 &self.notify.poll,
263 &mut self.registry,
264 unsafe { BorrowedFd::borrow_raw(fd) },
265 renew_event,
266 )
267 .is_ok()
268 {
269 self.pool_completed.push(entry_cancelled(op.user_data()));
270 }
271 }
272 #[cfg(aio)]
273 Some(OpType::Aio(aiocbp)) => {
274 let aiocb = unsafe { aiocbp.as_ref() };
275 let fd = aiocb.aio_fildes;
276 syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
277 }
278 }
279 }
280
281 pub fn push(&mut self, op: &mut Key<dyn crate::sys::OpCode>) -> Poll<io::Result<usize>> {
282 instrument!(compio_log::Level::TRACE, "push", ?op);
283 let user_data = op.user_data();
284 let op_pin = op.as_op_pin();
285 match op_pin.pre_submit()? {
286 Decision::Wait(arg) => {
287 unsafe {
289 self.submit(user_data, arg)?;
290 }
291 trace!("register {:?}", arg);
292 Poll::Pending
293 }
294 Decision::Completed(res) => Poll::Ready(Ok(res)),
295 Decision::Blocking => self.push_blocking(user_data),
296 #[cfg(aio)]
297 Decision::Aio(AioControl { mut aiocbp, submit }) => {
298 let aiocb = unsafe { aiocbp.as_mut() };
299 #[cfg(freebsd)]
300 {
301 aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
303 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
304 aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
305 }
306 #[cfg(solarish)]
307 let mut notify = libc::port_notify {
308 portnfy_port: self.as_raw_fd(),
309 portnfy_user: user_data as _,
310 };
311 #[cfg(solarish)]
312 {
313 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
314 aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
315 }
316 match syscall!(submit(aiocbp.as_ptr())) {
317 Ok(_) => Poll::Pending,
318 Err(e)
326 if matches!(
327 e.raw_os_error(),
328 Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
329 ) =>
330 {
331 self.push_blocking(user_data)
332 }
333 Err(e) => Poll::Ready(Err(e)),
334 }
335 }
336 }
337 }
338
339 fn push_blocking(&mut self, user_data: usize) -> Poll<io::Result<usize>> {
340 let waker = self.waker();
341 let completed = self.pool_completed.clone();
342 let mut closure = move || {
343 let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
344 let op_pin = op.as_op_pin();
345 let res = match op_pin.operate() {
346 Poll::Pending => unreachable!("this operation is not non-blocking"),
347 Poll::Ready(res) => res,
348 };
349 completed.push(Entry::new(user_data, res));
350 waker.wake();
351 };
352 loop {
353 match self.pool.dispatch(closure) {
354 Ok(()) => return Poll::Pending,
355 Err(e) => {
356 closure = e.0;
357 self.poll_blocking();
358 }
359 }
360 }
361 }
362
363 fn poll_blocking(&mut self) -> bool {
364 if self.pool_completed.is_empty() {
365 return false;
366 }
367 while let Some(entry) = self.pool_completed.pop() {
368 unsafe {
369 entry.notify();
370 }
371 }
372 true
373 }
374
375 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
376 instrument!(compio_log::Level::TRACE, "poll", ?timeout);
377 if self.poll_blocking() {
378 return Ok(());
379 }
380 self.events.clear();
381 self.notify.poll.wait(&mut self.events, timeout)?;
382 if self.events.is_empty() && timeout.is_some() {
383 return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
384 }
385 for event in self.events.iter() {
386 let user_data = event.key;
387 trace!("receive {} for {:?}", user_data, event);
388 let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
390 let op = op.as_op_pin();
391 match op.op_type() {
392 None => {
393 trace!("op {} is completed", user_data);
396 }
397 Some(OpType::Fd(fd)) => {
398 let queue = self
401 .registry
402 .get_mut(&fd)
403 .expect("the fd should be attached");
404 if let Some((user_data, interest)) = queue.pop_interest(&event) {
405 let mut op =
406 unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
407 let op = op.as_op_pin();
408 let res = match op.operate() {
409 Poll::Pending => {
410 queue.push_front_interest(user_data, interest);
412 None
413 }
414 Poll::Ready(res) => Some(res),
415 };
416 if let Some(res) = res {
417 unsafe { Entry::new(user_data, res).notify() }
419 }
420 }
421 let renew_event = queue.event();
422 Self::renew(
423 &self.notify.poll,
424 &mut self.registry,
425 unsafe { BorrowedFd::borrow_raw(fd) },
426 renew_event,
427 )?;
428 }
429 #[cfg(aio)]
430 Some(OpType::Aio(aiocbp)) => {
431 let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
432 let res = match err {
433 libc::EINPROGRESS => {
438 trace!("op {} is not completed", user_data);
439 continue;
440 }
441 libc::ECANCELED => {
442 unsafe { libc::aio_return(aiocbp.as_ptr()) };
444 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
445 }
446 _ => syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize),
447 };
448 unsafe { Entry::new(user_data, res).notify() }
450 }
451 }
452 }
453 Ok(())
454 }
455
456 pub fn waker(&self) -> Waker {
457 Waker::from(self.notify.clone())
458 }
459
460 pub fn create_buffer_pool(
461 &mut self,
462 buffer_len: u16,
463 buffer_size: usize,
464 ) -> io::Result<BufferPool> {
465 #[cfg(fusion)]
466 {
467 Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
468 buffer_len,
469 buffer_size,
470 )))
471 }
472 #[cfg(not(fusion))]
473 {
474 Ok(BufferPool::new(buffer_len, buffer_size))
475 }
476 }
477
478 pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
482 Ok(())
483 }
484}
485
486impl AsRawFd for Driver {
487 fn as_raw_fd(&self) -> RawFd {
488 self.poller().as_raw_fd()
489 }
490}
491
492impl Drop for Driver {
493 fn drop(&mut self) {
494 for fd in self.registry.keys() {
495 unsafe {
496 let fd = BorrowedFd::borrow_raw(*fd);
497 self.poller().delete(fd).ok();
498 }
499 }
500 }
501}
502
503fn entry_cancelled(user_data: usize) -> Entry {
504 Entry::new(
505 user_data,
506 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
507 )
508}
509
510struct Notify {
512 poll: Poller,
513}
514
515impl Notify {
516 fn new(poll: Poller) -> Self {
517 Self { poll }
518 }
519
520 pub fn notify(&self) -> io::Result<()> {
522 self.poll.notify()
523 }
524}
525
526impl Wake for Notify {
527 fn wake(self: Arc<Self>) {
528 self.wake_by_ref();
529 }
530
531 fn wake_by_ref(self: &Arc<Self>) {
532 self.notify().ok();
533 }
534}