1use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
10use socket2::{SockAddr, SockAddrStorage, socklen_t};
11
12pub use crate::sys::op::{
13 Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
14 SendToVectored, SendVectored,
15};
16#[cfg(windows)]
17pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
18#[cfg(unix)]
19pub use crate::sys::op::{
20 CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
21 ReadVectoredAt, Rename, Symlink, Unlink, WriteVectoredAt,
22};
23#[cfg(io_uring)]
24pub use crate::sys::op::{ReadManagedAt, RecvManaged};
25use crate::{OwnedFd, TakeBuffer};
26
27pub trait BufResultExt {
29 fn map_advanced(self) -> Self;
31}
32
33impl<T: SetBufInit> BufResultExt for BufResult<usize, T> {
34 fn map_advanced(self) -> Self {
35 self.map_res(|res| (res, ()))
36 .map_advanced()
37 .map_res(|(res, _)| res)
38 }
39}
40
41impl<T: SetBufInit, O> BufResultExt for BufResult<(usize, O), T> {
42 fn map_advanced(self) -> Self {
43 self.map(|(init, obj), mut buffer| {
44 unsafe {
45 buffer.set_buf_init(init);
46 }
47 ((init, obj), buffer)
48 })
49 }
50}
51
52impl<T: SetBufInit, C: SetBufInit, O> BufResultExt for BufResult<(usize, usize, O), (T, C)> {
53 fn map_advanced(self) -> Self {
54 self.map(
55 |(init_buffer, init_control, obj), (mut buffer, mut control)| {
56 unsafe {
57 buffer.set_buf_init(init_buffer);
58 control.set_buf_init(init_control);
59 }
60 ((init_buffer, init_control, obj), (buffer, control))
61 },
62 )
63 }
64}
65
66pub trait RecvResultExt {
68 type RecvResult;
70
71 fn map_addr(self) -> Self::RecvResult;
73}
74
75impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t)> {
76 type RecvResult = BufResult<(usize, SockAddr), T>;
77
78 fn map_addr(self) -> Self::RecvResult {
79 self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
80 .map_addr()
81 .map_res(|(res, _, addr)| (res, addr))
82 }
83}
84
85impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t, usize)> {
86 type RecvResult = BufResult<(usize, usize, SockAddr), T>;
87
88 fn map_addr(self) -> Self::RecvResult {
89 self.map2(
90 |res, (buffer, addr_buffer, addr_size, len)| {
91 let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
92 ((res, len, addr), buffer)
93 },
94 |(buffer, ..)| buffer,
95 )
96 }
97}
98
99pub trait ResultTakeBuffer {
101 type BufferPool;
103 type Buffer<'a>;
105
106 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
108}
109
110impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, u32) {
111 type Buffer<'a> = T::Buffer<'a>;
112 type BufferPool = T::BufferPool;
113
114 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
115 let (BufResult(result, op), flags) = self;
116 op.take_buffer(pool, result, flags)
117 }
118}
119
120pub struct Asyncify<F, D> {
122 pub(crate) f: Option<F>,
123 pub(crate) data: Option<D>,
124 _p: PhantomPinned,
125}
126
127impl<F, D> Asyncify<F, D> {
128 pub fn new(f: F) -> Self {
130 Self {
131 f: Some(f),
132 data: None,
133 _p: PhantomPinned,
134 }
135 }
136}
137
138impl<F, D> IntoInner for Asyncify<F, D> {
139 type Inner = D;
140
141 fn into_inner(mut self) -> Self::Inner {
142 self.data.take().expect("the data should not be None")
143 }
144}
145
146pub struct CloseFile {
148 pub(crate) fd: ManuallyDrop<OwnedFd>,
149}
150
151impl CloseFile {
152 pub fn new(fd: OwnedFd) -> Self {
154 Self {
155 fd: ManuallyDrop::new(fd),
156 }
157 }
158}
159
160#[derive(Debug)]
162pub struct ReadAt<T: IoBufMut, S> {
163 pub(crate) fd: S,
164 pub(crate) offset: u64,
165 pub(crate) buffer: T,
166 #[cfg(aio)]
167 pub(crate) aiocb: libc::aiocb,
168 _p: PhantomPinned,
169}
170
171impl<T: IoBufMut, S> ReadAt<T, S> {
172 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
174 Self {
175 fd,
176 offset,
177 buffer,
178 #[cfg(aio)]
179 aiocb: unsafe { std::mem::zeroed() },
180 _p: PhantomPinned,
181 }
182 }
183}
184
185impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
186 type Inner = T;
187
188 fn into_inner(self) -> Self::Inner {
189 self.buffer
190 }
191}
192
193#[derive(Debug)]
195pub struct WriteAt<T: IoBuf, S> {
196 pub(crate) fd: S,
197 pub(crate) offset: u64,
198 pub(crate) buffer: T,
199 #[cfg(aio)]
200 pub(crate) aiocb: libc::aiocb,
201 _p: PhantomPinned,
202}
203
204impl<T: IoBuf, S> WriteAt<T, S> {
205 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
207 Self {
208 fd,
209 offset,
210 buffer,
211 #[cfg(aio)]
212 aiocb: unsafe { std::mem::zeroed() },
213 _p: PhantomPinned,
214 }
215 }
216}
217
218impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
219 type Inner = T;
220
221 fn into_inner(self) -> Self::Inner {
222 self.buffer
223 }
224}
225
226pub struct Sync<S> {
228 pub(crate) fd: S,
229 #[allow(dead_code)]
230 pub(crate) datasync: bool,
231 #[cfg(aio)]
232 pub(crate) aiocb: libc::aiocb,
233}
234
235impl<S> Sync<S> {
236 pub fn new(fd: S, datasync: bool) -> Self {
240 Self {
241 fd,
242 datasync,
243 #[cfg(aio)]
244 aiocb: unsafe { std::mem::zeroed() },
245 }
246 }
247}
248
249pub struct ShutdownSocket<S> {
251 pub(crate) fd: S,
252 pub(crate) how: Shutdown,
253}
254
255impl<S> ShutdownSocket<S> {
256 pub fn new(fd: S, how: Shutdown) -> Self {
258 Self { fd, how }
259 }
260}
261
262pub struct CloseSocket {
264 pub(crate) fd: ManuallyDrop<OwnedFd>,
265}
266
267impl CloseSocket {
268 pub fn new(fd: OwnedFd) -> Self {
270 Self {
271 fd: ManuallyDrop::new(fd),
272 }
273 }
274}
275
276pub struct Connect<S> {
278 pub(crate) fd: S,
279 pub(crate) addr: SockAddr,
280}
281
282impl<S> Connect<S> {
283 pub fn new(fd: S, addr: SockAddr) -> Self {
285 Self { fd, addr }
286 }
287}
288
289#[cfg(any(not(io_uring), fusion))]
290pub(crate) mod managed {
291 use std::io;
292
293 use compio_buf::IntoInner;
294
295 use super::{ReadAt, Recv};
296 use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
297
298 pub struct ReadManagedAt<S> {
300 pub(crate) op: ReadAt<OwnedBuffer, S>,
301 }
302
303 impl<S> ReadManagedAt<S> {
304 pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
306 #[cfg(fusion)]
307 let pool = pool.as_poll();
308 Ok(Self {
309 op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
310 })
311 }
312 }
313
314 impl<S> TakeBuffer for ReadManagedAt<S> {
315 type Buffer<'a> = BorrowedBuffer<'a>;
316 type BufferPool = BufferPool;
317
318 fn take_buffer(
319 self,
320 buffer_pool: &BufferPool,
321 result: io::Result<usize>,
322 _: u32,
323 ) -> io::Result<BorrowedBuffer<'_>> {
324 let result = result?;
325 #[cfg(fusion)]
326 let buffer_pool = buffer_pool.as_poll();
327 let slice = self.op.into_inner();
328 let res = unsafe { buffer_pool.create_proxy(slice, result) };
330 #[cfg(fusion)]
331 let res = BorrowedBuffer::new_poll(res);
332 Ok(res)
333 }
334 }
335
336 pub struct RecvManaged<S> {
338 pub(crate) op: Recv<OwnedBuffer, S>,
339 }
340
341 impl<S> RecvManaged<S> {
342 pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
344 #[cfg(fusion)]
345 let pool = pool.as_poll();
346 Ok(Self {
347 op: Recv::new(fd, pool.get_buffer(len)?),
348 })
349 }
350 }
351
352 impl<S> TakeBuffer for RecvManaged<S> {
353 type Buffer<'a> = BorrowedBuffer<'a>;
354 type BufferPool = BufferPool;
355
356 fn take_buffer(
357 self,
358 buffer_pool: &Self::BufferPool,
359 result: io::Result<usize>,
360 _: u32,
361 ) -> io::Result<Self::Buffer<'_>> {
362 let result = result?;
363 #[cfg(fusion)]
364 let buffer_pool = buffer_pool.as_poll();
365 let slice = self.op.into_inner();
366 let res = unsafe { buffer_pool.create_proxy(slice, result) };
368 #[cfg(fusion)]
369 let res = BorrowedBuffer::new_poll(res);
370 Ok(res)
371 }
372 }
373}
374
375#[cfg(not(io_uring))]
376pub use managed::*;