1use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, SetLen};
10use pin_project_lite::pin_project;
11use socket2::{SockAddr, SockAddrStorage, socklen_t};
12
13#[cfg(linux_all)]
14pub use crate::sys::op::Splice;
15pub use crate::sys::op::{
16 Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
17 SendToVectored, SendVectored,
18};
19#[cfg(windows)]
20pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
21#[cfg(unix)]
22pub use crate::sys::op::{
23 CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
24 ReadVectored, ReadVectoredAt, Rename, Stat, Symlink, TruncateFile, Unlink, WriteVectored,
25 WriteVectoredAt,
26};
27#[cfg(io_uring)]
28pub use crate::sys::op::{ReadManaged, ReadManagedAt, RecvManaged};
29use crate::{Extra, OwnedFd, SharedFd, TakeBuffer, sys::aio::*};
30
31pub trait BufResultExt {
33 unsafe fn map_advanced(self) -> Self;
39}
40
41pub trait VecBufResultExt {
43 unsafe fn map_vec_advanced(self) -> Self;
49}
50
51impl<T: SetLen + IoBuf> BufResultExt for BufResult<usize, T> {
52 unsafe fn map_advanced(self) -> Self {
53 unsafe {
54 self.map_res(|res| (res, ()))
55 .map_advanced()
56 .map_res(|(res, _)| res)
57 }
58 }
59}
60
61impl<T: SetLen + IoVectoredBuf> VecBufResultExt for BufResult<usize, T> {
62 unsafe fn map_vec_advanced(self) -> Self {
63 unsafe {
64 self.map_res(|res| (res, ()))
65 .map_vec_advanced()
66 .map_res(|(res, _)| res)
67 }
68 }
69}
70
71impl<T: SetLen + IoBuf, O> BufResultExt for BufResult<(usize, O), T> {
72 unsafe fn map_advanced(self) -> Self {
73 self.map(|(init, obj), mut buffer| {
74 unsafe {
75 buffer.advance_to(init);
76 }
77 ((init, obj), buffer)
78 })
79 }
80}
81
82impl<T: SetLen + IoVectoredBuf, O> VecBufResultExt for BufResult<(usize, O), T> {
83 unsafe fn map_vec_advanced(self) -> Self {
84 self.map(|(init, obj), mut buffer| {
85 unsafe {
86 buffer.advance_vec_to(init);
87 }
88 ((init, obj), buffer)
89 })
90 }
91}
92
93impl<T: SetLen + IoBuf, C: SetLen + IoBuf, O> BufResultExt
94 for BufResult<(usize, usize, O), (T, C)>
95{
96 unsafe fn map_advanced(self) -> Self {
97 self.map(
98 |(init_buffer, init_control, obj), (mut buffer, mut control)| {
99 unsafe {
100 buffer.advance_to(init_buffer);
101 control.advance_to(init_control);
102 }
103 ((init_buffer, init_control, obj), (buffer, control))
104 },
105 )
106 }
107}
108
109impl<T: SetLen + IoVectoredBuf, C: SetLen + IoBuf, O> VecBufResultExt
110 for BufResult<(usize, usize, O), (T, C)>
111{
112 unsafe fn map_vec_advanced(self) -> Self {
113 self.map(
114 |(init_buffer, init_control, obj), (mut buffer, mut control)| {
115 unsafe {
116 buffer.advance_vec_to(init_buffer);
117 control.advance_to(init_control);
118 }
119 ((init_buffer, init_control, obj), (buffer, control))
120 },
121 )
122 }
123}
124
125pub trait RecvResultExt {
127 type RecvResult;
129
130 fn map_addr(self) -> Self::RecvResult;
132}
133
134impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t)> {
135 type RecvResult = BufResult<(usize, SockAddr), T>;
136
137 fn map_addr(self) -> Self::RecvResult {
138 self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
139 .map_addr()
140 .map_res(|(res, _, addr)| (res, addr))
141 }
142}
143
144impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t, usize)> {
145 type RecvResult = BufResult<(usize, usize, SockAddr), T>;
146
147 fn map_addr(self) -> Self::RecvResult {
148 self.map2(
149 |res, (buffer, addr_buffer, addr_size, len)| {
150 let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
151 ((res, len, addr), buffer)
152 },
153 |(buffer, ..)| buffer,
154 )
155 }
156}
157
158pub trait ResultTakeBuffer {
160 type BufferPool;
162 type Buffer<'a>;
164
165 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
167}
168
169impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, Extra) {
170 type Buffer<'a> = T::Buffer<'a>;
171 type BufferPool = T::BufferPool;
172
173 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
174 let (BufResult(result, op), extra) = self;
175 op.take_buffer(pool, result, extra.buffer_id()?)
176 }
177}
178
179pin_project! {
180 pub struct Asyncify<F, D> {
182 pub(crate) f: Option<F>,
183 pub(crate) data: Option<D>,
184 _p: PhantomPinned,
185 }
186}
187
188impl<F, D> Asyncify<F, D> {
189 pub fn new(f: F) -> Self {
191 Self {
192 f: Some(f),
193 data: None,
194 _p: PhantomPinned,
195 }
196 }
197}
198
199impl<F, D> IntoInner for Asyncify<F, D> {
200 type Inner = D;
201
202 fn into_inner(mut self) -> Self::Inner {
203 self.data.take().expect("the data should not be None")
204 }
205}
206
207pin_project! {
208 pub struct AsyncifyFd<S, F, D> {
210 pub(crate) fd: SharedFd<S>,
211 pub(crate) f: Option<F>,
212 pub(crate) data: Option<D>,
213 _p: PhantomPinned,
214 }
215}
216
217impl<S, F, D> AsyncifyFd<S, F, D> {
218 pub fn new(fd: SharedFd<S>, f: F) -> Self {
220 Self {
221 fd,
222 f: Some(f),
223 data: None,
224 _p: PhantomPinned,
225 }
226 }
227}
228
229impl<S, F, D> IntoInner for AsyncifyFd<S, F, D> {
230 type Inner = D;
231
232 fn into_inner(mut self) -> Self::Inner {
233 self.data.take().expect("the data should not be None")
234 }
235}
236
237pub struct CloseFile {
239 pub(crate) fd: ManuallyDrop<OwnedFd>,
240}
241
242impl CloseFile {
243 pub fn new(fd: OwnedFd) -> Self {
245 Self {
246 fd: ManuallyDrop::new(fd),
247 }
248 }
249}
250
251pin_project! {
252 #[derive(Debug)]
254 pub struct ReadAt<T: IoBufMut, S> {
255 pub(crate) fd: S,
256 pub(crate) offset: u64,
257 #[pin]
258 pub(crate) buffer: T,
259 pub(crate) aiocb: aiocb,
260 _p: PhantomPinned,
261 }
262}
263
264impl<T: IoBufMut, S> ReadAt<T, S> {
265 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
267 Self {
268 fd,
269 offset,
270 buffer,
271 aiocb: new_aiocb(),
272 _p: PhantomPinned,
273 }
274 }
275}
276
277impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
278 type Inner = T;
279
280 fn into_inner(self) -> Self::Inner {
281 self.buffer
282 }
283}
284
285pin_project! {
286 #[derive(Debug)]
288 pub struct WriteAt<T: IoBuf, S> {
289 pub(crate) fd: S,
290 pub(crate) offset: u64,
291 #[pin]
292 pub(crate) buffer: T,
293 pub(crate) aiocb: aiocb,
294 _p: PhantomPinned,
295 }
296}
297
298impl<T: IoBuf, S> WriteAt<T, S> {
299 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
301 Self {
302 fd,
303 offset,
304 buffer,
305 aiocb: new_aiocb(),
306 _p: PhantomPinned,
307 }
308 }
309}
310
311impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
312 type Inner = T;
313
314 fn into_inner(self) -> Self::Inner {
315 self.buffer
316 }
317}
318
319pin_project! {
320 pub struct Read<T: IoBufMut, S> {
322 pub(crate) fd: S,
323 #[pin]
324 pub(crate) buffer: T,
325 _p: PhantomPinned,
326 }
327}
328
329impl<T: IoBufMut, S> Read<T, S> {
330 pub fn new(fd: S, buffer: T) -> Self {
332 Self {
333 fd,
334 buffer,
335 _p: PhantomPinned,
336 }
337 }
338}
339
340impl<T: IoBufMut, S> IntoInner for Read<T, S> {
341 type Inner = T;
342
343 fn into_inner(self) -> Self::Inner {
344 self.buffer
345 }
346}
347
348pub struct Write<T: IoBuf, S> {
350 pub(crate) fd: S,
351 pub(crate) buffer: T,
352 _p: PhantomPinned,
353}
354
355impl<T: IoBuf, S> Write<T, S> {
356 pub fn new(fd: S, buffer: T) -> Self {
358 Self {
359 fd,
360 buffer,
361 _p: PhantomPinned,
362 }
363 }
364}
365
366impl<T: IoBuf, S> IntoInner for Write<T, S> {
367 type Inner = T;
368
369 fn into_inner(self) -> Self::Inner {
370 self.buffer
371 }
372}
373
374pin_project! {
375 pub struct Sync<S> {
377 pub(crate) fd: S,
378 pub(crate) datasync: bool,
379 pub(crate) aiocb: aiocb,
380 }
381}
382
383impl<S> Sync<S> {
384 pub fn new(fd: S, datasync: bool) -> Self {
388 Self {
389 fd,
390 datasync,
391 aiocb: new_aiocb(),
392 }
393 }
394}
395
396pub struct ShutdownSocket<S> {
398 pub(crate) fd: S,
399 pub(crate) how: Shutdown,
400}
401
402impl<S> ShutdownSocket<S> {
403 pub fn new(fd: S, how: Shutdown) -> Self {
405 Self { fd, how }
406 }
407}
408
409pub struct CloseSocket {
411 pub(crate) fd: ManuallyDrop<OwnedFd>,
412}
413
414impl CloseSocket {
415 pub fn new(fd: OwnedFd) -> Self {
417 Self {
418 fd: ManuallyDrop::new(fd),
419 }
420 }
421}
422
423pub struct Connect<S> {
425 pub(crate) fd: S,
426 pub(crate) addr: SockAddr,
427}
428
429impl<S> Connect<S> {
430 pub fn new(fd: S, addr: SockAddr) -> Self {
432 Self { fd, addr }
433 }
434}
435
436#[cfg(any(not(io_uring), fusion))]
437pub(crate) mod managed {
438 use std::io;
439
440 use compio_buf::IntoInner;
441 use pin_project_lite::pin_project;
442
443 use super::{Read, ReadAt, Recv};
444 use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
445
446 pin_project! {
447 pub struct ReadManagedAt<S> {
449 #[pin]
450 pub(crate) op: ReadAt<OwnedBuffer, S>,
451 }
452 }
453
454 impl<S> ReadManagedAt<S> {
455 pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
457 #[cfg(fusion)]
458 let pool = pool.as_poll();
459 Ok(Self {
460 op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
461 })
462 }
463 }
464
465 impl<S> TakeBuffer for ReadManagedAt<S> {
466 type Buffer<'a> = BorrowedBuffer<'a>;
467 type BufferPool = BufferPool;
468
469 fn take_buffer(
470 self,
471 buffer_pool: &BufferPool,
472 result: io::Result<usize>,
473 _: u16,
474 ) -> io::Result<BorrowedBuffer<'_>> {
475 let result = result?;
476 #[cfg(fusion)]
477 let buffer_pool = buffer_pool.as_poll();
478 let slice = self.op.into_inner();
479 let res = unsafe { buffer_pool.create_proxy(slice, result) };
481 #[cfg(fusion)]
482 let res = BorrowedBuffer::new_poll(res);
483 Ok(res)
484 }
485 }
486
487 pin_project! {
488 pub struct ReadManaged<S> {
490 #[pin]
491 pub(crate) op: Read<OwnedBuffer, S>,
492 }
493 }
494
495 impl<S> ReadManaged<S> {
496 pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
498 #[cfg(fusion)]
499 let pool = pool.as_poll();
500 Ok(Self {
501 op: Read::new(fd, pool.get_buffer(len)?),
502 })
503 }
504 }
505
506 impl<S> TakeBuffer for ReadManaged<S> {
507 type Buffer<'a> = BorrowedBuffer<'a>;
508 type BufferPool = BufferPool;
509
510 fn take_buffer(
511 self,
512 buffer_pool: &Self::BufferPool,
513 result: io::Result<usize>,
514 _: u16,
515 ) -> io::Result<Self::Buffer<'_>> {
516 let result = result?;
517 #[cfg(fusion)]
518 let buffer_pool = buffer_pool.as_poll();
519 let slice = self.op.into_inner();
520 let res = unsafe { buffer_pool.create_proxy(slice, result) };
522 #[cfg(fusion)]
523 let res = BorrowedBuffer::new_poll(res);
524 Ok(res)
525 }
526 }
527
528 pin_project! {
529 pub struct RecvManaged<S> {
534 #[pin]
535 pub(crate) op: Recv<OwnedBuffer, S>,
536 }
537 }
538
539 impl<S> RecvManaged<S> {
540 pub fn new(fd: S, pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
542 #[cfg(fusion)]
543 let pool = pool.as_poll();
544 Ok(Self {
545 op: Recv::new(fd, pool.get_buffer(len)?, flags),
546 })
547 }
548 }
549
550 impl<S> TakeBuffer for RecvManaged<S> {
551 type Buffer<'a> = BorrowedBuffer<'a>;
552 type BufferPool = BufferPool;
553
554 fn take_buffer(
555 self,
556 buffer_pool: &Self::BufferPool,
557 result: io::Result<usize>,
558 _: u16,
559 ) -> io::Result<Self::Buffer<'_>> {
560 let result = result?;
561 #[cfg(fusion)]
562 let buffer_pool = buffer_pool.as_poll();
563 let slice = self.op.into_inner();
564 let res = unsafe { buffer_pool.create_proxy(slice, result) };
566 #[cfg(fusion)]
567 let res = BorrowedBuffer::new_poll(res);
568 Ok(res)
569 }
570 }
571}
572
573#[cfg(not(io_uring))]
574pub use managed::*;