compio_driver/fusion/
mod.rs

1#[path = "../poll/mod.rs"]
2mod poll;
3
4#[path = "../iour/mod.rs"]
5mod iour;
6
7pub(crate) mod op;
8
9#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
10pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
11use std::{io, task::Poll, time::Duration};
12
13use compio_log::warn;
14pub use iour::{OpCode as IourOpCode, OpEntry};
15pub use poll::{Decision, OpCode as PollOpCode, OpType};
16
17pub use crate::driver_type::DriverType; // Re-export so current user won't be broken
18use crate::{BufferPool, Key, ProactorBuilder};
19
20/// Fused [`OpCode`]
21///
22/// This trait encapsulates both operation for `io-uring` and `polling`
23pub trait OpCode: PollOpCode + IourOpCode {}
24
25impl<T: PollOpCode + IourOpCode + ?Sized> OpCode for T {}
26
27#[allow(clippy::large_enum_variant)]
28enum FuseDriver {
29    Poll(poll::Driver),
30    IoUring(iour::Driver),
31}
32
33/// Low-level fusion driver.
34pub(crate) struct Driver {
35    fuse: FuseDriver,
36}
37
38impl Driver {
39    /// Create a new fusion driver with given number of entries
40    pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
41        let (ty, fallback) = match &builder.driver_type {
42            Some(t) => (*t, false),
43            None => (DriverType::suggest(), true),
44        };
45        match ty {
46            DriverType::Poll => Ok(Self {
47                fuse: FuseDriver::Poll(poll::Driver::new(builder)?),
48            }),
49            DriverType::IoUring => match iour::Driver::new(builder) {
50                Ok(driver) => Ok(Self {
51                    fuse: FuseDriver::IoUring(driver),
52                }),
53                Err(_e) if fallback => {
54                    warn!("Fail to create io-uring driver: {_e:?}, fallback to polling driver.");
55                    Ok(Self {
56                        fuse: FuseDriver::Poll(poll::Driver::new(builder)?),
57                    })
58                }
59                Err(e) => Err(e),
60            },
61            _ => unreachable!("Fuse driver will only be enabled on linux"),
62        }
63    }
64
65    pub fn driver_type(&self) -> DriverType {
66        match &self.fuse {
67            FuseDriver::Poll(driver) => driver.driver_type(),
68            FuseDriver::IoUring(driver) => driver.driver_type(),
69        }
70    }
71
72    pub fn create_op<T: OpCode + 'static>(&self, op: T) -> Key<T> {
73        match &self.fuse {
74            FuseDriver::Poll(driver) => driver.create_op(op),
75            FuseDriver::IoUring(driver) => driver.create_op(op),
76        }
77    }
78
79    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
80        match &mut self.fuse {
81            FuseDriver::Poll(driver) => driver.attach(fd),
82            FuseDriver::IoUring(driver) => driver.attach(fd),
83        }
84    }
85
86    pub fn cancel(&mut self, op: &mut Key<dyn OpCode>) {
87        match &mut self.fuse {
88            FuseDriver::Poll(driver) => driver.cancel(op),
89            FuseDriver::IoUring(driver) => driver.cancel(op),
90        }
91    }
92
93    pub fn push(&mut self, op: &mut Key<dyn OpCode>) -> Poll<io::Result<usize>> {
94        match &mut self.fuse {
95            FuseDriver::Poll(driver) => driver.push(op),
96            FuseDriver::IoUring(driver) => driver.push(op),
97        }
98    }
99
100    pub unsafe fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
101        match &mut self.fuse {
102            FuseDriver::Poll(driver) => driver.poll(timeout),
103            FuseDriver::IoUring(driver) => driver.poll(timeout),
104        }
105    }
106
107    pub fn handle(&self) -> NotifyHandle {
108        let fuse = match &self.fuse {
109            FuseDriver::Poll(driver) => FuseNotifyHandle::Poll(driver.handle()),
110            FuseDriver::IoUring(driver) => FuseNotifyHandle::IoUring(driver.handle()),
111        };
112        NotifyHandle::from_fuse(fuse)
113    }
114
115    pub fn create_buffer_pool(
116        &mut self,
117        buffer_len: u16,
118        buffer_size: usize,
119    ) -> io::Result<BufferPool> {
120        match &mut self.fuse {
121            FuseDriver::IoUring(driver) => Ok(driver.create_buffer_pool(buffer_len, buffer_size)?),
122            FuseDriver::Poll(driver) => Ok(driver.create_buffer_pool(buffer_len, buffer_size)?),
123        }
124    }
125
126    /// # Safety
127    ///
128    /// caller must make sure release the buffer pool with correct driver
129    pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
130        match &mut self.fuse {
131            FuseDriver::Poll(driver) => driver.release_buffer_pool(buffer_pool),
132            FuseDriver::IoUring(driver) => driver.release_buffer_pool(buffer_pool),
133        }
134    }
135}
136
137impl AsRawFd for Driver {
138    fn as_raw_fd(&self) -> RawFd {
139        match &self.fuse {
140            FuseDriver::Poll(driver) => driver.as_raw_fd(),
141            FuseDriver::IoUring(driver) => driver.as_raw_fd(),
142        }
143    }
144}
145
146enum FuseNotifyHandle {
147    Poll(poll::NotifyHandle),
148    IoUring(iour::NotifyHandle),
149}
150
151/// A notify handle to the inner driver.
152pub struct NotifyHandle {
153    fuse: FuseNotifyHandle,
154}
155
156impl NotifyHandle {
157    fn from_fuse(fuse: FuseNotifyHandle) -> Self {
158        Self { fuse }
159    }
160
161    /// Notify the inner driver.
162    pub fn notify(&self) -> io::Result<()> {
163        match &self.fuse {
164            FuseNotifyHandle::Poll(handle) => handle.notify(),
165            FuseNotifyHandle::IoUring(handle) => handle.notify(),
166        }
167    }
168}