Skip to main content

windows_erg/pipes/
enumerate.rs

1use std::collections::HashMap;
2use std::thread;
3use std::time::Duration;
4
5use windows::Wdk::Foundation::OBJECT_ATTRIBUTES;
6use windows::Wdk::Storage::FileSystem::{
7    FILE_DIRECTORY_FILE, FILE_DIRECTORY_INFORMATION, FILE_NON_DIRECTORY_FILE, FILE_OPEN,
8    FILE_OPEN_FOR_BACKUP_INTENT, FILE_PIPE_LOCAL_INFORMATION, FILE_SYNCHRONOUS_IO_NONALERT,
9    FileDirectoryInformation, FilePipeLocalInformation, NtCreateFile, NtQueryDirectoryFile,
10    NtQueryInformationFile,
11};
12use windows::Win32::Foundation::{HANDLE, RtlNtStatusToDosError, UNICODE_STRING};
13use windows::Win32::Storage::FileSystem::{
14    FILE_FLAGS_AND_ATTRIBUTES, FILE_LIST_DIRECTORY, FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE,
15    FILE_SHARE_MODE, FILE_SHARE_READ, FILE_SHARE_WRITE,
16};
17use windows::Win32::System::IO::IO_STATUS_BLOCK;
18use windows::core::PWSTR;
19
20use crate::error::{Error, PipeError, PipeIoError, Result};
21use crate::utils::to_utf16_nul;
22
23use super::types::{
24    NamedPipeChange, NamedPipeInfo, NamedPipeLocalInfo, PipeName, filetime_to_system_time,
25};
26
27const NAMED_PIPE_DIRECTORY_PATH: &str = r"\Device\NamedPipe\";
28const NAMED_PIPE_DIRECTORY_RESOURCE: &str = r"\Device\NamedPipe";
29const OBJ_CASE_INSENSITIVE: u32 = 0x0000_0040;
30const SYNCHRONIZE_ACCESS: u32 = 0x0010_0000;
31const STATUS_SUCCESS: i32 = 0;
32const STATUS_NO_MORE_FILES: i32 = 0x8000_0006_u32 as i32;
33const DIRECTORY_BUFFER_CAPACITY: usize = 64 * 1024;
34
35/// Stateful helper that diffs successive named-pipe snapshots.
36#[derive(Debug, Default)]
37pub struct NamedPipePoller {
38    known_pipes: HashMap<PipeName, NamedPipeInfo>,
39}
40
41impl NamedPipePoller {
42    /// Create a poller with an empty baseline.
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Replace the current baseline with the latest snapshot and return detected changes.
48    pub fn poll(&mut self) -> Result<Vec<NamedPipeChange>> {
49        let current_pipes = list()?;
50        let mut current_map = HashMap::with_capacity(current_pipes.len());
51
52        for pipe in current_pipes {
53            current_map.insert(pipe.pipe_name.clone(), pipe);
54        }
55
56        let mut changes = Vec::new();
57
58        for (pipe_name, pipe_info) in &current_map {
59            if !self.known_pipes.contains_key(pipe_name) {
60                changes.push(NamedPipeChange::Appeared(pipe_info.clone()));
61            }
62        }
63
64        for (pipe_name, pipe_info) in &self.known_pipes {
65            if !current_map.contains_key(pipe_name) {
66                changes.push(NamedPipeChange::Removed(pipe_info.clone()));
67            }
68        }
69
70        changes.sort_by(|left, right| change_name(left).cmp(change_name(right)));
71        self.known_pipes = current_map;
72
73        Ok(changes)
74    }
75
76    /// Seed the baseline from the current snapshot without reporting any changes.
77    pub fn seed(&mut self) -> Result<usize> {
78        let current_pipes = list()?;
79        self.known_pipes = current_pipes
80            .into_iter()
81            .map(|pipe| (pipe.pipe_name.clone(), pipe))
82            .collect();
83        Ok(self.known_pipes.len())
84    }
85
86    /// Poll for a fixed number of rounds with a sleep interval between rounds.
87    pub fn poll_interval(
88        &mut self,
89        rounds: usize,
90        interval: Duration,
91    ) -> Result<Vec<Vec<NamedPipeChange>>> {
92        let mut snapshots = Vec::with_capacity(rounds);
93        for _ in 0..rounds {
94            thread::sleep(interval);
95            snapshots.push(self.poll()?);
96        }
97        Ok(snapshots)
98    }
99
100    /// Poll for a fixed number of rounds and invoke a callback for each round.
101    ///
102    /// Returns the total number of changes observed across all rounds.
103    pub fn poll_interval_with_callback<F>(
104        &mut self,
105        rounds: usize,
106        interval: Duration,
107        mut callback: F,
108    ) -> Result<usize>
109    where
110        F: FnMut(usize, &[NamedPipeChange]),
111    {
112        let mut total_changes = 0usize;
113        for round in 1..=rounds {
114            thread::sleep(interval);
115            let changes = self.poll()?;
116            total_changes += changes.len();
117            callback(round, &changes);
118        }
119        Ok(total_changes)
120    }
121}
122
123pub fn poll_interval(rounds: usize, interval: Duration) -> Result<Vec<Vec<NamedPipeChange>>> {
124    let mut poller = NamedPipePoller::new();
125    poller.seed()?;
126    poller.poll_interval(rounds, interval)
127}
128
129pub fn poll_interval_with_callback<F>(
130    rounds: usize,
131    interval: Duration,
132    callback: F,
133) -> Result<usize>
134where
135    F: FnMut(usize, &[NamedPipeChange]),
136{
137    let mut poller = NamedPipePoller::new();
138    poller.seed()?;
139    poller.poll_interval_with_callback(rounds, interval, callback)
140}
141
142pub fn list() -> Result<Vec<NamedPipeInfo>> {
143    let mut out_pipes = Vec::with_capacity(64);
144    list_with_buffer(&mut out_pipes)?;
145    Ok(out_pipes)
146}
147
148pub fn list_with_buffer(out_pipes: &mut Vec<NamedPipeInfo>) -> Result<usize> {
149    list_with_filter(out_pipes, |_| true)
150}
151
152pub fn list_with_filter<F>(out_pipes: &mut Vec<NamedPipeInfo>, filter: F) -> Result<usize>
153where
154    F: Fn(&NamedPipeInfo) -> bool,
155{
156    out_pipes.clear();
157
158    let directory_handle = open_named_pipe_directory()?;
159    let mut io_status = IO_STATUS_BLOCK::default();
160    let mut work_buffer = vec![0u8; DIRECTORY_BUFFER_CAPACITY];
161    let mut restart_scan = true;
162
163    loop {
164        let status = unsafe {
165            NtQueryDirectoryFile(
166                directory_handle.raw(),
167                HANDLE(std::ptr::null_mut()),
168                None,
169                None,
170                &mut io_status,
171                work_buffer.as_mut_ptr() as *mut _,
172                work_buffer.len() as u32,
173                FileDirectoryInformation,
174                false,
175                None,
176                restart_scan,
177            )
178        };
179
180        let status_code = status.0;
181        if status_code == STATUS_NO_MORE_FILES {
182            break;
183        }
184
185        if status_code != STATUS_SUCCESS {
186            return Err(pipe_directory_status_error(
187                "query named pipe directory",
188                status_code,
189            ));
190        }
191
192        let bytes_returned = io_status.Information;
193        if bytes_returned == 0 {
194            break;
195        }
196
197        parse_directory_entries(&work_buffer[..bytes_returned], out_pipes, &filter)?;
198        restart_scan = false;
199    }
200
201    out_pipes.sort_by(|left, right| left.pipe_name.as_str().cmp(right.pipe_name.as_str()));
202    Ok(out_pipes.len())
203}
204
205pub fn query_local_info(pipe_name: &PipeName) -> Result<NamedPipeLocalInfo> {
206    let relative_name = pipe_name
207        .as_str()
208        .strip_prefix(PipeName::PREFIX)
209        .ok_or_else(|| {
210            Error::Pipe(PipeError::Io(PipeIoError::new(
211                NAMED_PIPE_DIRECTORY_RESOURCE,
212                "derive relative pipe name",
213            )))
214        })?;
215
216    let relative_utf16: Vec<u16> = relative_name.encode_utf16().collect();
217    query_pipe_local_info(&relative_utf16)
218}
219
220fn parse_directory_entries<F>(
221    buffer: &[u8],
222    out_pipes: &mut Vec<NamedPipeInfo>,
223    filter: &F,
224) -> Result<()>
225where
226    F: Fn(&NamedPipeInfo) -> bool,
227{
228    let mut offset = 0usize;
229
230    while offset < buffer.len() {
231        let entry = unsafe { &*(buffer.as_ptr().add(offset) as *const FILE_DIRECTORY_INFORMATION) };
232
233        let name_len = (entry.FileNameLength / 2) as usize;
234        let name_slice = unsafe { std::slice::from_raw_parts(entry.FileName.as_ptr(), name_len) };
235        let relative_name = String::from_utf16_lossy(name_slice);
236
237        if !relative_name.is_empty() {
238            let pipe_name = PipeName::from_relative_name(&relative_name).map_err(|_| {
239                Error::Pipe(PipeError::Io(PipeIoError::new(
240                    NAMED_PIPE_DIRECTORY_RESOURCE,
241                    "parse named pipe directory entry",
242                )))
243            })?;
244
245            let pipe_info = NamedPipeInfo {
246                pipe_name,
247                relative_name,
248                creation_time: filetime_to_system_time(entry.CreationTime),
249                last_access_time: filetime_to_system_time(entry.LastAccessTime),
250                last_write_time: filetime_to_system_time(entry.LastWriteTime),
251                change_time: filetime_to_system_time(entry.ChangeTime),
252                end_of_file: entry.EndOfFile,
253                allocation_size: entry.AllocationSize,
254                file_attributes: entry.FileAttributes,
255                file_index: entry.FileIndex,
256                local_info: None,
257            };
258
259            if filter(&pipe_info) {
260                out_pipes.push(pipe_info);
261            }
262        }
263
264        if entry.NextEntryOffset == 0 {
265            break;
266        }
267
268        offset += entry.NextEntryOffset as usize;
269    }
270
271    Ok(())
272}
273
274fn query_pipe_local_info(relative_name_utf16: &[u16]) -> Result<NamedPipeLocalInfo> {
275    let pipe_handle = open_named_pipe_file(relative_name_utf16)?;
276    let mut io_status = IO_STATUS_BLOCK::default();
277    let mut local_info = FILE_PIPE_LOCAL_INFORMATION::default();
278
279    let status = unsafe {
280        NtQueryInformationFile(
281            pipe_handle.raw(),
282            &mut io_status,
283            &mut local_info as *mut _ as *mut _,
284            std::mem::size_of::<FILE_PIPE_LOCAL_INFORMATION>() as u32,
285            FilePipeLocalInformation,
286        )
287    };
288
289    if status.0 != STATUS_SUCCESS {
290        return Err(pipe_directory_status_error(
291            "query named pipe local information",
292            status.0,
293        ));
294    }
295
296    Ok(NamedPipeLocalInfo {
297        named_pipe_type: local_info.NamedPipeType,
298        named_pipe_configuration: local_info.NamedPipeConfiguration,
299        maximum_instances: local_info.MaximumInstances,
300        current_instances: local_info.CurrentInstances,
301        inbound_quota: local_info.InboundQuota,
302        read_data_available: local_info.ReadDataAvailable,
303        outbound_quota: local_info.OutboundQuota,
304        write_quota_available: local_info.WriteQuotaAvailable,
305        named_pipe_state: local_info.NamedPipeState,
306        named_pipe_end: local_info.NamedPipeEnd,
307    })
308}
309
310fn open_named_pipe_directory() -> Result<crate::utils::OwnedHandle> {
311    let mut nt_path_wide = to_utf16_nul(NAMED_PIPE_DIRECTORY_PATH);
312    let mut unicode_name = UNICODE_STRING {
313        Length: ((nt_path_wide.len() - 1) * 2) as u16,
314        MaximumLength: (nt_path_wide.len() * 2) as u16,
315        Buffer: PWSTR(nt_path_wide.as_mut_ptr()),
316    };
317    let object_attributes = OBJECT_ATTRIBUTES {
318        Length: std::mem::size_of::<OBJECT_ATTRIBUTES>() as u32,
319        RootDirectory: HANDLE(std::ptr::null_mut()),
320        ObjectName: &mut unicode_name,
321        Attributes: OBJ_CASE_INSENSITIVE,
322        SecurityDescriptor: std::ptr::null(),
323        SecurityQualityOfService: std::ptr::null(),
324    };
325    let mut io_status = IO_STATUS_BLOCK::default();
326    let mut directory_handle = HANDLE(std::ptr::null_mut());
327
328    let status = unsafe {
329        NtCreateFile(
330            &mut directory_handle,
331            windows::Win32::Storage::FileSystem::FILE_ACCESS_RIGHTS(
332                FILE_LIST_DIRECTORY.0 | SYNCHRONIZE_ACCESS,
333            ),
334            &object_attributes,
335            &mut io_status,
336            None,
337            FILE_FLAGS_AND_ATTRIBUTES(0),
338            FILE_SHARE_MODE(FILE_SHARE_READ.0 | FILE_SHARE_WRITE.0 | FILE_SHARE_DELETE.0),
339            FILE_OPEN,
340            FILE_DIRECTORY_FILE | FILE_SYNCHRONOUS_IO_NONALERT | FILE_OPEN_FOR_BACKUP_INTENT,
341            None,
342            0,
343        )
344    };
345
346    if status.0 != STATUS_SUCCESS {
347        return Err(pipe_directory_status_error(
348            "open named pipe directory",
349            status.0,
350        ));
351    }
352
353    Ok(crate::utils::OwnedHandle::new(directory_handle))
354}
355
356fn open_named_pipe_file(relative_name_utf16: &[u16]) -> Result<crate::utils::OwnedHandle> {
357    let mut nt_path_wide = Vec::with_capacity(
358        NAMED_PIPE_DIRECTORY_PATH.encode_utf16().count() + relative_name_utf16.len() + 1,
359    );
360    nt_path_wide.extend(NAMED_PIPE_DIRECTORY_PATH.encode_utf16());
361    nt_path_wide.extend_from_slice(relative_name_utf16);
362    nt_path_wide.push(0);
363
364    let mut unicode_name = UNICODE_STRING {
365        Length: ((nt_path_wide.len() - 1) * 2) as u16,
366        MaximumLength: (nt_path_wide.len() * 2) as u16,
367        Buffer: PWSTR(nt_path_wide.as_mut_ptr()),
368    };
369    let object_attributes = OBJECT_ATTRIBUTES {
370        Length: std::mem::size_of::<OBJECT_ATTRIBUTES>() as u32,
371        RootDirectory: HANDLE(std::ptr::null_mut()),
372        ObjectName: &mut unicode_name,
373        Attributes: OBJ_CASE_INSENSITIVE,
374        SecurityDescriptor: std::ptr::null(),
375        SecurityQualityOfService: std::ptr::null(),
376    };
377    let mut io_status = IO_STATUS_BLOCK::default();
378    let mut pipe_handle = HANDLE(std::ptr::null_mut());
379
380    let status = unsafe {
381        NtCreateFile(
382            &mut pipe_handle,
383            windows::Win32::Storage::FileSystem::FILE_ACCESS_RIGHTS(
384                FILE_READ_ATTRIBUTES.0 | SYNCHRONIZE_ACCESS,
385            ),
386            &object_attributes,
387            &mut io_status,
388            None,
389            FILE_FLAGS_AND_ATTRIBUTES(0),
390            FILE_SHARE_MODE(FILE_SHARE_READ.0 | FILE_SHARE_WRITE.0 | FILE_SHARE_DELETE.0),
391            FILE_OPEN,
392            FILE_NON_DIRECTORY_FILE | FILE_SYNCHRONOUS_IO_NONALERT,
393            None,
394            0,
395        )
396    };
397
398    if status.0 != STATUS_SUCCESS {
399        return Err(pipe_directory_status_error(
400            "open named pipe for local info",
401            status.0,
402        ));
403    }
404
405    Ok(crate::utils::OwnedHandle::new(pipe_handle))
406}
407
408fn change_name(change: &NamedPipeChange) -> &str {
409    match change {
410        NamedPipeChange::Appeared(info) | NamedPipeChange::Removed(info) => info.pipe_name.as_str(),
411    }
412}
413
414fn pipe_directory_status_error(operation: &'static str, status: i32) -> Error {
415    let error_code = unsafe { RtlNtStatusToDosError(windows::Win32::Foundation::NTSTATUS(status)) };
416    let mapped_code = if error_code == 0 {
417        status
418    } else {
419        error_code as i32
420    };
421
422    Error::Pipe(PipeError::Io(PipeIoError::with_code(
423        NAMED_PIPE_DIRECTORY_RESOURCE,
424        operation,
425        mapped_code,
426    )))
427}