interprocess_docfix/os/windows/named_pipe/
stream.rs1use crate::os::windows::{
2 imports::*,
3 named_pipe::{PipeMode, PipeOps, PipeStreamInternals, PipeStreamRole},
4 AsRawHandle, FromRawHandle, IntoRawHandle,
5};
6use crate::{PartialMsgWriteError, ReliableReadMsg};
7use std::{
8 ffi::OsStr,
9 fmt::{self, Debug, Formatter},
10 io::{self, Read, Write},
11 mem::ManuallyDrop,
12 ptr,
13};
14
15mod inst {
16 use super::*;
17 pub struct Instance {
19 ops: PipeOps,
20 is_server: bool,
21 }
22 impl Instance {
23 pub fn create_non_taken(ops: PipeOps) -> Self {
24 Self::new(ops, false)
25 }
26 pub fn new(ops: PipeOps, is_server: bool) -> Self {
27 Self { ops, is_server }
28 }
29 pub fn instance(&self) -> &PipeOps {
30 &self.ops
31 }
32 pub fn is_server(&self) -> bool {
33 self.is_server
34 }
35 pub fn is_split(&self) -> bool {
36 false
38 }
39 }
40}
41pub(super) use inst::*;
42
43macro_rules! create_stream_type_base {
44 (
45 $ty:ident:
46 extra_methods: {$($extra_methods:tt)*},
47 doc: $doc:tt
48 ) => {
49 #[doc = $doc]
50 pub struct $ty {
51 instance: Instance,
52 }
53 impl $ty {
54 $($extra_methods)*
58
59 fn ops(&self) -> &PipeOps {
60 self.instance.instance()
61 }
62 pub fn client_process_id(&self) -> io::Result<u32> {
64 self.ops().get_client_process_id()
65 }
66 pub fn client_session_id(&self) -> io::Result<u32> {
68 self.ops().get_client_session_id()
69 }
70 pub fn server_process_id(&self) -> io::Result<u32> {
72 self.ops().get_server_process_id()
73 }
74 pub fn server_session_id(&self) -> io::Result<u32> {
76 self.ops().get_server_session_id()
77 }
78 pub fn disconnect_without_flushing(self) -> io::Result<()> {
82 if self.is_split() {
83 return Err(io::Error::new(
84 io::ErrorKind::Other,
85 "cannot abruptly disconnect a pipe stream which has been split",
86 ));
87 }
88 self.ops().disconnect()?;
89 let self_ = ManuallyDrop::new(self);
90 let instance = unsafe {
91 ptr::read(&self_.instance)
93 };
94 drop(instance);
95 Ok(())
96 }
97 fn is_split(&self) -> bool {
98 self.instance.is_split()
99 }
100 }
101 #[doc(hidden)]
102 impl crate::Sealed for $ty {}
103 #[doc(hidden)]
104 impl PipeStreamInternals for $ty {
105 #[cfg(windows)]
106 fn build(instance: Instance) -> Self {
107 Self { instance }
108 }
109 }
110 impl Drop for $ty {
111 fn drop(&mut self) {
112 if !self.is_split() {
113 if self.is_server() {
114 let _ = self.ops().server_drop_disconnect();
115 }
116 }
117 }
118 }
119 impl AsRawHandle for $ty {
120 #[cfg(windows)]
121 fn as_raw_handle(&self) -> HANDLE {
122 self.ops().as_raw_handle()
123 }
124 }
125 impl Debug for $ty {
126 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
127 f.debug_struct(stringify!($ty))
128 .field("handle", &self.as_raw_handle())
129 .finish()
130 }
131 }
132 };
133}
134
135macro_rules! create_stream_type {
136 (
137 $ty:ident:
138 desired_access: $desired_access:expr,
139 role: $role:expr,
140 read_mode: $read_mode:expr,
141 write_mode: $write_mode:expr,
142 doc: $doc:tt
143 ) => {
144 create_stream_type_base!(
145 $ty:
146 extra_methods: {
147 pub fn connect(name: impl AsRef<OsStr>) -> io::Result<Self> {
149 Self::_connect(name.as_ref())
150 }
151 fn _connect(name: &OsStr) -> io::Result<Self> {
152 let pipeops = _connect(
153 name,
154 None,
155 Self::READ_MODE.is_some(),
156 Self::WRITE_MODE.is_some(),
157 WaitTimeout::DEFAULT,
158 )?;
159 Ok(Self { instance: Instance::create_non_taken(pipeops) })
160 }
161 pub fn connect_to_remote(pipe_name: impl AsRef<OsStr>, hostname: impl AsRef<OsStr>) -> io::Result<Self> {
163 Self::_connect_to_remote(pipe_name.as_ref(), hostname.as_ref())
164 }
165 fn _connect_to_remote(pipe_name: &OsStr, hostname: &OsStr) -> io::Result<Self> {
166 let pipeops = _connect(
167 pipe_name,
168 Some(hostname),
169 Self::READ_MODE.is_some(),
170 Self::WRITE_MODE.is_some(),
171 WaitTimeout::DEFAULT,
172 )?;
173 Ok(Self { instance: Instance::create_non_taken(pipeops) })
174 }
175 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
185 unsafe {
186 super::set_nonblocking_for_stream(self.as_raw_handle(), Self::READ_MODE, nonblocking)
187 }
188 }
189 pub fn is_server(&self) -> bool {
191 self.instance.is_server()
192 }
193 pub fn is_client(&self) -> bool {
195 !self.is_server()
196 }
197 },
198 doc: $doc
199 );
200 impl FromRawHandle for $ty {
201 #[cfg(windows)]
202 unsafe fn from_raw_handle(handle: HANDLE) -> Self {
203 let pipeops = unsafe {
204 PipeOps::from_raw_handle(handle)
206 };
207
208 let is_server = pipeops.is_server().expect("\
209failed to determine if pipe was server-side or client-side during construction from raw handle");
210
211 if Self::READ_MODE == Some(PipeMode::Messages) {
214 let has_msg_boundaries = pipeops.does_pipe_have_message_boundaries().expect("\
215failed to determine whether the pipe preserves message boundaries");
216 assert!(has_msg_boundaries, "\
217stream wrapper type uses a message-based read mode, but the underlying pipe does not preserve \
218message boundaries");
219 }
220
221 let instance = Instance::new(pipeops, is_server);
222 Self { instance }
223 }
224 }
225 impl IntoRawHandle for $ty {
226 #[cfg(windows)]
227 fn into_raw_handle(self) -> HANDLE {
228 assert!(self.is_client(),
229 "cannot reclaim named pipe instance from server instancer");
230 let handle = self.ops().as_raw_handle();
231 handle
232 }
233 }
234 impl PipeStream for $ty {
235 const ROLE: PipeStreamRole = $role;
236 const WRITE_MODE: Option<PipeMode> = $write_mode;
237 const READ_MODE: Option<PipeMode> = $read_mode;
238 }
239 };
240 ($(
241 $ty:ident:
242 desired_access: $desired_access:expr,
243 role: $role:expr,
244 read_mode: $read_mode:expr,
245 write_mode: $write_mode:expr,
246 doc: $doc:tt
247 )+) => {
248 $(create_stream_type!(
249 $ty:
250 desired_access: $desired_access,
251 role: $role,
252 read_mode: $read_mode,
253 write_mode: $write_mode,
254 doc: $doc
255 );)+
256 };
257}
258create_stream_type! {
259 ByteReaderPipeStream:
260 desired_access: GENERIC_READ,
261 role: PipeStreamRole::Reader,
262 read_mode: Some(PipeMode::Bytes),
263 write_mode: None,
264 doc: "
265[Byte stream reader] for a named pipe.
266
267Created either by using `PipeListener` or by connecting to a named pipe server.
268
269[Byte stream reader]: https://doc.rust-lang.org/std/io/trait.Read.html
270"
271 ByteWriterPipeStream:
272 desired_access: GENERIC_WRITE,
273 role: PipeStreamRole::Writer,
274 read_mode: None,
275 write_mode: Some(PipeMode::Bytes),
276 doc: "
277[Byte stream writer] for a named pipe.
278
279Created either by using `PipeListener` or by connecting to a named pipe server.
280
281[Byte stream writer]: https://doc.rust-lang.org/std/io/trait.Write.html
282"
283 DuplexBytePipeStream:
284 desired_access: GENERIC_READ | GENERIC_WRITE,
285 role: PipeStreamRole::ReaderAndWriter,
286 read_mode: Some(PipeMode::Bytes),
287 write_mode: Some(PipeMode::Bytes),
288 doc: "
289Byte stream [reader] and [writer] for a named pipe.
290
291Created either by using `PipeListener` or by connecting to a named pipe server.
292
293[reader]: https://doc.rust-lang.org/std/io/trait.Read.html
294[writer]: https://doc.rust-lang.org/std/io/trait.Write.html
295"
296 MsgReaderPipeStream:
297 desired_access: GENERIC_READ,
298 role: PipeStreamRole::Reader,
299 read_mode: Some(PipeMode::Messages),
300 write_mode: None,
301 doc: "
302[Message stream reader] for a named pipe.
303
304Created either by using `PipeListener` or by connecting to a named pipe server.
305
306[Message stream reader]: https://doc.rust-lang.org/std/io/trait.Read.html
307"
308 MsgWriterPipeStream:
309 desired_access: GENERIC_WRITE,
310 role: PipeStreamRole::Writer,
311 read_mode: None,
312 write_mode: Some(PipeMode::Messages),
313 doc: "
314[Message stream writer] for a named pipe.
315
316Created either by using `PipeListener` or by connecting to a named pipe server.
317
318[Message stream writer]: https://doc.rust-lang.org/std/io/trait.Write.html
319"
320 DuplexMsgPipeStream:
321 desired_access: GENERIC_READ | GENERIC_WRITE,
322 role: PipeStreamRole::ReaderAndWriter,
323 read_mode: Some(PipeMode::Messages),
324 write_mode: Some(PipeMode::Messages),
325 doc: "
326Message stream [reader] and [writer] for a named pipe.
327
328Created either by using `PipeListener` or by connecting to a named pipe server.
329
330[reader]: https://doc.rust-lang.org/std/io/trait.Read.html
331[writer]: https://doc.rust-lang.org/std/io/trait.Write.html
332"
333}
334
335impl Read for ByteReaderPipeStream {
336 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
337 self.ops().read_bytes(buf)
338 }
339}
340
341impl Write for ByteWriterPipeStream {
342 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
343 self.ops().write(buf)
344 }
345 fn flush(&mut self) -> io::Result<()> {
346 self.ops().flush()
347 }
348}
349
350impl Read for DuplexBytePipeStream {
351 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
352 self.ops().read_bytes(buf)
353 }
354}
355impl Write for DuplexBytePipeStream {
356 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
357 self.ops().write(buf)
358 }
359 fn flush(&mut self) -> io::Result<()> {
360 self.ops().flush()
361 }
362}
363
364impl Read for MsgReaderPipeStream {
365 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
366 self.ops().read_bytes(buf)
367 }
368}
369impl ReliableReadMsg for MsgReaderPipeStream {
370 fn read_msg(&mut self, buf: &mut [u8]) -> io::Result<Result<usize, Vec<u8>>> {
371 self.ops().read_msg(buf)
372 }
373 fn try_read_msg(&mut self, buf: &mut [u8]) -> io::Result<Result<usize, usize>> {
374 self.ops().try_read_msg(buf)
375 }
376}
377
378impl Write for MsgWriterPipeStream {
379 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
380 if self.ops().write(buf)? == buf.len() {
381 Ok(buf.len())
382 } else {
383 Err(io::Error::new(io::ErrorKind::Other, PartialMsgWriteError))
384 }
385 }
386 fn flush(&mut self) -> io::Result<()> {
387 self.ops().flush()
388 }
389}
390
391impl Read for DuplexMsgPipeStream {
392 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
393 self.ops().read_bytes(buf)
394 }
395}
396impl ReliableReadMsg for DuplexMsgPipeStream {
397 fn read_msg(&mut self, buf: &mut [u8]) -> io::Result<Result<usize, Vec<u8>>> {
398 self.ops().read_msg(buf)
399 }
400 fn try_read_msg(&mut self, buf: &mut [u8]) -> io::Result<Result<usize, usize>> {
401 self.ops().try_read_msg(buf)
402 }
403}
404impl Write for DuplexMsgPipeStream {
405 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
406 if self.ops().write(buf)? == buf.len() {
407 Ok(buf.len())
408 } else {
409 Err(io::Error::new(io::ErrorKind::Other, PartialMsgWriteError))
410 }
411 }
412 fn flush(&mut self) -> io::Result<()> {
413 self.ops().flush()
414 }
415}
416
417pub trait PipeStream: AsRawHandle + IntoRawHandle + FromRawHandle + PipeStreamInternals {
426 const ROLE: PipeStreamRole;
428 const WRITE_MODE: Option<PipeMode>;
432 const READ_MODE: Option<PipeMode>;
436}
437
438#[deprecated(note = "\
442poor ergonomics: you can't use turbofish syntax due to `impl AsRef<OsStr>` parameters and you \
443have to use `None::<&OsStr>` instead of just `None` to provide an empty hostname")]
444pub fn connect<Stream: PipeStream>(
445 pipe_name: impl AsRef<OsStr>,
446 hostname: Option<impl AsRef<OsStr>>,
447) -> io::Result<Stream> {
448 let pipeops = _connect(
449 pipe_name.as_ref(),
450 hostname.as_ref().map(AsRef::as_ref),
451 Stream::READ_MODE.is_some(),
452 Stream::WRITE_MODE.is_some(),
453 WaitTimeout::DEFAULT,
454 )?;
455 let instance = Instance::create_non_taken(pipeops);
456 Ok(Stream::build(instance))
457}
458
459fn _connect(
460 pipe_name: &OsStr,
461 hostname: Option<&OsStr>,
462 read: bool,
463 write: bool,
464 timeout: WaitTimeout,
465) -> io::Result<PipeOps> {
466 let path = super::convert_path(pipe_name, hostname);
467 loop {
468 match connect_without_waiting(&path, read, write) {
469 Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {
470 wait_for_server(&path, timeout)?;
471 continue;
472 }
473 els => return els,
474 }
475 }
476}
477
478fn connect_without_waiting(path: &[u16], read: bool, write: bool) -> io::Result<PipeOps> {
479 let (success, handle) = unsafe {
480 let handle = CreateFileW(
481 path.as_ptr() as *mut _,
482 {
483 let mut access_flags: DWORD = 0;
484 if read {
485 access_flags |= GENERIC_READ;
486 }
487 if write {
488 access_flags |= GENERIC_WRITE;
489 }
490 access_flags
491 },
492 FILE_SHARE_READ | FILE_SHARE_WRITE,
493 ptr::null_mut(),
494 OPEN_EXISTING,
495 0,
496 ptr::null_mut(),
497 );
498 (handle != INVALID_HANDLE_VALUE, handle)
499 };
500 if success {
501 unsafe {
502 Ok(PipeOps::from_raw_handle(handle))
504 }
505 } else {
506 Err(io::Error::last_os_error())
507 }
508}
509
510#[repr(transparent)] #[derive(Copy, Clone, Debug, PartialEq, Eq)]
512struct WaitTimeout(u32);
513impl WaitTimeout {
514 const DEFAULT: Self = Self(0x00000000);
515 }
517impl From<WaitTimeout> for u32 {
518 fn from(x: WaitTimeout) -> Self {
519 x.0
520 }
521}
522impl Default for WaitTimeout {
523 fn default() -> Self {
524 Self::DEFAULT
525 }
526}
527fn wait_for_server(path: &[u16], timeout: WaitTimeout) -> io::Result<()> {
528 let success = unsafe { WaitNamedPipeW(path.as_ptr() as *mut _, timeout.0) != 0 };
529 if success {
530 Ok(())
531 } else {
532 Err(io::Error::last_os_error())
533 }
534}