1use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
10use socket2::SockAddr;
11
12#[cfg(windows)]
13pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
14pub use crate::sys::op::{
15 Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
16 SendToVectored, SendVectored,
17};
18#[cfg(unix)]
19pub use crate::sys::op::{
20 CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
21 ReadVectoredAt, Rename, Symlink, Unlink, WriteVectoredAt,
22};
23#[cfg(io_uring)]
24pub use crate::sys::op::{ReadManagedAt, RecvManaged};
25use crate::{
26 OwnedFd, TakeBuffer,
27 sys::{sockaddr_storage, socklen_t},
28};
29
30pub trait BufResultExt {
32 fn map_advanced(self) -> Self;
34}
35
36impl<T: SetBufInit> BufResultExt for BufResult<usize, T> {
37 fn map_advanced(self) -> Self {
38 self.map_res(|res| (res, ()))
39 .map_advanced()
40 .map_res(|(res, _)| res)
41 }
42}
43
44impl<T: SetBufInit, O> BufResultExt for BufResult<(usize, O), T> {
45 fn map_advanced(self) -> Self {
46 self.map(|(init, obj), mut buffer| {
47 unsafe {
48 buffer.set_buf_init(init);
49 }
50 ((init, obj), buffer)
51 })
52 }
53}
54
55impl<T: SetBufInit, C: SetBufInit, O> BufResultExt for BufResult<(usize, usize, O), (T, C)> {
56 fn map_advanced(self) -> Self {
57 self.map(
58 |(init_buffer, init_control, obj), (mut buffer, mut control)| {
59 unsafe {
60 buffer.set_buf_init(init_buffer);
61 control.set_buf_init(init_control);
62 }
63 ((init_buffer, init_control, obj), (buffer, control))
64 },
65 )
66 }
67}
68
69pub trait RecvResultExt {
71 type RecvResult;
73
74 fn map_addr(self) -> Self::RecvResult;
76}
77
78impl<T> RecvResultExt for BufResult<usize, (T, sockaddr_storage, socklen_t)> {
79 type RecvResult = BufResult<(usize, SockAddr), T>;
80
81 fn map_addr(self) -> Self::RecvResult {
82 self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
83 .map_addr()
84 .map_res(|(res, _, addr)| (res, addr))
85 }
86}
87
88impl<T> RecvResultExt for BufResult<usize, (T, sockaddr_storage, socklen_t, usize)> {
89 type RecvResult = BufResult<(usize, usize, SockAddr), T>;
90
91 fn map_addr(self) -> Self::RecvResult {
92 self.map2(
93 |res, (buffer, addr_buffer, addr_size, len)| {
94 let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
95 ((res, len, addr), buffer)
96 },
97 |(buffer, ..)| buffer,
98 )
99 }
100}
101
102pub trait ResultTakeBuffer {
104 type BufferPool;
106 type Buffer<'a>;
108
109 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
111}
112
113impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, u32) {
114 type Buffer<'a> = T::Buffer<'a>;
115 type BufferPool = T::BufferPool;
116
117 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
118 let (BufResult(result, op), flags) = self;
119 op.take_buffer(pool, result, flags)
120 }
121}
122
123pub struct Asyncify<F, D> {
125 pub(crate) f: Option<F>,
126 pub(crate) data: Option<D>,
127 _p: PhantomPinned,
128}
129
130impl<F, D> Asyncify<F, D> {
131 pub fn new(f: F) -> Self {
133 Self {
134 f: Some(f),
135 data: None,
136 _p: PhantomPinned,
137 }
138 }
139}
140
141impl<F, D> IntoInner for Asyncify<F, D> {
142 type Inner = D;
143
144 fn into_inner(mut self) -> Self::Inner {
145 self.data.take().expect("the data should not be None")
146 }
147}
148
149pub struct CloseFile {
151 pub(crate) fd: ManuallyDrop<OwnedFd>,
152}
153
154impl CloseFile {
155 pub fn new(fd: OwnedFd) -> Self {
157 Self {
158 fd: ManuallyDrop::new(fd),
159 }
160 }
161}
162
163#[derive(Debug)]
165pub struct ReadAt<T: IoBufMut, S> {
166 pub(crate) fd: S,
167 pub(crate) offset: u64,
168 pub(crate) buffer: T,
169 #[cfg(aio)]
170 pub(crate) aiocb: libc::aiocb,
171 _p: PhantomPinned,
172}
173
174impl<T: IoBufMut, S> ReadAt<T, S> {
175 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
177 Self {
178 fd,
179 offset,
180 buffer,
181 #[cfg(aio)]
182 aiocb: unsafe { std::mem::zeroed() },
183 _p: PhantomPinned,
184 }
185 }
186}
187
188impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
189 type Inner = T;
190
191 fn into_inner(self) -> Self::Inner {
192 self.buffer
193 }
194}
195
196#[derive(Debug)]
198pub struct WriteAt<T: IoBuf, S> {
199 pub(crate) fd: S,
200 pub(crate) offset: u64,
201 pub(crate) buffer: T,
202 #[cfg(aio)]
203 pub(crate) aiocb: libc::aiocb,
204 _p: PhantomPinned,
205}
206
207impl<T: IoBuf, S> WriteAt<T, S> {
208 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
210 Self {
211 fd,
212 offset,
213 buffer,
214 #[cfg(aio)]
215 aiocb: unsafe { std::mem::zeroed() },
216 _p: PhantomPinned,
217 }
218 }
219}
220
221impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
222 type Inner = T;
223
224 fn into_inner(self) -> Self::Inner {
225 self.buffer
226 }
227}
228
229pub struct Sync<S> {
231 pub(crate) fd: S,
232 #[allow(dead_code)]
233 pub(crate) datasync: bool,
234 #[cfg(aio)]
235 pub(crate) aiocb: libc::aiocb,
236}
237
238impl<S> Sync<S> {
239 pub fn new(fd: S, datasync: bool) -> Self {
243 Self {
244 fd,
245 datasync,
246 #[cfg(aio)]
247 aiocb: unsafe { std::mem::zeroed() },
248 }
249 }
250}
251
252pub struct ShutdownSocket<S> {
254 pub(crate) fd: S,
255 pub(crate) how: Shutdown,
256}
257
258impl<S> ShutdownSocket<S> {
259 pub fn new(fd: S, how: Shutdown) -> Self {
261 Self { fd, how }
262 }
263}
264
265pub struct CloseSocket {
267 pub(crate) fd: ManuallyDrop<OwnedFd>,
268}
269
270impl CloseSocket {
271 pub fn new(fd: OwnedFd) -> Self {
273 Self {
274 fd: ManuallyDrop::new(fd),
275 }
276 }
277}
278
279pub struct Connect<S> {
281 pub(crate) fd: S,
282 pub(crate) addr: SockAddr,
283}
284
285impl<S> Connect<S> {
286 pub fn new(fd: S, addr: SockAddr) -> Self {
288 Self { fd, addr }
289 }
290}
291
292#[cfg(any(not(io_uring), fusion))]
293pub(crate) mod managed {
294 use std::io;
295
296 use compio_buf::IntoInner;
297
298 use super::{ReadAt, Recv};
299 use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
300
301 pub struct ReadManagedAt<S> {
303 pub(crate) op: ReadAt<OwnedBuffer, S>,
304 }
305
306 impl<S> ReadManagedAt<S> {
307 pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
309 #[cfg(fusion)]
310 let pool = pool.as_poll();
311 Ok(Self {
312 op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
313 })
314 }
315 }
316
317 impl<S> TakeBuffer for ReadManagedAt<S> {
318 type Buffer<'a> = BorrowedBuffer<'a>;
319 type BufferPool = BufferPool;
320
321 fn take_buffer(
322 self,
323 buffer_pool: &BufferPool,
324 result: io::Result<usize>,
325 _: u32,
326 ) -> io::Result<BorrowedBuffer<'_>> {
327 let result = result?;
328 #[cfg(fusion)]
329 let buffer_pool = buffer_pool.as_poll();
330 let slice = self.op.into_inner();
331 let res = unsafe { buffer_pool.create_proxy(slice, result) };
333 #[cfg(fusion)]
334 let res = BorrowedBuffer::new_poll(res);
335 Ok(res)
336 }
337 }
338
339 pub struct RecvManaged<S> {
341 pub(crate) op: Recv<OwnedBuffer, S>,
342 }
343
344 impl<S> RecvManaged<S> {
345 pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
347 #[cfg(fusion)]
348 let pool = pool.as_poll();
349 Ok(Self {
350 op: Recv::new(fd, pool.get_buffer(len)?),
351 })
352 }
353 }
354
355 impl<S> TakeBuffer for RecvManaged<S> {
356 type Buffer<'a> = BorrowedBuffer<'a>;
357 type BufferPool = BufferPool;
358
359 fn take_buffer(
360 self,
361 buffer_pool: &Self::BufferPool,
362 result: io::Result<usize>,
363 _: u32,
364 ) -> io::Result<Self::Buffer<'_>> {
365 let result = result?;
366 #[cfg(fusion)]
367 let buffer_pool = buffer_pool.as_poll();
368 let slice = self.op.into_inner();
369 let res = unsafe { buffer_pool.create_proxy(slice, result) };
371 #[cfg(fusion)]
372 let res = BorrowedBuffer::new_poll(res);
373 Ok(res)
374 }
375 }
376}
377
378#[cfg(not(io_uring))]
379pub use managed::*;