1use std::collections::HashMap;
2use std::thread;
3use std::time::Duration;
4
5use windows::Wdk::Foundation::OBJECT_ATTRIBUTES;
6use windows::Wdk::Storage::FileSystem::{
7 FILE_DIRECTORY_FILE, FILE_DIRECTORY_INFORMATION, FILE_NON_DIRECTORY_FILE, FILE_OPEN,
8 FILE_OPEN_FOR_BACKUP_INTENT, FILE_PIPE_LOCAL_INFORMATION, FILE_SYNCHRONOUS_IO_NONALERT,
9 FileDirectoryInformation, FilePipeLocalInformation, NtCreateFile, NtQueryDirectoryFile,
10 NtQueryInformationFile,
11};
12use windows::Win32::Foundation::{HANDLE, RtlNtStatusToDosError, UNICODE_STRING};
13use windows::Win32::Storage::FileSystem::{
14 FILE_FLAGS_AND_ATTRIBUTES, FILE_LIST_DIRECTORY, FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE,
15 FILE_SHARE_MODE, FILE_SHARE_READ, FILE_SHARE_WRITE,
16};
17use windows::Win32::System::IO::IO_STATUS_BLOCK;
18use windows::core::PWSTR;
19
20use crate::error::{Error, PipeError, PipeIoError, Result};
21use crate::utils::to_utf16_nul;
22
23use super::types::{
24 NamedPipeChange, NamedPipeInfo, NamedPipeLocalInfo, PipeName, filetime_to_system_time,
25};
26
27const NAMED_PIPE_DIRECTORY_PATH: &str = r"\Device\NamedPipe\";
28const NAMED_PIPE_DIRECTORY_RESOURCE: &str = r"\Device\NamedPipe";
29const OBJ_CASE_INSENSITIVE: u32 = 0x0000_0040;
30const SYNCHRONIZE_ACCESS: u32 = 0x0010_0000;
31const STATUS_SUCCESS: i32 = 0;
32const STATUS_NO_MORE_FILES: i32 = 0x8000_0006_u32 as i32;
33const DIRECTORY_BUFFER_CAPACITY: usize = 64 * 1024;
34
35#[derive(Debug, Default)]
37pub struct NamedPipePoller {
38 known_pipes: HashMap<PipeName, NamedPipeInfo>,
39}
40
41impl NamedPipePoller {
42 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn poll(&mut self) -> Result<Vec<NamedPipeChange>> {
49 let current_pipes = list()?;
50 let mut current_map = HashMap::with_capacity(current_pipes.len());
51
52 for pipe in current_pipes {
53 current_map.insert(pipe.pipe_name.clone(), pipe);
54 }
55
56 let mut changes = Vec::new();
57
58 for (pipe_name, pipe_info) in ¤t_map {
59 if !self.known_pipes.contains_key(pipe_name) {
60 changes.push(NamedPipeChange::Appeared(pipe_info.clone()));
61 }
62 }
63
64 for (pipe_name, pipe_info) in &self.known_pipes {
65 if !current_map.contains_key(pipe_name) {
66 changes.push(NamedPipeChange::Removed(pipe_info.clone()));
67 }
68 }
69
70 changes.sort_by(|left, right| change_name(left).cmp(change_name(right)));
71 self.known_pipes = current_map;
72
73 Ok(changes)
74 }
75
76 pub fn seed(&mut self) -> Result<usize> {
78 let current_pipes = list()?;
79 self.known_pipes = current_pipes
80 .into_iter()
81 .map(|pipe| (pipe.pipe_name.clone(), pipe))
82 .collect();
83 Ok(self.known_pipes.len())
84 }
85
86 pub fn poll_interval(
88 &mut self,
89 rounds: usize,
90 interval: Duration,
91 ) -> Result<Vec<Vec<NamedPipeChange>>> {
92 let mut snapshots = Vec::with_capacity(rounds);
93 for _ in 0..rounds {
94 thread::sleep(interval);
95 snapshots.push(self.poll()?);
96 }
97 Ok(snapshots)
98 }
99
100 pub fn poll_interval_with_callback<F>(
104 &mut self,
105 rounds: usize,
106 interval: Duration,
107 mut callback: F,
108 ) -> Result<usize>
109 where
110 F: FnMut(usize, &[NamedPipeChange]),
111 {
112 let mut total_changes = 0usize;
113 for round in 1..=rounds {
114 thread::sleep(interval);
115 let changes = self.poll()?;
116 total_changes += changes.len();
117 callback(round, &changes);
118 }
119 Ok(total_changes)
120 }
121}
122
123pub fn poll_interval(rounds: usize, interval: Duration) -> Result<Vec<Vec<NamedPipeChange>>> {
124 let mut poller = NamedPipePoller::new();
125 poller.seed()?;
126 poller.poll_interval(rounds, interval)
127}
128
129pub fn poll_interval_with_callback<F>(
130 rounds: usize,
131 interval: Duration,
132 callback: F,
133) -> Result<usize>
134where
135 F: FnMut(usize, &[NamedPipeChange]),
136{
137 let mut poller = NamedPipePoller::new();
138 poller.seed()?;
139 poller.poll_interval_with_callback(rounds, interval, callback)
140}
141
142pub fn list() -> Result<Vec<NamedPipeInfo>> {
143 let mut out_pipes = Vec::with_capacity(64);
144 list_with_buffer(&mut out_pipes)?;
145 Ok(out_pipes)
146}
147
148pub fn list_with_buffer(out_pipes: &mut Vec<NamedPipeInfo>) -> Result<usize> {
149 list_with_filter(out_pipes, |_| true)
150}
151
152pub fn list_with_filter<F>(out_pipes: &mut Vec<NamedPipeInfo>, filter: F) -> Result<usize>
153where
154 F: Fn(&NamedPipeInfo) -> bool,
155{
156 out_pipes.clear();
157
158 let directory_handle = open_named_pipe_directory()?;
159 let mut io_status = IO_STATUS_BLOCK::default();
160 let mut work_buffer = vec![0u8; DIRECTORY_BUFFER_CAPACITY];
161 let mut restart_scan = true;
162
163 loop {
164 let status = unsafe {
165 NtQueryDirectoryFile(
166 directory_handle.raw(),
167 HANDLE(std::ptr::null_mut()),
168 None,
169 None,
170 &mut io_status,
171 work_buffer.as_mut_ptr() as *mut _,
172 work_buffer.len() as u32,
173 FileDirectoryInformation,
174 false,
175 None,
176 restart_scan,
177 )
178 };
179
180 let status_code = status.0;
181 if status_code == STATUS_NO_MORE_FILES {
182 break;
183 }
184
185 if status_code != STATUS_SUCCESS {
186 return Err(pipe_directory_status_error(
187 "query named pipe directory",
188 status_code,
189 ));
190 }
191
192 let bytes_returned = io_status.Information;
193 if bytes_returned == 0 {
194 break;
195 }
196
197 parse_directory_entries(&work_buffer[..bytes_returned], out_pipes, &filter)?;
198 restart_scan = false;
199 }
200
201 out_pipes.sort_by(|left, right| left.pipe_name.as_str().cmp(right.pipe_name.as_str()));
202 Ok(out_pipes.len())
203}
204
205pub fn query_local_info(pipe_name: &PipeName) -> Result<NamedPipeLocalInfo> {
206 let relative_name = pipe_name
207 .as_str()
208 .strip_prefix(PipeName::PREFIX)
209 .ok_or_else(|| {
210 Error::Pipe(PipeError::Io(PipeIoError::new(
211 NAMED_PIPE_DIRECTORY_RESOURCE,
212 "derive relative pipe name",
213 )))
214 })?;
215
216 let relative_utf16: Vec<u16> = relative_name.encode_utf16().collect();
217 query_pipe_local_info(&relative_utf16)
218}
219
220fn parse_directory_entries<F>(
221 buffer: &[u8],
222 out_pipes: &mut Vec<NamedPipeInfo>,
223 filter: &F,
224) -> Result<()>
225where
226 F: Fn(&NamedPipeInfo) -> bool,
227{
228 let mut offset = 0usize;
229
230 while offset < buffer.len() {
231 let entry = unsafe { &*(buffer.as_ptr().add(offset) as *const FILE_DIRECTORY_INFORMATION) };
232
233 let name_len = (entry.FileNameLength / 2) as usize;
234 let name_slice = unsafe { std::slice::from_raw_parts(entry.FileName.as_ptr(), name_len) };
235 let relative_name = String::from_utf16_lossy(name_slice);
236
237 if !relative_name.is_empty() {
238 let pipe_name = PipeName::from_relative_name(&relative_name).map_err(|_| {
239 Error::Pipe(PipeError::Io(PipeIoError::new(
240 NAMED_PIPE_DIRECTORY_RESOURCE,
241 "parse named pipe directory entry",
242 )))
243 })?;
244
245 let pipe_info = NamedPipeInfo {
246 pipe_name,
247 relative_name,
248 creation_time: filetime_to_system_time(entry.CreationTime),
249 last_access_time: filetime_to_system_time(entry.LastAccessTime),
250 last_write_time: filetime_to_system_time(entry.LastWriteTime),
251 change_time: filetime_to_system_time(entry.ChangeTime),
252 end_of_file: entry.EndOfFile,
253 allocation_size: entry.AllocationSize,
254 file_attributes: entry.FileAttributes,
255 file_index: entry.FileIndex,
256 local_info: None,
257 };
258
259 if filter(&pipe_info) {
260 out_pipes.push(pipe_info);
261 }
262 }
263
264 if entry.NextEntryOffset == 0 {
265 break;
266 }
267
268 offset += entry.NextEntryOffset as usize;
269 }
270
271 Ok(())
272}
273
274fn query_pipe_local_info(relative_name_utf16: &[u16]) -> Result<NamedPipeLocalInfo> {
275 let pipe_handle = open_named_pipe_file(relative_name_utf16)?;
276 let mut io_status = IO_STATUS_BLOCK::default();
277 let mut local_info = FILE_PIPE_LOCAL_INFORMATION::default();
278
279 let status = unsafe {
280 NtQueryInformationFile(
281 pipe_handle.raw(),
282 &mut io_status,
283 &mut local_info as *mut _ as *mut _,
284 std::mem::size_of::<FILE_PIPE_LOCAL_INFORMATION>() as u32,
285 FilePipeLocalInformation,
286 )
287 };
288
289 if status.0 != STATUS_SUCCESS {
290 return Err(pipe_directory_status_error(
291 "query named pipe local information",
292 status.0,
293 ));
294 }
295
296 Ok(NamedPipeLocalInfo {
297 named_pipe_type: local_info.NamedPipeType,
298 named_pipe_configuration: local_info.NamedPipeConfiguration,
299 maximum_instances: local_info.MaximumInstances,
300 current_instances: local_info.CurrentInstances,
301 inbound_quota: local_info.InboundQuota,
302 read_data_available: local_info.ReadDataAvailable,
303 outbound_quota: local_info.OutboundQuota,
304 write_quota_available: local_info.WriteQuotaAvailable,
305 named_pipe_state: local_info.NamedPipeState,
306 named_pipe_end: local_info.NamedPipeEnd,
307 })
308}
309
310fn open_named_pipe_directory() -> Result<crate::utils::OwnedHandle> {
311 let mut nt_path_wide = to_utf16_nul(NAMED_PIPE_DIRECTORY_PATH);
312 let mut unicode_name = UNICODE_STRING {
313 Length: ((nt_path_wide.len() - 1) * 2) as u16,
314 MaximumLength: (nt_path_wide.len() * 2) as u16,
315 Buffer: PWSTR(nt_path_wide.as_mut_ptr()),
316 };
317 let object_attributes = OBJECT_ATTRIBUTES {
318 Length: std::mem::size_of::<OBJECT_ATTRIBUTES>() as u32,
319 RootDirectory: HANDLE(std::ptr::null_mut()),
320 ObjectName: &mut unicode_name,
321 Attributes: OBJ_CASE_INSENSITIVE,
322 SecurityDescriptor: std::ptr::null(),
323 SecurityQualityOfService: std::ptr::null(),
324 };
325 let mut io_status = IO_STATUS_BLOCK::default();
326 let mut directory_handle = HANDLE(std::ptr::null_mut());
327
328 let status = unsafe {
329 NtCreateFile(
330 &mut directory_handle,
331 windows::Win32::Storage::FileSystem::FILE_ACCESS_RIGHTS(
332 FILE_LIST_DIRECTORY.0 | SYNCHRONIZE_ACCESS,
333 ),
334 &object_attributes,
335 &mut io_status,
336 None,
337 FILE_FLAGS_AND_ATTRIBUTES(0),
338 FILE_SHARE_MODE(FILE_SHARE_READ.0 | FILE_SHARE_WRITE.0 | FILE_SHARE_DELETE.0),
339 FILE_OPEN,
340 FILE_DIRECTORY_FILE | FILE_SYNCHRONOUS_IO_NONALERT | FILE_OPEN_FOR_BACKUP_INTENT,
341 None,
342 0,
343 )
344 };
345
346 if status.0 != STATUS_SUCCESS {
347 return Err(pipe_directory_status_error(
348 "open named pipe directory",
349 status.0,
350 ));
351 }
352
353 Ok(crate::utils::OwnedHandle::new(directory_handle))
354}
355
356fn open_named_pipe_file(relative_name_utf16: &[u16]) -> Result<crate::utils::OwnedHandle> {
357 let mut nt_path_wide = Vec::with_capacity(
358 NAMED_PIPE_DIRECTORY_PATH.encode_utf16().count() + relative_name_utf16.len() + 1,
359 );
360 nt_path_wide.extend(NAMED_PIPE_DIRECTORY_PATH.encode_utf16());
361 nt_path_wide.extend_from_slice(relative_name_utf16);
362 nt_path_wide.push(0);
363
364 let mut unicode_name = UNICODE_STRING {
365 Length: ((nt_path_wide.len() - 1) * 2) as u16,
366 MaximumLength: (nt_path_wide.len() * 2) as u16,
367 Buffer: PWSTR(nt_path_wide.as_mut_ptr()),
368 };
369 let object_attributes = OBJECT_ATTRIBUTES {
370 Length: std::mem::size_of::<OBJECT_ATTRIBUTES>() as u32,
371 RootDirectory: HANDLE(std::ptr::null_mut()),
372 ObjectName: &mut unicode_name,
373 Attributes: OBJ_CASE_INSENSITIVE,
374 SecurityDescriptor: std::ptr::null(),
375 SecurityQualityOfService: std::ptr::null(),
376 };
377 let mut io_status = IO_STATUS_BLOCK::default();
378 let mut pipe_handle = HANDLE(std::ptr::null_mut());
379
380 let status = unsafe {
381 NtCreateFile(
382 &mut pipe_handle,
383 windows::Win32::Storage::FileSystem::FILE_ACCESS_RIGHTS(
384 FILE_READ_ATTRIBUTES.0 | SYNCHRONIZE_ACCESS,
385 ),
386 &object_attributes,
387 &mut io_status,
388 None,
389 FILE_FLAGS_AND_ATTRIBUTES(0),
390 FILE_SHARE_MODE(FILE_SHARE_READ.0 | FILE_SHARE_WRITE.0 | FILE_SHARE_DELETE.0),
391 FILE_OPEN,
392 FILE_NON_DIRECTORY_FILE | FILE_SYNCHRONOUS_IO_NONALERT,
393 None,
394 0,
395 )
396 };
397
398 if status.0 != STATUS_SUCCESS {
399 return Err(pipe_directory_status_error(
400 "open named pipe for local info",
401 status.0,
402 ));
403 }
404
405 Ok(crate::utils::OwnedHandle::new(pipe_handle))
406}
407
408fn change_name(change: &NamedPipeChange) -> &str {
409 match change {
410 NamedPipeChange::Appeared(info) | NamedPipeChange::Removed(info) => info.pipe_name.as_str(),
411 }
412}
413
414fn pipe_directory_status_error(operation: &'static str, status: i32) -> Error {
415 let error_code = unsafe { RtlNtStatusToDosError(windows::Win32::Foundation::NTSTATUS(status)) };
416 let mapped_code = if error_code == 0 {
417 status
418 } else {
419 error_code as i32
420 };
421
422 Error::Pipe(PipeError::Io(PipeIoError::with_code(
423 NAMED_PIPE_DIRECTORY_RESOURCE,
424 operation,
425 mapped_code,
426 )))
427}