Skip to main content

compio_driver/sys/iocp/
op.rs

1#[cfg(feature = "once_cell_try")]
2use std::sync::OnceLock;
3use std::{
4    io,
5    marker::PhantomPinned,
6    net::Shutdown,
7    os::windows::io::AsRawSocket,
8    pin::Pin,
9    ptr::{null, null_mut, read_unaligned},
10    task::Poll,
11};
12
13use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
14#[cfg(not(feature = "once_cell_try"))]
15use once_cell::sync::OnceCell as OnceLock;
16use pin_project_lite::pin_project;
17use socket2::{SockAddr, SockAddrStorage};
18use windows_sys::{
19    Win32::{
20        Foundation::{
21            CloseHandle, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
22            ERROR_IO_PENDING, ERROR_NETNAME_DELETED, ERROR_NO_DATA, ERROR_NOT_FOUND,
23            ERROR_PIPE_CONNECTED, ERROR_PIPE_NOT_CONNECTED, GetLastError,
24        },
25        Networking::WinSock::{
26            CMSGHDR, LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, LPFN_WSARECVMSG,
27            SD_BOTH, SD_RECEIVE, SD_SEND, SIO_GET_EXTENSION_FUNCTION_POINTER,
28            SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, SOCKADDR_STORAGE,
29            SOL_SOCKET, WSAID_ACCEPTEX, WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS,
30            WSAID_WSARECVMSG, WSAIoctl, WSAMSG, WSARecv, WSARecvFrom, WSASend, WSASendMsg,
31            WSASendTo, closesocket, setsockopt, shutdown, socklen_t,
32        },
33        Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile},
34        System::{
35            IO::{CancelIoEx, DeviceIoControl, OVERLAPPED},
36            Pipes::ConnectNamedPipe,
37        },
38    },
39    core::GUID,
40};
41
42use crate::{AsFd, AsRawFd, OpCode, OpType, RawFd, op::*, sys_slice::*, syscall};
43
44#[inline]
45fn winapi_result(transferred: u32) -> Poll<io::Result<usize>> {
46    let error = unsafe { GetLastError() };
47    assert_ne!(error, 0);
48    match error {
49        ERROR_IO_PENDING => Poll::Pending,
50        ERROR_IO_INCOMPLETE
51        | ERROR_NETNAME_DELETED
52        | ERROR_HANDLE_EOF
53        | ERROR_BROKEN_PIPE
54        | ERROR_PIPE_CONNECTED
55        | ERROR_PIPE_NOT_CONNECTED
56        | ERROR_NO_DATA => Poll::Ready(Ok(transferred as _)),
57        _ => Poll::Ready(Err(io::Error::from_raw_os_error(error as _))),
58    }
59}
60
61#[inline]
62fn win32_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
63    if res == 0 {
64        winapi_result(transferred)
65    } else {
66        Poll::Ready(Ok(transferred as _))
67    }
68}
69
70#[inline]
71fn winsock_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
72    if res != 0 {
73        winapi_result(transferred)
74    } else {
75        Poll::Ready(Ok(transferred as _))
76    }
77}
78
79#[inline]
80fn cancel(handle: RawFd, optr: *mut OVERLAPPED) -> io::Result<()> {
81    match syscall!(BOOL, CancelIoEx(handle as _, optr)) {
82        Ok(_) => Ok(()),
83        Err(e) => {
84            if e.raw_os_error() == Some(ERROR_NOT_FOUND as _) {
85                Ok(())
86            } else {
87                Err(e)
88            }
89        }
90    }
91}
92
93fn get_wsa_fn<F>(handle: RawFd, fguid: GUID) -> io::Result<Option<F>> {
94    let mut fptr = None;
95    let mut returned = 0;
96    syscall!(
97        SOCKET,
98        WSAIoctl(
99            handle as _,
100            SIO_GET_EXTENSION_FUNCTION_POINTER,
101            std::ptr::addr_of!(fguid).cast(),
102            std::mem::size_of_val(&fguid) as _,
103            std::ptr::addr_of_mut!(fptr).cast(),
104            std::mem::size_of::<F>() as _,
105            &mut returned,
106            null_mut(),
107            None,
108        )
109    )?;
110    Ok(fptr)
111}
112
113unsafe impl<
114    D: std::marker::Send + 'static,
115    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
116> OpCode for Asyncify<F, D>
117{
118    fn op_type(&self) -> OpType {
119        OpType::Blocking
120    }
121
122    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
123        let this = self.project();
124        let f = this
125            .f
126            .take()
127            .expect("the operate method could only be called once");
128        let BufResult(res, data) = f();
129        *this.data = Some(data);
130        Poll::Ready(res)
131    }
132}
133
134unsafe impl<
135    S,
136    D: std::marker::Send + 'static,
137    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
138> OpCode for AsyncifyFd<S, F, D>
139{
140    fn op_type(&self) -> OpType {
141        OpType::Blocking
142    }
143
144    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
145        // SAFETY: self won't be moved
146        let this = self.project();
147        let f = this
148            .f
149            .take()
150            .expect("the operate method could only be called once");
151        let BufResult(res, data) = f(this.fd);
152        *this.data = Some(data);
153        Poll::Ready(res)
154    }
155}
156
157unsafe impl OpCode for CloseFile {
158    fn op_type(&self) -> OpType {
159        OpType::Blocking
160    }
161
162    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
163        Poll::Ready(Ok(
164            syscall!(BOOL, CloseHandle(self.fd.as_fd().as_raw_fd()))? as _,
165        ))
166    }
167}
168
169unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
170    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
171        if let Some(overlapped) = unsafe { optr.as_mut() } {
172            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
173            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
174        }
175        let fd = self.fd.as_fd().as_raw_fd();
176        let slice = self.project().buffer.sys_slice_mut();
177        let mut transferred = 0;
178        let res = unsafe {
179            ReadFile(
180                fd,
181                slice.ptr() as _,
182                slice.len() as _,
183                &mut transferred,
184                optr,
185            )
186        };
187        win32_result(res, transferred)
188    }
189
190    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
191        cancel(self.fd.as_fd().as_raw_fd(), optr)
192    }
193}
194
195unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
196    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
197        if let Some(overlapped) = unsafe { optr.as_mut() } {
198            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
199            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
200        }
201        let slice = self.buffer.as_init();
202        let mut transferred = 0;
203        let res = unsafe {
204            WriteFile(
205                self.fd.as_fd().as_raw_fd(),
206                slice.as_ptr() as _,
207                slice.len().try_into().unwrap_or(u32::MAX),
208                &mut transferred,
209                optr,
210            )
211        };
212        win32_result(res, transferred)
213    }
214
215    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
216        cancel(self.fd.as_fd().as_raw_fd(), optr)
217    }
218}
219
220unsafe impl<S: AsFd> OpCode for ReadManagedAt<S> {
221    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
222        unsafe { self.project().op.operate(optr) }
223    }
224
225    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
226        self.project().op.cancel(optr)
227    }
228}
229
230unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
231    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
232        let fd = self.fd.as_fd().as_raw_fd();
233        let slice = self.project().buffer.sys_slice_mut();
234        let mut transferred = 0;
235        let res = unsafe {
236            ReadFile(
237                fd,
238                slice.ptr() as _,
239                slice.len() as _,
240                &mut transferred,
241                optr,
242            )
243        };
244        win32_result(res, transferred)
245    }
246
247    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
248        cancel(self.fd.as_fd().as_raw_fd(), optr)
249    }
250}
251
252unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
253    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
254        let slice = self.buffer.as_init();
255        let mut transferred = 0;
256        let res = unsafe {
257            WriteFile(
258                self.fd.as_fd().as_raw_fd(),
259                slice.as_ptr() as _,
260                slice.len().try_into().unwrap_or(u32::MAX),
261                &mut transferred,
262                optr,
263            )
264        };
265        win32_result(res, transferred)
266    }
267
268    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
269        cancel(self.fd.as_fd().as_raw_fd(), optr)
270    }
271}
272
273unsafe impl<S: AsFd> OpCode for ReadManaged<S> {
274    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
275        unsafe { self.project().op.operate(optr) }
276    }
277
278    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
279        self.project().op.cancel(optr)
280    }
281}
282
283unsafe impl<S: AsFd> OpCode for Sync<S> {
284    fn op_type(&self) -> OpType {
285        OpType::Blocking
286    }
287
288    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
289        Poll::Ready(Ok(
290            syscall!(BOOL, FlushFileBuffers(self.fd.as_fd().as_raw_fd()))? as _,
291        ))
292    }
293}
294
295unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
296    fn op_type(&self) -> OpType {
297        OpType::Blocking
298    }
299
300    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
301        let how = match self.how {
302            Shutdown::Write => SD_SEND,
303            Shutdown::Read => SD_RECEIVE,
304            Shutdown::Both => SD_BOTH,
305        };
306        Poll::Ready(Ok(
307            syscall!(SOCKET, shutdown(self.fd.as_fd().as_raw_fd() as _, how))? as _,
308        ))
309    }
310}
311
312unsafe impl OpCode for CloseSocket {
313    fn op_type(&self) -> OpType {
314        OpType::Blocking
315    }
316
317    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
318        Poll::Ready(Ok(
319            syscall!(SOCKET, closesocket(self.fd.as_fd().as_raw_fd() as _))? as _,
320        ))
321    }
322}
323
324static ACCEPT_EX: OnceLock<LPFN_ACCEPTEX> = OnceLock::new();
325static GET_ADDRS: OnceLock<LPFN_GETACCEPTEXSOCKADDRS> = OnceLock::new();
326
327const ACCEPT_ADDR_BUFFER_SIZE: usize = std::mem::size_of::<SOCKADDR_STORAGE>() + 16;
328const ACCEPT_BUFFER_SIZE: usize = ACCEPT_ADDR_BUFFER_SIZE * 2;
329
330pin_project! {
331    /// Accept a connection.
332    pub struct Accept<S> {
333        pub(crate) fd: S,
334        pub(crate) accept_fd: socket2::Socket,
335        #[pin]
336        pub(crate) buffer: [u8; ACCEPT_BUFFER_SIZE],
337        _p: PhantomPinned,
338    }
339}
340
341impl<S> Accept<S> {
342    /// Create [`Accept`]. `accept_fd` should not be bound.
343    pub fn new(fd: S, accept_fd: socket2::Socket) -> Self {
344        Self {
345            fd,
346            accept_fd,
347            buffer: [0u8; ACCEPT_BUFFER_SIZE],
348            _p: PhantomPinned,
349        }
350    }
351}
352
353impl<S: AsFd> Accept<S> {
354    /// Update accept context.
355    pub fn update_context(&self) -> io::Result<()> {
356        let fd = self.fd.as_fd().as_raw_fd();
357        syscall!(
358            SOCKET,
359            setsockopt(
360                self.accept_fd.as_raw_socket() as _,
361                SOL_SOCKET,
362                SO_UPDATE_ACCEPT_CONTEXT,
363                &fd as *const _ as _,
364                std::mem::size_of_val(&fd) as _,
365            )
366        )?;
367        Ok(())
368    }
369
370    /// Get the remote address from the inner buffer.
371    pub fn into_addr(self) -> io::Result<(socket2::Socket, SockAddr)> {
372        let get_addrs_fn = GET_ADDRS
373            .get_or_try_init(|| {
374                get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_GETACCEPTEXSOCKADDRS)
375            })?
376            .ok_or_else(|| {
377                io::Error::new(
378                    io::ErrorKind::Unsupported,
379                    "cannot retrieve GetAcceptExSockAddrs",
380                )
381            })?;
382        let mut local_addr: *mut SOCKADDR = null_mut();
383        let mut local_addr_len = 0;
384        let mut remote_addr: *mut SOCKADDR = null_mut();
385        let mut remote_addr_len = 0;
386        unsafe {
387            get_addrs_fn(
388                &self.buffer as *const _ as *const _,
389                0,
390                ACCEPT_ADDR_BUFFER_SIZE as _,
391                ACCEPT_ADDR_BUFFER_SIZE as _,
392                &mut local_addr,
393                &mut local_addr_len,
394                &mut remote_addr,
395                &mut remote_addr_len,
396            );
397        }
398        Ok((self.accept_fd, unsafe {
399            SockAddr::new(
400                // SAFETY: the buffer is large enough to hold the address
401                std::mem::transmute::<SOCKADDR_STORAGE, SockAddrStorage>(read_unaligned(
402                    remote_addr.cast::<SOCKADDR_STORAGE>(),
403                )),
404                remote_addr_len,
405            )
406        }))
407    }
408}
409
410unsafe impl<S: AsFd> OpCode for Accept<S> {
411    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
412        let accept_fn = ACCEPT_EX
413            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_ACCEPTEX))?
414            .ok_or_else(|| {
415                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve AcceptEx")
416            })?;
417        let mut received = 0;
418        let this = self.project();
419        let res = unsafe {
420            accept_fn(
421                this.fd.as_fd().as_raw_fd() as _,
422                this.accept_fd.as_raw_socket() as _,
423                this.buffer.sys_slice_mut().ptr() as _,
424                0,
425                ACCEPT_ADDR_BUFFER_SIZE as _,
426                ACCEPT_ADDR_BUFFER_SIZE as _,
427                &mut received,
428                optr,
429            )
430        };
431        win32_result(res, received)
432    }
433
434    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
435        cancel(self.fd.as_fd().as_raw_fd(), optr)
436    }
437}
438
439static CONNECT_EX: OnceLock<LPFN_CONNECTEX> = OnceLock::new();
440
441impl<S: AsFd> Connect<S> {
442    /// Update connect context.
443    pub fn update_context(&self) -> io::Result<()> {
444        syscall!(
445            SOCKET,
446            setsockopt(
447                self.fd.as_fd().as_raw_fd() as _,
448                SOL_SOCKET,
449                SO_UPDATE_CONNECT_CONTEXT,
450                null(),
451                0,
452            )
453        )?;
454        Ok(())
455    }
456}
457
458unsafe impl<S: AsFd> OpCode for Connect<S> {
459    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
460        let connect_fn = CONNECT_EX
461            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_CONNECTEX))?
462            .ok_or_else(|| {
463                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve ConnectEx")
464            })?;
465        let mut sent = 0;
466        let res = unsafe {
467            connect_fn(
468                self.fd.as_fd().as_raw_fd() as _,
469                self.addr.as_ptr().cast(),
470                self.addr.len(),
471                null(),
472                0,
473                &mut sent,
474                optr,
475            )
476        };
477        win32_result(res, sent)
478    }
479
480    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
481        cancel(self.fd.as_fd().as_raw_fd(), optr)
482    }
483}
484
485pin_project! {
486    /// Receive data from remote.
487    ///
488    /// It is only used for socket operations. If you want to read from a pipe, use
489    /// [`Read`].
490    pub struct Recv<T: IoBufMut, S> {
491        pub(crate) fd: S,
492        #[pin]
493        pub(crate) buffer: T,
494        pub(crate) flags: i32,
495        pub(crate) slice: SysSlice,
496        _p: PhantomPinned,
497    }
498}
499
500impl<T: IoBufMut, S> Recv<T, S> {
501    /// Create [`Recv`].
502    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
503        Self {
504            fd,
505            buffer,
506            flags,
507            slice: SysSlice::null(),
508            _p: PhantomPinned,
509        }
510    }
511}
512
513impl<T: IoBufMut, S> IntoInner for Recv<T, S> {
514    type Inner = T;
515
516    fn into_inner(self) -> Self::Inner {
517        self.buffer
518    }
519}
520
521unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
522    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
523        let this = self.project();
524        let fd = this.fd.as_fd().as_raw_fd();
525        let mut flags = *this.flags as _;
526        *this.slice = this.buffer.sys_slice_mut();
527        let mut received = 0;
528        let res = unsafe {
529            WSARecv(
530                fd as _,
531                (this.slice as *const SysSlice).cast(),
532                1,
533                &mut received,
534                &mut flags,
535                optr,
536                None,
537            )
538        };
539        winsock_result(res, received)
540    }
541
542    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
543        cancel(self.fd.as_fd().as_raw_fd(), optr)
544    }
545}
546
547unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
548    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
549        unsafe { self.project().op.operate(optr) }
550    }
551
552    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
553        self.project().op.cancel(optr)
554    }
555}
556
557unsafe impl<S: AsFd> OpCode for RecvFromManaged<S> {
558    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
559        unsafe { self.project().op.operate(optr) }
560    }
561
562    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
563        self.project().op.cancel(optr)
564    }
565}
566
567pin_project! {
568    /// Receive data from remote into vectored buffer.
569    pub struct RecvVectored<T: IoVectoredBufMut, S> {
570        pub(crate) fd: S,
571        #[pin]
572        pub(crate) buffer: T,
573        pub(crate) flags: i32,
574        pub(crate) slices: Vec<SysSlice>,
575        _p: PhantomPinned,
576    }
577}
578
579impl<T: IoVectoredBufMut, S> RecvVectored<T, S> {
580    /// Create [`RecvVectored`].
581    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
582        Self {
583            fd,
584            buffer,
585            flags,
586            slices: vec![],
587            _p: PhantomPinned,
588        }
589    }
590}
591
592impl<T: IoVectoredBufMut, S> IntoInner for RecvVectored<T, S> {
593    type Inner = T;
594
595    fn into_inner(self) -> Self::Inner {
596        self.buffer
597    }
598}
599
600unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
601    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
602        let this = self.project();
603        let fd = this.fd.as_fd().as_raw_fd();
604        let mut flags = *this.flags as _;
605        *this.slices = this.buffer.sys_slices_mut();
606        let mut received = 0;
607        let res = unsafe {
608            WSARecv(
609                fd as _,
610                this.slices.as_ptr() as _,
611                this.slices.len() as _,
612                &mut received,
613                &mut flags,
614                optr,
615                None,
616            )
617        };
618        winsock_result(res, received)
619    }
620
621    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
622        cancel(self.fd.as_fd().as_raw_fd(), optr)
623    }
624}
625
626pin_project! {
627    /// Send data to remote.
628    ///
629    /// It is only used for socket operations. If you want to write to a pipe, use
630    /// [`Write`].
631    pub struct Send<T: IoBuf, S> {
632        pub(crate) fd: S,
633        #[pin]
634        pub(crate) buffer: T,
635        pub(crate) flags: i32,
636        pub(crate) slice: SysSlice,
637        _p: PhantomPinned,
638    }
639}
640
641impl<T: IoBuf, S> Send<T, S> {
642    /// Create [`Send`].
643    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
644        Self {
645            fd,
646            buffer,
647            flags,
648            slice: SysSlice::null(),
649            _p: PhantomPinned,
650        }
651    }
652}
653
654impl<T: IoBuf, S> IntoInner for Send<T, S> {
655    type Inner = T;
656
657    fn into_inner(self) -> Self::Inner {
658        self.buffer
659    }
660}
661
662unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
663    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
664        let this = self.project();
665        *this.slice = this.buffer.as_ref().sys_slice();
666        let mut sent = 0;
667        let res = unsafe {
668            WSASend(
669                this.fd.as_fd().as_raw_fd() as _,
670                (this.slice as *const SysSlice).cast(),
671                1,
672                &mut sent,
673                *this.flags as _,
674                optr,
675                None,
676            )
677        };
678        winsock_result(res, sent)
679    }
680
681    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
682        cancel(self.fd.as_fd().as_raw_fd(), optr)
683    }
684}
685
686pin_project! {
687    /// Send data to remote from vectored buffer.
688    pub struct SendVectored<T: IoVectoredBuf, S> {
689        pub(crate) fd: S,
690        #[pin]
691        pub(crate) buffer: T,
692        pub(crate) flags: i32,
693        pub(crate) slices: Vec<SysSlice>,
694        _p: PhantomPinned,
695    }
696}
697
698impl<T: IoVectoredBuf, S> SendVectored<T, S> {
699    /// Create [`SendVectored`].
700    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
701        Self {
702            fd,
703            buffer,
704            flags,
705            slices: vec![],
706            _p: PhantomPinned,
707        }
708    }
709}
710
711impl<T: IoVectoredBuf, S> IntoInner for SendVectored<T, S> {
712    type Inner = T;
713
714    fn into_inner(self) -> Self::Inner {
715        self.buffer
716    }
717}
718
719unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
720    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
721        let this = self.project();
722        *this.slices = this.buffer.as_ref().sys_slices();
723        let mut sent = 0;
724        let res = unsafe {
725            WSASend(
726                this.fd.as_fd().as_raw_fd() as _,
727                this.slices.as_ptr() as _,
728                this.slices.len() as _,
729                &mut sent,
730                *this.flags as _,
731                optr,
732                None,
733            )
734        };
735        winsock_result(res, sent)
736    }
737
738    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
739        cancel(self.fd.as_fd().as_raw_fd(), optr)
740    }
741}
742
743pin_project! {
744    /// Receive data and source address.
745    pub struct RecvFrom<T: IoBufMut, S> {
746        pub(crate) fd: S,
747        #[pin]
748        pub(crate) buffer: T,
749        pub(crate) addr: SockAddrStorage,
750        pub(crate) addr_len: socklen_t,
751        pub(crate) flags: i32,
752        pub(crate) slice: SysSlice,
753        _p: PhantomPinned,
754    }
755}
756
757impl<T: IoBufMut, S> RecvFrom<T, S> {
758    /// Create [`RecvFrom`].
759    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
760        let addr = SockAddrStorage::zeroed();
761        let addr_len = addr.size_of();
762        Self {
763            fd,
764            buffer,
765            addr,
766            addr_len,
767            flags,
768            slice: SysSlice::null(),
769            _p: PhantomPinned,
770        }
771    }
772}
773
774impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
775    type Inner = (T, SockAddrStorage, socklen_t);
776
777    fn into_inner(self) -> Self::Inner {
778        (self.buffer, self.addr, self.addr_len)
779    }
780}
781
782unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
783    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
784        let this = self.project();
785        let fd = this.fd.as_fd().as_raw_fd();
786        *this.slice = this.buffer.sys_slice_mut();
787        let mut flags = *this.flags as _;
788        let mut received = 0;
789        let res = unsafe {
790            WSARecvFrom(
791                fd as _,
792                (this.slice as *const SysSlice).cast(),
793                1,
794                &mut received,
795                &mut flags,
796                this.addr as *mut _ as *mut SOCKADDR,
797                this.addr_len,
798                optr,
799                None,
800            )
801        };
802        winsock_result(res, received)
803    }
804
805    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
806        cancel(self.fd.as_fd().as_raw_fd(), optr)
807    }
808}
809
810pin_project! {
811    /// Receive data and source address into vectored buffer.
812    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
813        pub(crate) fd: S,
814        #[pin]
815        pub(crate) buffer: T,
816        pub(crate) addr: SockAddrStorage,
817        pub(crate) addr_len: socklen_t,
818        pub(crate) flags: i32,
819        pub(crate) slices: Vec<SysSlice>,
820        _p: PhantomPinned,
821    }
822}
823
824impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
825    /// Create [`RecvFromVectored`].
826    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
827        let addr = SockAddrStorage::zeroed();
828        let addr_len = addr.size_of();
829        Self {
830            fd,
831            buffer,
832            addr,
833            addr_len,
834            flags,
835            slices: vec![],
836            _p: PhantomPinned,
837        }
838    }
839}
840
841impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
842    type Inner = (T, SockAddrStorage, socklen_t);
843
844    fn into_inner(self) -> Self::Inner {
845        (self.buffer, self.addr, self.addr_len)
846    }
847}
848
849unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
850    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
851        let this = self.project();
852        let fd = this.fd.as_fd().as_raw_fd();
853        *this.slices = this.buffer.sys_slices_mut();
854        let mut flags = *this.flags as _;
855        let mut received = 0;
856        let res = unsafe {
857            WSARecvFrom(
858                fd as _,
859                this.slices.as_ptr() as _,
860                this.slices.len() as _,
861                &mut received,
862                &mut flags,
863                this.addr as *mut _ as *mut SOCKADDR,
864                this.addr_len,
865                optr,
866                None,
867            )
868        };
869        winsock_result(res, received)
870    }
871
872    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
873        cancel(self.fd.as_fd().as_raw_fd(), optr)
874    }
875}
876
877pin_project! {
878    /// Send data to specified address from buffer.
879    pub struct SendTo<T: IoBuf, S> {
880        pub(crate) fd: S,
881        #[pin]
882        pub(crate) buffer: T,
883        pub(crate) addr: SockAddr,
884        pub(crate) flags: i32,
885        pub(crate) slice: SysSlice,
886        _p: PhantomPinned,
887    }
888}
889
890impl<T: IoBuf, S> SendTo<T, S> {
891    /// Create [`SendTo`].
892    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
893        Self {
894            fd,
895            buffer,
896            addr,
897            flags,
898            slice: SysSlice::null(),
899            _p: PhantomPinned,
900        }
901    }
902}
903
904impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
905    type Inner = T;
906
907    fn into_inner(self) -> Self::Inner {
908        self.buffer
909    }
910}
911
912unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
913    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
914        let this = self.project();
915        *this.slice = this.buffer.as_ref().sys_slice();
916        let mut sent = 0;
917        let res = unsafe {
918            WSASendTo(
919                this.fd.as_fd().as_raw_fd() as _,
920                (this.slice as *const SysSlice).cast(),
921                1,
922                &mut sent,
923                *this.flags as _,
924                this.addr.as_ptr().cast(),
925                this.addr.len(),
926                optr,
927                None,
928            )
929        };
930        winsock_result(res, sent)
931    }
932
933    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
934        cancel(self.fd.as_fd().as_raw_fd(), optr)
935    }
936}
937
938pin_project! {
939    /// Send data to specified address from vectored buffer.
940    pub struct SendToVectored<T: IoVectoredBuf, S> {
941        pub(crate) fd: S,
942        #[pin]
943        pub(crate) buffer: T,
944        pub(crate) addr: SockAddr,
945        pub(crate) flags: i32,
946        pub(crate) slices: Vec<SysSlice>,
947        _p: PhantomPinned,
948    }
949}
950
951impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
952    /// Create [`SendToVectored`].
953    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
954        Self {
955            fd,
956            buffer,
957            addr,
958            flags,
959            slices: vec![],
960            _p: PhantomPinned,
961        }
962    }
963}
964
965impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
966    type Inner = T;
967
968    fn into_inner(self) -> Self::Inner {
969        self.buffer
970    }
971}
972
973unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
974    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
975        let this = self.project();
976        *this.slices = this.buffer.as_ref().sys_slices();
977        let mut sent = 0;
978        let res = unsafe {
979            WSASendTo(
980                this.fd.as_fd().as_raw_fd() as _,
981                this.slices.as_ptr() as _,
982                this.slices.len() as _,
983                &mut sent,
984                *this.flags as _,
985                this.addr.as_ptr().cast(),
986                this.addr.len(),
987                optr,
988                None,
989            )
990        };
991        winsock_result(res, sent)
992    }
993
994    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
995        cancel(self.fd.as_fd().as_raw_fd(), optr)
996    }
997}
998
999static WSA_RECVMSG: OnceLock<LPFN_WSARECVMSG> = OnceLock::new();
1000
1001pin_project! {
1002    /// Receive data and source address with ancillary data into vectored buffer.
1003    pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
1004        msg: WSAMSG,
1005        addr: SockAddrStorage,
1006        fd: S,
1007        #[pin]
1008        buffer: T,
1009        #[pin]
1010        control: C,
1011        slices: Vec<SysSlice>,
1012        _p: PhantomPinned,
1013    }
1014}
1015
1016impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
1017    /// Create [`RecvMsg`].
1018    ///
1019    /// # Panics
1020    ///
1021    /// This function will panic if the control message buffer is misaligned.
1022    pub fn new(fd: S, buffer: T, control: C, flags: i32) -> Self {
1023        assert!(
1024            control.buf_ptr().cast::<CMSGHDR>().is_aligned(),
1025            "misaligned control message buffer"
1026        );
1027        let addr = SockAddrStorage::zeroed();
1028        let mut msg: WSAMSG = unsafe { std::mem::zeroed() };
1029        msg.dwFlags = flags as _;
1030        Self {
1031            msg,
1032            addr,
1033            fd,
1034            buffer,
1035            control,
1036            slices: vec![],
1037            _p: PhantomPinned,
1038        }
1039    }
1040}
1041
1042impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
1043    type Inner = ((T, C), SockAddrStorage, socklen_t, usize);
1044
1045    fn into_inner(self) -> Self::Inner {
1046        (
1047            (self.buffer, self.control),
1048            self.addr,
1049            self.msg.namelen,
1050            self.msg.Control.len as _,
1051        )
1052    }
1053}
1054
1055unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1056    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
1057        let recvmsg_fn = WSA_RECVMSG
1058            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_WSARECVMSG))?
1059            .ok_or_else(|| {
1060                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve WSARecvMsg")
1061            })?;
1062
1063        let this = self.project();
1064
1065        *this.slices = this.buffer.sys_slices_mut();
1066        this.msg.name = this.addr as *mut _ as _;
1067        this.msg.namelen = this.addr.size_of() as _;
1068        this.msg.lpBuffers = this.slices.as_mut_ptr() as _;
1069        this.msg.dwBufferCount = this.slices.len() as _;
1070        this.msg.Control = this.control.sys_slice_mut().into_inner();
1071
1072        let mut received = 0;
1073        let res = unsafe {
1074            recvmsg_fn(
1075                this.fd.as_fd().as_raw_fd() as _,
1076                this.msg,
1077                &mut received,
1078                optr,
1079                None,
1080            )
1081        };
1082        winsock_result(res, received)
1083    }
1084
1085    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1086        cancel(self.fd.as_fd().as_raw_fd(), optr)
1087    }
1088}
1089
1090pin_project! {
1091    /// Send data to specified address accompanied by ancillary data from vectored
1092    /// buffer.
1093    pub struct SendMsg<T: IoVectoredBuf, C: IoBuf, S> {
1094        msg: WSAMSG,
1095        fd: S,
1096        #[pin]
1097        buffer: T,
1098        #[pin]
1099        control: C,
1100        addr: SockAddr,
1101        slices: Vec<SysSlice>,
1102        flags: i32,
1103        _p: PhantomPinned,
1104    }
1105}
1106
1107impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
1108    /// Create [`SendMsg`].
1109    ///
1110    /// # Panics
1111    ///
1112    /// This function will panic if the control message buffer is misaligned.
1113    pub fn new(fd: S, buffer: T, control: C, addr: SockAddr, flags: i32) -> Self {
1114        assert!(
1115            control.buf_ptr().cast::<CMSGHDR>().is_aligned(),
1116            "misaligned control message buffer"
1117        );
1118        Self {
1119            msg: unsafe { std::mem::zeroed() },
1120            fd,
1121            buffer,
1122            control,
1123            addr,
1124            slices: vec![],
1125            flags,
1126            _p: PhantomPinned,
1127        }
1128    }
1129}
1130
1131impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsg<T, C, S> {
1132    type Inner = (T, C);
1133
1134    fn into_inner(self) -> Self::Inner {
1135        (self.buffer, self.control)
1136    }
1137}
1138
1139unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1140    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
1141        let this = self.project();
1142
1143        *this.slices = this.buffer.as_ref().sys_slices();
1144        let control = this.control.as_ref().sys_slice();
1145        *this.msg = WSAMSG {
1146            name: this.addr.as_ptr() as _,
1147            namelen: this.addr.len(),
1148            lpBuffers: this.slices.as_ptr() as _,
1149            dwBufferCount: this.slices.len() as _,
1150            Control: control.into_inner(),
1151            dwFlags: 0,
1152        };
1153
1154        let mut sent = 0;
1155        let res = unsafe {
1156            WSASendMsg(
1157                this.fd.as_fd().as_raw_fd() as _,
1158                this.msg,
1159                *this.flags as _,
1160                &mut sent,
1161                optr,
1162                None,
1163            )
1164        };
1165        winsock_result(res, sent)
1166    }
1167
1168    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1169        cancel(self.fd.as_fd().as_raw_fd(), optr)
1170    }
1171}
1172
1173pin_project! {
1174    /// Connect to a named pipe.
1175    pub struct ConnectNamedPipe<S> {
1176        pub(crate) fd: S,
1177    }
1178}
1179
1180impl<S> ConnectNamedPipe<S> {
1181    /// Create [`ConnectNamedPipe`](struct@ConnectNamedPipe).
1182    pub fn new(fd: S) -> Self {
1183        Self { fd }
1184    }
1185}
1186
1187unsafe impl<S: AsFd> OpCode for ConnectNamedPipe<S> {
1188    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
1189        let res = unsafe { ConnectNamedPipe(self.fd.as_fd().as_raw_fd() as _, optr) };
1190        win32_result(res, 0)
1191    }
1192
1193    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1194        cancel(self.fd.as_fd().as_raw_fd(), optr)
1195    }
1196}
1197
1198pin_project! {
1199    /// Send a control code to a device.
1200    pub struct DeviceIoControl<S, I: IoBuf, O: IoBufMut> {
1201        pub(crate) fd: S,
1202        pub(crate) ioctl_code: u32,
1203        #[pin]
1204        pub(crate) input_buffer: Option<I>,
1205        #[pin]
1206        pub(crate) output_buffer: Option<O>,
1207        _p: PhantomPinned,
1208    }
1209}
1210
1211impl<S, I: IoBuf, O: IoBufMut> DeviceIoControl<S, I, O> {
1212    /// Create [`DeviceIoControl`].
1213    pub fn new(fd: S, ioctl_code: u32, input_buffer: Option<I>, output_buffer: Option<O>) -> Self {
1214        Self {
1215            fd,
1216            ioctl_code,
1217            input_buffer,
1218            output_buffer,
1219            _p: PhantomPinned,
1220        }
1221    }
1222}
1223
1224impl<S, I: IoBuf, O: IoBufMut> IntoInner for DeviceIoControl<S, I, O> {
1225    type Inner = (Option<I>, Option<O>);
1226
1227    fn into_inner(self) -> Self::Inner {
1228        (self.input_buffer, self.output_buffer)
1229    }
1230}
1231
1232unsafe impl<S: AsFd, I: IoBuf, O: IoBufMut> OpCode for DeviceIoControl<S, I, O> {
1233    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
1234        let mut this = self.project();
1235
1236        let fd = this.fd.as_fd().as_raw_fd();
1237
1238        let input = this
1239            .input_buffer
1240            .as_ref()
1241            .as_pin_ref()
1242            .map_or(SysSlice::null(), |x| x.sys_slice());
1243        let output = this
1244            .output_buffer
1245            .as_mut()
1246            .as_pin_mut()
1247            .map_or(SysSlice::null(), |x| x.sys_slice_mut());
1248
1249        let mut transferred = 0;
1250        let res = unsafe {
1251            DeviceIoControl(
1252                fd,
1253                *this.ioctl_code,
1254                input.ptr() as _,
1255                input.len() as _,
1256                output.ptr() as _,
1257                output.len() as _,
1258                &mut transferred,
1259                optr,
1260            )
1261        };
1262        win32_result(res, transferred)
1263    }
1264
1265    fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1266        cancel(self.fd.as_fd().as_raw_fd(), optr)
1267    }
1268}