Skip to main content

compio_driver/sys/op/managed/
fusion.rs

1use compio_buf::*;
2use rustix::net::RecvFlags;
3use socket2::SockAddr;
4
5use super::{fallback, iour};
6use crate::{BufferPool, BufferRef, IourOpCode, OpEntry, OpType, PollOpCode, sys::pal::*};
7
8macro_rules! mop {
9    (<$($ty:ident: $trait:ident),* $(,)?> $name:ident( $($arg:ident: $arg_t:ty),* $(,)? ) with $pool:ident) => {
10        mop!(<$($ty: $trait),*> $name( $($arg: $arg_t),* ) with $pool; crate::BufferRef);
11    };
12    (<$($ty:ident: $trait:ident),* $(,)?> $name:ident( $($arg:ident: $arg_t:ty),* $(,)? ) with $pool:ident; $inner:ty) => {
13        ::paste::paste!{
14            enum [< $name Inner >] <$($ty: $trait),*> {
15                Poll(fallback::$name<$($ty),*>),
16                IoUring(iour::$name<$($ty),*>),
17            }
18
19            impl<$($ty: $trait),*> [< $name Inner >]<$($ty),*> {
20                fn poll(&mut self) -> &mut fallback::$name<$($ty),*> {
21                    match self {
22                        Self::Poll(op) => op,
23                        Self::IoUring(_) => unreachable!("Current driver is not `io-uring`"),
24                    }
25                }
26
27                fn iour(&mut self) -> &mut iour::$name<$($ty),*> {
28                    match self {
29                        Self::IoUring(op) => op,
30                        Self::Poll(_) => unreachable!("Current driver is not `polling`"),
31                    }
32                }
33            }
34
35            #[doc = concat!("A fused `", stringify!($name), "` operation")]
36            pub struct $name <$($ty: $trait),*> {
37                inner: [< $name Inner >] <$($ty),*>
38            }
39
40            impl<$($ty: $trait),*> $name <$($ty),*> {
41                #[doc = concat!("Create a new `", stringify!($name), "`.")]
42                pub fn new($($arg: $arg_t),*) -> std::io::Result<Self> {
43                    Ok(if $pool.is_io_uring()? {
44                        Self {
45                            inner: [< $name Inner >]::IoUring(iour::$name::new($($arg),*)?),
46                        }
47                    } else {
48                        Self {
49                            inner: [< $name Inner >]::Poll(fallback::$name::new($($arg),*)?),
50                        }
51                    })
52                }
53            }
54
55            impl <$($ty: $trait),*> crate::TakeBuffer for $name <$($ty),*> {
56                type Buffer = $inner;
57
58                fn take_buffer(self) -> Option<$inner> {
59                    match self.inner {
60                        [< $name Inner >]::IoUring(op) => op.take_buffer().map(Into::into),
61                        [< $name Inner >]::Poll(op) => op.take_buffer().map(Into::into),
62                    }
63                }
64            }
65
66            unsafe impl<$($ty: $trait),*> PollOpCode for $name<$($ty),*> {
67                type Control = <fallback::$name<$($ty),*> as PollOpCode>::Control;
68
69                unsafe fn init(&mut self, ctrl: &mut Self::Control) {
70                    unsafe { self.inner.poll().init(ctrl) }
71                }
72
73                fn pre_submit(&mut self, control: &mut Self::Control) -> std::io::Result<crate::Decision> {
74                    self.inner.poll().pre_submit(control)
75                }
76
77                fn op_type(&mut self, control: &mut Self::Control) -> Option<OpType> {
78                    self.inner.poll().op_type(control)
79                }
80
81                fn operate(
82                    &mut self, control: &mut Self::Control,
83                ) -> std::task::Poll<std::io::Result<usize>> {
84                    self.inner.poll().operate(control)
85                }
86            }
87
88            unsafe impl<$($ty: $trait),*> IourOpCode for $name<$($ty),*> {
89                type Control = <iour::$name<$($ty),*> as IourOpCode>::Control;
90
91                unsafe fn init(&mut self, ctrl: &mut Self::Control) {
92                    unsafe { self.inner.iour().init(ctrl) }
93                }
94
95                fn create_entry(&mut self, control: &mut Self::Control) -> OpEntry {
96                    self.inner.iour().create_entry(control)
97                }
98
99                fn create_entry_fallback(&mut self, control: &mut Self::Control) -> OpEntry {
100                    self.inner.iour().create_entry_fallback(control)
101                }
102
103                fn call_blocking(&mut self, control: &mut Self::Control) -> std::io::Result<usize> {
104                    self.inner.iour().call_blocking(control)
105                }
106
107                unsafe fn set_result(&mut self, control: &mut Self::Control, result: &std::io::Result<usize>, extra: &crate::Extra) {
108                    unsafe { self.inner.iour().set_result(control, result, extra) }
109                }
110
111                unsafe fn push_multishot(&mut self, control: &mut Self::Control, result: std::io::Result<usize>, extra: crate::Extra) {
112                    unsafe { self.inner.iour().push_multishot(control, result, extra) }
113                }
114
115                fn pop_multishot(&mut self, control: &mut Self::Control) -> Option<BufResult<usize, crate::Extra>> {
116                    self.inner.iour().pop_multishot(control)
117                }
118            }
119        }
120    };
121}
122
123mop!(<S: AsFd> ReadManagedAt(fd: S, offset: u64, pool: &BufferPool, len: usize) with pool);
124mop!(<S: AsFd> ReadManaged(fd: S, pool: &BufferPool, len: usize) with pool);
125mop!(<S: AsFd> RecvManaged(fd: S, pool: &BufferPool, len: usize, flags: RecvFlags) with pool);
126mop!(<S: AsFd> RecvFromManaged(fd: S, pool: &BufferPool, len: usize, flags: RecvFlags) with pool; (BufferRef, Option<SockAddr>));
127mop!(<C: IoBufMut, S: AsFd> RecvMsgManaged(fd: S, pool: &BufferPool, len: usize, control: C, flags: RecvFlags) with pool; ((BufferRef, C), Option<SockAddr>, usize));
128mop!(<S: AsFd> ReadMultiAt(fd: S, offset: u64, pool: &BufferPool, len: usize) with pool);
129mop!(<S: AsFd> ReadMulti(fd: S, pool: &BufferPool, len: usize) with pool);
130mop!(<S: AsFd> RecvMulti(fd: S, pool: &BufferPool, len: usize, flags: RecvFlags) with pool);
131mop!(<S: AsFd> RecvFromMulti(fd: S, pool: &BufferPool, flags: RecvFlags) with pool; RecvFromMultiResult);
132mop!(<S: AsFd> RecvMsgMulti(fd: S, pool: &BufferPool, control_len: usize, flags: RecvFlags) with pool; RecvMsgMultiResult);
133
134enum RecvFromMultiResultInner {
135    Poll(fallback::RecvFromMultiResult),
136    IoUring(iour::RecvFromMultiResult),
137}
138
139/// Result of [`RecvFromMulti`].
140pub struct RecvFromMultiResult {
141    inner: RecvFromMultiResultInner,
142}
143
144impl From<fallback::RecvFromMultiResult> for RecvFromMultiResult {
145    fn from(result: fallback::RecvFromMultiResult) -> Self {
146        Self {
147            inner: RecvFromMultiResultInner::Poll(result),
148        }
149    }
150}
151
152impl From<iour::RecvFromMultiResult> for RecvFromMultiResult {
153    fn from(result: iour::RecvFromMultiResult) -> Self {
154        Self {
155            inner: RecvFromMultiResultInner::IoUring(result),
156        }
157    }
158}
159
160impl RecvFromMultiResult {
161    /// Create [`RecvFromMultiResult`] from a buffer received from
162    /// [`RecvFromMulti`]. It should be used for io-uring only.
163    ///
164    /// # Safety
165    ///
166    /// The buffer must be received from [`RecvFromMulti`] or have the same
167    /// format as the buffer received from [`RecvFromMulti`].
168    pub unsafe fn new(buffer: BufferRef) -> Self {
169        Self {
170            inner: RecvFromMultiResultInner::IoUring(unsafe {
171                iour::RecvFromMultiResult::new(buffer)
172            }),
173        }
174    }
175
176    /// Get the payload data.
177    pub fn data(&self) -> &[u8] {
178        match &self.inner {
179            RecvFromMultiResultInner::Poll(result) => result.data(),
180            RecvFromMultiResultInner::IoUring(result) => result.data(),
181        }
182    }
183
184    /// Get the source address if applicable.
185    pub fn addr(&self) -> Option<SockAddr> {
186        match &self.inner {
187            RecvFromMultiResultInner::Poll(result) => result.addr(),
188            RecvFromMultiResultInner::IoUring(result) => result.addr(),
189        }
190    }
191}
192
193impl IntoInner for RecvFromMultiResult {
194    type Inner = BufferRef;
195
196    fn into_inner(self) -> Self::Inner {
197        match self.inner {
198            RecvFromMultiResultInner::Poll(result) => result.into_inner(),
199            RecvFromMultiResultInner::IoUring(result) => result.into_inner(),
200        }
201    }
202}
203
204enum RecvMsgMultiResultInner {
205    Poll(fallback::RecvMsgMultiResult),
206    IoUring(iour::RecvMsgMultiResult),
207}
208
209/// Result of [`RecvMsgMulti`].
210pub struct RecvMsgMultiResult {
211    inner: RecvMsgMultiResultInner,
212}
213
214impl From<fallback::RecvMsgMultiResult> for RecvMsgMultiResult {
215    fn from(result: fallback::RecvMsgMultiResult) -> Self {
216        Self {
217            inner: RecvMsgMultiResultInner::Poll(result),
218        }
219    }
220}
221
222impl From<iour::RecvMsgMultiResult> for RecvMsgMultiResult {
223    fn from(result: iour::RecvMsgMultiResult) -> Self {
224        Self {
225            inner: RecvMsgMultiResultInner::IoUring(result),
226        }
227    }
228}
229
230impl RecvMsgMultiResult {
231    /// Create [`RecvMsgMultiResult`] from a buffer received from
232    /// [`RecvMsgMulti`]. It should be used for io-uring only.
233    ///
234    /// # Safety
235    ///
236    /// The buffer must be received from [`RecvMsgMulti`] or have the same
237    /// format as the buffer received from [`RecvMsgMulti`].
238    pub unsafe fn new(buffer: BufferRef, clen: usize) -> Self {
239        Self {
240            inner: RecvMsgMultiResultInner::IoUring(unsafe {
241                iour::RecvMsgMultiResult::new(buffer, clen)
242            }),
243        }
244    }
245
246    /// Get the payload data.
247    pub fn data(&self) -> &[u8] {
248        match &self.inner {
249            RecvMsgMultiResultInner::Poll(result) => result.data(),
250            RecvMsgMultiResultInner::IoUring(result) => result.data(),
251        }
252    }
253
254    /// Get the ancillary data.
255    pub fn ancillary(&self) -> &[u8] {
256        match &self.inner {
257            RecvMsgMultiResultInner::Poll(result) => result.ancillary(),
258            RecvMsgMultiResultInner::IoUring(result) => result.ancillary(),
259        }
260    }
261
262    /// Get the source address if applicable.
263    pub fn addr(&self) -> Option<SockAddr> {
264        match &self.inner {
265            RecvMsgMultiResultInner::Poll(result) => result.addr(),
266            RecvMsgMultiResultInner::IoUring(result) => result.addr(),
267        }
268    }
269}
270
271impl IntoInner for RecvMsgMultiResult {
272    type Inner = BufferRef;
273
274    fn into_inner(self) -> Self::Inner {
275        match self.inner {
276            RecvMsgMultiResultInner::Poll(result) => result.into_inner(),
277            RecvMsgMultiResultInner::IoUring(result) => result.into_inner(),
278        }
279    }
280}