compio_driver/iocp/
op.rs

1#[cfg(feature = "once_cell_try")]
2use std::sync::OnceLock;
3use std::{
4    io,
5    marker::PhantomPinned,
6    net::Shutdown,
7    os::windows::io::AsRawSocket,
8    pin::Pin,
9    ptr::{null, null_mut, read_unaligned},
10    task::Poll,
11};
12
13use compio_buf::{
14    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
15};
16#[cfg(not(feature = "once_cell_try"))]
17use once_cell::sync::OnceCell as OnceLock;
18use socket2::{SockAddr, SockAddrStorage};
19use windows_sys::{
20    Win32::{
21        Foundation::{
22            CloseHandle, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
23            ERROR_IO_PENDING, ERROR_NETNAME_DELETED, ERROR_NO_DATA, ERROR_NOT_FOUND,
24            ERROR_PIPE_CONNECTED, ERROR_PIPE_NOT_CONNECTED, GetLastError,
25        },
26        Networking::WinSock::{
27            CMSGHDR, LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, LPFN_WSARECVMSG,
28            SD_BOTH, SD_RECEIVE, SD_SEND, SIO_GET_EXTENSION_FUNCTION_POINTER,
29            SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, SOCKADDR_STORAGE,
30            SOL_SOCKET, WSABUF, WSAID_ACCEPTEX, WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS,
31            WSAID_WSARECVMSG, WSAIoctl, WSAMSG, WSARecv, WSARecvFrom, WSASend, WSASendMsg,
32            WSASendTo, closesocket, setsockopt, shutdown, socklen_t,
33        },
34        Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile},
35        System::{
36            IO::{CancelIoEx, DeviceIoControl, OVERLAPPED},
37            Pipes::ConnectNamedPipe,
38        },
39    },
40    core::GUID,
41};
42
43use crate::{AsFd, AsRawFd, OpCode, OpType, RawFd, op::*, syscall};
44
45#[inline]
46fn winapi_result(transferred: u32) -> Poll<io::Result<usize>> {
47    let error = unsafe { GetLastError() };
48    assert_ne!(error, 0);
49    match error {
50        ERROR_IO_PENDING => Poll::Pending,
51        ERROR_IO_INCOMPLETE
52        | ERROR_NETNAME_DELETED
53        | ERROR_HANDLE_EOF
54        | ERROR_BROKEN_PIPE
55        | ERROR_PIPE_CONNECTED
56        | ERROR_PIPE_NOT_CONNECTED
57        | ERROR_NO_DATA => Poll::Ready(Ok(transferred as _)),
58        _ => Poll::Ready(Err(io::Error::from_raw_os_error(error as _))),
59    }
60}
61
62#[inline]
63fn win32_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
64    if res == 0 {
65        winapi_result(transferred)
66    } else {
67        Poll::Ready(Ok(transferred as _))
68    }
69}
70
71#[inline]
72fn winsock_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
73    if res != 0 {
74        winapi_result(transferred)
75    } else {
76        Poll::Ready(Ok(transferred as _))
77    }
78}
79
80#[inline]
81fn cancel(handle: RawFd, optr: *mut OVERLAPPED) -> io::Result<()> {
82    match syscall!(BOOL, CancelIoEx(handle as _, optr)) {
83        Ok(_) => Ok(()),
84        Err(e) => {
85            if e.raw_os_error() == Some(ERROR_NOT_FOUND as _) {
86                Ok(())
87            } else {
88                Err(e)
89            }
90        }
91    }
92}
93
94fn get_wsa_fn<F>(handle: RawFd, fguid: GUID) -> io::Result<Option<F>> {
95    let mut fptr = None;
96    let mut returned = 0;
97    syscall!(
98        SOCKET,
99        WSAIoctl(
100            handle as _,
101            SIO_GET_EXTENSION_FUNCTION_POINTER,
102            std::ptr::addr_of!(fguid).cast(),
103            std::mem::size_of_val(&fguid) as _,
104            std::ptr::addr_of_mut!(fptr).cast(),
105            std::mem::size_of::<F>() as _,
106            &mut returned,
107            null_mut(),
108            None,
109        )
110    )?;
111    Ok(fptr)
112}
113
114impl<
115    D: std::marker::Send + 'static,
116    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
117> OpCode for Asyncify<F, D>
118{
119    fn op_type(&self) -> OpType {
120        OpType::Blocking
121    }
122
123    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
124        // SAFETY: self won't be moved
125        let this = unsafe { self.get_unchecked_mut() };
126        let f = this
127            .f
128            .take()
129            .expect("the operate method could only be called once");
130        let BufResult(res, data) = f();
131        this.data = Some(data);
132        Poll::Ready(res)
133    }
134}
135
136impl<
137    S,
138    D: std::marker::Send + 'static,
139    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
140> OpCode for AsyncifyFd<S, F, D>
141{
142    fn op_type(&self) -> OpType {
143        OpType::Blocking
144    }
145
146    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
147        // SAFETY: self won't be moved
148        let this = unsafe { self.get_unchecked_mut() };
149        let f = this
150            .f
151            .take()
152            .expect("the operate method could only be called once");
153        let BufResult(res, data) = f(&this.fd);
154        this.data = Some(data);
155        Poll::Ready(res)
156    }
157}
158
159impl OpCode for CloseFile {
160    fn op_type(&self) -> OpType {
161        OpType::Blocking
162    }
163
164    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
165        Poll::Ready(Ok(
166            syscall!(BOOL, CloseHandle(self.fd.as_fd().as_raw_fd()))? as _,
167        ))
168    }
169}
170
171impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
172    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
173        if let Some(overlapped) = unsafe { optr.as_mut() } {
174            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
175            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
176        }
177        let fd = self.fd.as_fd().as_raw_fd();
178        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
179        let mut transferred = 0;
180        let res = unsafe {
181            ReadFile(
182                fd,
183                slice.as_mut_ptr() as _,
184                slice.len() as _,
185                &mut transferred,
186                optr,
187            )
188        };
189        win32_result(res, transferred)
190    }
191
192    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
193        cancel(self.fd.as_fd().as_raw_fd(), optr)
194    }
195}
196
197impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
198    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
199        if let Some(overlapped) = unsafe { optr.as_mut() } {
200            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
201            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
202        }
203        let slice = self.buffer.as_slice();
204        let mut transferred = 0;
205        let res = unsafe {
206            WriteFile(
207                self.fd.as_fd().as_raw_fd(),
208                slice.as_ptr() as _,
209                slice.len() as _,
210                &mut transferred,
211                optr,
212            )
213        };
214        win32_result(res, transferred)
215    }
216
217    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
218        cancel(self.fd.as_fd().as_raw_fd(), optr)
219    }
220}
221
222impl<S: AsFd> OpCode for ReadManagedAt<S> {
223    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
224        unsafe { self.map_unchecked_mut(|this| &mut this.op).operate(optr) }
225    }
226
227    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
228        unsafe { self.map_unchecked_mut(|this| &mut this.op).cancel(optr) }
229    }
230}
231
232impl<S: AsFd> OpCode for Sync<S> {
233    fn op_type(&self) -> OpType {
234        OpType::Blocking
235    }
236
237    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
238        Poll::Ready(Ok(
239            syscall!(BOOL, FlushFileBuffers(self.fd.as_fd().as_raw_fd()))? as _,
240        ))
241    }
242}
243
244impl<S: AsFd> OpCode for ShutdownSocket<S> {
245    fn op_type(&self) -> OpType {
246        OpType::Blocking
247    }
248
249    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
250        let how = match self.how {
251            Shutdown::Write => SD_SEND,
252            Shutdown::Read => SD_RECEIVE,
253            Shutdown::Both => SD_BOTH,
254        };
255        Poll::Ready(Ok(
256            syscall!(SOCKET, shutdown(self.fd.as_fd().as_raw_fd() as _, how))? as _,
257        ))
258    }
259}
260
261impl OpCode for CloseSocket {
262    fn op_type(&self) -> OpType {
263        OpType::Blocking
264    }
265
266    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
267        Poll::Ready(Ok(
268            syscall!(SOCKET, closesocket(self.fd.as_fd().as_raw_fd() as _))? as _,
269        ))
270    }
271}
272
273static ACCEPT_EX: OnceLock<LPFN_ACCEPTEX> = OnceLock::new();
274static GET_ADDRS: OnceLock<LPFN_GETACCEPTEXSOCKADDRS> = OnceLock::new();
275
276const ACCEPT_ADDR_BUFFER_SIZE: usize = std::mem::size_of::<SOCKADDR_STORAGE>() + 16;
277const ACCEPT_BUFFER_SIZE: usize = ACCEPT_ADDR_BUFFER_SIZE * 2;
278
279/// Accept a connection.
280pub struct Accept<S> {
281    pub(crate) fd: S,
282    pub(crate) accept_fd: socket2::Socket,
283    pub(crate) buffer: [u8; ACCEPT_BUFFER_SIZE],
284    _p: PhantomPinned,
285}
286
287impl<S> Accept<S> {
288    /// Create [`Accept`]. `accept_fd` should not be bound.
289    pub fn new(fd: S, accept_fd: socket2::Socket) -> Self {
290        Self {
291            fd,
292            accept_fd,
293            buffer: [0u8; ACCEPT_BUFFER_SIZE],
294            _p: PhantomPinned,
295        }
296    }
297}
298
299impl<S: AsFd> Accept<S> {
300    /// Update accept context.
301    pub fn update_context(&self) -> io::Result<()> {
302        let fd = self.fd.as_fd().as_raw_fd();
303        syscall!(
304            SOCKET,
305            setsockopt(
306                self.accept_fd.as_raw_socket() as _,
307                SOL_SOCKET,
308                SO_UPDATE_ACCEPT_CONTEXT,
309                &fd as *const _ as _,
310                std::mem::size_of_val(&fd) as _,
311            )
312        )?;
313        Ok(())
314    }
315
316    /// Get the remote address from the inner buffer.
317    pub fn into_addr(self) -> io::Result<(socket2::Socket, SockAddr)> {
318        let get_addrs_fn = GET_ADDRS
319            .get_or_try_init(|| {
320                get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_GETACCEPTEXSOCKADDRS)
321            })?
322            .ok_or_else(|| {
323                io::Error::new(
324                    io::ErrorKind::Unsupported,
325                    "cannot retrieve GetAcceptExSockAddrs",
326                )
327            })?;
328        let mut local_addr: *mut SOCKADDR = null_mut();
329        let mut local_addr_len = 0;
330        let mut remote_addr: *mut SOCKADDR = null_mut();
331        let mut remote_addr_len = 0;
332        unsafe {
333            get_addrs_fn(
334                &self.buffer as *const _ as *const _,
335                0,
336                ACCEPT_ADDR_BUFFER_SIZE as _,
337                ACCEPT_ADDR_BUFFER_SIZE as _,
338                &mut local_addr,
339                &mut local_addr_len,
340                &mut remote_addr,
341                &mut remote_addr_len,
342            );
343        }
344        Ok((self.accept_fd, unsafe {
345            SockAddr::new(
346                // SAFETY: the buffer is large enough to hold the address
347                std::mem::transmute::<SOCKADDR_STORAGE, SockAddrStorage>(read_unaligned(
348                    remote_addr.cast::<SOCKADDR_STORAGE>(),
349                )),
350                remote_addr_len,
351            )
352        }))
353    }
354}
355
356impl<S: AsFd> OpCode for Accept<S> {
357    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
358        let accept_fn = ACCEPT_EX
359            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_ACCEPTEX))?
360            .ok_or_else(|| {
361                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve AcceptEx")
362            })?;
363        let mut received = 0;
364        let res = unsafe {
365            accept_fn(
366                self.fd.as_fd().as_raw_fd() as _,
367                self.accept_fd.as_raw_socket() as _,
368                self.get_unchecked_mut().buffer.as_mut_ptr() as _,
369                0,
370                ACCEPT_ADDR_BUFFER_SIZE as _,
371                ACCEPT_ADDR_BUFFER_SIZE as _,
372                &mut received,
373                optr,
374            )
375        };
376        win32_result(res, received)
377    }
378
379    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
380        cancel(self.fd.as_fd().as_raw_fd(), optr)
381    }
382}
383
384static CONNECT_EX: OnceLock<LPFN_CONNECTEX> = OnceLock::new();
385
386impl<S: AsFd> Connect<S> {
387    /// Update connect context.
388    pub fn update_context(&self) -> io::Result<()> {
389        syscall!(
390            SOCKET,
391            setsockopt(
392                self.fd.as_fd().as_raw_fd() as _,
393                SOL_SOCKET,
394                SO_UPDATE_CONNECT_CONTEXT,
395                null(),
396                0,
397            )
398        )?;
399        Ok(())
400    }
401}
402
403impl<S: AsFd> OpCode for Connect<S> {
404    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
405        let connect_fn = CONNECT_EX
406            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_CONNECTEX))?
407            .ok_or_else(|| {
408                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve ConnectEx")
409            })?;
410        let mut sent = 0;
411        let res = unsafe {
412            connect_fn(
413                self.fd.as_fd().as_raw_fd() as _,
414                self.addr.as_ptr().cast(),
415                self.addr.len(),
416                null(),
417                0,
418                &mut sent,
419                optr,
420            )
421        };
422        win32_result(res, sent)
423    }
424
425    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
426        cancel(self.fd.as_fd().as_raw_fd(), optr)
427    }
428}
429
430/// Receive data from remote.
431pub struct Recv<T: IoBufMut, S> {
432    pub(crate) fd: S,
433    pub(crate) buffer: T,
434    _p: PhantomPinned,
435}
436
437impl<T: IoBufMut, S> Recv<T, S> {
438    /// Create [`Recv`].
439    pub fn new(fd: S, buffer: T) -> Self {
440        Self {
441            fd,
442            buffer,
443            _p: PhantomPinned,
444        }
445    }
446}
447
448impl<T: IoBufMut, S> IntoInner for Recv<T, S> {
449    type Inner = T;
450
451    fn into_inner(self) -> Self::Inner {
452        self.buffer
453    }
454}
455
456impl<S: AsFd> OpCode for RecvManaged<S> {
457    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
458        unsafe { self.map_unchecked_mut(|this| &mut this.op).operate(optr) }
459    }
460
461    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
462        unsafe { self.map_unchecked_mut(|this| &mut this.op).cancel(optr) }
463    }
464}
465
466impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
467    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
468        let fd = self.fd.as_fd().as_raw_fd();
469        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
470        let mut transferred = 0;
471        let res = unsafe {
472            ReadFile(
473                fd,
474                slice.as_mut_ptr() as _,
475                slice.len() as _,
476                &mut transferred,
477                optr,
478            )
479        };
480        win32_result(res, transferred)
481    }
482
483    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
484        cancel(self.fd.as_fd().as_raw_fd(), optr)
485    }
486}
487
488/// Receive data from remote into vectored buffer.
489pub struct RecvVectored<T: IoVectoredBufMut, S> {
490    pub(crate) fd: S,
491    pub(crate) buffer: T,
492    _p: PhantomPinned,
493}
494
495impl<T: IoVectoredBufMut, S> RecvVectored<T, S> {
496    /// Create [`RecvVectored`].
497    pub fn new(fd: S, buffer: T) -> Self {
498        Self {
499            fd,
500            buffer,
501            _p: PhantomPinned,
502        }
503    }
504}
505
506impl<T: IoVectoredBufMut, S> IntoInner for RecvVectored<T, S> {
507    type Inner = T;
508
509    fn into_inner(self) -> Self::Inner {
510        self.buffer
511    }
512}
513
514impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
515    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
516        let fd = self.fd.as_fd().as_raw_fd();
517        let slices = unsafe { self.get_unchecked_mut().buffer.io_slices_mut() };
518        let mut flags = 0;
519        let mut received = 0;
520        let res = unsafe {
521            WSARecv(
522                fd as _,
523                slices.as_ptr() as _,
524                slices.len() as _,
525                &mut received,
526                &mut flags,
527                optr,
528                None,
529            )
530        };
531        winsock_result(res, received)
532    }
533
534    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
535        cancel(self.fd.as_fd().as_raw_fd(), optr)
536    }
537}
538
539/// Send data to remote.
540pub struct Send<T: IoBuf, S> {
541    pub(crate) fd: S,
542    pub(crate) buffer: T,
543    _p: PhantomPinned,
544}
545
546impl<T: IoBuf, S> Send<T, S> {
547    /// Create [`Send`].
548    pub fn new(fd: S, buffer: T) -> Self {
549        Self {
550            fd,
551            buffer,
552            _p: PhantomPinned,
553        }
554    }
555}
556
557impl<T: IoBuf, S> IntoInner for Send<T, S> {
558    type Inner = T;
559
560    fn into_inner(self) -> Self::Inner {
561        self.buffer
562    }
563}
564
565impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
566    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
567        let slice = self.buffer.as_slice();
568        let mut transferred = 0;
569        let res = unsafe {
570            WriteFile(
571                self.fd.as_fd().as_raw_fd(),
572                slice.as_ptr() as _,
573                slice.len() as _,
574                &mut transferred,
575                optr,
576            )
577        };
578        win32_result(res, transferred)
579    }
580
581    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
582        cancel(self.fd.as_fd().as_raw_fd(), optr)
583    }
584}
585
586/// Send data to remote from vectored buffer.
587pub struct SendVectored<T: IoVectoredBuf, S> {
588    pub(crate) fd: S,
589    pub(crate) buffer: T,
590    _p: PhantomPinned,
591}
592
593impl<T: IoVectoredBuf, S> SendVectored<T, S> {
594    /// Create [`SendVectored`].
595    pub fn new(fd: S, buffer: T) -> Self {
596        Self {
597            fd,
598            buffer,
599            _p: PhantomPinned,
600        }
601    }
602}
603
604impl<T: IoVectoredBuf, S> IntoInner for SendVectored<T, S> {
605    type Inner = T;
606
607    fn into_inner(self) -> Self::Inner {
608        self.buffer
609    }
610}
611
612impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
613    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
614        let slices = unsafe { self.buffer.io_slices() };
615        let mut sent = 0;
616        let res = unsafe {
617            WSASend(
618                self.fd.as_fd().as_raw_fd() as _,
619                slices.as_ptr() as _,
620                slices.len() as _,
621                &mut sent,
622                0,
623                optr,
624                None,
625            )
626        };
627        winsock_result(res, sent)
628    }
629
630    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
631        cancel(self.fd.as_fd().as_raw_fd(), optr)
632    }
633}
634
635/// Receive data and source address.
636pub struct RecvFrom<T: IoBufMut, S> {
637    pub(crate) fd: S,
638    pub(crate) buffer: T,
639    pub(crate) addr: SockAddrStorage,
640    pub(crate) addr_len: socklen_t,
641    _p: PhantomPinned,
642}
643
644impl<T: IoBufMut, S> RecvFrom<T, S> {
645    /// Create [`RecvFrom`].
646    pub fn new(fd: S, buffer: T) -> Self {
647        let addr = SockAddrStorage::zeroed();
648        let addr_len = addr.size_of();
649        Self {
650            fd,
651            buffer,
652            addr,
653            addr_len,
654            _p: PhantomPinned,
655        }
656    }
657}
658
659impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
660    type Inner = (T, SockAddrStorage, socklen_t);
661
662    fn into_inner(self) -> Self::Inner {
663        (self.buffer, self.addr, self.addr_len)
664    }
665}
666
667impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
668    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
669        let this = unsafe { self.get_unchecked_mut() };
670        let fd = this.fd.as_fd().as_raw_fd();
671        let buffer = unsafe { this.buffer.as_io_slice_mut() };
672        let mut flags = 0;
673        let mut received = 0;
674        let res = unsafe {
675            WSARecvFrom(
676                fd as _,
677                &buffer as *const _ as _,
678                1,
679                &mut received,
680                &mut flags,
681                &mut this.addr as *mut _ as *mut SOCKADDR,
682                &mut this.addr_len,
683                optr,
684                None,
685            )
686        };
687        winsock_result(res, received)
688    }
689
690    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
691        cancel(self.fd.as_fd().as_raw_fd(), optr)
692    }
693}
694
695/// Receive data and source address into vectored buffer.
696pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
697    pub(crate) fd: S,
698    pub(crate) buffer: T,
699    pub(crate) addr: SockAddrStorage,
700    pub(crate) addr_len: socklen_t,
701    _p: PhantomPinned,
702}
703
704impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
705    /// Create [`RecvFromVectored`].
706    pub fn new(fd: S, buffer: T) -> Self {
707        let addr = SockAddrStorage::zeroed();
708        let addr_len = addr.size_of();
709        Self {
710            fd,
711            buffer,
712            addr,
713            addr_len,
714            _p: PhantomPinned,
715        }
716    }
717}
718
719impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
720    type Inner = (T, SockAddrStorage, socklen_t);
721
722    fn into_inner(self) -> Self::Inner {
723        (self.buffer, self.addr, self.addr_len)
724    }
725}
726
727impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
728    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
729        let this = unsafe { self.get_unchecked_mut() };
730        let fd = this.fd.as_fd().as_raw_fd();
731        let buffer = unsafe { this.buffer.io_slices_mut() };
732        let mut flags = 0;
733        let mut received = 0;
734        let res = unsafe {
735            WSARecvFrom(
736                fd as _,
737                buffer.as_ptr() as _,
738                buffer.len() as _,
739                &mut received,
740                &mut flags,
741                &mut this.addr as *mut _ as *mut SOCKADDR,
742                &mut this.addr_len,
743                optr,
744                None,
745            )
746        };
747        winsock_result(res, received)
748    }
749
750    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
751        cancel(self.fd.as_fd().as_raw_fd(), optr)
752    }
753}
754
755/// Send data to specified address.
756pub struct SendTo<T: IoBuf, S> {
757    pub(crate) fd: S,
758    pub(crate) buffer: T,
759    pub(crate) addr: SockAddr,
760    _p: PhantomPinned,
761}
762
763impl<T: IoBuf, S> SendTo<T, S> {
764    /// Create [`SendTo`].
765    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
766        Self {
767            fd,
768            buffer,
769            addr,
770            _p: PhantomPinned,
771        }
772    }
773}
774
775impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
776    type Inner = T;
777
778    fn into_inner(self) -> Self::Inner {
779        self.buffer
780    }
781}
782
783impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
784    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
785        let buffer = unsafe { self.buffer.as_io_slice() };
786        let mut sent = 0;
787        let res = unsafe {
788            WSASendTo(
789                self.fd.as_fd().as_raw_fd() as _,
790                &buffer as *const _ as _,
791                1,
792                &mut sent,
793                0,
794                self.addr.as_ptr().cast(),
795                self.addr.len(),
796                optr,
797                None,
798            )
799        };
800        winsock_result(res, sent)
801    }
802
803    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
804        cancel(self.fd.as_fd().as_raw_fd(), optr)
805    }
806}
807
808/// Send data to specified address from vectored buffer.
809pub struct SendToVectored<T: IoVectoredBuf, S> {
810    pub(crate) fd: S,
811    pub(crate) buffer: T,
812    pub(crate) addr: SockAddr,
813    _p: PhantomPinned,
814}
815
816impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
817    /// Create [`SendToVectored`].
818    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
819        Self {
820            fd,
821            buffer,
822            addr,
823            _p: PhantomPinned,
824        }
825    }
826}
827
828impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
829    type Inner = T;
830
831    fn into_inner(self) -> Self::Inner {
832        self.buffer
833    }
834}
835
836impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
837    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
838        let buffer = unsafe { self.buffer.io_slices() };
839        let mut sent = 0;
840        let res = unsafe {
841            WSASendTo(
842                self.fd.as_fd().as_raw_fd() as _,
843                buffer.as_ptr() as _,
844                buffer.len() as _,
845                &mut sent,
846                0,
847                self.addr.as_ptr().cast(),
848                self.addr.len(),
849                optr,
850                None,
851            )
852        };
853        winsock_result(res, sent)
854    }
855
856    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
857        cancel(self.fd.as_fd().as_raw_fd(), optr)
858    }
859}
860
861static WSA_RECVMSG: OnceLock<LPFN_WSARECVMSG> = OnceLock::new();
862
863/// Receive data and source address with ancillary data into vectored buffer.
864pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
865    msg: WSAMSG,
866    addr: SockAddrStorage,
867    fd: S,
868    buffer: T,
869    control: C,
870    _p: PhantomPinned,
871}
872
873impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
874    /// Create [`RecvMsg`].
875    ///
876    /// # Panics
877    ///
878    /// This function will panic if the control message buffer is misaligned.
879    pub fn new(fd: S, buffer: T, control: C) -> Self {
880        assert!(
881            control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
882            "misaligned control message buffer"
883        );
884        let addr = SockAddrStorage::zeroed();
885        Self {
886            msg: unsafe { std::mem::zeroed() },
887            addr,
888            fd,
889            buffer,
890            control,
891            _p: PhantomPinned,
892        }
893    }
894}
895
896impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
897    type Inner = ((T, C), SockAddrStorage, socklen_t, usize);
898
899    fn into_inner(self) -> Self::Inner {
900        (
901            (self.buffer, self.control),
902            self.addr,
903            self.msg.namelen,
904            self.msg.Control.len as _,
905        )
906    }
907}
908
909impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
910    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
911        let recvmsg_fn = WSA_RECVMSG
912            .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_WSARECVMSG))?
913            .ok_or_else(|| {
914                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve WSARecvMsg")
915            })?;
916
917        let this = unsafe { self.get_unchecked_mut() };
918
919        let mut slices = unsafe { this.buffer.io_slices_mut() };
920        this.msg.name = &mut this.addr as *mut _ as _;
921        this.msg.namelen = std::mem::size_of::<SOCKADDR_STORAGE>() as _;
922        this.msg.lpBuffers = slices.as_mut_ptr() as _;
923        this.msg.dwBufferCount = slices.len() as _;
924        // SAFETY: `IoSliceMut` is promised to be the same as `WSABUF`
925        this.msg.Control =
926            unsafe { std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut()) };
927
928        let mut received = 0;
929        let res = unsafe {
930            recvmsg_fn(
931                this.fd.as_fd().as_raw_fd() as _,
932                &mut this.msg,
933                &mut received,
934                optr,
935                None,
936            )
937        };
938        winsock_result(res, received)
939    }
940
941    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
942        cancel(self.fd.as_fd().as_raw_fd(), optr)
943    }
944}
945
946/// Send data to specified address accompanied by ancillary data from vectored
947/// buffer.
948pub struct SendMsg<T: IoVectoredBuf, C: IoBuf, S> {
949    fd: S,
950    buffer: T,
951    control: C,
952    addr: SockAddr,
953    _p: PhantomPinned,
954}
955
956impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
957    /// Create [`SendMsg`].
958    ///
959    /// # Panics
960    ///
961    /// This function will panic if the control message buffer is misaligned.
962    pub fn new(fd: S, buffer: T, control: C, addr: SockAddr) -> Self {
963        assert!(
964            control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
965            "misaligned control message buffer"
966        );
967        Self {
968            fd,
969            buffer,
970            control,
971            addr,
972            _p: PhantomPinned,
973        }
974    }
975}
976
977impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsg<T, C, S> {
978    type Inner = (T, C);
979
980    fn into_inner(self) -> Self::Inner {
981        (self.buffer, self.control)
982    }
983}
984
985impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
986    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
987        let this = unsafe { self.get_unchecked_mut() };
988
989        let slices = unsafe { this.buffer.io_slices() };
990        let msg = WSAMSG {
991            name: this.addr.as_ptr() as _,
992            namelen: this.addr.len(),
993            lpBuffers: slices.as_ptr() as _,
994            dwBufferCount: slices.len() as _,
995            // SAFETY: `IoSlice` is promised to be the same as `WSABUF`
996            Control: unsafe { std::mem::transmute::<IoSlice, WSABUF>(this.control.as_io_slice()) },
997            dwFlags: 0,
998        };
999
1000        let mut sent = 0;
1001        let res = unsafe {
1002            WSASendMsg(
1003                this.fd.as_fd().as_raw_fd() as _,
1004                &msg,
1005                0,
1006                &mut sent,
1007                optr,
1008                None,
1009            )
1010        };
1011        winsock_result(res, sent)
1012    }
1013
1014    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1015        cancel(self.fd.as_fd().as_raw_fd(), optr)
1016    }
1017}
1018
1019/// Connect a named pipe server.
1020pub struct ConnectNamedPipe<S> {
1021    pub(crate) fd: S,
1022}
1023
1024impl<S> ConnectNamedPipe<S> {
1025    /// Create [`ConnectNamedPipe`](struct@ConnectNamedPipe).
1026    pub fn new(fd: S) -> Self {
1027        Self { fd }
1028    }
1029}
1030
1031impl<S: AsFd> OpCode for ConnectNamedPipe<S> {
1032    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
1033        let res = unsafe { ConnectNamedPipe(self.fd.as_fd().as_raw_fd() as _, optr) };
1034        win32_result(res, 0)
1035    }
1036
1037    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1038        cancel(self.fd.as_fd().as_raw_fd(), optr)
1039    }
1040}
1041
1042/// Send a control code to a device.
1043pub struct DeviceIoControl<S, I: IoBuf, O: IoBufMut> {
1044    pub(crate) fd: S,
1045    pub(crate) ioctl_code: u32,
1046    pub(crate) input_buffer: Option<I>,
1047    pub(crate) output_buffer: Option<O>,
1048    _p: PhantomPinned,
1049}
1050
1051impl<S, I: IoBuf, O: IoBufMut> DeviceIoControl<S, I, O> {
1052    /// Create [`DeviceIoControl`].
1053    pub fn new(fd: S, ioctl_code: u32, input_buffer: Option<I>, output_buffer: Option<O>) -> Self {
1054        Self {
1055            fd,
1056            ioctl_code,
1057            input_buffer,
1058            output_buffer,
1059            _p: PhantomPinned,
1060        }
1061    }
1062}
1063
1064impl<S, I: IoBuf, O: IoBufMut> IntoInner for DeviceIoControl<S, I, O> {
1065    type Inner = (Option<I>, Option<O>);
1066
1067    fn into_inner(self) -> Self::Inner {
1068        (self.input_buffer, self.output_buffer)
1069    }
1070}
1071
1072impl<S: AsFd, I: IoBuf, O: IoBufMut> OpCode for DeviceIoControl<S, I, O> {
1073    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
1074        let this = unsafe { self.get_unchecked_mut() };
1075        let fd = this.fd.as_fd().as_raw_fd();
1076        let input_len = this.input_buffer.as_ref().map_or(0, |x| x.buf_len());
1077        let input_ptr = this
1078            .input_buffer
1079            .as_ref()
1080            .map_or(std::ptr::null(), |x| x.as_buf_ptr());
1081        let output_len = this.output_buffer.as_ref().map_or(0, |x| x.buf_len());
1082        let output_ptr = this
1083            .output_buffer
1084            .as_mut()
1085            .map_or(std::ptr::null_mut(), |x| x.as_buf_mut_ptr());
1086        let mut transferred = 0;
1087        let res = unsafe {
1088            DeviceIoControl(
1089                fd,
1090                this.ioctl_code,
1091                input_ptr as _,
1092                input_len as _,
1093                output_ptr as _,
1094                output_len as _,
1095                &mut transferred,
1096                optr,
1097            )
1098        };
1099        win32_result(res, transferred)
1100    }
1101
1102    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
1103        cancel(self.fd.as_fd().as_raw_fd(), optr)
1104    }
1105}