interprocess_docfix/os/windows/named_pipe/
stream.rs

1use crate::os::windows::{
2    imports::*,
3    named_pipe::{PipeMode, PipeOps, PipeStreamInternals, PipeStreamRole},
4    AsRawHandle, FromRawHandle, IntoRawHandle,
5};
6use crate::{PartialMsgWriteError, ReliableReadMsg};
7use std::{
8    ffi::OsStr,
9    fmt::{self, Debug, Formatter},
10    io::{self, Read, Write},
11    mem::ManuallyDrop,
12    ptr,
13};
14
15mod inst {
16    use super::*;
17    /// Wrapper for sync `PipeOps` to make the macro work. Will be gone soon once I redesign the API to use generics.
18    pub struct Instance {
19        ops: PipeOps,
20        is_server: bool,
21    }
22    impl Instance {
23        pub fn create_non_taken(ops: PipeOps) -> Self {
24            Self::new(ops, false)
25        }
26        pub fn new(ops: PipeOps, is_server: bool) -> Self {
27            Self { ops, is_server }
28        }
29        pub fn instance(&self) -> &PipeOps {
30            &self.ops
31        }
32        pub fn is_server(&self) -> bool {
33            self.is_server
34        }
35        pub fn is_split(&self) -> bool {
36            // sync pipes don't implement splitting yet
37            false
38        }
39    }
40}
41pub(super) use inst::*;
42
43macro_rules! create_stream_type_base {
44    (
45        $ty:ident:
46            extra_methods: {$($extra_methods:tt)*},
47            doc: $doc:tt
48    ) => {
49        #[doc = $doc]
50        pub struct $ty {
51            instance: Instance,
52        }
53        impl $ty {
54            // fn is_server(&self) -> bool and fn is_client(&self) -> bool
55            // generated by downstream macros
56
57            $($extra_methods)*
58
59            fn ops(&self) -> &PipeOps {
60                self.instance.instance()
61            }
62            /// Retrieves the process identifier of the client side of the named pipe connection.
63            pub fn client_process_id(&self) -> io::Result<u32> {
64                self.ops().get_client_process_id()
65            }
66            /// Retrieves the session identifier of the client side of the named pipe connection.
67            pub fn client_session_id(&self) -> io::Result<u32> {
68                self.ops().get_client_session_id()
69            }
70            /// Retrieves the process identifier of the server side of the named pipe connection.
71            pub fn server_process_id(&self) -> io::Result<u32> {
72                self.ops().get_server_process_id()
73            }
74            /// Retrieves the session identifier of the server side of the named pipe connection.
75            pub fn server_session_id(&self) -> io::Result<u32> {
76                self.ops().get_server_session_id()
77            }
78            /// Disconnects the named pipe stream without flushing buffers, causing all data in those buffers to be lost. This is much faster (and, in some case, the only finite-time way of ending things) than simply dropping the stream, since, for non-async named pipes, the `Drop` implementation flushes first.
79            ///
80            /// Only makes sense for server-side pipes and will return an error if called on a client stream. *For async pipe streams, this is the same as dropping the pipe.*
81            pub fn disconnect_without_flushing(self) -> io::Result<()> {
82                if self.is_split() {
83                    return Err(io::Error::new(
84                        io::ErrorKind::Other,
85                        "cannot abruptly disconnect a pipe stream which has been split",
86                    ));
87                }
88                self.ops().disconnect()?;
89                let self_ = ManuallyDrop::new(self);
90                let instance = unsafe {
91                    // SAFETY: ManuallyDrop is used to safely destroy the invalidated original
92                    ptr::read(&self_.instance)
93                };
94                drop(instance);
95                Ok(())
96            }
97            fn is_split(&self) -> bool {
98                self.instance.is_split()
99            }
100        }
101        #[doc(hidden)]
102        impl crate::Sealed for $ty {}
103        #[doc(hidden)]
104        impl PipeStreamInternals for $ty {
105            #[cfg(windows)]
106            fn build(instance: Instance) -> Self {
107                Self { instance }
108            }
109        }
110        impl Drop for $ty {
111            fn drop(&mut self) {
112                if !self.is_split() {
113                    if self.is_server() {
114                        let _ = self.ops().server_drop_disconnect();
115                    }
116                }
117            }
118        }
119        impl AsRawHandle for $ty {
120            #[cfg(windows)]
121            fn as_raw_handle(&self) -> HANDLE {
122                self.ops().as_raw_handle()
123            }
124        }
125        impl Debug for $ty {
126            fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
127                f.debug_struct(stringify!($ty))
128                    .field("handle", &self.as_raw_handle())
129                    .finish()
130            }
131        }
132    };
133}
134
135macro_rules! create_stream_type {
136    (
137        $ty:ident:
138            desired_access: $desired_access:expr,
139            role: $role:expr,
140            read_mode: $read_mode:expr,
141            write_mode: $write_mode:expr,
142            doc: $doc:tt
143    ) => {
144        create_stream_type_base!(
145            $ty:
146            extra_methods: {
147                /// Connects to the specified named pipe (the `\\.\pipe\` prefix is added automatically), blocking until a server instance is dispatched.
148                pub fn connect(name: impl AsRef<OsStr>) -> io::Result<Self> {
149                    Self::_connect(name.as_ref())
150                }
151                fn _connect(name: &OsStr) -> io::Result<Self> {
152                    let pipeops = _connect(
153                        name,
154                        None,
155                        Self::READ_MODE.is_some(),
156                        Self::WRITE_MODE.is_some(),
157                        WaitTimeout::DEFAULT,
158                    )?;
159                    Ok(Self { instance: Instance::create_non_taken(pipeops) })
160                }
161                /// Connects to the specified named pipe at a remote computer (the `\\<hostname>\pipe\` prefix is added automatically), blocking until a server instance is dispatched.
162                pub fn connect_to_remote(pipe_name: impl AsRef<OsStr>, hostname: impl AsRef<OsStr>) -> io::Result<Self> {
163                    Self::_connect_to_remote(pipe_name.as_ref(), hostname.as_ref())
164                }
165                fn _connect_to_remote(pipe_name: &OsStr, hostname: &OsStr) -> io::Result<Self> {
166                    let pipeops = _connect(
167                        pipe_name,
168                        Some(hostname),
169                        Self::READ_MODE.is_some(),
170                        Self::WRITE_MODE.is_some(),
171                        WaitTimeout::DEFAULT,
172                    )?;
173                    Ok(Self { instance: Instance::create_non_taken(pipeops) })
174                }
175                /// Sets whether the nonblocking mode for the pipe stream is enabled. By default, it is disabled.
176                ///
177                /// In nonblocking mode, attempts to read from the pipe when there is no data available or to write when the buffer has filled up because the receiving side did not read enough bytes in time will never block like they normally do. Instead, a [`WouldBlock`] error is immediately returned, allowing the thread to perform useful actions in the meantime.
178                ///
179                /// *If called on the server side, the flag will be set only for one stream instance.* A listener creation option, [`nonblocking`], and a similar method on the listener, [`set_nonblocking`], can be used to set the mode in bulk for all current instances and future ones.
180                ///
181                /// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock " "
182                /// [`nonblocking`]: struct.PipeListenerOptions.html#structfield.nonblocking " "
183                /// [`set_nonblocking`]: struct.PipeListener.html#method.set_nonblocking " "
184                pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
185                    unsafe {
186                        super::set_nonblocking_for_stream(self.as_raw_handle(), Self::READ_MODE, nonblocking)
187                    }
188                }
189                /// Returns `true` if the stream was created by a listener (server-side), `false` if it was created by connecting to a server (server-side).
190                pub fn is_server(&self) -> bool {
191                    self.instance.is_server()
192                }
193                /// Returns `true` if the stream was created by connecting to a server (client-side), `false` if it was created by a listener (server-side).
194                pub fn is_client(&self) -> bool {
195                    !self.is_server()
196                }
197            },
198            doc: $doc
199        );
200        impl FromRawHandle for $ty {
201            #[cfg(windows)]
202            unsafe fn from_raw_handle(handle: HANDLE) -> Self {
203                let pipeops = unsafe {
204                    // SAFETY: guaranteed via safety contract
205                    PipeOps::from_raw_handle(handle)
206                };
207
208                let is_server = pipeops.is_server().expect("\
209failed to determine if pipe was server-side or client-side during construction from raw handle");
210
211                // If the wrapper type tries to read incoming data as messages, that might break if
212                // the underlying pipe has no message boundaries. Let's check for that.
213                if Self::READ_MODE == Some(PipeMode::Messages) {
214                    let has_msg_boundaries = pipeops.does_pipe_have_message_boundaries().expect("\
215failed to determine whether the pipe preserves message boundaries");
216                    assert!(has_msg_boundaries, "\
217stream wrapper type uses a message-based read mode, but the underlying pipe does not preserve \
218message boundaries");
219                }
220
221                let instance = Instance::new(pipeops, is_server);
222                Self { instance }
223            }
224        }
225        impl IntoRawHandle for $ty {
226            #[cfg(windows)]
227            fn into_raw_handle(self) -> HANDLE {
228                assert!(self.is_client(),
229                    "cannot reclaim named pipe instance from server instancer");
230                let handle = self.ops().as_raw_handle();
231                handle
232            }
233        }
234        impl PipeStream for $ty {
235            const ROLE: PipeStreamRole = $role;
236            const WRITE_MODE: Option<PipeMode> = $write_mode;
237            const READ_MODE: Option<PipeMode> = $read_mode;
238        }
239    };
240    ($(
241        $ty:ident:
242            desired_access: $desired_access:expr,
243            role: $role:expr,
244            read_mode: $read_mode:expr,
245            write_mode: $write_mode:expr,
246            doc: $doc:tt
247    )+) => {
248        $(create_stream_type!(
249            $ty:
250            desired_access: $desired_access,
251            role: $role,
252            read_mode: $read_mode,
253            write_mode: $write_mode,
254            doc: $doc
255        );)+
256    };
257}
258create_stream_type! {
259    ByteReaderPipeStream:
260        desired_access: GENERIC_READ,
261        role: PipeStreamRole::Reader,
262        read_mode: Some(PipeMode::Bytes),
263        write_mode: None,
264        doc: "
265[Byte stream reader] for a named pipe.
266
267Created either by using `PipeListener` or by connecting to a named pipe server.
268
269[Byte stream reader]: https://doc.rust-lang.org/std/io/trait.Read.html
270"
271    ByteWriterPipeStream:
272        desired_access: GENERIC_WRITE,
273        role: PipeStreamRole::Writer,
274        read_mode: None,
275        write_mode: Some(PipeMode::Bytes),
276        doc: "
277[Byte stream writer] for a named pipe.
278
279Created either by using `PipeListener` or by connecting to a named pipe server.
280
281[Byte stream writer]: https://doc.rust-lang.org/std/io/trait.Write.html
282"
283    DuplexBytePipeStream:
284        desired_access: GENERIC_READ | GENERIC_WRITE,
285        role: PipeStreamRole::ReaderAndWriter,
286        read_mode: Some(PipeMode::Bytes),
287        write_mode: Some(PipeMode::Bytes),
288        doc: "
289Byte stream [reader] and [writer] for a named pipe.
290
291Created either by using `PipeListener` or by connecting to a named pipe server.
292
293[reader]: https://doc.rust-lang.org/std/io/trait.Read.html
294[writer]: https://doc.rust-lang.org/std/io/trait.Write.html
295"
296    MsgReaderPipeStream:
297        desired_access: GENERIC_READ,
298        role: PipeStreamRole::Reader,
299        read_mode: Some(PipeMode::Messages),
300        write_mode: None,
301        doc: "
302[Message stream reader] for a named pipe.
303
304Created either by using `PipeListener` or by connecting to a named pipe server.
305
306[Message stream reader]: https://doc.rust-lang.org/std/io/trait.Read.html
307"
308    MsgWriterPipeStream:
309        desired_access: GENERIC_WRITE,
310        role: PipeStreamRole::Writer,
311        read_mode: None,
312        write_mode: Some(PipeMode::Messages),
313        doc: "
314[Message stream writer] for a named pipe.
315
316Created either by using `PipeListener` or by connecting to a named pipe server.
317
318[Message stream writer]: https://doc.rust-lang.org/std/io/trait.Write.html
319"
320    DuplexMsgPipeStream:
321        desired_access: GENERIC_READ | GENERIC_WRITE,
322        role: PipeStreamRole::ReaderAndWriter,
323        read_mode: Some(PipeMode::Messages),
324        write_mode: Some(PipeMode::Messages),
325        doc: "
326Message stream [reader] and [writer] for a named pipe.
327
328Created either by using `PipeListener` or by connecting to a named pipe server.
329
330[reader]: https://doc.rust-lang.org/std/io/trait.Read.html
331[writer]: https://doc.rust-lang.org/std/io/trait.Write.html
332"
333}
334
335impl Read for ByteReaderPipeStream {
336    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
337        self.ops().read_bytes(buf)
338    }
339}
340
341impl Write for ByteWriterPipeStream {
342    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
343        self.ops().write(buf)
344    }
345    fn flush(&mut self) -> io::Result<()> {
346        self.ops().flush()
347    }
348}
349
350impl Read for DuplexBytePipeStream {
351    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
352        self.ops().read_bytes(buf)
353    }
354}
355impl Write for DuplexBytePipeStream {
356    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
357        self.ops().write(buf)
358    }
359    fn flush(&mut self) -> io::Result<()> {
360        self.ops().flush()
361    }
362}
363
364impl Read for MsgReaderPipeStream {
365    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
366        self.ops().read_bytes(buf)
367    }
368}
369impl ReliableReadMsg for MsgReaderPipeStream {
370    fn read_msg(&mut self, buf: &mut [u8]) -> io::Result<Result<usize, Vec<u8>>> {
371        self.ops().read_msg(buf)
372    }
373    fn try_read_msg(&mut self, buf: &mut [u8]) -> io::Result<Result<usize, usize>> {
374        self.ops().try_read_msg(buf)
375    }
376}
377
378impl Write for MsgWriterPipeStream {
379    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
380        if self.ops().write(buf)? == buf.len() {
381            Ok(buf.len())
382        } else {
383            Err(io::Error::new(io::ErrorKind::Other, PartialMsgWriteError))
384        }
385    }
386    fn flush(&mut self) -> io::Result<()> {
387        self.ops().flush()
388    }
389}
390
391impl Read for DuplexMsgPipeStream {
392    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
393        self.ops().read_bytes(buf)
394    }
395}
396impl ReliableReadMsg for DuplexMsgPipeStream {
397    fn read_msg(&mut self, buf: &mut [u8]) -> io::Result<Result<usize, Vec<u8>>> {
398        self.ops().read_msg(buf)
399    }
400    fn try_read_msg(&mut self, buf: &mut [u8]) -> io::Result<Result<usize, usize>> {
401        self.ops().try_read_msg(buf)
402    }
403}
404impl Write for DuplexMsgPipeStream {
405    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
406        if self.ops().write(buf)? == buf.len() {
407            Ok(buf.len())
408        } else {
409            Err(io::Error::new(io::ErrorKind::Other, PartialMsgWriteError))
410        }
411    }
412    fn flush(&mut self) -> io::Result<()> {
413        self.ops().flush()
414    }
415}
416
417/// Defines the properties of pipe stream types.
418///
419/// ## Why there are multiple types of pipe streams
420/// One of the similarities between Unix domain sockets and Windows named pipes is how both can be used in datagram mode and in byte stream mode, that is, like with sockets, Windows named pipes can both maintain the boundaries between packets or erase those boundaries – the specific behavior can be controlled both during pipe creation and during connection. The reader can still use the stream interface even if the writer maintains datagram boundaries, and vice versa: the system automatically disassembles the datagrams into a byte stream with virtually no cost.
421///
422/// The distinction between datagram-oriented connections and byte streams exists for symmetry with the standard library, where UDP and TCP sockets are represented by different types. The idea behind this is that by separating the two semantic types of sockets into two types, the distinction between those semantics can be enforced at compile time instead of using runtime errors to signal that, for example, a datagram read operation is attempted on a byte stream.
423///
424/// The fact that named pipes can have different data flow directions further increases the amount of various stream types. By restricting the implemented stream traits at compile time, named pipe streams can be used correctly in generic contexts unaware of named pipes without extra runtime checking for the correct pipe direction.
425pub trait PipeStream: AsRawHandle + IntoRawHandle + FromRawHandle + PipeStreamInternals {
426    /// The data stream flow direction for the pipe. See the [`PipeStreamRole`] enumeration for more on what this means.
427    const ROLE: PipeStreamRole;
428    /// The data stream mode for the pipe. If set to `PipeMode::Bytes`, message boundaries will broken and having `READ_MODE` at `PipeMode::Messages` would be a pipe creation error.
429    ///
430    /// For reader streams, this value has no meaning: if the reader stream belongs to the server (client sends data, server receives), then `READ_MODE` takes the role of this value; if the reader stream belongs to the client, there is no visible difference to how the server writes data since the client specifies its read mode itself anyway.
431    const WRITE_MODE: Option<PipeMode>;
432    /// The data stream mode used when reading from the pipe: if `WRITE_MODE` is `PipeMode::Messages` and `READ_MODE` is `PipeMode::Bytes`, the message boundaries will be destroyed when reading even though they are retained when written. See the `PipeMode` enumeration for more on what those modes mean.
433    ///
434    /// For writer streams, this value has no meaning: if the writer stream belongs to the server (server sends data, client receives), then the server doesn't read data at all and thus this does not affect anything; if the writer stream belongs to the client, then the client doesn't read anything and the value is meaningless as well.
435    const READ_MODE: Option<PipeMode>;
436}
437
438/// Tries to connect to the specified named pipe (the `\\.\pipe\` prefix is added automatically), returning a named pipe stream of the stream type provided via generic parameters. If there is no available server, returns immediately.
439///
440/// Since named pipes can work across multiple machines, an optional hostname can be supplied. Leave it at `None` if you're using named pipes on the local machine exclusively, which is most likely the case.
441#[deprecated(note = "\
442poor ergonomics: you can't use turbofish syntax due to `impl AsRef<OsStr>` parameters and you \
443have to use `None::<&OsStr>` instead of just `None` to provide an empty hostname")]
444pub fn connect<Stream: PipeStream>(
445    pipe_name: impl AsRef<OsStr>,
446    hostname: Option<impl AsRef<OsStr>>,
447) -> io::Result<Stream> {
448    let pipeops = _connect(
449        pipe_name.as_ref(),
450        hostname.as_ref().map(AsRef::as_ref),
451        Stream::READ_MODE.is_some(),
452        Stream::WRITE_MODE.is_some(),
453        WaitTimeout::DEFAULT,
454    )?;
455    let instance = Instance::create_non_taken(pipeops);
456    Ok(Stream::build(instance))
457}
458
459fn _connect(
460    pipe_name: &OsStr,
461    hostname: Option<&OsStr>,
462    read: bool,
463    write: bool,
464    timeout: WaitTimeout,
465) -> io::Result<PipeOps> {
466    let path = super::convert_path(pipe_name, hostname);
467    loop {
468        match connect_without_waiting(&path, read, write) {
469            Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {
470                wait_for_server(&path, timeout)?;
471                continue;
472            }
473            els => return els,
474        }
475    }
476}
477
478fn connect_without_waiting(path: &[u16], read: bool, write: bool) -> io::Result<PipeOps> {
479    let (success, handle) = unsafe {
480        let handle = CreateFileW(
481            path.as_ptr() as *mut _,
482            {
483                let mut access_flags: DWORD = 0;
484                if read {
485                    access_flags |= GENERIC_READ;
486                }
487                if write {
488                    access_flags |= GENERIC_WRITE;
489                }
490                access_flags
491            },
492            FILE_SHARE_READ | FILE_SHARE_WRITE,
493            ptr::null_mut(),
494            OPEN_EXISTING,
495            0,
496            ptr::null_mut(),
497        );
498        (handle != INVALID_HANDLE_VALUE, handle)
499    };
500    if success {
501        unsafe {
502            // SAFETY: we just created this handle
503            Ok(PipeOps::from_raw_handle(handle))
504        }
505    } else {
506        Err(io::Error::last_os_error())
507    }
508}
509
510#[repr(transparent)] // #[repr(DWORD)]
511#[derive(Copy, Clone, Debug, PartialEq, Eq)]
512struct WaitTimeout(u32);
513impl WaitTimeout {
514    const DEFAULT: Self = Self(0x00000000);
515    //const FOREVER: Self = Self(0xffffffff);
516}
517impl From<WaitTimeout> for u32 {
518    fn from(x: WaitTimeout) -> Self {
519        x.0
520    }
521}
522impl Default for WaitTimeout {
523    fn default() -> Self {
524        Self::DEFAULT
525    }
526}
527fn wait_for_server(path: &[u16], timeout: WaitTimeout) -> io::Result<()> {
528    let success = unsafe { WaitNamedPipeW(path.as_ptr() as *mut _, timeout.0) != 0 };
529    if success {
530        Ok(())
531    } else {
532        Err(io::Error::last_os_error())
533    }
534}