1use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
10use socket2::{SockAddr, SockAddrStorage, socklen_t};
11
12pub use crate::sys::op::{
13 Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
14 SendToVectored, SendVectored,
15};
16#[cfg(windows)]
17pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
18#[cfg(unix)]
19pub use crate::sys::op::{
20 CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
21 ReadVectoredAt, Rename, Symlink, Unlink, WriteVectoredAt,
22};
23#[cfg(io_uring)]
24pub use crate::sys::op::{ReadManagedAt, RecvManaged};
25use crate::{OwnedFd, SharedFd, TakeBuffer};
26
27pub trait BufResultExt {
29 fn map_advanced(self) -> Self;
31}
32
33impl<T: SetBufInit> BufResultExt for BufResult<usize, T> {
34 fn map_advanced(self) -> Self {
35 self.map_res(|res| (res, ()))
36 .map_advanced()
37 .map_res(|(res, _)| res)
38 }
39}
40
41impl<T: SetBufInit, O> BufResultExt for BufResult<(usize, O), T> {
42 fn map_advanced(self) -> Self {
43 self.map(|(init, obj), mut buffer| {
44 unsafe {
45 buffer.set_buf_init(init);
46 }
47 ((init, obj), buffer)
48 })
49 }
50}
51
52impl<T: SetBufInit, C: SetBufInit, O> BufResultExt for BufResult<(usize, usize, O), (T, C)> {
53 fn map_advanced(self) -> Self {
54 self.map(
55 |(init_buffer, init_control, obj), (mut buffer, mut control)| {
56 unsafe {
57 buffer.set_buf_init(init_buffer);
58 control.set_buf_init(init_control);
59 }
60 ((init_buffer, init_control, obj), (buffer, control))
61 },
62 )
63 }
64}
65
66pub trait RecvResultExt {
68 type RecvResult;
70
71 fn map_addr(self) -> Self::RecvResult;
73}
74
75impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t)> {
76 type RecvResult = BufResult<(usize, SockAddr), T>;
77
78 fn map_addr(self) -> Self::RecvResult {
79 self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
80 .map_addr()
81 .map_res(|(res, _, addr)| (res, addr))
82 }
83}
84
85impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t, usize)> {
86 type RecvResult = BufResult<(usize, usize, SockAddr), T>;
87
88 fn map_addr(self) -> Self::RecvResult {
89 self.map2(
90 |res, (buffer, addr_buffer, addr_size, len)| {
91 let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
92 ((res, len, addr), buffer)
93 },
94 |(buffer, ..)| buffer,
95 )
96 }
97}
98
99pub trait ResultTakeBuffer {
101 type BufferPool;
103 type Buffer<'a>;
105
106 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
108}
109
110impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, u32) {
111 type Buffer<'a> = T::Buffer<'a>;
112 type BufferPool = T::BufferPool;
113
114 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
115 let (BufResult(result, op), flags) = self;
116 op.take_buffer(pool, result, flags)
117 }
118}
119
120pub struct Asyncify<F, D> {
122 pub(crate) f: Option<F>,
123 pub(crate) data: Option<D>,
124 _p: PhantomPinned,
125}
126
127impl<F, D> Asyncify<F, D> {
128 pub fn new(f: F) -> Self {
130 Self {
131 f: Some(f),
132 data: None,
133 _p: PhantomPinned,
134 }
135 }
136}
137
138impl<F, D> IntoInner for Asyncify<F, D> {
139 type Inner = D;
140
141 fn into_inner(mut self) -> Self::Inner {
142 self.data.take().expect("the data should not be None")
143 }
144}
145
146pub struct AsyncifyFd<S, F, D> {
148 pub(crate) fd: SharedFd<S>,
149 pub(crate) f: Option<F>,
150 pub(crate) data: Option<D>,
151 _p: PhantomPinned,
152}
153
154impl<S, F, D> AsyncifyFd<S, F, D> {
155 pub fn new(fd: SharedFd<S>, f: F) -> Self {
157 Self {
158 fd,
159 f: Some(f),
160 data: None,
161 _p: PhantomPinned,
162 }
163 }
164}
165
166impl<S, F, D> IntoInner for AsyncifyFd<S, F, D> {
167 type Inner = D;
168
169 fn into_inner(mut self) -> Self::Inner {
170 self.data.take().expect("the data should not be None")
171 }
172}
173
174pub struct CloseFile {
176 pub(crate) fd: ManuallyDrop<OwnedFd>,
177}
178
179impl CloseFile {
180 pub fn new(fd: OwnedFd) -> Self {
182 Self {
183 fd: ManuallyDrop::new(fd),
184 }
185 }
186}
187
188#[derive(Debug)]
190pub struct ReadAt<T: IoBufMut, S> {
191 pub(crate) fd: S,
192 pub(crate) offset: u64,
193 pub(crate) buffer: T,
194 #[cfg(aio)]
195 pub(crate) aiocb: libc::aiocb,
196 _p: PhantomPinned,
197}
198
199impl<T: IoBufMut, S> ReadAt<T, S> {
200 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
202 Self {
203 fd,
204 offset,
205 buffer,
206 #[cfg(aio)]
207 aiocb: unsafe { std::mem::zeroed() },
208 _p: PhantomPinned,
209 }
210 }
211}
212
213impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
214 type Inner = T;
215
216 fn into_inner(self) -> Self::Inner {
217 self.buffer
218 }
219}
220
221#[derive(Debug)]
223pub struct WriteAt<T: IoBuf, S> {
224 pub(crate) fd: S,
225 pub(crate) offset: u64,
226 pub(crate) buffer: T,
227 #[cfg(aio)]
228 pub(crate) aiocb: libc::aiocb,
229 _p: PhantomPinned,
230}
231
232impl<T: IoBuf, S> WriteAt<T, S> {
233 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
235 Self {
236 fd,
237 offset,
238 buffer,
239 #[cfg(aio)]
240 aiocb: unsafe { std::mem::zeroed() },
241 _p: PhantomPinned,
242 }
243 }
244}
245
246impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
247 type Inner = T;
248
249 fn into_inner(self) -> Self::Inner {
250 self.buffer
251 }
252}
253
254pub struct Sync<S> {
256 pub(crate) fd: S,
257 #[allow(dead_code)]
258 pub(crate) datasync: bool,
259 #[cfg(aio)]
260 pub(crate) aiocb: libc::aiocb,
261}
262
263impl<S> Sync<S> {
264 pub fn new(fd: S, datasync: bool) -> Self {
268 Self {
269 fd,
270 datasync,
271 #[cfg(aio)]
272 aiocb: unsafe { std::mem::zeroed() },
273 }
274 }
275}
276
277pub struct ShutdownSocket<S> {
279 pub(crate) fd: S,
280 pub(crate) how: Shutdown,
281}
282
283impl<S> ShutdownSocket<S> {
284 pub fn new(fd: S, how: Shutdown) -> Self {
286 Self { fd, how }
287 }
288}
289
290pub struct CloseSocket {
292 pub(crate) fd: ManuallyDrop<OwnedFd>,
293}
294
295impl CloseSocket {
296 pub fn new(fd: OwnedFd) -> Self {
298 Self {
299 fd: ManuallyDrop::new(fd),
300 }
301 }
302}
303
304pub struct Connect<S> {
306 pub(crate) fd: S,
307 pub(crate) addr: SockAddr,
308}
309
310impl<S> Connect<S> {
311 pub fn new(fd: S, addr: SockAddr) -> Self {
313 Self { fd, addr }
314 }
315}
316
317#[cfg(any(not(io_uring), fusion))]
318pub(crate) mod managed {
319 use std::io;
320
321 use compio_buf::IntoInner;
322
323 use super::{ReadAt, Recv};
324 use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
325
326 pub struct ReadManagedAt<S> {
328 pub(crate) op: ReadAt<OwnedBuffer, S>,
329 }
330
331 impl<S> ReadManagedAt<S> {
332 pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
334 #[cfg(fusion)]
335 let pool = pool.as_poll();
336 Ok(Self {
337 op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
338 })
339 }
340 }
341
342 impl<S> TakeBuffer for ReadManagedAt<S> {
343 type Buffer<'a> = BorrowedBuffer<'a>;
344 type BufferPool = BufferPool;
345
346 fn take_buffer(
347 self,
348 buffer_pool: &BufferPool,
349 result: io::Result<usize>,
350 _: u32,
351 ) -> io::Result<BorrowedBuffer<'_>> {
352 let result = result?;
353 #[cfg(fusion)]
354 let buffer_pool = buffer_pool.as_poll();
355 let slice = self.op.into_inner();
356 let res = unsafe { buffer_pool.create_proxy(slice, result) };
358 #[cfg(fusion)]
359 let res = BorrowedBuffer::new_poll(res);
360 Ok(res)
361 }
362 }
363
364 pub struct RecvManaged<S> {
366 pub(crate) op: Recv<OwnedBuffer, S>,
367 }
368
369 impl<S> RecvManaged<S> {
370 pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
372 #[cfg(fusion)]
373 let pool = pool.as_poll();
374 Ok(Self {
375 op: Recv::new(fd, pool.get_buffer(len)?),
376 })
377 }
378 }
379
380 impl<S> TakeBuffer for RecvManaged<S> {
381 type Buffer<'a> = BorrowedBuffer<'a>;
382 type BufferPool = BufferPool;
383
384 fn take_buffer(
385 self,
386 buffer_pool: &Self::BufferPool,
387 result: io::Result<usize>,
388 _: u32,
389 ) -> io::Result<Self::Buffer<'_>> {
390 let result = result?;
391 #[cfg(fusion)]
392 let buffer_pool = buffer_pool.as_poll();
393 let slice = self.op.into_inner();
394 let res = unsafe { buffer_pool.create_proxy(slice, result) };
396 #[cfg(fusion)]
397 let res = BorrowedBuffer::new_poll(res);
398 Ok(res)
399 }
400 }
401}
402
403#[cfg(not(io_uring))]
404pub use managed::*;