Skip to main content

uds_fork/
windows_unixstream.rs

1
2use std::
3{
4    cmp::min, ffi::{c_int, c_void}, fs, io::{self, ErrorKind}, mem::{self, MaybeUninit}, net::Shutdown, os::windows::io::
5    {
6        AsRawSocket, AsSocket, BorrowedSocket, FromRawSocket, IntoRawSocket, OwnedSocket, RawSocket
7    }, path::Path, ptr, sync::{LazyLock, OnceLock, atomic::{AtomicU64, Ordering}, mpsc}, thread::{self, JoinHandle}, time::Duration
8};
9
10use windows_sys::
11{
12    Win32::{Foundation::{HANDLE, HANDLE_FLAG_INHERIT, SetHandleInformation}, Networking::WinSock::
13    {
14        AF_UNIX, FIONBIO, INVALID_SOCKET, MSG_TRUNC, SD_BOTH, SD_RECEIVE, SD_SEND, SO_ERROR, SO_RCVTIMEO, SO_SNDTIMEO, SO_TYPE, SOCK_STREAM, SOCKADDR as sockaddr, SOCKADDR_UN, SOCKET_ERROR, SOL_SOCKET, TIMEVAL, WSA_FLAG_NO_HANDLE_INHERIT, WSA_FLAG_OVERLAPPED, WSACleanup, WSADATA, WSAEMSGSIZE, WSAESHUTDOWN, WSAPROTOCOL_INFOW, WSARecv, WSASend, WSASocketW, WSAStartup, accept, bind, connect, getpeername, getsockname, getsockopt, ioctlsocket, listen, recv, send, shutdown, socket, socklen_t
15    }}, 
16    core::PCSTR
17};
18
19use crate::{LISTEN_BACKLOG, UnixSocketAddr};
20
21
22fn create_socket() -> io::Result<OwnedSocket>
23{
24     let socket_res = 
25        unsafe 
26        { 
27            WSASocketW(
28                AF_UNIX as i32, 
29                SOCK_STREAM, 
30                0, 
31                ptr::null(), 
32                0, 
33                WSA_FLAG_NO_HANDLE_INHERIT | WSA_FLAG_OVERLAPPED
34            ) 
35        };
36
37    if socket_res != INVALID_SOCKET
38    {
39        return Ok(
40            unsafe{ OwnedSocket::from_raw_socket(socket_res as u64) }
41        );
42    }
43    else
44    {
45        return Err(io::Error::last_os_error());
46    }
47}
48
49fn connect_socket<S: AsRawSocket>(so: &S, addr: &UnixSocketAddr) -> io::Result<()>
50{
51    let sa =  unsafe{ addr.as_raw_ptr_general() };
52
53    let res = unsafe { connect(so.as_raw_socket() as usize, sa.0, sa.1) };
54
55    if res != SOCKET_ERROR
56    {
57        return Ok(());
58    }
59    else
60    {
61        return Err(io::Error::last_os_error());
62    }
63}
64
65fn bind_socket<S: AsRawSocket>(so: &S, addr: &UnixSocketAddr) -> io::Result<()>
66{
67    let sa =  unsafe{ addr.as_raw_ptr_general() };
68
69    let res = unsafe{ bind(so.as_raw_socket() as usize, sa.0, sa.1) };
70
71    if res != SOCKET_ERROR
72    {
73        return Ok(());
74    }
75    else
76    {
77        return Err(io::Error::last_os_error());
78    }
79}
80
81fn listen_socket<S: AsRawSocket>(so: &S, backlog: i32) -> io::Result<()>
82{
83    let res = unsafe{ listen(so.as_raw_socket() as usize, backlog) };
84
85    if res != SOCKET_ERROR
86    {
87        return Ok(());
88    }
89    else
90    {
91        return Err(io::Error::last_os_error());
92    }
93}
94
95/// Safe wrapper around `getsockname()` or `getpeername()`. 
96fn get_unix_local_addr<FD>(socket: &FD) -> Result<UnixSocketAddr, io::Error> 
97where FD: AsRawSocket
98{
99    unsafe 
100    {
101        UnixSocketAddr::new_from_ffi(
102            |addr_ptr, addr_len| 
103            {
104                if getsockname(socket.as_raw_socket() as usize, addr_ptr, addr_len) != SOCKET_ERROR
105                {
106                    Ok(())
107                }
108                else
109                {
110                    Err(io::Error::last_os_error())
111                }
112            }
113        )
114        .map(|((), addr)| addr )
115    }
116}
117
118
119fn get_unix_peer_addr<FD>(socket: &FD) -> Result<UnixSocketAddr, io::Error> 
120where FD: AsRawSocket
121{
122    unsafe 
123    {
124        UnixSocketAddr::new_from_ffi(
125            |addr_ptr, addr_len| 
126            {
127                if getpeername(socket.as_raw_socket() as usize, addr_ptr, addr_len)  != SOCKET_ERROR
128                {
129                    Ok(())
130                }
131                else
132                {
133                    Err(io::Error::last_os_error())
134                }
135            }
136        )
137        .map(|((), addr)| addr )
138    }
139}
140
141pub 
142fn get_socket_family<S: AsRawSocket>(fd: &S) -> io::Result<u16>
143{
144    let mut optval: MaybeUninit<SOCKADDR_UN> = MaybeUninit::zeroed();
145    let mut len = size_of::<SOCKADDR_UN>() as socklen_t;
146    
147
148    let res = 
149        unsafe
150        {
151            getsockname(fd.as_raw_socket() as usize,optval.as_mut_ptr().cast(), &mut len)
152        };
153
154    // more likely it will not fail
155    if res != SOCKET_ERROR 
156    {
157        return Ok(unsafe { optval.assume_init() }.sun_family);
158    }
159    else
160    {
161        return Err(io::Error::last_os_error());
162    }
163}
164
165pub 
166fn get_socket_type<S: AsRawSocket>(fd: &S) -> io::Result<c_int>
167{
168    let mut optval: MaybeUninit<c_int> = MaybeUninit::zeroed();
169    let mut len = size_of::<c_int>() as socklen_t;
170
171    let res = 
172        unsafe
173        {
174            getsockopt(fd.as_raw_socket() as usize, SOL_SOCKET, SO_TYPE,
175                optval.as_mut_ptr().cast(),&mut len,
176            )
177        };
178
179    // more likely it will not fail
180    if res == 0
181    {
182        if len as usize != size_of::<c_int>()
183        {
184            return Err(
185                std::io::Error::new(
186                    ErrorKind::Other, 
187                    format!("assertion trap: returned data len mispatch {} != {}",
188                            len, size_of::<c_int>())
189                )
190            );
191        }
192
193        return Ok(unsafe { optval.assume_init() });
194    }
195    else
196    {
197        return Err(io::Error::last_os_error());
198    }
199}
200
201/// Safe wrapper around `getsockopt(SO_ERROR)`.
202fn take_error<FD>(socket: &FD) -> Result<Option<io::Error>, io::Error> 
203where FD: AsRawSocket
204{
205    let mut stored_errno: c_int = 0;
206    let mut optlen = mem::size_of::<c_int>() as socklen_t;
207    let dst_ptr = &mut stored_errno as *mut c_int as *mut u8;
208    
209    unsafe 
210    {
211        if getsockopt(socket.as_raw_socket() as usize, SOL_SOCKET, SO_ERROR, dst_ptr, &mut optlen) == -1 
212        {
213            Err(io::Error::last_os_error())
214        } 
215        else if optlen != mem::size_of::<c_int>() as socklen_t 
216        {
217            // std panics here
218            Err(
219                io::Error::new(
220                    ErrorKind::InvalidData,
221                    "got unexpected length from getsockopt(SO_ERROR)"
222                )
223            )
224        } 
225        else if stored_errno == 0 
226        {
227            Ok(None)
228        } 
229        else 
230        {
231            Ok(Some(io::Error::last_os_error()))
232        }
233    }
234}
235
236fn set_nonblocking<FD: AsRawSocket>(so: &FD,  nonblocking: bool) -> Result<(), io::Error> 
237{
238    let mut nonblocking = if nonblocking { 1 } else { 0 };
239
240    let res = unsafe{ ioctlsocket(so.as_raw_socket() as usize, FIONBIO, &mut nonblocking) };
241
242    if res == SOCKET_ERROR
243    {
244        return Err(io::Error::last_os_error());
245    }
246
247    return Ok(());
248}
249 
250fn accept_from<FD>(fd: &FD, nonblocking: bool) -> Result<(WindowsUnixStream, UnixSocketAddr), io::Error> 
251where FD: AsRawSocket
252{
253    unsafe 
254    { 
255        UnixSocketAddr::new_from_ffi(
256            |addr_ptr, len_ptr| 
257            {
258                let socket = 
259                    accept(fd.as_raw_socket() as usize, addr_ptr, len_ptr);
260
261                if socket == INVALID_SOCKET
262                {
263                    return Err(io::Error::last_os_error());
264                }
265
266                let o_sock = WindowsUnixStream::from_raw_socket_checked(socket as u64);
267
268                if nonblocking  == true
269                {
270                    set_nonblocking(&o_sock, true)?;
271                }
272
273                return Ok(o_sock);
274            }
275        ) 
276    }
277}
278
279
280fn shutdown_sock<SOCK: AsRawSocket>(sock: &SOCK, how: Shutdown) -> io::Result<()> 
281{
282    let res = 
283        match how 
284        {
285            Shutdown::Write => 
286                unsafe{ shutdown(sock.as_raw_socket() as usize, SD_SEND) },
287            Shutdown::Read =>
288                unsafe{ shutdown(sock.as_raw_socket() as usize, SD_RECEIVE) },
289            Shutdown::Both => 
290                unsafe{ shutdown(sock.as_raw_socket() as usize, SD_BOTH) },
291        };
292
293    if res == -1
294    {
295        return Err(io::Error::last_os_error());
296    }
297
298    return Ok(());
299}
300
301/// `noinherit` - when set to `true` means noinherit
302fn set_handle_inherit<S: AsRawSocket>(sock: &S, noinherit: bool) -> io::Result<()>
303{
304    let res = 
305        unsafe 
306        {
307            SetHandleInformation(
308                sock.as_raw_socket() as HANDLE,
309                HANDLE_FLAG_INHERIT,
310                (noinherit == false) as u32,
311            )
312        };
313
314    if res != 0
315    {
316        return Ok(());
317    }
318
319    return Err(io::Error::last_os_error());
320}
321
322pub struct SockOpts
323{
324    optname: i32,
325    opt: SockOpt,
326}
327
328impl SockOpts
329{
330    const LEVEL: i32 = SOL_SOCKET;
331
332    fn from_opt_duration(value: Option<Duration>) -> TIMEVAL
333    {
334        value
335            .map_or(
336                TIMEVAL{ tv_sec: 0, tv_usec: 0 },
337                |v| 
338                TIMEVAL
339                { 
340                    tv_sec: min(v.as_secs(), i32::MAX as u64) as i32,
341                    tv_usec: v.subsec_micros() as i32,
342                }
343            )
344    }
345
346    fn from_timeval(value: TIMEVAL) -> Option<Duration>
347    {
348        if value.tv_sec == 0 && value.tv_usec == 0 
349        {
350            return None;
351        } 
352        else 
353        {
354            let sec = value.tv_sec as u64;
355            let nsec = (value.tv_usec as u32) * 1000;
356
357            return Some(Duration::new(sec, nsec));
358        }
359    }
360
361    fn set_rcv_timeout<SOCK: AsRawSocket>(sock: &SOCK, tm: Option<Duration>) -> io::Result<()>
362    {
363        let op = 
364            Self
365            {
366                optname: SO_RCVTIMEO,
367                opt: 
368                    SockOpt::RcvTimeout(Self::from_opt_duration(tm))
369            };
370
371        return op.setsockopt(sock);
372    }
373
374    fn get_rcv_timeout<SOCK: AsRawSocket>(sock: &SOCK) -> io::Result<Option<Duration>>
375    {
376        let op = 
377            Self
378            {
379                optname: SO_RCVTIMEO,
380                opt: 
381                    SockOpt::RcvTimeout(TIMEVAL::default())
382            };
383
384        let SockOpt::RcvTimeout(v) = op.getsockopt(sock)?
385            else
386            {   
387                return Err(
388                    io::Error::new(ErrorKind::Other, "assertion trap: expected SockOpt::RcvTimeout")
389                );
390            };
391
392        return Ok(Self::from_timeval(v));
393    }
394
395    fn set_snd_timeout<SOCK: AsRawSocket>(sock: &SOCK, tm: Option<Duration>) -> io::Result<()>
396    {
397        let op = 
398            Self
399            {
400                optname: SO_SNDTIMEO,
401                opt: 
402                    SockOpt::SndTimeout(Self::from_opt_duration(tm))
403            };
404
405        return op.setsockopt(sock);
406    }
407
408    fn get_snd_timeout<SOCK: AsRawSocket>(sock: &SOCK) -> io::Result<Option<Duration>>
409    {
410        let op = 
411            Self
412            {
413                optname: SO_SNDTIMEO,
414                opt: 
415                    SockOpt::SndTimeout(TIMEVAL::default())
416            };
417
418        let SockOpt::SndTimeout(v) = op.getsockopt(sock)?
419            else
420            {   
421                return Err(
422                    io::Error::new(ErrorKind::Other, "assertion trap: expected SockOpt::SndTimeout")
423                );
424            };
425
426        return Ok(Self::from_timeval(v));
427    }
428
429    fn getsockopt<SOCK>(self, sock: &SOCK) -> io::Result<SockOpt>
430    where 
431        SOCK: AsRawSocket
432    {
433        let (mut optval, mut len) = 
434            match self.opt
435            {
436                SockOpt::RcvTimeout(timeval) =>
437                    (MaybeUninit::<TIMEVAL>::zeroed(), size_of_val(&timeval) as i32),
438                SockOpt::SndTimeout(timeval) => 
439                    (MaybeUninit::<TIMEVAL>::zeroed(), size_of_val(&timeval) as i32),
440            };
441
442        let res = 
443            unsafe
444            {
445                windows_sys::Win32::Networking::WinSock
446                    ::getsockopt(sock.as_raw_socket() as usize,Self::LEVEL, self.optname,
447                        optval.as_mut_ptr().cast(),&mut len,
448                )
449            };
450
451        // more likely it will not fail
452        if res != SOCKET_ERROR
453        {
454            match self.opt
455            {
456                SockOpt::RcvTimeout(_) =>
457                    return Ok(unsafe { SockOpt::RcvTimeout(optval.assume_init()) }),
458                SockOpt::SndTimeout(_) => 
459                    return Ok(unsafe { SockOpt::SndTimeout(optval.assume_init()) }),
460            }
461        }
462        else
463        {
464            return Err(std::io::Error::last_os_error());
465        }
466    }
467
468    fn setsockopt<SOCK>(self, fd: &SOCK) -> io::Result<()>
469    where 
470        SOCK: AsRawSocket
471    {
472        let (optval, option_len) = 
473            match self.opt
474            {
475                SockOpt::RcvTimeout(timeval) =>
476                    (ptr::addr_of!(timeval).cast(), size_of_val(&timeval) as i32),
477                SockOpt::SndTimeout(timeval) => 
478                    (ptr::addr_of!(timeval).cast(), size_of_val(&timeval) as i32),
479            };
480
481        let res = 
482            unsafe
483            {
484                windows_sys::Win32::Networking::WinSock
485                    ::setsockopt(fd.as_raw_socket() as usize, Self::LEVEL, self.optname, 
486                        optval, option_len) 
487            };
488
489        // more likely it will not fail
490        if res != SOCKET_ERROR
491        {
492            return Ok(());
493        }
494        else
495        {
496            return Err(std::io::Error::last_os_error());
497        }
498    }
499}
500
501pub enum SockOpt
502{
503    RcvTimeout(TIMEVAL),
504    SndTimeout(TIMEVAL),
505}
506 
507
508
509#[derive(Debug, Clone, Copy, PartialEq, Eq)]
510pub struct RecvFlags(pub u32);
511
512impl RecvFlags
513{
514    pub const MSG_TRUNC: u32 = MSG_TRUNC;
515}
516
517fn recv_vectored<S: AsRawSocket>(socket: &S, bufs: &mut [io::IoSliceMut<'_>], flags: c_int) -> io::Result<(usize, RecvFlags)> 
518{
519    let mut nread = 0;
520    let mut flags = flags as u32;
521    let res = 
522         unsafe
523         {
524            WSARecv(
525                socket.as_raw_socket() as usize,
526                bufs.as_mut_ptr().cast(),
527                min(bufs.len(), u32::MAX as usize) as u32,
528                &mut nread,
529                &mut flags,
530                ptr::null_mut(),
531                None,
532            )
533        };
534
535    if res == SOCKET_ERROR
536    {
537        let e = io::Error::last_os_error();
538
539        if e.raw_os_error() == Some(WSAESHUTDOWN as i32)
540        {
541            return Ok( (0, RecvFlags(0)) );
542        }
543        else if e.raw_os_error() ==  Some(WSAEMSGSIZE as i32)
544        {
545            return Ok( (nread as usize, RecvFlags(RecvFlags::MSG_TRUNC)) );
546        }
547        else
548        {
549            return Err(e);
550        }
551    }
552
553    return Ok( (nread as usize, RecvFlags(0)) );
554}
555
556fn send_vectored<S: AsRawSocket>(socket: &S, bufs: &[io::IoSlice<'_>], flags: c_int) -> io::Result<usize> 
557{
558    let mut nsent = 0;
559
560    let res = 
561        unsafe
562        {
563            WSASend(
564                socket.as_raw_socket() as usize,
565                bufs.as_ptr() as *mut _,
566                min(bufs.len(), u32::MAX as usize) as u32,
567                &mut nsent,
568                flags as u32,
569                std::ptr::null_mut(),
570                None,
571            )
572        };
573
574    if res == SOCKET_ERROR
575    {
576        return Err(io::Error::last_os_error());
577    }
578
579    return Ok( nsent as usize);
580}
581
582#[derive(Debug)]
583struct WsaLazyThing;
584
585impl Drop for WsaLazyThing
586{
587    fn drop(&mut self) 
588    {
589        unsafe
590        {
591            WSACleanup();
592        }
593    }
594}
595
596static WSA_STARTUP: LazyLock<WsaLazyThing> = 
597    LazyLock::new(
598        ||
599        {
600            let mut wsadata = MaybeUninit::<WSADATA>::zeroed();
601            
602            let res = unsafe{ WSAStartup(0x0202, wsadata.as_mut_ptr()) };
603
604            if res != 0
605            {
606                panic!("WSAStartup error: {}", io::Error::last_os_error());
607            }
608
609            WsaLazyThing
610        }
611    );
612
613
614/// An unix domain `stream` packet connection for Windows.
615/// 
616/// It requires WSA version 2.0 or 2.2 i.e Windows 10 and above.
617/// 
618/// The crate requests version 2.2 by default!
619///
620/// Is simular to the `UnixStream` but with some limitations:
621/// 
622/// * no `SOCK_DGRAM` or `SOCK_SEQPACKET` support
623/// 
624/// * Ancillary data like `SCM_RIGHTS` `SCM_CREDENTIALS`
625/// 
626/// * Autobind feature
627/// 
628/// * socketpair
629///
630/// # Examples
631///
632/// ```ignore
633/// let path = "server3.sock";
634/// 
635/// let client = WindowsUnixStream::connect(path).unwrap();
636/// client.send(b"first").unwrap();
637/// client.send(b"second").unwrap();
638/// ```
639#[derive(Debug)]
640#[repr(transparent)]
641pub struct WindowsUnixStream
642{
643    sock: OwnedSocket,
644}
645
646impl FromRawSocket for WindowsUnixStream
647{
648    unsafe 
649    fn from_raw_socket(sock: RawSocket) -> Self 
650    {
651        let os = unsafe{ OwnedSocket::from_raw_socket(sock) };
652
653        return WindowsUnixStream::from(os);
654    }
655}
656
657impl From<OwnedSocket> for WindowsUnixStream
658{
659    fn from(os: OwnedSocket) -> Self 
660    {
661        let sa_fam = get_socket_family(&os).unwrap();
662        let sa_type = get_socket_type(&os).unwrap();
663
664        if sa_fam != AF_UNIX || sa_type != SOCK_STREAM
665        {
666            panic!("assertion trap: provided FD is not AF_UNIX & SOCK_STREAM, {} {}", 
667                sa_fam, sa_type);
668        }
669
670        let _ = &*WSA_STARTUP;
671
672        return Self{ sock: os };
673    }
674}
675
676impl From<WindowsUnixStream> for OwnedSocket
677{
678    fn from(value: WindowsUnixStream) -> Self 
679    {
680        return value.sock;
681    }
682}
683
684impl AsSocket for WindowsUnixStream
685{
686    fn as_socket(&self) -> BorrowedSocket<'_> 
687    {
688        return self.sock.as_socket();
689    }
690}
691
692impl AsRawSocket for WindowsUnixStream
693{
694    fn as_raw_socket(&self) -> RawSocket 
695    {
696        return self.sock.as_raw_socket();
697    }
698}
699
700impl IntoRawSocket for WindowsUnixStream
701{
702    fn into_raw_socket(self) -> RawSocket 
703    {
704        return self.sock.into_raw_socket();
705    }
706}
707
708impl io::Write for WindowsUnixStream
709{
710    fn write(&mut self, buf: &[u8]) -> io::Result<usize> 
711    {
712        self.send(buf)
713    }
714
715    /// Do not access `bufs` after sending!
716    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> 
717    {
718        send_vectored(self, &bufs, 0)
719    }
720    
721    fn flush(&mut self) -> io::Result<()>
722    {
723        todo!()
724    }
725}
726
727impl io::Read for WindowsUnixStream
728{
729    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> 
730    {
731        self.recv(buf)
732    }
733
734    fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> 
735    {
736        self.recv_vectored(bufs).map(|(n, _)| n)
737    }
738}
739
740
741#[cfg(feature = "xio-rs")]
742pub mod xio_unix_stream_enabled
743{
744    use xio_rs::{EsInterfaceRegistry, XioChannel, XioEventPipe, XioEventUid, XioResult, event_registry::XioRegistry};
745
746    use super::WindowsUnixStream;
747
748    impl<ESSR: EsInterfaceRegistry> XioEventPipe<ESSR, Self> for WindowsUnixStream
749    {
750        fn connect_event_pipe(&mut self, ess: &XioRegistry<ESSR>, ev_uid: XioEventUid, channel: XioChannel) -> XioResult<()> 
751        {
752            self.set_nonblocking(true)?;
753
754            ess.get_ev_sys().en_register(&self.sock, ev_uid, channel)
755        }
756    
757        fn modify_event_pipe(&mut self, ess: &XioRegistry<ESSR>, ev_uid: XioEventUid, channel: XioChannel) -> XioResult<()> 
758        {
759            ess.get_ev_sys().modify(&self.sock, ev_uid, channel)
760        }
761    
762        fn disconnect_event_pipe(&mut self, ess: &XioRegistry<ESSR>) -> XioResult<()> 
763        {
764            ess.get_ev_sys().de_register(&self.sock)
765        }
766    }
767}
768
769impl WindowsUnixStream
770{
771    unsafe 
772    fn from_raw_socket_checked(raw_sock: RawSocket) -> Self 
773    {
774        let os = unsafe{ OwnedSocket::from_raw_socket(raw_sock) };
775
776        let _ = &*WSA_STARTUP;
777
778        return Self{ sock: os };
779    }
780
781    /// Connects to an unix stream server listening at `path`.
782    ///
783    /// This is a wrapper around [`connect_unix_addr()`](#method.connect_unix_addr)
784    /// for convenience and compatibility with std.
785    pub 
786    fn connect<P: AsRef<Path>>(path: P) -> Result<Self, io::Error> 
787    {
788        let addr = UnixSocketAddr::from_path(&path)?;
789
790        return Self::connect_unix_addr(&addr);
791    }
792
793    /// Connects to an unix seqpacket server listening at `addr`.
794    pub 
795    fn connect_unix_addr(addr: &UnixSocketAddr) -> Result<Self, io::Error> 
796    {
797        let _ = &*WSA_STARTUP;
798
799        let socket = create_socket()?;
800
801        connect_socket(&socket, addr)?;
802
803        return Ok( Self{ sock: socket } );
804    }
805
806    /// Binds to an address before connecting to a listening seqpacet socket.
807    pub 
808    fn connect_from_to_unix_addr(from: &UnixSocketAddr,  to: &UnixSocketAddr) -> Result<Self, io::Error> 
809    {
810        let _ = &*WSA_STARTUP;
811
812        let socket = create_socket()?;
813
814        bind_socket(&socket, from)?;
815
816        connect_socket(&socket, to)?;
817
818        return Ok( Self{ sock: socket } );
819    }
820
821    /// Creates a socket pair.
822    pub 
823    fn pair() -> Result<(Self, Self), io::Error> 
824    {
825        let dir = tempfile::tempdir()?;
826        let file_path = dir.path().join("so_pair");
827
828        let bind = WindowsUnixListener::bind(&file_path)?;
829       
830        let handle0: JoinHandle<Result<WindowsUnixStream, io::Error>> = 
831            thread::spawn(move ||
832                {
833                    let (s, a) = bind.accept_unix_addr()?;
834
835                    return Ok(s);
836                }
837            );
838        
839        let s1 = Self::connect(&file_path)?;
840
841        let s2 = 
842            handle0
843                .join()
844                .map_err(|e| 
845                    io::Error::new(ErrorKind::Other, format!("join error: {:?}", e))
846                )??;
847
848        return Ok( (s1, s2) );
849    }
850
851    pub 
852    fn try_clone(&self) -> io::Result<Self>
853    {
854        self.sock.try_clone().map(|osck| Self { sock: osck })
855    }
856
857    pub 
858    fn set_nonblocking(&self, nonblk: bool) -> io::Result<()>
859    {
860        set_nonblocking(self, nonblk)
861    }
862
863    ///  Cloexec
864    pub 
865    fn set_no_inherit(&self, no_inh: bool) -> io::Result<()>
866    {
867        set_handle_inherit(&self.sock, no_inh)
868    }
869
870    pub 
871    fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()>
872    {
873        SockOpts::set_snd_timeout(&self.sock, timeout)
874    }
875
876    pub 
877    fn write_timeout(&self) -> io::Result<Option<Duration>>
878    {
879        SockOpts::get_snd_timeout(&self.sock)
880    }
881
882    pub 
883    fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()>
884    {
885        SockOpts::set_rcv_timeout(&self.sock, timeout)
886    }
887
888    pub 
889    fn read_timeout(&self) -> io::Result<Option<Duration>>
890    {
891        SockOpts::get_rcv_timeout(&self.sock)
892    }
893
894    /// Returns the address of this side of the connection.
895    pub 
896    fn local_unix_addr(&self) -> Result<UnixSocketAddr, io::Error> 
897    {
898        get_unix_local_addr(self)
899    }
900
901    /// Returns the address of the other side of the connection.
902    pub 
903    fn peer_unix_addr(&self) -> Result<UnixSocketAddr, io::Error> 
904    {
905        get_unix_peer_addr(self)
906    }
907
908    /// Sends a packet to the peer.
909    pub 
910    fn send(&self, packet: &[u8]) -> Result<usize, io::Error> 
911    {
912        let ptr = packet.as_ptr();
913        let pkt_len = min(packet.len(), i32::MAX as usize) as i32;
914        let flags = 0;
915
916        let sent = unsafe { send(self.sock.as_raw_socket() as usize, ptr, pkt_len, flags) };
917        
918        if sent != SOCKET_ERROR
919        {
920            return Ok(sent as usize);
921        }
922        
923        return Err(std::io::Error::last_os_error());
924    }
925
926    /// Receives a packet from the peer.
927    pub 
928    fn recv(&self, buffer: &mut[u8]) -> Result<usize, io::Error> 
929    {
930        let ptr = buffer.as_mut_ptr();
931        let pkt_len = min(buffer.len(), i32::MAX as usize) as i32;
932        let received = unsafe { recv(self.sock.as_raw_socket() as usize, ptr, pkt_len, 0) };
933        
934        if received >= 0
935        {
936            return Ok(received as usize);
937        }
938
939        return Err(std::io::Error::last_os_error());
940    }
941
942    pub 
943    fn recv_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<(usize, RecvFlags)> 
944    {
945        recv_vectored(self, bufs, 0)
946    }
947
948    /// Windows consumes the [io::IoSlice] instances, so can no logner be accessed.
949    /// The [io::Write] borrows, but it is not correct. DO not access the slices after
950    /// it were sent.
951    pub 
952    fn send_vectored(&self, bufs: Vec<io::IoSlice<'_>>) -> io::Result<usize>
953    {
954        send_vectored(self, &bufs, 0)
955    }
956
957    /// Returns the value of the `SO_ERROR` option.
958    ///
959    /// This might only provide errors generated from nonblocking `connect()`s,
960    /// which this library doesn't support. It is therefore unlikely to be 
961    /// useful, but is provided for parity with stream counterpart in std.
962    pub 
963    fn take_error(&self) -> Result<Option<io::Error>, io::Error> 
964    {
965        take_error(self)
966    }
967
968    /// Allows to shudown receiving/sending or both sides.
969    pub 
970    fn shutdown(&self, how: Shutdown) -> io::Result<()>
971    {
972        shutdown_sock(&self.sock, how)
973    }
974}
975
976/// An unix domain listener for [SOCK_STREAM] packet connections which requires
977/// WSA version 2.2 i.e Windows 10 minimum.
978///
979/// See [`WindowsUnixStream`](struct.WindowsUnixStream.html) for a description
980/// of this type of connection.
981///
982/// # Examples
983///
984/// ```no_run
985/// let file_path = "server_test.sock";
986/// let listener = uds_fork::WindowsUnixListener::bind(file_path)
987///     .expect("Create seqpacket listener");
988/// let client = uds_fork::WindowsUnixStream::connect(file_path).unwrap();
989/// let (conn, _addr) = listener.accept_unix_addr().unwrap();
990/// conn.send(b"Welcome").unwrap();
991/// std::thread::sleep(std::time::Duration::from_secs(1));
992/// drop(client);
993/// ```
994#[repr(transparent)]
995#[derive(Debug)]
996pub struct WindowsUnixListener
997{
998    sock: OwnedSocket,
999}
1000
1001
1002impl FromRawSocket for WindowsUnixListener
1003{
1004    unsafe 
1005    fn from_raw_socket(sock: RawSocket) -> Self 
1006    {
1007        let os = unsafe{ OwnedSocket::from_raw_socket(sock) };
1008
1009        return WindowsUnixListener::from(os);
1010    }
1011}
1012
1013impl From<OwnedSocket> for WindowsUnixListener
1014{
1015    fn from(os: OwnedSocket) -> Self 
1016    {
1017        let sa_fam = get_socket_family(&os).unwrap();
1018        let sa_type = get_socket_type(&os).unwrap();
1019
1020        if sa_fam != AF_UNIX || sa_type != SOCK_STREAM
1021        {
1022            panic!("assertion trap: provided FD is not AF_UNIX & SOCK_SEQPACKET, {} {}", 
1023                sa_fam, sa_type);
1024        }
1025
1026        let _ = &*WSA_STARTUP;
1027
1028        return Self{ sock: os };
1029    }
1030}
1031
1032impl From<WindowsUnixListener> for OwnedSocket
1033{
1034    fn from(mut value: WindowsUnixListener) -> Self 
1035    {
1036        return value.sock;
1037    }
1038}
1039
1040impl AsSocket for WindowsUnixListener
1041{
1042    fn as_socket(&self) -> BorrowedSocket<'_> 
1043    {
1044        return self.sock.as_socket();
1045    }
1046}
1047
1048impl AsRawSocket for WindowsUnixListener
1049{
1050    fn as_raw_socket(&self) -> RawSocket 
1051    {
1052        return self.sock.as_raw_socket();
1053    }
1054}
1055
1056impl IntoRawSocket for WindowsUnixListener
1057{
1058    fn into_raw_socket(mut self) -> RawSocket 
1059    {
1060        return self.sock.into_raw_socket();
1061    }
1062}
1063
1064/// A XIO [XioEventPipe] implementation.
1065#[cfg(feature = "xio-rs")]
1066pub mod xio_listener_enabled
1067{
1068    use xio_rs::{EsInterfaceRegistry, XioChannel, XioEventPipe, XioEventUid, XioResult, event_registry::XioRegistry};
1069
1070    use super::WindowsUnixListener;
1071
1072    impl<ESSR: EsInterfaceRegistry> XioEventPipe<ESSR, Self> for WindowsUnixListener
1073    {
1074        fn connect_event_pipe(&mut self, ess: &XioRegistry<ESSR>, ev_uid: XioEventUid, channel: XioChannel) -> XioResult<()> 
1075        {
1076            self.set_nonblocking(true)?;
1077
1078            ess.get_ev_sys().en_register(&self.sock, ev_uid, channel)
1079        }
1080    
1081        fn modify_event_pipe(&mut self, ess: &XioRegistry<ESSR>, ev_uid: XioEventUid, channel: XioChannel) -> XioResult<()> 
1082        {
1083            ess.get_ev_sys().modify(&self.sock, ev_uid, channel)
1084        }
1085    
1086        fn disconnect_event_pipe(&mut self, ess: &XioRegistry<ESSR>) -> XioResult<()> 
1087        {
1088            ess.get_ev_sys().de_register(&self.sock)
1089        }
1090    }
1091}
1092
1093impl WindowsUnixListener
1094{
1095     /// Creates a socket that listens for seqpacket connections on the specified socket file.
1096    pub 
1097    fn bind<P: AsRef<Path>>(path: P) -> Result<Self, io::Error> 
1098    {
1099        let addr = UnixSocketAddr::from_path(path.as_ref())?;
1100        
1101        return Self::bind_unix_addr(&addr);
1102    }
1103
1104    /// Creates a socket that listens for seqpacket connections on the specified address.
1105    /// 
1106    /// [`addr`]: uds_fork::addr::UnixSocketAddr
1107    pub 
1108    fn bind_unix_addr(addr: &UnixSocketAddr) -> Result<Self, io::Error> 
1109    {
1110        let _ = &*WSA_STARTUP;
1111
1112        let socket = create_socket()?;
1113
1114        bind_socket(&socket, addr)?;
1115
1116        listen_socket(&socket, LISTEN_BACKLOG)?;
1117        
1118        return Ok(Self{ sock: socket });
1119    }
1120 
1121    pub 
1122    fn set_nonblocking(&self, nonblk: bool) -> io::Result<()>
1123    {
1124        set_nonblocking(self, nonblk)
1125    }
1126
1127    /// Returns the address the socket is listening on.
1128    pub 
1129    fn local_unix_addr(&self) -> Result<UnixSocketAddr, io::Error> 
1130    {
1131        get_unix_local_addr(self)
1132    }
1133
1134    pub 
1135    fn listen(&self, backlog: i32) -> io::Result<()>
1136    {
1137        listen_socket(self, backlog)
1138    }
1139
1140    /// Accepts a new incoming connection to this listener.
1141    /// 
1142    /// Rustdocs: 
1143    /// > This function will block the calling thread until a new Unix connection
1144    /// > is established. When established, the corresponding [`WindowsUnixStream`] and
1145    /// > the remote peer's address will be returned.
1146    /// 
1147    /// [`WindowsUnixStream`]: uds_fork::WindowsUnixStream
1148    ///
1149    /// # Examples
1150    ///
1151    /// ```no_run
1152    /// use uds_fork::WindowsUnixListener;
1153    ///
1154    /// fn main() -> std::io::Result<()> 
1155    /// {
1156    ///     let listener = WindowsUnixListener::bind("/path/to/the/socket")?;
1157    ///
1158    ///     match listener.accept()
1159    ///     {
1160    ///         Ok((socket, addr)) => 
1161    ///             println!("Got a client: {addr:?}"),
1162    ///         Err(e) => 
1163    ///             println!("accept function failed: {e:?}"),
1164    ///     }
1165    /// 
1166    ///     return Ok(());
1167    /// }
1168    /// ```
1169    #[inline]
1170    pub 
1171    fn accept(&self)-> Result<(WindowsUnixStream, UnixSocketAddr), io::Error> 
1172    {
1173        self.accept_unix_addr()
1174    }
1175
1176    /// Creates a new independently owned handle to the underlying socket.
1177    /// 
1178    /// Rustdocs:
1179    /// > The returned `WindowsUnixListener` is a reference to the same socket that this
1180    /// > object references. Both handles can be used to accept incoming
1181    /// > connections and options set on one listener will affect the other.
1182    pub 
1183    fn try_clone(&self) -> io::Result<Self> 
1184    {
1185        return Ok(
1186            Self
1187            {
1188                sock: self.sock.try_clone()?
1189            }
1190        );
1191    }
1192
1193    /// Accepts a new incoming connection to this listener.
1194    /// 
1195    /// Rustdocs: 
1196    /// > This function will block the calling thread until a new Unix connection
1197    /// > is established. When established, the corresponding [`WindowsUnixStream`] and
1198    /// > the remote peer's address will be returned.
1199    /// 
1200    /// [`WindowsUnixStream`]: uds_fork::WindowsUnixStream
1201    ///
1202    /// # Examples
1203    ///
1204    /// ```no_run
1205    /// use uds_fork::WindowsUnixListener;
1206    ///
1207    /// fn main() -> std::io::Result<()> 
1208    /// {
1209    ///     let listener = WindowsUnixListener::bind("/path/to/the/socket")?;
1210    ///
1211    ///     match listener.accept_unix_addr()
1212    ///     {
1213    ///         Ok((socket, addr)) => 
1214    ///             println!("Got a client: {addr:?}"),
1215    ///         Err(e) => 
1216    ///             println!("accept function failed: {e:?}"),
1217    ///     }
1218    /// 
1219    ///     return Ok(());
1220    /// }
1221    /// ```
1222    pub 
1223    fn accept_unix_addr(&self)-> Result<(WindowsUnixStream, UnixSocketAddr), io::Error> 
1224    {
1225        let (socket, addr) = accept_from(self, false)?;
1226        
1227        return Ok((socket, addr));
1228    }
1229
1230    /// Returns the value of the `SO_ERROR` option.
1231    ///
1232    /// This might never produce any errors for listeners. It is therefore
1233    /// unlikely to be useful, but is provided for parity with
1234    /// `std::unix::net::UnixListener`.
1235    pub 
1236    fn take_error(&self) -> Result<Option<io::Error>, io::Error> 
1237    {
1238        take_error(self)
1239    }
1240
1241    /// Returns an iterator over incoming connections.
1242    /// 
1243    /// Rustdoc:
1244    /// > The iterator will never return [`None`] and will also not yield the
1245    /// > peer's [`UnixSocketAddr`] structure.
1246    /// 
1247    /// ```no_run
1248    /// use std::thread;
1249    /// use uds_fork::{WindowsUnixStream, WindowsUnixListener};
1250    ///
1251    /// fn handle_client(stream: WindowsUnixStream) 
1252    /// {
1253    ///     // ...
1254    /// }
1255    ///
1256    /// fn main() -> std::io::Result<()> 
1257    /// {
1258    ///     let listener = WindowsUnixListener::bind("/path/to/the/socket")?;
1259    ///
1260    ///     for stream in listener.incoming() 
1261    ///     {
1262    ///         match stream 
1263    ///         {
1264    ///             Ok(stream) => 
1265    ///             {
1266    ///                 thread::spawn(|| handle_client(stream));
1267    ///             },
1268    ///             Err(err) => 
1269    ///             {
1270    ///                 break;
1271    ///             }
1272    ///         }
1273    ///     }
1274    /// 
1275    ///     return Ok(());
1276    /// }
1277    /// ```
1278    pub 
1279    fn incoming(&self) -> Incoming<'_> 
1280    {
1281        Incoming { listener: self }
1282    }
1283}
1284
1285/// A rust std API.
1286/// 
1287/// From Rustdocs:
1288/// > An iterator over incoming connections to a [`UnixListener`].
1289/// >
1290/// > It will never return [`None`].
1291/// 
1292/// # Examples
1293///
1294/// ```no_run
1295/// use std::thread;
1296/// use uds_fork::{WindowsUnixStream, WindowsUnixListener};
1297///
1298/// fn handle_client(stream: WindowsUnixStream) {
1299///     // ...
1300/// }
1301///
1302/// fn main() -> std::io::Result<()> 
1303/// {
1304///     let listener = WindowsUnixListener::bind("/path/to/the/socket")?;
1305///
1306///     for stream in listener.incoming() 
1307///     {
1308///         match stream 
1309///         {
1310///             Ok(stream) => 
1311///             {
1312///                 thread::spawn(|| handle_client(stream));
1313///             }
1314///             Err(err) => 
1315///             {
1316///                 break;
1317///             }
1318///         }
1319///     }
1320///     return Ok(());
1321/// }
1322/// ```
1323#[derive(Debug)]
1324pub struct Incoming<'a> 
1325{
1326    listener: &'a WindowsUnixListener,
1327}
1328
1329impl<'a> Iterator for Incoming<'a> 
1330{
1331    type Item = io::Result<WindowsUnixStream>;
1332
1333    fn next(&mut self) -> Option<io::Result<WindowsUnixStream>> 
1334    {
1335        Some(self.listener.accept().map(|s| s.0))
1336    }
1337
1338    fn size_hint(&self) -> (usize, Option<usize>) 
1339    {
1340        (usize::MAX, None)
1341    }
1342}
1343
1344impl<'a> IntoIterator for &'a WindowsUnixListener 
1345{
1346    type Item = io::Result<WindowsUnixStream>;
1347    type IntoIter = Incoming<'a>;
1348
1349    fn into_iter(self) -> Incoming<'a> 
1350    {
1351        self.incoming()
1352    }
1353}