Skip to main content

windows_erg/pipes/
server.rs

1use std::borrow::Cow;
2use std::io;
3use std::path::PathBuf;
4use std::time::Duration;
5
6use windows::Win32::Foundation::{
7    ERROR_IO_PENDING, ERROR_OPERATION_ABORTED, ERROR_PIPE_CONNECTED, GetLastError, WAIT_FAILED,
8    WAIT_OBJECT_0, WAIT_TIMEOUT,
9};
10use windows::Win32::Storage::FileSystem::{
11    FILE_FLAG_OVERLAPPED, FILE_FLAGS_AND_ATTRIBUTES, PIPE_ACCESS_DUPLEX, PIPE_ACCESS_INBOUND,
12    PIPE_ACCESS_OUTBOUND, ReadFile, WriteFile,
13};
14use windows::Win32::System::IO::{CancelIoEx, GetOverlappedResult, OVERLAPPED};
15use windows::Win32::System::Pipes::{
16    ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, GetNamedPipeClientProcessId,
17    NAMED_PIPE_MODE, PIPE_READMODE_BYTE, PIPE_READMODE_MESSAGE, PIPE_REJECT_REMOTE_CLIENTS,
18    PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES, PIPE_WAIT,
19};
20use windows::Win32::System::Threading::WaitForMultipleObjects;
21use windows::core::PCWSTR;
22
23use crate::error::{
24    AccessDeniedError, InvalidParameterError, PipeConnectError, PipeError, PipeTimeoutError,
25};
26use crate::process::{Process, ProcessId};
27use crate::utils::to_utf16_nul;
28use crate::wait::Wait;
29use crate::{Error, Result};
30
31use super::error_map::map_pipe_windows_error;
32use super::security_attrs::NativePipeSecurityAttributes;
33use super::types::{
34    NamedPipeOpenMode, NamedPipeType, PipeName, PipeSecurityOptions, PipeServerEndpoint,
35};
36
37/// Builder for creating a named pipe server configuration.
38#[derive(Debug, Clone)]
39pub struct NamedPipeServerBuilder {
40    pipe_name: Option<PipeName>,
41    open_mode: NamedPipeOpenMode,
42    pipe_type: NamedPipeType,
43    max_instances: u8,
44    out_buffer_size: u32,
45    in_buffer_size: u32,
46    default_timeout: Duration,
47    security: PipeSecurityOptions,
48    allowed_executables: Vec<PathBuf>,
49}
50
51impl NamedPipeServerBuilder {
52    /// Create a new named pipe server builder.
53    pub fn new() -> Self {
54        Self {
55            pipe_name: None,
56            open_mode: NamedPipeOpenMode::Duplex,
57            pipe_type: NamedPipeType::Byte,
58            max_instances: 1,
59            out_buffer_size: 4096,
60            in_buffer_size: 4096,
61            default_timeout: Duration::from_secs(5),
62            security: PipeSecurityOptions::default(),
63            allowed_executables: Vec::new(),
64        }
65    }
66
67    /// Set the named pipe path.
68    pub fn pipe_name(mut self, pipe_name: PipeName) -> Self {
69        self.pipe_name = Some(pipe_name);
70        self
71    }
72
73    /// Set the open direction.
74    pub fn open_mode(mut self, open_mode: NamedPipeOpenMode) -> Self {
75        self.open_mode = open_mode;
76        self
77    }
78
79    /// Set byte/message semantics.
80    pub fn pipe_type(mut self, pipe_type: NamedPipeType) -> Self {
81        self.pipe_type = pipe_type;
82        self
83    }
84
85    /// Set number of server instances for this pipe name.
86    pub fn max_instances(mut self, max_instances: u8) -> Self {
87        self.max_instances = max_instances;
88        self
89    }
90
91    /// Set outbound buffer size.
92    pub fn out_buffer_size(mut self, out_buffer_size: u32) -> Self {
93        self.out_buffer_size = out_buffer_size;
94        self
95    }
96
97    /// Set inbound buffer size.
98    pub fn in_buffer_size(mut self, in_buffer_size: u32) -> Self {
99        self.in_buffer_size = in_buffer_size;
100        self
101    }
102
103    /// Set default timeout.
104    pub fn default_timeout(mut self, default_timeout: Duration) -> Self {
105        self.default_timeout = default_timeout;
106        self
107    }
108
109    /// Set raw security options.
110    pub fn security(mut self, security: PipeSecurityOptions) -> Self {
111        self.security = security;
112        self
113    }
114
115    /// Restrict connections to processes whose executable path matches one of the given paths.
116    ///
117    /// The comparison is case-insensitive. If no paths are added (the default), all processes
118    /// are allowed to connect.
119    pub fn allow_executable(mut self, path: impl Into<PathBuf>) -> Self {
120        self.allowed_executables.push(path.into());
121        self
122    }
123
124    /// Remove a previously added executable path from the allow-list.
125    ///
126    /// The comparison is case-insensitive. Does nothing if the path is not present.
127    pub fn remove_executable(mut self, path: impl Into<PathBuf>) -> Self {
128        let path = path.into();
129        self.allowed_executables.retain(|p| {
130            !p.as_os_str()
131                .to_string_lossy()
132                .eq_ignore_ascii_case(&path.as_os_str().to_string_lossy())
133        });
134        self
135    }
136
137    /// Build a named pipe server configuration.
138    pub fn build(self) -> Result<NamedPipeServerConfig> {
139        let pipe_name = self.pipe_name.ok_or_else(|| {
140            Error::InvalidParameter(InvalidParameterError::new(
141                "pipe_name",
142                "Pipe name must be specified",
143            ))
144        })?;
145
146        if self.max_instances == 0 {
147            return Err(Error::InvalidParameter(InvalidParameterError::new(
148                "max_instances",
149                "max_instances must be at least 1",
150            )));
151        }
152
153        Ok(NamedPipeServerConfig {
154            pipe_name,
155            open_mode: self.open_mode,
156            pipe_type: self.pipe_type,
157            max_instances: self.max_instances,
158            out_buffer_size: self.out_buffer_size,
159            in_buffer_size: self.in_buffer_size,
160            default_timeout: self.default_timeout,
161            security: self.security,
162            allowed_executables: self.allowed_executables,
163        })
164    }
165}
166
167impl Default for NamedPipeServerBuilder {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173/// Named pipe server runtime configuration.
174#[derive(Debug)]
175pub struct NamedPipeServerConfig {
176    pipe_name: PipeName,
177    open_mode: NamedPipeOpenMode,
178    pipe_type: NamedPipeType,
179    max_instances: u8,
180    out_buffer_size: u32,
181    in_buffer_size: u32,
182    default_timeout: Duration,
183    security: PipeSecurityOptions,
184    allowed_executables: Vec<PathBuf>,
185}
186
187impl NamedPipeServerConfig {
188    /// Create a new builder.
189    pub fn builder() -> NamedPipeServerBuilder {
190        NamedPipeServerBuilder::new()
191    }
192
193    /// Create a named pipe server instance.
194    pub fn create(&self) -> Result<NamedPipeServer> {
195        let name_wide = to_utf16_nul(self.pipe_name.as_str());
196        let open_mode = to_server_open_mode(self.open_mode);
197        let pipe_mode = to_pipe_mode(self.pipe_type);
198        let max_instances = if self.max_instances == u8::MAX {
199            PIPE_UNLIMITED_INSTANCES
200        } else {
201            self.max_instances as u32
202        };
203
204        let default_timeout_ms = self.default_timeout.as_millis().min(u32::MAX as u128) as u32;
205        let security_attributes =
206            NativePipeSecurityAttributes::from_options(&self.security, self.pipe_name.as_str())?;
207
208        let raw_handle = unsafe {
209            CreateNamedPipeW(
210                PCWSTR(name_wide.as_ptr()),
211                open_mode,
212                pipe_mode,
213                max_instances,
214                self.out_buffer_size,
215                self.in_buffer_size,
216                default_timeout_ms,
217                security_attributes.as_option_ptr(),
218            )
219        };
220
221        if raw_handle.is_invalid() {
222            let code = unsafe { GetLastError().0 as i32 };
223            return Err(map_pipe_windows_error(
224                "create",
225                Some(&self.pipe_name),
226                code,
227            ));
228        }
229
230        Ok(NamedPipeServer {
231            endpoint: PipeServerEndpoint::from_raw(
232                raw_handle,
233                true,
234                self.pipe_name.clone(),
235                self.open_mode,
236                self.pipe_type,
237            ),
238            default_timeout: self.default_timeout,
239            allowed_executables: self.allowed_executables.clone(),
240        })
241    }
242
243    /// Return pipe name.
244    pub fn pipe_name(&self) -> &PipeName {
245        &self.pipe_name
246    }
247
248    /// Return open mode.
249    pub fn open_mode(&self) -> NamedPipeOpenMode {
250        self.open_mode
251    }
252
253    /// Return pipe type.
254    pub fn pipe_type(&self) -> NamedPipeType {
255        self.pipe_type
256    }
257
258    /// Return configured max instances.
259    pub fn max_instances(&self) -> u8 {
260        self.max_instances
261    }
262
263    /// Return configured outbound buffer size.
264    pub fn out_buffer_size(&self) -> u32 {
265        self.out_buffer_size
266    }
267
268    /// Return configured inbound buffer size.
269    pub fn in_buffer_size(&self) -> u32 {
270        self.in_buffer_size
271    }
272
273    /// Return default timeout.
274    pub fn default_timeout(&self) -> Duration {
275        self.default_timeout
276    }
277
278    /// Return security options.
279    pub fn security(&self) -> PipeSecurityOptions {
280        self.security.clone()
281    }
282}
283
284/// A connected or connectable named pipe server instance.
285#[derive(Debug)]
286pub struct NamedPipeServer {
287    endpoint: PipeServerEndpoint,
288    default_timeout: Duration,
289    allowed_executables: Vec<PathBuf>,
290}
291
292impl NamedPipeServer {
293    /// Return the underlying endpoint.
294    pub fn endpoint(&self) -> &PipeServerEndpoint {
295        &self.endpoint
296    }
297
298    /// Return the configured default timeout.
299    pub fn default_timeout(&self) -> Duration {
300        self.default_timeout
301    }
302
303    /// Add an executable path to the allow-list.
304    ///
305    /// The comparison is case-insensitive. If no paths are in the allow-list (the default),
306    /// all processes are allowed to connect.
307    pub fn allow_executable(&mut self, path: impl Into<PathBuf>) {
308        self.allowed_executables.push(path.into());
309    }
310
311    /// Remove an executable path from the allow-list.
312    ///
313    /// The comparison is case-insensitive. Does nothing if the path is not present.
314    pub fn remove_executable(&mut self, path: impl Into<PathBuf>) {
315        let path = path.into();
316        self.allowed_executables.retain(|p| {
317            !p.as_os_str()
318                .to_string_lossy()
319                .eq_ignore_ascii_case(&path.as_os_str().to_string_lossy())
320        });
321    }
322
323    /// Block until a client connects to this instance.
324    ///
325    /// If an executable allow-list was configured via [`NamedPipeServerBuilder::allow_executable`],
326    /// the connecting process's image path is checked against the list. If it does not match,
327    /// the connection is immediately disconnected and an [`Error::AccessDenied`] error is returned.
328    /// An empty allow-list (the default) permits all processes to connect.
329    pub fn connect(&self) -> Result<()> {
330        let result = unsafe { ConnectNamedPipe(self.endpoint.raw_handle(), None) };
331        if result.is_err() {
332            let code = unsafe { GetLastError().0 as i32 };
333            if code != ERROR_PIPE_CONNECTED.0 as i32 {
334                return Err(map_pipe_windows_error(
335                    "connect",
336                    Some(self.endpoint.pipe_name()),
337                    code,
338                ));
339            }
340        }
341
342        self.validate_connected_client()?;
343        Ok(())
344    }
345
346    /// Block until a client connects to this instance or the timeout elapses.
347    ///
348    /// This method returns [`Error::Pipe(PipeError::Timeout)`] if no client connection is
349    /// completed within the provided timeout.
350    pub fn connect_with_timeout(&self, timeout: Duration) -> Result<()> {
351        let wait = Wait::manual_reset(false)?;
352        self.connect_with_wait_timeout(&wait, timeout)
353    }
354
355    /// Block until a client connects or an external wait handle is signaled
356    ///
357    /// If `wait` is signaled first, this method cancels the pending connect operation and
358    /// returns [`Error::Pipe(PipeError::Connect)`] with interruption context.
359    pub fn connect_with_wait(&self, wait: &Wait) -> Result<()> {
360        self.connect_with_wait_timeout(wait, Duration::MAX)
361    }
362
363    /// Block until a client connects, an external wait handle is signaled, or timeout elapses.
364    ///
365    /// If `wait` is signaled first, this method cancels the pending connect operation and
366    /// returns [`Error::Pipe(PipeError::Connect)`] with interruption context.
367    pub fn connect_with_wait_timeout(&self, wait: &Wait, timeout: Duration) -> Result<()> {
368        let connect_event = Wait::manual_reset(false)?;
369        let mut overlapped = OVERLAPPED {
370            hEvent: connect_event.raw_handle(),
371            ..Default::default()
372        };
373
374        let mut connect_code: Option<i32> = None;
375        let result = unsafe { ConnectNamedPipe(self.endpoint.raw_handle(), Some(&mut overlapped)) };
376        if result.is_err() {
377            let code = unsafe { GetLastError().0 as i32 };
378            connect_code = Some(code);
379            if code != ERROR_IO_PENDING.0 as i32 && code != ERROR_PIPE_CONNECTED.0 as i32 {
380                return Err(map_pipe_windows_error(
381                    "connect",
382                    Some(self.endpoint.pipe_name()),
383                    code,
384                ));
385            }
386        }
387
388        if result.is_ok() || connect_code == Some(ERROR_PIPE_CONNECTED.0 as i32) {
389            self.validate_connected_client()?;
390            return Ok(());
391        }
392
393        let handles = [connect_event.raw_handle(), wait.raw_handle()];
394        let wait_result =
395            unsafe { WaitForMultipleObjects(&handles, false, duration_to_wait_ms(timeout)) };
396
397        if wait_result == WAIT_OBJECT_0 {
398            let mut transferred = 0u32;
399            unsafe {
400                GetOverlappedResult(
401                    self.endpoint.raw_handle(),
402                    &overlapped,
403                    &mut transferred,
404                    false,
405                )
406            }
407            .map_err(|_| {
408                let code = unsafe { GetLastError().0 as i32 };
409                map_pipe_windows_error("connect", Some(self.endpoint.pipe_name()), code)
410            })?;
411
412            self.validate_connected_client()?;
413            return Ok(());
414        }
415
416        if wait_result == windows::Win32::Foundation::WAIT_EVENT(WAIT_OBJECT_0.0 + 1) {
417            let _ = unsafe { CancelIoEx(self.endpoint.raw_handle(), Some(&overlapped)) };
418            return Err(Error::Pipe(PipeError::Connect(
419                PipeConnectError::new(Cow::Owned(self.endpoint.pipe_name().as_str().to_owned()))
420                    .with_context("connect interrupted by wait handle signal")
421                    .with_code(ERROR_OPERATION_ABORTED.0 as i32),
422            )));
423        }
424
425        if wait_result == WAIT_TIMEOUT {
426            let _ = unsafe { CancelIoEx(self.endpoint.raw_handle(), Some(&overlapped)) };
427            return Err(Error::Pipe(PipeError::Timeout(PipeTimeoutError::new(
428                Cow::Owned(self.endpoint.pipe_name().as_str().to_owned()),
429                Cow::Borrowed("connect"),
430            ))));
431        }
432
433        let _ = unsafe { CancelIoEx(self.endpoint.raw_handle(), Some(&overlapped)) };
434        if wait_result == WAIT_FAILED {
435            let code = unsafe { GetLastError().0 as i32 };
436            return Err(map_pipe_windows_error(
437                "connect",
438                Some(self.endpoint.pipe_name()),
439                code,
440            ));
441        }
442
443        Err(map_pipe_windows_error(
444            "connect",
445            Some(self.endpoint.pipe_name()),
446            wait_result.0 as i32,
447        ))
448    }
449
450    fn validate_connected_client(&self) -> Result<()> {
451        if !self.allowed_executables.is_empty()
452            && let Err(e) = self.check_client_executable()
453        {
454            let _ = self.disconnect();
455            return Err(e);
456        }
457        Ok(())
458    }
459
460    /// Retrieve the connecting client's executable path and verify it is on the allow-list.
461    fn check_client_executable(&self) -> Result<()> {
462        let pipe_name = Cow::Owned(self.endpoint.pipe_name().as_str().to_owned());
463        let mut pid: u32 = 0;
464        let ok = unsafe { GetNamedPipeClientProcessId(self.endpoint.raw_handle(), &mut pid) };
465        if ok.is_err() {
466            return Err(Error::AccessDenied(AccessDeniedError::with_reason(
467                pipe_name,
468                "connect",
469                "could not determine client process id",
470            )));
471        }
472
473        let client_path = match Process::open(ProcessId::new(pid)) {
474            Ok(proc) => match proc.path() {
475                Ok(p) => p,
476                Err(_) => {
477                    return Err(Error::AccessDenied(AccessDeniedError::with_reason(
478                        pipe_name,
479                        "connect",
480                        "could not retrieve client executable path",
481                    )));
482                }
483            },
484            Err(_) => {
485                return Err(Error::AccessDenied(AccessDeniedError::with_reason(
486                    pipe_name,
487                    "connect",
488                    "could not open client process",
489                )));
490            }
491        };
492
493        let allowed = self.allowed_executables.iter().any(|allowed| {
494            allowed
495                .as_os_str()
496                .to_string_lossy()
497                .eq_ignore_ascii_case(&client_path.as_os_str().to_string_lossy())
498        });
499
500        if allowed {
501            Ok(())
502        } else {
503            Err(Error::AccessDenied(AccessDeniedError::with_reason(
504                pipe_name,
505                "connect",
506                Cow::Owned(format!(
507                    "client executable '{}' is not in the allow-list",
508                    client_path.display()
509                )),
510            )))
511        }
512    }
513
514    /// Disconnect the currently connected client.
515    pub fn disconnect(&self) -> Result<()> {
516        unsafe { DisconnectNamedPipe(self.endpoint.raw_handle()) }.map_err(|_| {
517            let code = unsafe { GetLastError().0 as i32 };
518            map_pipe_windows_error("disconnect", Some(self.endpoint.pipe_name()), code)
519        })
520    }
521}
522
523fn duration_to_wait_ms(timeout: Duration) -> u32 {
524    timeout.as_millis().min(u32::MAX as u128) as u32
525}
526
527impl io::Read for NamedPipeServer {
528    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
529        let mut read = 0u32;
530        unsafe { ReadFile(self.endpoint.raw_handle(), Some(buf), Some(&mut read), None) }
531            .map_err(|e| io::Error::from_raw_os_error(e.code().0))?;
532        Ok(read as usize)
533    }
534}
535
536impl io::Write for NamedPipeServer {
537    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
538        let mut written = 0u32;
539        unsafe {
540            WriteFile(
541                self.endpoint.raw_handle(),
542                Some(buf),
543                Some(&mut written),
544                None,
545            )
546        }
547        .map_err(|e| io::Error::from_raw_os_error(e.code().0))?;
548        Ok(written as usize)
549    }
550
551    fn flush(&mut self) -> io::Result<()> {
552        Ok(())
553    }
554}
555
556fn to_server_open_mode(open_mode: NamedPipeOpenMode) -> FILE_FLAGS_AND_ATTRIBUTES {
557    match open_mode {
558        NamedPipeOpenMode::Inbound => {
559            FILE_FLAGS_AND_ATTRIBUTES(PIPE_ACCESS_INBOUND.0 | FILE_FLAG_OVERLAPPED.0)
560        }
561        NamedPipeOpenMode::Outbound => {
562            FILE_FLAGS_AND_ATTRIBUTES(PIPE_ACCESS_OUTBOUND.0 | FILE_FLAG_OVERLAPPED.0)
563        }
564        NamedPipeOpenMode::Duplex => {
565            FILE_FLAGS_AND_ATTRIBUTES(PIPE_ACCESS_DUPLEX.0 | FILE_FLAG_OVERLAPPED.0)
566        }
567    }
568}
569
570fn to_pipe_mode(pipe_type: NamedPipeType) -> NAMED_PIPE_MODE {
571    match pipe_type {
572        NamedPipeType::Byte => NAMED_PIPE_MODE(
573            PIPE_TYPE_BYTE.0 | PIPE_READMODE_BYTE.0 | PIPE_WAIT.0 | PIPE_REJECT_REMOTE_CLIENTS.0,
574        ),
575        NamedPipeType::Message => NAMED_PIPE_MODE(
576            PIPE_TYPE_MESSAGE.0
577                | PIPE_READMODE_MESSAGE.0
578                | PIPE_WAIT.0
579                | PIPE_REJECT_REMOTE_CLIENTS.0,
580        ),
581    }
582}