compio_driver/iocp/
op.rs

1#[cfg(feature = "once_cell_try")]
2use std::sync::OnceLock;
3use std::{
4    io,
5    marker::PhantomPinned,
6    net::Shutdown,
7    os::windows::io::AsRawSocket,
8    pin::Pin,
9    ptr::{null, null_mut, read_unaligned},
10    task::Poll,
11};
12
13use aligned_array::{A8, Aligned};
14use compio_buf::{
15    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
16};
17#[cfg(not(feature = "once_cell_try"))]
18use once_cell::sync::OnceCell as OnceLock;
19use socket2::SockAddr;
20use windows_sys::{
21    Win32::{
22        Foundation::{
23            CloseHandle, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
24            ERROR_IO_PENDING, ERROR_NETNAME_DELETED, ERROR_NO_DATA, ERROR_NOT_FOUND,
25            ERROR_PIPE_CONNECTED, ERROR_PIPE_NOT_CONNECTED, GetLastError,
26        },
27        Networking::WinSock::{
28            CMSGHDR, LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, LPFN_WSARECVMSG,
29            SD_BOTH, SD_RECEIVE, SD_SEND, SIO_GET_EXTENSION_FUNCTION_POINTER,
30            SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, SOCKADDR_STORAGE,
31            SOL_SOCKET, WSABUF, WSAID_ACCEPTEX, WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS,
32            WSAID_WSARECVMSG, WSAIoctl, WSAMSG, WSARecv, WSARecvFrom, WSASend, WSASendMsg,
33            WSASendTo, closesocket, setsockopt, shutdown, socklen_t,
34        },
35        Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile},
36        System::{
37            IO::{CancelIoEx, DeviceIoControl, OVERLAPPED},
38            Pipes::ConnectNamedPipe,
39        },
40    },
41    core::GUID,
42};
43
44use crate::{AsFd, AsRawFd, OpCode, OpType, RawFd, op::*, syscall};
45
46#[inline]
47fn winapi_result(transferred: u32) -> Poll<io::Result<usize>> {
48    let error = unsafe { GetLastError() };
49    assert_ne!(error, 0);
50    match error {
51        ERROR_IO_PENDING => Poll::Pending,
52        ERROR_IO_INCOMPLETE
53        | ERROR_NETNAME_DELETED
54        | ERROR_HANDLE_EOF
55        | ERROR_BROKEN_PIPE
56        | ERROR_PIPE_CONNECTED
57        | ERROR_PIPE_NOT_CONNECTED
58        | ERROR_NO_DATA => Poll::Ready(Ok(transferred as _)),
59        _ => Poll::Ready(Err(io::Error::from_raw_os_error(error as _))),
60    }
61}
62
63#[inline]
64fn win32_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
65    if res == 0 {
66        winapi_result(transferred)
67    } else {
68        Poll::Ready(Ok(transferred as _))
69    }
70}
71
72#[inline]
73fn winsock_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
74    if res != 0 {
75        winapi_result(transferred)
76    } else {
77        Poll::Ready(Ok(transferred as _))
78    }
79}
80
81#[inline]
82fn cancel(handle: RawFd, optr: *mut OVERLAPPED) -> io::Result<()> {
83    match syscall!(BOOL, CancelIoEx(handle as _, optr)) {
84        Ok(_) => Ok(()),
85        Err(e) => {
86            if e.raw_os_error() == Some(ERROR_NOT_FOUND as _) {
87                Ok(())
88            } else {
89                Err(e)
90            }
91        }
92    }
93}
94
95fn get_wsa_fn<F>(handle: RawFd, fguid: GUID) -> io::Result<Option<F>> {
96    let mut fptr = None;
97    let mut returned = 0;
98    syscall!(
99        SOCKET,
100        WSAIoctl(
101            handle as _,
102            SIO_GET_EXTENSION_FUNCTION_POINTER,
103            std::ptr::addr_of!(fguid).cast(),
104            std::mem::size_of_val(&fguid) as _,
105            std::ptr::addr_of_mut!(fptr).cast(),
106            std::mem::size_of::<F>() as _,
107            &mut returned,
108            null_mut(),
109            None,
110        )
111    )?;
112    Ok(fptr)
113}
114
115impl<
116    D: std::marker::Send + 'static,
117    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
118> OpCode for Asyncify<F, D>
119{
120    fn op_type(&self) -> OpType {
121        OpType::Blocking
122    }
123
124    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
125        // Safety: self won't be moved
126        let this = self.get_unchecked_mut();
127        let f = this
128            .f
129            .take()
130            .expect("the operate method could only be called once");
131        let BufResult(res, data) = f();
132        this.data = Some(data);
133        Poll::Ready(res)
134    }
135}
136
137impl OpCode for CloseFile {
138    fn op_type(&self) -> OpType {
139        OpType::Blocking
140    }
141
142    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
143        Poll::Ready(Ok(
144            syscall!(BOOL, CloseHandle(self.fd.as_fd().as_raw_fd()))? as _,
145        ))
146    }
147}
148
149impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
150    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
151        if let Some(overlapped) = optr.as_mut() {
152            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
153            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
154        }
155        let fd = self.fd.as_fd().as_raw_fd();
156        let slice = self.get_unchecked_mut().buffer.as_mut_slice();
157        let mut transferred = 0;
158        let res = ReadFile(
159            fd,
160            slice.as_mut_ptr() as _,
161            slice.len() as _,
162            &mut transferred,
163            optr,
164        );
165        win32_result(res, transferred)
166    }
167
168    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
169        cancel(self.fd.as_fd().as_raw_fd(), optr)
170    }
171}
172
173impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
174    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
175        if let Some(overlapped) = optr.as_mut() {
176            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
177            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
178        }
179        let slice = self.buffer.as_slice();
180        let mut transferred = 0;
181        let res = WriteFile(
182            self.fd.as_fd().as_raw_fd(),
183            slice.as_ptr() as _,
184            slice.len() as _,
185            &mut transferred,
186            optr,
187        );
188        win32_result(res, transferred)
189    }
190
191    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
192        cancel(self.fd.as_fd().as_raw_fd(), optr)
193    }
194}
195
196impl<S: AsFd> OpCode for ReadManagedAt<S> {
197    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
198        self.map_unchecked_mut(|this| &mut this.op).operate(optr)
199    }
200
201    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
202        self.map_unchecked_mut(|this| &mut this.op).cancel(optr)
203    }
204}
205
206impl<S: AsFd> OpCode for Sync<S> {
207    fn op_type(&self) -> OpType {
208        OpType::Blocking
209    }
210
211    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
212        Poll::Ready(Ok(
213            syscall!(BOOL, FlushFileBuffers(self.fd.as_fd().as_raw_fd()))? as _,
214        ))
215    }
216}
217
218impl<S: AsFd> OpCode for ShutdownSocket<S> {
219    fn op_type(&self) -> OpType {
220        OpType::Blocking
221    }
222
223    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
224        let how = match self.how {
225            Shutdown::Write => SD_SEND,
226            Shutdown::Read => SD_RECEIVE,
227            Shutdown::Both => SD_BOTH,
228        };
229        Poll::Ready(Ok(
230            syscall!(SOCKET, shutdown(self.fd.as_fd().as_raw_fd() as _, how))? as _,
231        ))
232    }
233}
234
235impl OpCode for CloseSocket {
236    fn op_type(&self) -> OpType {
237        OpType::Blocking
238    }
239
240    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
241        Poll::Ready(Ok(
242            syscall!(SOCKET, closesocket(self.fd.as_fd().as_raw_fd() as _))? as _,
243        ))
244    }
245}
246
247static ACCEPT_EX: OnceLock<LPFN_ACCEPTEX> = OnceLock::new();
248static GET_ADDRS: OnceLock<LPFN_GETACCEPTEXSOCKADDRS> = OnceLock::new();
249
250const ACCEPT_ADDR_BUFFER_SIZE: usize = std::mem::size_of::<SOCKADDR_STORAGE>() + 16;
251const ACCEPT_BUFFER_SIZE: usize = ACCEPT_ADDR_BUFFER_SIZE * 2;
252
253/// Accept a connection.
254pub struct Accept<S> {
255    pub(crate) fd: S,
256    pub(crate) accept_fd: socket2::Socket,
257    pub(crate) buffer: Aligned<A8, [u8; ACCEPT_BUFFER_SIZE]>,
258    _p: PhantomPinned,
259}
260
261impl<S> Accept<S> {
262    /// Create [`Accept`]. `accept_fd` should not be bound.
263    pub fn new(fd: S, accept_fd: socket2::Socket) -> Self {
264        Self {
265            fd,
266            accept_fd,
267            buffer: unsafe { std::mem::zeroed() },
268            _p: PhantomPinned,
269        }
270    }
271}
272
273impl<S: AsFd> Accept<S> {
274    /// Update accept context.
275    pub fn update_context(&self) -> io::Result<()> {
276        let fd = self.fd.as_fd().as_raw_fd();
277        syscall!(
278            SOCKET,
279            setsockopt(
280                self.accept_fd.as_raw_socket() as _,
281                SOL_SOCKET,
282                SO_UPDATE_ACCEPT_CONTEXT,
283                &fd as *const _ as _,
284                std::mem::size_of_val(&fd) as _,
285            )
286        )?;
287        Ok(())
288    }
289
290    /// Get the remote address from the inner buffer.
291    pub fn into_addr(self) -> io::Result<(socket2::Socket, SockAddr)> {
292        let get_addrs_fn = GET_ADDRS
293            .get_or_try_init(|| {
294                get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_GETACCEPTEXSOCKADDRS)
295            })?
296            .ok_or_else(|| {
297                io::Error::new(
298                    io::ErrorKind::Unsupported,
299                    "cannot retrieve GetAcceptExSockAddrs",
300                )
301            })?;
302        let mut local_addr: *mut SOCKADDR = null_mut();
303        let mut local_addr_len = 0;
304        let mut remote_addr: *mut SOCKADDR = null_mut();
305        let mut remote_addr_len = 0;
306        unsafe {
307            get_addrs_fn(
308                &self.buffer as *const _ as *const _,
309                0,
310                ACCEPT_ADDR_BUFFER_SIZE as _,
311                ACCEPT_ADDR_BUFFER_SIZE as _,
312                &mut local_addr,
313                &mut local_addr_len,
314                &mut remote_addr,
315                &mut remote_addr_len,
316            );
317        }
318        Ok((self.accept_fd, unsafe {
319            SockAddr::new(
320                read_unaligned(remote_addr.cast::<SOCKADDR_STORAGE>()),
321                remote_addr_len,
322            )
323        }))
324    }
325}
326
327impl<S: AsFd> OpCode for Accept<S> {
328    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
329        let accept_fn = ACCEPT_EX
330            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_ACCEPTEX))?
331            .ok_or_else(|| {
332                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve AcceptEx")
333            })?;
334        let mut received = 0;
335        let res = accept_fn(
336            self.fd.as_fd().as_raw_fd() as _,
337            self.accept_fd.as_raw_socket() as _,
338            self.get_unchecked_mut().buffer.as_mut_ptr() as _,
339            0,
340            ACCEPT_ADDR_BUFFER_SIZE as _,
341            ACCEPT_ADDR_BUFFER_SIZE as _,
342            &mut received,
343            optr,
344        );
345        win32_result(res, received)
346    }
347
348    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
349        cancel(self.fd.as_fd().as_raw_fd(), optr)
350    }
351}
352
353static CONNECT_EX: OnceLock<LPFN_CONNECTEX> = OnceLock::new();
354
355impl<S: AsFd> Connect<S> {
356    /// Update connect context.
357    pub fn update_context(&self) -> io::Result<()> {
358        syscall!(
359            SOCKET,
360            setsockopt(
361                self.fd.as_fd().as_raw_fd() as _,
362                SOL_SOCKET,
363                SO_UPDATE_CONNECT_CONTEXT,
364                null(),
365                0,
366            )
367        )?;
368        Ok(())
369    }
370}
371
372impl<S: AsFd> OpCode for Connect<S> {
373    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
374        let connect_fn = CONNECT_EX
375            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_CONNECTEX))?
376            .ok_or_else(|| {
377                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve ConnectEx")
378            })?;
379        let mut sent = 0;
380        let res = connect_fn(
381            self.fd.as_fd().as_raw_fd() as _,
382            self.addr.as_ptr(),
383            self.addr.len(),
384            null(),
385            0,
386            &mut sent,
387            optr,
388        );
389        win32_result(res, sent)
390    }
391
392    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
393        cancel(self.fd.as_fd().as_raw_fd(), optr)
394    }
395}
396
397/// Receive data from remote.
398pub struct Recv<T: IoBufMut, S> {
399    pub(crate) fd: S,
400    pub(crate) buffer: T,
401    _p: PhantomPinned,
402}
403
404impl<T: IoBufMut, S> Recv<T, S> {
405    /// Create [`Recv`].
406    pub fn new(fd: S, buffer: T) -> Self {
407        Self {
408            fd,
409            buffer,
410            _p: PhantomPinned,
411        }
412    }
413}
414
415impl<T: IoBufMut, S> IntoInner for Recv<T, S> {
416    type Inner = T;
417
418    fn into_inner(self) -> Self::Inner {
419        self.buffer
420    }
421}
422
423impl<S: AsFd> OpCode for RecvManaged<S> {
424    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
425        self.map_unchecked_mut(|this| &mut this.op).operate(optr)
426    }
427
428    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
429        self.map_unchecked_mut(|this| &mut this.op).cancel(optr)
430    }
431}
432
433impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
434    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
435        let fd = self.fd.as_fd().as_raw_fd();
436        let slice = self.get_unchecked_mut().buffer.as_mut_slice();
437        let mut transferred = 0;
438        let res = ReadFile(
439            fd,
440            slice.as_mut_ptr() as _,
441            slice.len() as _,
442            &mut transferred,
443            optr,
444        );
445        win32_result(res, transferred)
446    }
447
448    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
449        cancel(self.fd.as_fd().as_raw_fd(), optr)
450    }
451}
452
453/// Receive data from remote into vectored buffer.
454pub struct RecvVectored<T: IoVectoredBufMut, S> {
455    pub(crate) fd: S,
456    pub(crate) buffer: T,
457    _p: PhantomPinned,
458}
459
460impl<T: IoVectoredBufMut, S> RecvVectored<T, S> {
461    /// Create [`RecvVectored`].
462    pub fn new(fd: S, buffer: T) -> Self {
463        Self {
464            fd,
465            buffer,
466            _p: PhantomPinned,
467        }
468    }
469}
470
471impl<T: IoVectoredBufMut, S> IntoInner for RecvVectored<T, S> {
472    type Inner = T;
473
474    fn into_inner(self) -> Self::Inner {
475        self.buffer
476    }
477}
478
479impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
480    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
481        let fd = self.fd.as_fd().as_raw_fd();
482        let slices = self.get_unchecked_mut().buffer.io_slices_mut();
483        let mut flags = 0;
484        let mut received = 0;
485        let res = WSARecv(
486            fd as _,
487            slices.as_ptr() as _,
488            slices.len() as _,
489            &mut received,
490            &mut flags,
491            optr,
492            None,
493        );
494        winsock_result(res, received)
495    }
496
497    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
498        cancel(self.fd.as_fd().as_raw_fd(), optr)
499    }
500}
501
502/// Send data to remote.
503pub struct Send<T: IoBuf, S> {
504    pub(crate) fd: S,
505    pub(crate) buffer: T,
506    _p: PhantomPinned,
507}
508
509impl<T: IoBuf, S> Send<T, S> {
510    /// Create [`Send`].
511    pub fn new(fd: S, buffer: T) -> Self {
512        Self {
513            fd,
514            buffer,
515            _p: PhantomPinned,
516        }
517    }
518}
519
520impl<T: IoBuf, S> IntoInner for Send<T, S> {
521    type Inner = T;
522
523    fn into_inner(self) -> Self::Inner {
524        self.buffer
525    }
526}
527
528impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
529    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
530        let slice = self.buffer.as_slice();
531        let mut transferred = 0;
532        let res = WriteFile(
533            self.fd.as_fd().as_raw_fd(),
534            slice.as_ptr() as _,
535            slice.len() as _,
536            &mut transferred,
537            optr,
538        );
539        win32_result(res, transferred)
540    }
541
542    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
543        cancel(self.fd.as_fd().as_raw_fd(), optr)
544    }
545}
546
547/// Send data to remote from vectored buffer.
548pub struct SendVectored<T: IoVectoredBuf, S> {
549    pub(crate) fd: S,
550    pub(crate) buffer: T,
551    _p: PhantomPinned,
552}
553
554impl<T: IoVectoredBuf, S> SendVectored<T, S> {
555    /// Create [`SendVectored`].
556    pub fn new(fd: S, buffer: T) -> Self {
557        Self {
558            fd,
559            buffer,
560            _p: PhantomPinned,
561        }
562    }
563}
564
565impl<T: IoVectoredBuf, S> IntoInner for SendVectored<T, S> {
566    type Inner = T;
567
568    fn into_inner(self) -> Self::Inner {
569        self.buffer
570    }
571}
572
573impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
574    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
575        let slices = self.buffer.io_slices();
576        let mut sent = 0;
577        let res = WSASend(
578            self.fd.as_fd().as_raw_fd() as _,
579            slices.as_ptr() as _,
580            slices.len() as _,
581            &mut sent,
582            0,
583            optr,
584            None,
585        );
586        winsock_result(res, sent)
587    }
588
589    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
590        cancel(self.fd.as_fd().as_raw_fd(), optr)
591    }
592}
593
594/// Receive data and source address.
595pub struct RecvFrom<T: IoBufMut, S> {
596    pub(crate) fd: S,
597    pub(crate) buffer: T,
598    pub(crate) addr: SOCKADDR_STORAGE,
599    pub(crate) addr_len: socklen_t,
600    _p: PhantomPinned,
601}
602
603impl<T: IoBufMut, S> RecvFrom<T, S> {
604    /// Create [`RecvFrom`].
605    pub fn new(fd: S, buffer: T) -> Self {
606        Self {
607            fd,
608            buffer,
609            addr: unsafe { std::mem::zeroed() },
610            addr_len: std::mem::size_of::<SOCKADDR_STORAGE>() as _,
611            _p: PhantomPinned,
612        }
613    }
614}
615
616impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
617    type Inner = (T, SOCKADDR_STORAGE, socklen_t);
618
619    fn into_inner(self) -> Self::Inner {
620        (self.buffer, self.addr, self.addr_len)
621    }
622}
623
624impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
625    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
626        let this = self.get_unchecked_mut();
627        let fd = this.fd.as_fd().as_raw_fd();
628        let buffer = this.buffer.as_io_slice_mut();
629        let mut flags = 0;
630        let mut received = 0;
631        let res = WSARecvFrom(
632            fd as _,
633            &buffer as *const _ as _,
634            1,
635            &mut received,
636            &mut flags,
637            &mut this.addr as *mut _ as *mut SOCKADDR,
638            &mut this.addr_len,
639            optr,
640            None,
641        );
642        winsock_result(res, received)
643    }
644
645    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
646        cancel(self.fd.as_fd().as_raw_fd(), optr)
647    }
648}
649
650/// Receive data and source address into vectored buffer.
651pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
652    pub(crate) fd: S,
653    pub(crate) buffer: T,
654    pub(crate) addr: SOCKADDR_STORAGE,
655    pub(crate) addr_len: socklen_t,
656    _p: PhantomPinned,
657}
658
659impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
660    /// Create [`RecvFromVectored`].
661    pub fn new(fd: S, buffer: T) -> Self {
662        Self {
663            fd,
664            buffer,
665            addr: unsafe { std::mem::zeroed() },
666            addr_len: std::mem::size_of::<SOCKADDR_STORAGE>() as _,
667            _p: PhantomPinned,
668        }
669    }
670}
671
672impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
673    type Inner = (T, SOCKADDR_STORAGE, socklen_t);
674
675    fn into_inner(self) -> Self::Inner {
676        (self.buffer, self.addr, self.addr_len)
677    }
678}
679
680impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
681    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
682        let this = self.get_unchecked_mut();
683        let fd = this.fd.as_fd().as_raw_fd();
684        let buffer = this.buffer.io_slices_mut();
685        let mut flags = 0;
686        let mut received = 0;
687        let res = WSARecvFrom(
688            fd as _,
689            buffer.as_ptr() as _,
690            buffer.len() as _,
691            &mut received,
692            &mut flags,
693            &mut this.addr as *mut _ as *mut SOCKADDR,
694            &mut this.addr_len,
695            optr,
696            None,
697        );
698        winsock_result(res, received)
699    }
700
701    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
702        cancel(self.fd.as_fd().as_raw_fd(), optr)
703    }
704}
705
706/// Send data to specified address.
707pub struct SendTo<T: IoBuf, S> {
708    pub(crate) fd: S,
709    pub(crate) buffer: T,
710    pub(crate) addr: SockAddr,
711    _p: PhantomPinned,
712}
713
714impl<T: IoBuf, S> SendTo<T, S> {
715    /// Create [`SendTo`].
716    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
717        Self {
718            fd,
719            buffer,
720            addr,
721            _p: PhantomPinned,
722        }
723    }
724}
725
726impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
727    type Inner = T;
728
729    fn into_inner(self) -> Self::Inner {
730        self.buffer
731    }
732}
733
734impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
735    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
736        let buffer = self.buffer.as_io_slice();
737        let mut sent = 0;
738        let res = WSASendTo(
739            self.fd.as_fd().as_raw_fd() as _,
740            &buffer as *const _ as _,
741            1,
742            &mut sent,
743            0,
744            self.addr.as_ptr(),
745            self.addr.len(),
746            optr,
747            None,
748        );
749        winsock_result(res, sent)
750    }
751
752    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
753        cancel(self.fd.as_fd().as_raw_fd(), optr)
754    }
755}
756
757/// Send data to specified address from vectored buffer.
758pub struct SendToVectored<T: IoVectoredBuf, S> {
759    pub(crate) fd: S,
760    pub(crate) buffer: T,
761    pub(crate) addr: SockAddr,
762    _p: PhantomPinned,
763}
764
765impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
766    /// Create [`SendToVectored`].
767    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
768        Self {
769            fd,
770            buffer,
771            addr,
772            _p: PhantomPinned,
773        }
774    }
775}
776
777impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
778    type Inner = T;
779
780    fn into_inner(self) -> Self::Inner {
781        self.buffer
782    }
783}
784
785impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
786    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
787        let buffer = self.buffer.io_slices();
788        let mut sent = 0;
789        let res = WSASendTo(
790            self.fd.as_fd().as_raw_fd() as _,
791            buffer.as_ptr() as _,
792            buffer.len() as _,
793            &mut sent,
794            0,
795            self.addr.as_ptr(),
796            self.addr.len(),
797            optr,
798            None,
799        );
800        winsock_result(res, sent)
801    }
802
803    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
804        cancel(self.fd.as_fd().as_raw_fd(), optr)
805    }
806}
807
808static WSA_RECVMSG: OnceLock<LPFN_WSARECVMSG> = OnceLock::new();
809
810/// Receive data and source address with ancillary data into vectored buffer.
811pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
812    msg: WSAMSG,
813    addr: SOCKADDR_STORAGE,
814    fd: S,
815    buffer: T,
816    control: C,
817    _p: PhantomPinned,
818}
819
820impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
821    /// Create [`RecvMsg`].
822    ///
823    /// # Panics
824    ///
825    /// This function will panic if the control message buffer is misaligned.
826    pub fn new(fd: S, buffer: T, control: C) -> Self {
827        assert!(
828            control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
829            "misaligned control message buffer"
830        );
831        Self {
832            msg: unsafe { std::mem::zeroed() },
833            addr: unsafe { std::mem::zeroed() },
834            fd,
835            buffer,
836            control,
837            _p: PhantomPinned,
838        }
839    }
840}
841
842impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
843    type Inner = ((T, C), SOCKADDR_STORAGE, socklen_t, usize);
844
845    fn into_inner(self) -> Self::Inner {
846        (
847            (self.buffer, self.control),
848            self.addr,
849            self.msg.namelen,
850            self.msg.Control.len as _,
851        )
852    }
853}
854
855impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
856    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
857        let recvmsg_fn = WSA_RECVMSG
858            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_WSARECVMSG))?
859            .ok_or_else(|| {
860                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve WSARecvMsg")
861            })?;
862
863        let this = self.get_unchecked_mut();
864
865        let mut slices = this.buffer.io_slices_mut();
866        this.msg.name = &mut this.addr as *mut _ as _;
867        this.msg.namelen = std::mem::size_of::<SOCKADDR_STORAGE>() as _;
868        this.msg.lpBuffers = slices.as_mut_ptr() as _;
869        this.msg.dwBufferCount = slices.len() as _;
870        this.msg.Control =
871            std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut());
872
873        let mut received = 0;
874        let res = recvmsg_fn(
875            this.fd.as_fd().as_raw_fd() as _,
876            &mut this.msg,
877            &mut received,
878            optr,
879            None,
880        );
881        winsock_result(res, received)
882    }
883
884    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
885        cancel(self.fd.as_fd().as_raw_fd(), optr)
886    }
887}
888
889/// Send data to specified address accompanied by ancillary data from vectored
890/// buffer.
891pub struct SendMsg<T: IoVectoredBuf, C: IoBuf, S> {
892    fd: S,
893    buffer: T,
894    control: C,
895    addr: SockAddr,
896    _p: PhantomPinned,
897}
898
899impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
900    /// Create [`SendMsg`].
901    ///
902    /// # Panics
903    ///
904    /// This function will panic if the control message buffer is misaligned.
905    pub fn new(fd: S, buffer: T, control: C, addr: SockAddr) -> Self {
906        assert!(
907            control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
908            "misaligned control message buffer"
909        );
910        Self {
911            fd,
912            buffer,
913            control,
914            addr,
915            _p: PhantomPinned,
916        }
917    }
918}
919
920impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsg<T, C, S> {
921    type Inner = (T, C);
922
923    fn into_inner(self) -> Self::Inner {
924        (self.buffer, self.control)
925    }
926}
927
928impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
929    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
930        let this = self.get_unchecked_mut();
931
932        let slices = this.buffer.io_slices();
933        let msg = WSAMSG {
934            name: this.addr.as_ptr() as _,
935            namelen: this.addr.len(),
936            lpBuffers: slices.as_ptr() as _,
937            dwBufferCount: slices.len() as _,
938            Control: std::mem::transmute::<IoSlice, WSABUF>(this.control.as_io_slice()),
939            dwFlags: 0,
940        };
941
942        let mut sent = 0;
943        let res = WSASendMsg(
944            this.fd.as_fd().as_raw_fd() as _,
945            &msg,
946            0,
947            &mut sent,
948            optr,
949            None,
950        );
951        winsock_result(res, sent)
952    }
953
954    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
955        cancel(self.fd.as_fd().as_raw_fd(), optr)
956    }
957}
958
959/// Connect a named pipe server.
960pub struct ConnectNamedPipe<S> {
961    pub(crate) fd: S,
962}
963
964impl<S> ConnectNamedPipe<S> {
965    /// Create [`ConnectNamedPipe`](struct@ConnectNamedPipe).
966    pub fn new(fd: S) -> Self {
967        Self { fd }
968    }
969}
970
971impl<S: AsFd> OpCode for ConnectNamedPipe<S> {
972    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
973        let res = ConnectNamedPipe(self.fd.as_fd().as_raw_fd() as _, optr);
974        win32_result(res, 0)
975    }
976
977    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
978        cancel(self.fd.as_fd().as_raw_fd(), optr)
979    }
980}
981
982/// Send a control code to a device.
983pub struct DeviceIoControl<S, I: IoBuf, O: IoBufMut> {
984    pub(crate) fd: S,
985    pub(crate) ioctl_code: u32,
986    pub(crate) input_buffer: Option<I>,
987    pub(crate) output_buffer: Option<O>,
988    _p: PhantomPinned,
989}
990
991impl<S, I: IoBuf, O: IoBufMut> DeviceIoControl<S, I, O> {
992    /// Create [`DeviceIoControl`].
993    pub fn new(fd: S, ioctl_code: u32, input_buffer: Option<I>, output_buffer: Option<O>) -> Self {
994        Self {
995            fd,
996            ioctl_code,
997            input_buffer,
998            output_buffer,
999            _p: PhantomPinned,
1000        }
1001    }
1002}
1003
1004impl<S, I: IoBuf, O: IoBufMut> IntoInner for DeviceIoControl<S, I, O> {
1005    type Inner = (Option<I>, Option<O>);
1006
1007    fn into_inner(self) -> Self::Inner {
1008        (self.input_buffer, self.output_buffer)
1009    }
1010}
1011
1012impl<S: AsFd, I: IoBuf, O: IoBufMut> OpCode for DeviceIoControl<S, I, O> {
1013    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
1014        let this = self.get_unchecked_mut();
1015        let fd = this.fd.as_fd().as_raw_fd();
1016        let input_len = this.input_buffer.as_ref().map_or(0, |x| x.buf_len());
1017        let input_ptr = this
1018            .input_buffer
1019            .as_ref()
1020            .map_or(std::ptr::null(), |x| x.as_buf_ptr());
1021        let output_len = this.output_buffer.as_ref().map_or(0, |x| x.buf_len());
1022        let output_ptr = this
1023            .output_buffer
1024            .as_mut()
1025            .map_or(std::ptr::null_mut(), |x| x.as_buf_mut_ptr());
1026        let mut transferred = 0;
1027        let res = DeviceIoControl(
1028            fd,
1029            this.ioctl_code,
1030            input_ptr as _,
1031            input_len as _,
1032            output_ptr as _,
1033            output_len as _,
1034            &mut transferred,
1035            optr,
1036        );
1037        win32_result(res, transferred)
1038    }
1039
1040    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1041        cancel(self.fd.as_fd().as_raw_fd(), optr)
1042    }
1043}