compio_driver/iocp/
op.rs

1#[cfg(feature = "once_cell_try")]
2use std::sync::OnceLock;
3use std::{
4    io,
5    marker::PhantomPinned,
6    net::Shutdown,
7    os::windows::io::AsRawSocket,
8    pin::Pin,
9    ptr::{null, null_mut, read_unaligned},
10    task::Poll,
11};
12
13use compio_buf::{
14    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
15};
16#[cfg(not(feature = "once_cell_try"))]
17use once_cell::sync::OnceCell as OnceLock;
18use socket2::{SockAddr, SockAddrStorage};
19use windows_sys::{
20    Win32::{
21        Foundation::{
22            CloseHandle, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
23            ERROR_IO_PENDING, ERROR_NETNAME_DELETED, ERROR_NO_DATA, ERROR_NOT_FOUND,
24            ERROR_PIPE_CONNECTED, ERROR_PIPE_NOT_CONNECTED, GetLastError,
25        },
26        Networking::WinSock::{
27            CMSGHDR, LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, LPFN_WSARECVMSG,
28            SD_BOTH, SD_RECEIVE, SD_SEND, SIO_GET_EXTENSION_FUNCTION_POINTER,
29            SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, SOCKADDR_STORAGE,
30            SOL_SOCKET, WSABUF, WSAID_ACCEPTEX, WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS,
31            WSAID_WSARECVMSG, WSAIoctl, WSAMSG, WSARecv, WSARecvFrom, WSASend, WSASendMsg,
32            WSASendTo, closesocket, setsockopt, shutdown, socklen_t,
33        },
34        Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile},
35        System::{
36            IO::{CancelIoEx, DeviceIoControl, OVERLAPPED},
37            Pipes::ConnectNamedPipe,
38        },
39    },
40    core::GUID,
41};
42
43use crate::{AsFd, AsRawFd, OpCode, OpType, RawFd, op::*, syscall};
44
45#[inline]
46fn winapi_result(transferred: u32) -> Poll<io::Result<usize>> {
47    let error = unsafe { GetLastError() };
48    assert_ne!(error, 0);
49    match error {
50        ERROR_IO_PENDING => Poll::Pending,
51        ERROR_IO_INCOMPLETE
52        | ERROR_NETNAME_DELETED
53        | ERROR_HANDLE_EOF
54        | ERROR_BROKEN_PIPE
55        | ERROR_PIPE_CONNECTED
56        | ERROR_PIPE_NOT_CONNECTED
57        | ERROR_NO_DATA => Poll::Ready(Ok(transferred as _)),
58        _ => Poll::Ready(Err(io::Error::from_raw_os_error(error as _))),
59    }
60}
61
62#[inline]
63fn win32_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
64    if res == 0 {
65        winapi_result(transferred)
66    } else {
67        Poll::Ready(Ok(transferred as _))
68    }
69}
70
71#[inline]
72fn winsock_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
73    if res != 0 {
74        winapi_result(transferred)
75    } else {
76        Poll::Ready(Ok(transferred as _))
77    }
78}
79
80#[inline]
81fn cancel(handle: RawFd, optr: *mut OVERLAPPED) -> io::Result<()> {
82    match syscall!(BOOL, CancelIoEx(handle as _, optr)) {
83        Ok(_) => Ok(()),
84        Err(e) => {
85            if e.raw_os_error() == Some(ERROR_NOT_FOUND as _) {
86                Ok(())
87            } else {
88                Err(e)
89            }
90        }
91    }
92}
93
94fn get_wsa_fn<F>(handle: RawFd, fguid: GUID) -> io::Result<Option<F>> {
95    let mut fptr = None;
96    let mut returned = 0;
97    syscall!(
98        SOCKET,
99        WSAIoctl(
100            handle as _,
101            SIO_GET_EXTENSION_FUNCTION_POINTER,
102            std::ptr::addr_of!(fguid).cast(),
103            std::mem::size_of_val(&fguid) as _,
104            std::ptr::addr_of_mut!(fptr).cast(),
105            std::mem::size_of::<F>() as _,
106            &mut returned,
107            null_mut(),
108            None,
109        )
110    )?;
111    Ok(fptr)
112}
113
114impl<
115    D: std::marker::Send + 'static,
116    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
117> OpCode for Asyncify<F, D>
118{
119    fn op_type(&self) -> OpType {
120        OpType::Blocking
121    }
122
123    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
124        // Safety: self won't be moved
125        let this = self.get_unchecked_mut();
126        let f = this
127            .f
128            .take()
129            .expect("the operate method could only be called once");
130        let BufResult(res, data) = f();
131        this.data = Some(data);
132        Poll::Ready(res)
133    }
134}
135
136impl OpCode for CloseFile {
137    fn op_type(&self) -> OpType {
138        OpType::Blocking
139    }
140
141    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
142        Poll::Ready(Ok(
143            syscall!(BOOL, CloseHandle(self.fd.as_fd().as_raw_fd()))? as _,
144        ))
145    }
146}
147
148impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
149    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
150        if let Some(overlapped) = optr.as_mut() {
151            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
152            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
153        }
154        let fd = self.fd.as_fd().as_raw_fd();
155        let slice = self.get_unchecked_mut().buffer.as_mut_slice();
156        let mut transferred = 0;
157        let res = ReadFile(
158            fd,
159            slice.as_mut_ptr() as _,
160            slice.len() as _,
161            &mut transferred,
162            optr,
163        );
164        win32_result(res, transferred)
165    }
166
167    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
168        cancel(self.fd.as_fd().as_raw_fd(), optr)
169    }
170}
171
172impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
173    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
174        if let Some(overlapped) = optr.as_mut() {
175            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
176            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
177        }
178        let slice = self.buffer.as_slice();
179        let mut transferred = 0;
180        let res = WriteFile(
181            self.fd.as_fd().as_raw_fd(),
182            slice.as_ptr() as _,
183            slice.len() as _,
184            &mut transferred,
185            optr,
186        );
187        win32_result(res, transferred)
188    }
189
190    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
191        cancel(self.fd.as_fd().as_raw_fd(), optr)
192    }
193}
194
195impl<S: AsFd> OpCode for ReadManagedAt<S> {
196    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
197        self.map_unchecked_mut(|this| &mut this.op).operate(optr)
198    }
199
200    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
201        self.map_unchecked_mut(|this| &mut this.op).cancel(optr)
202    }
203}
204
205impl<S: AsFd> OpCode for Sync<S> {
206    fn op_type(&self) -> OpType {
207        OpType::Blocking
208    }
209
210    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
211        Poll::Ready(Ok(
212            syscall!(BOOL, FlushFileBuffers(self.fd.as_fd().as_raw_fd()))? as _,
213        ))
214    }
215}
216
217impl<S: AsFd> OpCode for ShutdownSocket<S> {
218    fn op_type(&self) -> OpType {
219        OpType::Blocking
220    }
221
222    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
223        let how = match self.how {
224            Shutdown::Write => SD_SEND,
225            Shutdown::Read => SD_RECEIVE,
226            Shutdown::Both => SD_BOTH,
227        };
228        Poll::Ready(Ok(
229            syscall!(SOCKET, shutdown(self.fd.as_fd().as_raw_fd() as _, how))? as _,
230        ))
231    }
232}
233
234impl OpCode for CloseSocket {
235    fn op_type(&self) -> OpType {
236        OpType::Blocking
237    }
238
239    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
240        Poll::Ready(Ok(
241            syscall!(SOCKET, closesocket(self.fd.as_fd().as_raw_fd() as _))? as _,
242        ))
243    }
244}
245
246static ACCEPT_EX: OnceLock<LPFN_ACCEPTEX> = OnceLock::new();
247static GET_ADDRS: OnceLock<LPFN_GETACCEPTEXSOCKADDRS> = OnceLock::new();
248
249const ACCEPT_ADDR_BUFFER_SIZE: usize = std::mem::size_of::<SOCKADDR_STORAGE>() + 16;
250const ACCEPT_BUFFER_SIZE: usize = ACCEPT_ADDR_BUFFER_SIZE * 2;
251
252/// Accept a connection.
253pub struct Accept<S> {
254    pub(crate) fd: S,
255    pub(crate) accept_fd: socket2::Socket,
256    pub(crate) buffer: [u8; ACCEPT_BUFFER_SIZE],
257    _p: PhantomPinned,
258}
259
260impl<S> Accept<S> {
261    /// Create [`Accept`]. `accept_fd` should not be bound.
262    pub fn new(fd: S, accept_fd: socket2::Socket) -> Self {
263        Self {
264            fd,
265            accept_fd,
266            buffer: [0u8; ACCEPT_BUFFER_SIZE],
267            _p: PhantomPinned,
268        }
269    }
270}
271
272impl<S: AsFd> Accept<S> {
273    /// Update accept context.
274    pub fn update_context(&self) -> io::Result<()> {
275        let fd = self.fd.as_fd().as_raw_fd();
276        syscall!(
277            SOCKET,
278            setsockopt(
279                self.accept_fd.as_raw_socket() as _,
280                SOL_SOCKET,
281                SO_UPDATE_ACCEPT_CONTEXT,
282                &fd as *const _ as _,
283                std::mem::size_of_val(&fd) as _,
284            )
285        )?;
286        Ok(())
287    }
288
289    /// Get the remote address from the inner buffer.
290    pub fn into_addr(self) -> io::Result<(socket2::Socket, SockAddr)> {
291        let get_addrs_fn = GET_ADDRS
292            .get_or_try_init(|| {
293                get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_GETACCEPTEXSOCKADDRS)
294            })?
295            .ok_or_else(|| {
296                io::Error::new(
297                    io::ErrorKind::Unsupported,
298                    "cannot retrieve GetAcceptExSockAddrs",
299                )
300            })?;
301        let mut local_addr: *mut SOCKADDR = null_mut();
302        let mut local_addr_len = 0;
303        let mut remote_addr: *mut SOCKADDR = null_mut();
304        let mut remote_addr_len = 0;
305        unsafe {
306            get_addrs_fn(
307                &self.buffer as *const _ as *const _,
308                0,
309                ACCEPT_ADDR_BUFFER_SIZE as _,
310                ACCEPT_ADDR_BUFFER_SIZE as _,
311                &mut local_addr,
312                &mut local_addr_len,
313                &mut remote_addr,
314                &mut remote_addr_len,
315            );
316        }
317        Ok((self.accept_fd, unsafe {
318            SockAddr::new(
319                // Safety: the buffer is large enough to hold the address
320                std::mem::transmute::<SOCKADDR_STORAGE, SockAddrStorage>(read_unaligned(
321                    remote_addr.cast::<SOCKADDR_STORAGE>(),
322                )),
323                remote_addr_len,
324            )
325        }))
326    }
327}
328
329impl<S: AsFd> OpCode for Accept<S> {
330    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
331        let accept_fn = ACCEPT_EX
332            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_ACCEPTEX))?
333            .ok_or_else(|| {
334                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve AcceptEx")
335            })?;
336        let mut received = 0;
337        let res = accept_fn(
338            self.fd.as_fd().as_raw_fd() as _,
339            self.accept_fd.as_raw_socket() as _,
340            self.get_unchecked_mut().buffer.as_mut_ptr() as _,
341            0,
342            ACCEPT_ADDR_BUFFER_SIZE as _,
343            ACCEPT_ADDR_BUFFER_SIZE as _,
344            &mut received,
345            optr,
346        );
347        win32_result(res, received)
348    }
349
350    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
351        cancel(self.fd.as_fd().as_raw_fd(), optr)
352    }
353}
354
355static CONNECT_EX: OnceLock<LPFN_CONNECTEX> = OnceLock::new();
356
357impl<S: AsFd> Connect<S> {
358    /// Update connect context.
359    pub fn update_context(&self) -> io::Result<()> {
360        syscall!(
361            SOCKET,
362            setsockopt(
363                self.fd.as_fd().as_raw_fd() as _,
364                SOL_SOCKET,
365                SO_UPDATE_CONNECT_CONTEXT,
366                null(),
367                0,
368            )
369        )?;
370        Ok(())
371    }
372}
373
374impl<S: AsFd> OpCode for Connect<S> {
375    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
376        let connect_fn = CONNECT_EX
377            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_CONNECTEX))?
378            .ok_or_else(|| {
379                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve ConnectEx")
380            })?;
381        let mut sent = 0;
382        let res = connect_fn(
383            self.fd.as_fd().as_raw_fd() as _,
384            self.addr.as_ptr().cast(),
385            self.addr.len(),
386            null(),
387            0,
388            &mut sent,
389            optr,
390        );
391        win32_result(res, sent)
392    }
393
394    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
395        cancel(self.fd.as_fd().as_raw_fd(), optr)
396    }
397}
398
399/// Receive data from remote.
400pub struct Recv<T: IoBufMut, S> {
401    pub(crate) fd: S,
402    pub(crate) buffer: T,
403    _p: PhantomPinned,
404}
405
406impl<T: IoBufMut, S> Recv<T, S> {
407    /// Create [`Recv`].
408    pub fn new(fd: S, buffer: T) -> Self {
409        Self {
410            fd,
411            buffer,
412            _p: PhantomPinned,
413        }
414    }
415}
416
417impl<T: IoBufMut, S> IntoInner for Recv<T, S> {
418    type Inner = T;
419
420    fn into_inner(self) -> Self::Inner {
421        self.buffer
422    }
423}
424
425impl<S: AsFd> OpCode for RecvManaged<S> {
426    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
427        self.map_unchecked_mut(|this| &mut this.op).operate(optr)
428    }
429
430    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
431        self.map_unchecked_mut(|this| &mut this.op).cancel(optr)
432    }
433}
434
435impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
436    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
437        let fd = self.fd.as_fd().as_raw_fd();
438        let slice = self.get_unchecked_mut().buffer.as_mut_slice();
439        let mut transferred = 0;
440        let res = ReadFile(
441            fd,
442            slice.as_mut_ptr() as _,
443            slice.len() as _,
444            &mut transferred,
445            optr,
446        );
447        win32_result(res, transferred)
448    }
449
450    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
451        cancel(self.fd.as_fd().as_raw_fd(), optr)
452    }
453}
454
455/// Receive data from remote into vectored buffer.
456pub struct RecvVectored<T: IoVectoredBufMut, S> {
457    pub(crate) fd: S,
458    pub(crate) buffer: T,
459    _p: PhantomPinned,
460}
461
462impl<T: IoVectoredBufMut, S> RecvVectored<T, S> {
463    /// Create [`RecvVectored`].
464    pub fn new(fd: S, buffer: T) -> Self {
465        Self {
466            fd,
467            buffer,
468            _p: PhantomPinned,
469        }
470    }
471}
472
473impl<T: IoVectoredBufMut, S> IntoInner for RecvVectored<T, S> {
474    type Inner = T;
475
476    fn into_inner(self) -> Self::Inner {
477        self.buffer
478    }
479}
480
481impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
482    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
483        let fd = self.fd.as_fd().as_raw_fd();
484        let slices = self.get_unchecked_mut().buffer.io_slices_mut();
485        let mut flags = 0;
486        let mut received = 0;
487        let res = WSARecv(
488            fd as _,
489            slices.as_ptr() as _,
490            slices.len() as _,
491            &mut received,
492            &mut flags,
493            optr,
494            None,
495        );
496        winsock_result(res, received)
497    }
498
499    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
500        cancel(self.fd.as_fd().as_raw_fd(), optr)
501    }
502}
503
504/// Send data to remote.
505pub struct Send<T: IoBuf, S> {
506    pub(crate) fd: S,
507    pub(crate) buffer: T,
508    _p: PhantomPinned,
509}
510
511impl<T: IoBuf, S> Send<T, S> {
512    /// Create [`Send`].
513    pub fn new(fd: S, buffer: T) -> Self {
514        Self {
515            fd,
516            buffer,
517            _p: PhantomPinned,
518        }
519    }
520}
521
522impl<T: IoBuf, S> IntoInner for Send<T, S> {
523    type Inner = T;
524
525    fn into_inner(self) -> Self::Inner {
526        self.buffer
527    }
528}
529
530impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
531    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
532        let slice = self.buffer.as_slice();
533        let mut transferred = 0;
534        let res = WriteFile(
535            self.fd.as_fd().as_raw_fd(),
536            slice.as_ptr() as _,
537            slice.len() as _,
538            &mut transferred,
539            optr,
540        );
541        win32_result(res, transferred)
542    }
543
544    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
545        cancel(self.fd.as_fd().as_raw_fd(), optr)
546    }
547}
548
549/// Send data to remote from vectored buffer.
550pub struct SendVectored<T: IoVectoredBuf, S> {
551    pub(crate) fd: S,
552    pub(crate) buffer: T,
553    _p: PhantomPinned,
554}
555
556impl<T: IoVectoredBuf, S> SendVectored<T, S> {
557    /// Create [`SendVectored`].
558    pub fn new(fd: S, buffer: T) -> Self {
559        Self {
560            fd,
561            buffer,
562            _p: PhantomPinned,
563        }
564    }
565}
566
567impl<T: IoVectoredBuf, S> IntoInner for SendVectored<T, S> {
568    type Inner = T;
569
570    fn into_inner(self) -> Self::Inner {
571        self.buffer
572    }
573}
574
575impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
576    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
577        let slices = self.buffer.io_slices();
578        let mut sent = 0;
579        let res = WSASend(
580            self.fd.as_fd().as_raw_fd() as _,
581            slices.as_ptr() as _,
582            slices.len() as _,
583            &mut sent,
584            0,
585            optr,
586            None,
587        );
588        winsock_result(res, sent)
589    }
590
591    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
592        cancel(self.fd.as_fd().as_raw_fd(), optr)
593    }
594}
595
596/// Receive data and source address.
597pub struct RecvFrom<T: IoBufMut, S> {
598    pub(crate) fd: S,
599    pub(crate) buffer: T,
600    pub(crate) addr: SockAddrStorage,
601    pub(crate) addr_len: socklen_t,
602    _p: PhantomPinned,
603}
604
605impl<T: IoBufMut, S> RecvFrom<T, S> {
606    /// Create [`RecvFrom`].
607    pub fn new(fd: S, buffer: T) -> Self {
608        let addr = SockAddrStorage::zeroed();
609        let addr_len = addr.size_of();
610        Self {
611            fd,
612            buffer,
613            addr,
614            addr_len,
615            _p: PhantomPinned,
616        }
617    }
618}
619
620impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
621    type Inner = (T, SockAddrStorage, socklen_t);
622
623    fn into_inner(self) -> Self::Inner {
624        (self.buffer, self.addr, self.addr_len)
625    }
626}
627
628impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
629    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
630        let this = self.get_unchecked_mut();
631        let fd = this.fd.as_fd().as_raw_fd();
632        let buffer = this.buffer.as_io_slice_mut();
633        let mut flags = 0;
634        let mut received = 0;
635        let res = WSARecvFrom(
636            fd as _,
637            &buffer as *const _ as _,
638            1,
639            &mut received,
640            &mut flags,
641            &mut this.addr as *mut _ as *mut SOCKADDR,
642            &mut this.addr_len,
643            optr,
644            None,
645        );
646        winsock_result(res, received)
647    }
648
649    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
650        cancel(self.fd.as_fd().as_raw_fd(), optr)
651    }
652}
653
654/// Receive data and source address into vectored buffer.
655pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
656    pub(crate) fd: S,
657    pub(crate) buffer: T,
658    pub(crate) addr: SockAddrStorage,
659    pub(crate) addr_len: socklen_t,
660    _p: PhantomPinned,
661}
662
663impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
664    /// Create [`RecvFromVectored`].
665    pub fn new(fd: S, buffer: T) -> Self {
666        let addr = SockAddrStorage::zeroed();
667        let addr_len = addr.size_of();
668        Self {
669            fd,
670            buffer,
671            addr,
672            addr_len,
673            _p: PhantomPinned,
674        }
675    }
676}
677
678impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
679    type Inner = (T, SockAddrStorage, socklen_t);
680
681    fn into_inner(self) -> Self::Inner {
682        (self.buffer, self.addr, self.addr_len)
683    }
684}
685
686impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
687    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
688        let this = self.get_unchecked_mut();
689        let fd = this.fd.as_fd().as_raw_fd();
690        let buffer = this.buffer.io_slices_mut();
691        let mut flags = 0;
692        let mut received = 0;
693        let res = WSARecvFrom(
694            fd as _,
695            buffer.as_ptr() as _,
696            buffer.len() as _,
697            &mut received,
698            &mut flags,
699            &mut this.addr as *mut _ as *mut SOCKADDR,
700            &mut this.addr_len,
701            optr,
702            None,
703        );
704        winsock_result(res, received)
705    }
706
707    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
708        cancel(self.fd.as_fd().as_raw_fd(), optr)
709    }
710}
711
712/// Send data to specified address.
713pub struct SendTo<T: IoBuf, S> {
714    pub(crate) fd: S,
715    pub(crate) buffer: T,
716    pub(crate) addr: SockAddr,
717    _p: PhantomPinned,
718}
719
720impl<T: IoBuf, S> SendTo<T, S> {
721    /// Create [`SendTo`].
722    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
723        Self {
724            fd,
725            buffer,
726            addr,
727            _p: PhantomPinned,
728        }
729    }
730}
731
732impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
733    type Inner = T;
734
735    fn into_inner(self) -> Self::Inner {
736        self.buffer
737    }
738}
739
740impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
741    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
742        let buffer = self.buffer.as_io_slice();
743        let mut sent = 0;
744        let res = WSASendTo(
745            self.fd.as_fd().as_raw_fd() as _,
746            &buffer as *const _ as _,
747            1,
748            &mut sent,
749            0,
750            self.addr.as_ptr().cast(),
751            self.addr.len(),
752            optr,
753            None,
754        );
755        winsock_result(res, sent)
756    }
757
758    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
759        cancel(self.fd.as_fd().as_raw_fd(), optr)
760    }
761}
762
763/// Send data to specified address from vectored buffer.
764pub struct SendToVectored<T: IoVectoredBuf, S> {
765    pub(crate) fd: S,
766    pub(crate) buffer: T,
767    pub(crate) addr: SockAddr,
768    _p: PhantomPinned,
769}
770
771impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
772    /// Create [`SendToVectored`].
773    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
774        Self {
775            fd,
776            buffer,
777            addr,
778            _p: PhantomPinned,
779        }
780    }
781}
782
783impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
784    type Inner = T;
785
786    fn into_inner(self) -> Self::Inner {
787        self.buffer
788    }
789}
790
791impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
792    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
793        let buffer = self.buffer.io_slices();
794        let mut sent = 0;
795        let res = WSASendTo(
796            self.fd.as_fd().as_raw_fd() as _,
797            buffer.as_ptr() as _,
798            buffer.len() as _,
799            &mut sent,
800            0,
801            self.addr.as_ptr().cast(),
802            self.addr.len(),
803            optr,
804            None,
805        );
806        winsock_result(res, sent)
807    }
808
809    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
810        cancel(self.fd.as_fd().as_raw_fd(), optr)
811    }
812}
813
814static WSA_RECVMSG: OnceLock<LPFN_WSARECVMSG> = OnceLock::new();
815
816/// Receive data and source address with ancillary data into vectored buffer.
817pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
818    msg: WSAMSG,
819    addr: SockAddrStorage,
820    fd: S,
821    buffer: T,
822    control: C,
823    _p: PhantomPinned,
824}
825
826impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
827    /// Create [`RecvMsg`].
828    ///
829    /// # Panics
830    ///
831    /// This function will panic if the control message buffer is misaligned.
832    pub fn new(fd: S, buffer: T, control: C) -> Self {
833        assert!(
834            control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
835            "misaligned control message buffer"
836        );
837        let addr = SockAddrStorage::zeroed();
838        Self {
839            msg: unsafe { std::mem::zeroed() },
840            addr,
841            fd,
842            buffer,
843            control,
844            _p: PhantomPinned,
845        }
846    }
847}
848
849impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
850    type Inner = ((T, C), SockAddrStorage, socklen_t, usize);
851
852    fn into_inner(self) -> Self::Inner {
853        (
854            (self.buffer, self.control),
855            self.addr,
856            self.msg.namelen,
857            self.msg.Control.len as _,
858        )
859    }
860}
861
862impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
863    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
864        let recvmsg_fn = WSA_RECVMSG
865            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_WSARECVMSG))?
866            .ok_or_else(|| {
867                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve WSARecvMsg")
868            })?;
869
870        let this = self.get_unchecked_mut();
871
872        let mut slices = this.buffer.io_slices_mut();
873        this.msg.name = &mut this.addr as *mut _ as _;
874        this.msg.namelen = std::mem::size_of::<SOCKADDR_STORAGE>() as _;
875        this.msg.lpBuffers = slices.as_mut_ptr() as _;
876        this.msg.dwBufferCount = slices.len() as _;
877        this.msg.Control =
878            std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut());
879
880        let mut received = 0;
881        let res = recvmsg_fn(
882            this.fd.as_fd().as_raw_fd() as _,
883            &mut this.msg,
884            &mut received,
885            optr,
886            None,
887        );
888        winsock_result(res, received)
889    }
890
891    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
892        cancel(self.fd.as_fd().as_raw_fd(), optr)
893    }
894}
895
896/// Send data to specified address accompanied by ancillary data from vectored
897/// buffer.
898pub struct SendMsg<T: IoVectoredBuf, C: IoBuf, S> {
899    fd: S,
900    buffer: T,
901    control: C,
902    addr: SockAddr,
903    _p: PhantomPinned,
904}
905
906impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
907    /// Create [`SendMsg`].
908    ///
909    /// # Panics
910    ///
911    /// This function will panic if the control message buffer is misaligned.
912    pub fn new(fd: S, buffer: T, control: C, addr: SockAddr) -> Self {
913        assert!(
914            control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
915            "misaligned control message buffer"
916        );
917        Self {
918            fd,
919            buffer,
920            control,
921            addr,
922            _p: PhantomPinned,
923        }
924    }
925}
926
927impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsg<T, C, S> {
928    type Inner = (T, C);
929
930    fn into_inner(self) -> Self::Inner {
931        (self.buffer, self.control)
932    }
933}
934
935impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
936    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
937        let this = self.get_unchecked_mut();
938
939        let slices = this.buffer.io_slices();
940        let msg = WSAMSG {
941            name: this.addr.as_ptr() as _,
942            namelen: this.addr.len(),
943            lpBuffers: slices.as_ptr() as _,
944            dwBufferCount: slices.len() as _,
945            Control: std::mem::transmute::<IoSlice, WSABUF>(this.control.as_io_slice()),
946            dwFlags: 0,
947        };
948
949        let mut sent = 0;
950        let res = WSASendMsg(
951            this.fd.as_fd().as_raw_fd() as _,
952            &msg,
953            0,
954            &mut sent,
955            optr,
956            None,
957        );
958        winsock_result(res, sent)
959    }
960
961    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
962        cancel(self.fd.as_fd().as_raw_fd(), optr)
963    }
964}
965
966/// Connect a named pipe server.
967pub struct ConnectNamedPipe<S> {
968    pub(crate) fd: S,
969}
970
971impl<S> ConnectNamedPipe<S> {
972    /// Create [`ConnectNamedPipe`](struct@ConnectNamedPipe).
973    pub fn new(fd: S) -> Self {
974        Self { fd }
975    }
976}
977
978impl<S: AsFd> OpCode for ConnectNamedPipe<S> {
979    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
980        let res = ConnectNamedPipe(self.fd.as_fd().as_raw_fd() as _, optr);
981        win32_result(res, 0)
982    }
983
984    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
985        cancel(self.fd.as_fd().as_raw_fd(), optr)
986    }
987}
988
989/// Send a control code to a device.
990pub struct DeviceIoControl<S, I: IoBuf, O: IoBufMut> {
991    pub(crate) fd: S,
992    pub(crate) ioctl_code: u32,
993    pub(crate) input_buffer: Option<I>,
994    pub(crate) output_buffer: Option<O>,
995    _p: PhantomPinned,
996}
997
998impl<S, I: IoBuf, O: IoBufMut> DeviceIoControl<S, I, O> {
999    /// Create [`DeviceIoControl`].
1000    pub fn new(fd: S, ioctl_code: u32, input_buffer: Option<I>, output_buffer: Option<O>) -> Self {
1001        Self {
1002            fd,
1003            ioctl_code,
1004            input_buffer,
1005            output_buffer,
1006            _p: PhantomPinned,
1007        }
1008    }
1009}
1010
1011impl<S, I: IoBuf, O: IoBufMut> IntoInner for DeviceIoControl<S, I, O> {
1012    type Inner = (Option<I>, Option<O>);
1013
1014    fn into_inner(self) -> Self::Inner {
1015        (self.input_buffer, self.output_buffer)
1016    }
1017}
1018
1019impl<S: AsFd, I: IoBuf, O: IoBufMut> OpCode for DeviceIoControl<S, I, O> {
1020    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
1021        let this = self.get_unchecked_mut();
1022        let fd = this.fd.as_fd().as_raw_fd();
1023        let input_len = this.input_buffer.as_ref().map_or(0, |x| x.buf_len());
1024        let input_ptr = this
1025            .input_buffer
1026            .as_ref()
1027            .map_or(std::ptr::null(), |x| x.as_buf_ptr());
1028        let output_len = this.output_buffer.as_ref().map_or(0, |x| x.buf_len());
1029        let output_ptr = this
1030            .output_buffer
1031            .as_mut()
1032            .map_or(std::ptr::null_mut(), |x| x.as_buf_mut_ptr());
1033        let mut transferred = 0;
1034        let res = DeviceIoControl(
1035            fd,
1036            this.ioctl_code,
1037            input_ptr as _,
1038            input_len as _,
1039            output_ptr as _,
1040            output_len as _,
1041            &mut transferred,
1042            optr,
1043        );
1044        win32_result(res, transferred)
1045    }
1046
1047    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1048        cancel(self.fd.as_fd().as_raw_fd(), optr)
1049    }
1050}