Skip to main content

everything_ipc/pipe/
mod.rs

1/*!
2Everything's named pipe IPC interface, supported by Everything v1.5+.
3
4Compared to the [`wm`](crate::wm) IPC interface, this new pipe interface isn't quite great.
5So currently only [`get_folder_size()`](EverythingClient::get_folder_size) is implemented.
6If the need arises, other features can be added.
7
8[Everything 1.5 SDK - voidtools forum](https://www.voidtools.com/forum/viewtopic.php?t=15853)
9
10TODO: Batch IPC
11*/
12
13use bon::bon;
14use std::{io, mem, sync::Mutex, time::Duration};
15use thiserror::Error;
16use windows::{
17    Win32::{
18        Foundation::{
19            ERROR_IO_PENDING, ERROR_PIPE_BUSY, GENERIC_READ, GENERIC_WRITE, GetLastError, HANDLE,
20        },
21        Storage::FileSystem::{
22            CreateFileW, FILE_FLAG_OVERLAPPED, OPEN_EXISTING, ReadFile, WriteFile,
23        },
24        System::{
25            IO::GetOverlappedResult,
26            Threading::{CreateEventW, INFINITE, SetEvent, WaitForMultipleObjects},
27        },
28    },
29    core::PCWSTR,
30};
31
32use crate::windows::Handle;
33
34/// Error type for Everything IPC operations
35#[derive(Error, Debug)]
36pub enum IpcError {
37    #[error("out of memory")]
38    OutOfMemory,
39    #[error("IPC pipe not found")]
40    PipeNotFound,
41    #[error("pipe is busy")]
42    PipeBusy,
43    #[error("connection disconnected")]
44    Disconnected,
45    #[error("invalid parameter")]
46    InvalidParameter,
47    #[error("bad request")]
48    BadRequest,
49    #[error("cancelled")]
50    Cancelled,
51    #[error("property not found")]
52    PropertyNotFound,
53    #[error("server error")]
54    Server,
55    #[error("invalid command")]
56    InvalidCommand,
57    #[error("bad response")]
58    BadResponse,
59    #[error("insufficient buffer")]
60    InsufficientBuffer,
61    #[error("shutdown")]
62    Shutdown,
63    #[error("io error: {0}")]
64    Io(#[from] io::Error),
65}
66
67/// IPC pipe command codes
68const COMMAND_GET_FOLDER_SIZE: u32 = 18;
69
70/// IPC pipe response codes
71const RESPONSE_OK: u32 = 200;
72const RESPONSE_OK_MORE_DATA: u32 = 100;
73const RESPONSE_ERROR_BAD_REQUEST: u32 = 400;
74const RESPONSE_ERROR_CANCELLED: u32 = 401;
75const RESPONSE_ERROR_NOT_FOUND: u32 = 404;
76const RESPONSE_ERROR_OUT_OF_MEMORY: u32 = 500;
77const RESPONSE_ERROR_INVALID_COMMAND: u32 = 501;
78
79/// Chunk size for overlapped I/O operations
80const CHUNK_SIZE: usize = 65536;
81
82/// Message header for IPC communication
83#[repr(C)]
84#[derive(Debug, Clone, Copy)]
85pub struct Message {
86    pub code: u32,
87    pub size: u32,
88}
89
90/// Client structure for IPC communication
91///
92/// There can only be one client, you should drop this as early as possible.
93///
94/// See [`pipe`](super::pipe) for details.
95#[derive(Debug)]
96pub struct EverythingClient {
97    mutex: Mutex<()>,
98    pipe_handle: Option<Handle>,
99    send_event: Handle,
100    recv_event: Handle,
101    shutdown_event: Handle,
102}
103
104#[bon]
105impl EverythingClient {
106    /// Connect to the Everything IPC pipe, trying common instance names.
107    ///
108    /// This will first try to connect without an instance name (default),
109    /// and if that fails, try "1.5a".
110    #[builder]
111    pub async fn new(
112        instance_name: Option<&str>,
113        /// Automatic retry until timeout if the pipe is busy.
114        #[builder(default = Duration::from_secs(10))]
115        timeout: Duration,
116    ) -> Result<Self, IpcError> {
117        match Self::with_instance_timeout(instance_name, timeout).await {
118            Ok(client) => Ok(client),
119            Err(IpcError::PipeNotFound) if instance_name.is_none() => {
120                Self::with_instance_timeout(Some("1.5a"), timeout).await
121            }
122            Err(e) => Err(e),
123        }
124    }
125
126    /// Connect to the Everything IPC pipe with automatic retry until timeout.
127    ///
128    /// This wraps [new_auto] and retries on [PipeBusy] until the timeout.
129    ///
130    /// # Arguments
131    /// * `timeout` - The maximum time to wait for the pipe to become available.
132    async fn with_instance_timeout(
133        instance_name: Option<&str>,
134        timeout: Duration,
135    ) -> Result<Self, IpcError> {
136        let start = std::time::Instant::now();
137        loop {
138            match Self::try_new(instance_name) {
139                Ok(client) => return Ok(client),
140                Err(IpcError::PipeBusy) => {
141                    if start.elapsed() >= timeout {
142                        return Err(IpcError::PipeBusy);
143                    }
144                    tokio::time::sleep(Duration::from_millis(100)).await;
145                    continue;
146                }
147                Err(e) => return Err(e),
148            }
149        }
150    }
151
152    /// Connect to the Everything IPC pipe
153    /// `instance_name` can be None or empty to connect to the unnamed instance.
154    pub fn try_new(instance_name: Option<&str>) -> Result<Self, IpcError> {
155        unsafe {
156            let mut pipe_name = Vec::<u16>::new();
157            if !get_pipe_name(&mut pipe_name, instance_name.unwrap_or("")) {
158                return Err(IpcError::OutOfMemory);
159            }
160
161            let pipe_name_ptr = pipe_name.as_ptr();
162            let pipe_handle = match CreateFileW(
163                PCWSTR(pipe_name_ptr),
164                GENERIC_READ.0 | GENERIC_WRITE.0,
165                Default::default(),
166                None,
167                OPEN_EXISTING,
168                FILE_FLAG_OVERLAPPED,
169                None,
170            ) {
171                Ok(handle) => handle,
172                Err(_) => {
173                    let last_error = GetLastError();
174                    if last_error.0 == ERROR_PIPE_BUSY.0 {
175                        return Err(IpcError::PipeBusy);
176                    }
177                    return Err(IpcError::PipeNotFound);
178                }
179            };
180
181            let mut client = Self {
182                mutex: Mutex::new(()),
183                pipe_handle: None,
184                send_event: Handle::new(HANDLE::default()),
185                recv_event: Handle::new(HANDLE::default()),
186                shutdown_event: Handle::new(HANDLE::default()),
187            };
188
189            client.pipe_handle = Some(Handle::new(pipe_handle));
190
191            client.shutdown_event = Handle::new(match CreateEventW(None, true, false, None) {
192                Ok(handle) => handle,
193                Err(_) => return Err(IpcError::OutOfMemory),
194            });
195            if client.shutdown_event.is_null() {
196                return Err(IpcError::OutOfMemory);
197            }
198
199            client.send_event = Handle::new(match CreateEventW(None, true, false, None) {
200                Ok(handle) => handle,
201                Err(_) => return Err(IpcError::OutOfMemory),
202            });
203            if client.send_event.is_null() {
204                return Err(IpcError::OutOfMemory);
205            }
206
207            client.recv_event = Handle::new(match CreateEventW(None, true, false, None) {
208                Ok(handle) => handle,
209                Err(_) => return Err(IpcError::OutOfMemory),
210            });
211            if client.recv_event.is_null() {
212                return Err(IpcError::OutOfMemory);
213            }
214
215            Ok(client)
216        }
217    }
218
219    /// Shutdown this client
220    pub fn shutdown(&self) -> Result<(), IpcError> {
221        unsafe {
222            if !self.shutdown_event.is_null() {
223                let _ = SetEvent(self.shutdown_event.get());
224            }
225            Ok(())
226        }
227    }
228
229    /// IPC control operation
230    fn ioctrl(
231        &self,
232        code: u32,
233        in_data: *const u8,
234        in_size: usize,
235        out_data: *mut u8,
236        out_size: usize,
237        out_num_read: *mut usize,
238    ) -> Result<bool, IpcError> {
239        let _guard = self.mutex.lock();
240
241        self._send(code, in_data, in_size)?;
242
243        let mut recv_header = Message { code: 0, size: 0 };
244
245        let header_ok = self._recv_header(&mut recv_header)?;
246        if !header_ok {
247            return Ok(false);
248        }
249
250        let data_size = recv_header.size as usize;
251        if data_size > out_size {
252            if self._recv_skip(data_size).is_err() {
253                return Err(IpcError::InsufficientBuffer);
254            }
255            return Ok(false);
256        }
257
258        if self._recv_data(out_data, data_size).is_ok() {
259            if !out_num_read.is_null() {
260                unsafe {
261                    *out_num_read = data_size;
262                }
263            }
264            return Ok(true);
265        }
266
267        Ok(false)
268    }
269
270    /// Send data through IPC pipe
271    fn _send(&self, code: u32, in_data: *const u8, in_size: usize) -> Result<(), IpcError> {
272        if in_size as u64 > u32::MAX as u64 {
273            return Err(IpcError::OutOfMemory);
274        }
275
276        let send_message = Message {
277            code,
278            size: in_size as u32,
279        };
280
281        if !self.write_overlapped(
282            &send_message as *const _ as *const u8,
283            mem::size_of::<Message>(),
284        )? {
285            return Err(IpcError::Disconnected);
286        }
287
288        if !self.write_overlapped(in_data, in_size)? {
289            return Err(IpcError::Disconnected);
290        }
291
292        Ok(())
293    }
294
295    /// Write data with overlapped I/O
296    fn write_overlapped(&self, buf: *const u8, size: usize) -> Result<bool, IpcError> {
297        let mut overlapped: windows::Win32::System::IO::OVERLAPPED =
298            unsafe { mem::MaybeUninit::zeroed().assume_init() };
299        overlapped.hEvent = self.send_event.get();
300        overlapped.Anonymous.Anonymous.Offset = 0;
301        overlapped.Anonymous.Anonymous.OffsetHigh = 0;
302
303        let mut buf_ptr = buf;
304        let mut remaining = size;
305
306        while remaining > 0 {
307            let mut num_written: u32 = 0;
308            unsafe {
309                let chunk_size = std::cmp::min(remaining, CHUNK_SIZE) as u32;
310
311                let result = WriteFile(
312                    self.pipe_handle.as_ref().unwrap().get(),
313                    Some(std::slice::from_raw_parts(buf_ptr, chunk_size as usize)),
314                    Some(&mut num_written),
315                    Some(&mut overlapped),
316                );
317
318                match result {
319                    Ok(()) => {
320                        if num_written > 0 {
321                            buf_ptr = buf_ptr.add(num_written as usize);
322                            remaining -= num_written as usize;
323                            continue;
324                        } else {
325                            return Ok(false);
326                        }
327                    }
328                    Err(_) => {
329                        let last_error = GetLastError();
330
331                        if last_error.0 == ERROR_IO_PENDING.0 {
332                            let handles: [HANDLE; 2] =
333                                [self.shutdown_event.get(), self.send_event.get()];
334                            let wait_result = WaitForMultipleObjects(&handles, false, INFINITE);
335
336                            if wait_result.0 == windows::Win32::Foundation::WAIT_OBJECT_0.0 {
337                                return Err(IpcError::Shutdown);
338                            }
339
340                            match GetOverlappedResult(
341                                self.pipe_handle.as_ref().unwrap().get(),
342                                &overlapped,
343                                &mut num_written,
344                                false,
345                            ) {
346                                Ok(()) => {
347                                    if num_written > 0 {
348                                        buf_ptr = buf_ptr.add(num_written as usize);
349                                        remaining -= num_written as usize;
350                                        continue;
351                                    } else {
352                                        return Ok(false);
353                                    }
354                                }
355                                Err(_) => {
356                                    return Ok(false);
357                                }
358                            }
359                        } else {
360                            return Ok(false);
361                        }
362                    }
363                }
364            }
365        }
366
367        Ok(true)
368    }
369
370    /// Receive header from IPC pipe
371    fn _recv_header(&self, recv_header: &mut Message) -> Result<bool, IpcError> {
372        let data_size = mem::size_of::<Message>();
373        let recv_header_ptr = recv_header as *mut _ as *mut u8;
374
375        if self._recv_data(recv_header_ptr, data_size).is_err() {
376            return Ok(false);
377        }
378
379        if recv_header.code == RESPONSE_OK || recv_header.code == RESPONSE_OK_MORE_DATA {
380            return Ok(true);
381        }
382
383        // For error responses, skip the data payload and map to error type
384        if self._recv_skip(recv_header.size as usize).is_ok() {
385            match recv_header.code {
386                RESPONSE_ERROR_BAD_REQUEST => return Err(IpcError::BadRequest),
387                RESPONSE_ERROR_CANCELLED => return Err(IpcError::Cancelled),
388                RESPONSE_ERROR_NOT_FOUND => return Err(IpcError::PipeNotFound),
389                RESPONSE_ERROR_OUT_OF_MEMORY => return Err(IpcError::Server),
390                RESPONSE_ERROR_INVALID_COMMAND => return Err(IpcError::InvalidCommand),
391                _ => return Err(IpcError::BadResponse),
392            }
393        }
394
395        Ok(false)
396    }
397
398    /// Receive data from IPC pipe
399    fn _recv_data(&self, buf: *mut u8, buf_size: usize) -> Result<(), IpcError> {
400        let mut overlapped: windows::Win32::System::IO::OVERLAPPED =
401            unsafe { mem::MaybeUninit::zeroed().assume_init() };
402        overlapped.hEvent = self.recv_event.get();
403        overlapped.Anonymous.Anonymous.Offset = 0;
404        overlapped.Anonymous.Anonymous.OffsetHigh = 0;
405
406        let mut buf_ptr = buf;
407        let mut remaining = buf_size;
408
409        loop {
410            let chunk_size = std::cmp::min(remaining, CHUNK_SIZE);
411
412            unsafe {
413                if remaining == 0 {
414                    return Ok(());
415                }
416
417                let mut num_read: u32 = 0;
418                let result = ReadFile(
419                    self.pipe_handle.as_ref().unwrap().get(),
420                    Some(std::slice::from_raw_parts_mut(buf_ptr, chunk_size)),
421                    Some(&mut num_read),
422                    Some(&mut overlapped),
423                );
424
425                match result {
426                    Ok(()) => {
427                        if num_read > 0 {
428                            buf_ptr = buf_ptr.add(num_read as usize);
429                            remaining -= num_read as usize;
430                        } else {
431                            return Err(IpcError::Disconnected);
432                        }
433                    }
434                    Err(_) => {
435                        let last_error = GetLastError();
436
437                        if last_error.0 == ERROR_IO_PENDING.0 {
438                            let handles: [HANDLE; 2] =
439                                [self.shutdown_event.get(), self.recv_event.get()];
440                            let wait_result = WaitForMultipleObjects(&handles, false, INFINITE);
441
442                            if wait_result.0 == windows::Win32::Foundation::WAIT_OBJECT_0.0 {
443                                return Err(IpcError::Shutdown);
444                            }
445
446                            match GetOverlappedResult(
447                                self.pipe_handle.as_ref().unwrap().get(),
448                                &overlapped,
449                                &mut num_read,
450                                false,
451                            ) {
452                                Ok(()) => {
453                                    if num_read > 0 {
454                                        buf_ptr = buf_ptr.add(num_read as usize);
455                                        remaining -= num_read as usize;
456                                    } else {
457                                        return Err(IpcError::Disconnected);
458                                    }
459                                }
460                                Err(_) => {
461                                    return Err(IpcError::Disconnected);
462                                }
463                            }
464                        } else {
465                            return Err(IpcError::Disconnected);
466                        }
467                    }
468                }
469            }
470        }
471    }
472
473    /// Skip data from IPC pipe
474    fn _recv_skip(&self, size: usize) -> Result<(), IpcError> {
475        let mut buf = [0u8; 256];
476        let mut remaining = size;
477
478        loop {
479            if remaining == 0 {
480                return Ok(());
481            }
482
483            let recv_size = if remaining > 256 { 256 } else { remaining };
484
485            if self._recv_data(buf.as_mut_ptr(), recv_size).is_err() {
486                break;
487            }
488
489            remaining -= recv_size;
490        }
491
492        Err(IpcError::Disconnected)
493    }
494}
495
496impl EverythingClient {
497    /// Retrieves the indexed folder size.
498    ///
499    /// ## Remarks
500    /// Everything 1.5 will index folder sizes by default.
501    /// This can be disabled under Tools -> Options -> Indexes -> Index folder sizes.
502    ///
503    /// A case sensensitive search is performed first.
504    /// If no folder is found a case insensitive search is performed.
505    #[doc(alias = "get_folder_size_from_filename")]
506    pub fn get_folder_size(&self, path: &str) -> Result<u64, IpcError> {
507        let mut value: u64 = u64::MAX;
508        let mut num_read: usize = 0;
509
510        let result = self.ioctrl(
511            COMMAND_GET_FOLDER_SIZE,
512            path.as_ptr(),
513            path.len(),
514            &mut value as *mut u64 as *mut u8,
515            mem::size_of::<u64>(),
516            &mut num_read,
517        )?;
518
519        if result && num_read == mem::size_of::<u64>() && value != u64::MAX {
520            Ok(value)
521        } else {
522            Err(IpcError::BadResponse)
523        }
524    }
525}
526
527/// Get pipe name with instance suffix if provided
528fn get_pipe_name(buf: &mut Vec<u16>, instance_name: &str) -> bool {
529    let base = r"\\.\PIPE\Everything IPC";
530
531    // Push base string as UTF-16
532    for ch in base.encode_utf16() {
533        buf.push(ch);
534    }
535
536    if !instance_name.is_empty() {
537        buf.push('(' as u16);
538        buf.push(' ' as u16);
539
540        // Push instance name as UTF-16
541        for ch in instance_name.encode_utf16() {
542            buf.push(ch);
543        }
544
545        buf.push(')' as u16);
546    }
547
548    // Null terminator
549    buf.push(0);
550    true
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556
557    #[tokio::test]
558    async fn new() {
559        // Test that we can connect to the Everything IPC
560        let client = EverythingClient::builder().build().await.unwrap();
561        let _ = client;
562    }
563
564    #[tokio::test]
565    async fn get_folder_size() {
566        let client = EverythingClient::builder().build().await.expect("connect");
567
568        let result = client.get_folder_size(r"C:\Windows");
569
570        match result {
571            Ok(size) => {
572                dbg!(size);
573                assert!(size > 0, "should have a non-zero size");
574
575                assert!(
576                    size > 1024 * 1024, // at least 1MB
577                    "should be at least 1MB, got {} bytes",
578                    size
579                );
580            }
581            Err(e) => {
582                // The command may not be supported by all versions of Everything
583                panic!(
584                    "Note: get_folder_size_from_filename_w returned error: {:?}",
585                    e
586                );
587            }
588        }
589    }
590}