Skip to main content

everything_ipc/wm/
mod.rs

1/*!
2Everything's window message IPC interface, supported by Everything v1.4+.
3
4The main API is [`EverythingClient`].
5
6- Support Everything v1.4 and v1.5 (including Alpha version).
7- Higher performance than Everything v1.4's official SDK:
8  - Hot query time is about 30% shorter.
9  - Sending blocking time is 60% shorter for async queries.
10- Support both sync and async (Tokio) querying.
11
12## Examples
13```no_run
14// cargo add everything-ipc
15use everything_ipc::wm::{EverythingClient, RequestFlags, Sort};
16
17let everything = EverythingClient::new().expect("not available");
18
19let list = everything
20    .query_wait(r"C:\Windows\ *.exe")
21    .request_flags(RequestFlags::FileName | RequestFlags::Size | RequestFlags::Path)
22    .sort(Sort::SizeDescending)
23    .max_results(10)
24    .call()
25    .expect("query");
26
27println!("Found {} items:", list.len());
28println!("{:<25} {:>10}  {}", "Filename", "Size", "Path");
29for item in list.iter() {
30    // get_string() for String, get_str() for &U16CStr
31    let filename = item.get_string(RequestFlags::FileName).unwrap();
32    let path = item.get_str(RequestFlags::Path).unwrap().display();
33    let size = item.get_size(RequestFlags::Size).unwrap();
34    println!("{:<25} {:>10}  {}", filename, size, path);
35}
36println!("Total: {} items", list.total_len());
37/*
38Found 5 items:
39Filename                        Size  Path
40MRT.exe                    223939376  C:\Windows\System32
41MRT-KB890830.exe           133315992  C:\Windows\System32
42OneDriveSetup.exe           89771848  C:\Windows\WinSxS\amd64_microsoft-windows-onedrive-setup_31bf3856ad364e35_10.0.26100.5074_none_c1340e9ad5f0a5d0
43OneDriveSetup.exe           89771848  C:\Windows\System32
44OneDriveSetup.exe           60357040  C:\Windows\WinSxS\amd64_microsoft-windows-onedrive-setup_31bf3856ad364e35_10.0.26100.1_none_2233e98c8e9ce5f5
45Total: 5742 items
46*/
47```
48
49## References
50- [everything-cpp (IbEverythingLib)](https://github.com/Chaoses-Ib/IbEverythingLib/tree/master/everything-cpp)
51*/
52use std::{
53    mem,
54    sync::{Arc, Once, atomic, mpsc},
55    time::Duration,
56};
57
58use bon::bon;
59use tracing::{debug, error, instrument, trace, warn};
60use widestring::u16cstr;
61use windows::{
62    Win32::{
63        Foundation::{HINSTANCE, HWND, LPARAM, LRESULT, WPARAM},
64        System::DataExchange::COPYDATASTRUCT,
65        UI::WindowsAndMessaging::{
66            CreateWindowExW, DefWindowProcW, DispatchMessageW, GWL_USERDATA, GetMessageW,
67            GetWindowLongPtrW, MSG, PostMessageW, RegisterClassW, ReplyMessage, SendMessageW,
68            SetWindowLongPtrW, WINDOW_EX_STYLE, WINDOW_STYLE, WM_APP, WM_COPYDATA, WM_QUIT,
69            WNDCLASSW,
70        },
71    },
72    core::PCWSTR,
73};
74
75use crate::{IpcWindow, windows::get_current_module_handle};
76
77mod types;
78pub use types::*;
79mod ext;
80
81/// Errors that can occur when querying Everything
82#[derive(Debug, thiserror::Error)]
83pub enum IpcError {
84    /// No IPC window available
85    #[error("IPC window not found")]
86    NoIpcWindow,
87
88    /// Failed to create reply window
89    #[error("failed to create reply window")]
90    CreateReplyWindow,
91
92    /// Failed to send query to Everything
93    #[error("failed to send query to Everything")]
94    Send,
95
96    /// Query timed out waiting for response
97    #[error("query timed out")]
98    Timeout,
99
100    #[error("query: {0}")]
101    Query(&'static str),
102}
103
104// ==================== Reply Window ====================
105
106/// Window class name for Everything IPC reply windows
107const WINDOW_CLASS_NAME: &widestring::U16CStr = u16cstr!("everything_ipc::wm");
108
109/// Global flag to track if the window class has been registered
110static CLASS_REGISTERED: Once = Once::new();
111
112/// Register the window class globally (once per process)
113/// The class must be registered in the same thread that creates windows
114fn register_window_class() {
115    CLASS_REGISTERED.call_once(|| unsafe {
116        let wnd_class = WNDCLASSW {
117            lpfnWndProc: Some(reply_window_wndproc),
118            hInstance: get_current_module_handle().into(),
119            lpszClassName: PCWSTR(WINDOW_CLASS_NAME.as_ptr()),
120            style: Default::default(),
121            cbClsExtra: 0,
122            cbWndExtra: 0,
123            hIcon: Default::default(),
124            hCursor: Default::default(),
125            hbrBackground: Default::default(),
126            lpszMenuName: Default::default(),
127        };
128
129        let class_atom = RegisterClassW(&wnd_class);
130        if class_atom == 0 {
131            error!("Failed to register window class");
132        } else {
133            debug!(
134                "Registered window class {}",
135                WINDOW_CLASS_NAME.to_string_lossy()
136            );
137        }
138    });
139}
140
141/// A hidden reply window that receives query responses from Everything
142#[derive(Debug)]
143struct ReplyWindow {
144    hwnd: HWND,
145    // The thread handle is Send because we only use it to join the thread
146    // This is safe because we only join the thread in Drop
147    _thread: std::thread::JoinHandle<()>,
148}
149
150// SAFETY: The JoinHandle is only used to join the thread in Drop.
151// The thread is created in ReplyWindow::new and only runs the message loop,
152// which doesn't access any thread-local state. The HWND is stored as a raw
153// pointer internally and is only accessed on the message loop thread.
154// ReplyWindow is Send because we move the window creation into the thread.
155unsafe impl Send for ReplyWindow {}
156// ReplyWindow is Sync because the message loop is the only thread that
157// accesses the HWND, and all access is through the single message loop thread.
158unsafe impl Sync for ReplyWindow {}
159
160/// Result from the message loop thread: the created window handle as usize
161#[derive(Debug)]
162struct MessageLoopResult {
163    hwnd_usize: usize,
164}
165
166impl ReplyWindow {
167    /// Create a new reply window - creates the window in the message loop thread
168    pub fn new(inner_ptr: *mut ClientInner) -> Result<Self, IpcError> {
169        // Register the window class (once per process)
170        // This must be called before creating any windows
171        register_window_class();
172
173        // Create a channel to receive the window handle from the message loop thread
174        let (tx, rx) = mpsc::channel::<MessageLoopResult>();
175
176        // Store the inner pointer for use in the message loop thread
177        let inner_ptr_usize = inner_ptr as usize;
178
179        // Start the message loop in a separate thread
180        // The window class must be registered in the same thread where windows are created
181        let thread = std::thread::spawn(move || {
182            // Create the window in THIS thread (the message loop thread)
183            let hwnd = unsafe {
184                CreateWindowExW(
185                    WINDOW_EX_STYLE(0),
186                    PCWSTR(WINDOW_CLASS_NAME.as_ptr()),
187                    None,
188                    WINDOW_STYLE(0),
189                    0,
190                    0,
191                    0,
192                    0,
193                    None,
194                    None,
195                    Some(HINSTANCE::default()),
196                    None,
197                )
198            };
199
200            let hwnd = match hwnd.ok() {
201                Some(h) => h,
202                None => {
203                    debug!("Failed to create window in message loop thread");
204                    let _ = tx.send(MessageLoopResult { hwnd_usize: 0 });
205                    return;
206                }
207            };
208
209            // Send the window handle back to the caller as usize
210            if let Err(_) = tx.send(MessageLoopResult {
211                hwnd_usize: hwnd.0 as usize,
212            }) {
213                // _ = unsafe { DestroyWindow(hwnd) };
214                return;
215            }
216
217            debug!(?hwnd, "Created reply window");
218
219            // Set GWL_USERDATA to the EverythingInner pointer
220            unsafe { SetWindowLongPtrW(hwnd, GWL_USERDATA, inner_ptr_usize as isize) };
221
222            // Run the message loop
223            run_message_loop(hwnd);
224        });
225
226        // Wait for the window handle from the message loop thread
227        let result = rx.recv().map_err(|_| IpcError::CreateReplyWindow)?;
228        let MessageLoopResult { hwnd_usize } = result;
229
230        let hwnd = HWND(hwnd_usize as *mut _);
231        if hwnd.is_invalid() {
232            return Err(IpcError::CreateReplyWindow);
233        }
234
235        Ok(Self {
236            hwnd,
237            _thread: thread,
238        })
239    }
240
241    /// Get the window handle
242    pub fn hwnd(&self) -> HWND {
243        self.hwnd
244    }
245
246    /// Send a message to this window
247    pub fn post_message(
248        &self,
249        msg: u32,
250        w_param: WPARAM,
251        l_param: LPARAM,
252    ) -> Result<(), windows::core::Error> {
253        unsafe { PostMessageW(Some(self.hwnd), msg, w_param, l_param) }
254    }
255
256    /// Post `WM_QUIT` to the reply window to signal the message loop to exit
257    pub fn quit(&self) {
258        let _ = self.post_message(WM_QUIT, WPARAM(0), LPARAM(0));
259    }
260}
261
262impl Drop for ReplyWindow {
263    fn drop(&mut self) {
264        // This must be done before waiting for the thread to ensure the thread
265        // can process the quit message
266        self.quit();
267
268        // Join the message loop thread if it exists
269        // if let Some(handle) = self.thread.take() {
270        //     let _ = handle.join();
271        // }
272    }
273}
274
275/// A query response received from Everything
276#[derive(Debug)]
277pub struct QueryResponse {
278    pub id: u32,
279    pub data: Vec<u8>,
280}
281
282/// Reply window procedure for handling WM_APP and WM_COPYDATA
283#[instrument(skip_all, fields(hwnd))]
284unsafe extern "system" fn reply_window_wndproc(
285    hwnd: HWND,
286    msg: u32,
287    w_param: WPARAM,
288    l_param: LPARAM,
289) -> LRESULT {
290    match msg {
291        // Forward the request data to the IPC window
292        WM_APP => {
293            // WPARAM contains pointer to Box<Vec<u8>> (from PostMessageW)
294            // We take ownership of the Box, create a COPYDATASTRUCT, and forward it
295            let request_ptr = w_param.0 as *mut Vec<u8>;
296            let request = unsafe { Box::from_raw(request_ptr) };
297
298            // Get the EverythingInner pointer from GWL_USERDATA
299            let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) };
300            if inner_ptr != 0 {
301                let inner = unsafe { &*(inner_ptr as *const ClientInner) };
302                let ipc_hwnd = inner.ipc_window.hwnd();
303
304                // Create COPYDATASTRUCT from the request data
305                let cds = COPYDATASTRUCT {
306                    dwData: EVERYTHING_IPC_COPYDATA_QUERY2W as usize,
307                    cbData: request.len() as u32,
308                    lpData: request.as_ptr() as *mut _,
309                };
310
311                // Get the raw pointer to the COPYDATASTRUCT
312                let cds_ptr = &cds as *const COPYDATASTRUCT;
313
314                // Send to IPC window synchronously
315                let r = unsafe {
316                    SendMessageW(
317                        ipc_hwnd,
318                        WM_COPYDATA,
319                        Some(WPARAM(hwnd.0 as usize)),
320                        Some(LPARAM(cds_ptr as isize)),
321                    )
322                };
323                if r.0 == 1 {
324                    trace!(?ipc_hwnd, ?r);
325                } else {
326                    warn!(?ipc_hwnd, ?r);
327                    // Drop the current query sender since the message failed to send
328                    // This will cause the query to Err on the client side
329                    drop(inner.current_query_sender.lock().unwrap().take());
330                }
331            }
332
333            // Request Vec is dropped here after SendMessageW returns
334
335            LRESULT(0)
336        }
337        // Response from Everything IPC window
338        WM_COPYDATA => {
339            let copydata = unsafe { &*(l_param.0 as *const COPYDATASTRUCT) };
340            // Do not assert that copydata->dwData == _EVERYTHING_COPYDATA_QUERYREPLY(0)
341            // The code in Everything's SDK is wrong. copydata->dwData is replyid and can be any value.
342            let id = copydata.dwData as u32;
343
344            // Get the EverythingInner pointer from GWL_USERDATA
345            let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) } as *const ClientInner;
346            if inner_ptr.is_null() {
347                error!("No object found");
348                return LRESULT(0);
349            }
350
351            // Get the sender from the inner struct
352            let inner = unsafe { &*inner_ptr };
353            if let Some(sender) = inner.current_query_sender.lock().unwrap().take() {
354                if match &sender {
355                    QuerySender::Sync(_sender) => {
356                        // TODO: https://github.com/rust-lang/rust/issues/153668
357                        /*
358                        if sender.is_disconnected() {
359                            return LRESULT(1);
360                        }
361                        */
362                        false
363                    }
364                    #[cfg(feature = "tokio")]
365                    QuerySender::Tokio(sender) => sender.is_closed(),
366                } {
367                    return LRESULT(1);
368                }
369
370                // Convert to QueryList and send
371                // TODO: Callback for one less copy
372                let data = unsafe {
373                    std::slice::from_raw_parts(
374                        copydata.lpData as *const u8,
375                        copydata.cbData as usize,
376                    )
377                }
378                .into();
379                // Reply to Everything
380                _ = unsafe { ReplyMessage(LRESULT(1)) };
381                trace!(id, cbData = copydata.cbData, "WM_COPYDATA received");
382
383                let results = QueryList::new(id, data);
384                if match sender {
385                    QuerySender::Sync(sender) => sender.send(results).is_ok(),
386                    #[cfg(feature = "tokio")]
387                    QuerySender::Tokio(sender) => sender.send(results).is_ok(),
388                } {
389                    debug!(id, "Sent query response");
390                } else {
391                    warn!(id, "Failed to send query response");
392                }
393            } else {
394                warn!(id, "No pending query");
395            }
396
397            LRESULT(1)
398        }
399        _ => unsafe { DefWindowProcW(hwnd, msg, w_param, l_param) },
400    }
401}
402
403/// Message loop runner - runs in a separate thread
404/// Takes an HWND which is passed as usize for Send compatibility
405fn run_message_loop(hwnd: HWND) {
406    unsafe {
407        let mut msg: MSG = mem::zeroed();
408        let mut ret;
409        loop {
410            ret = GetMessageW(&mut msg, Some(hwnd), 0, 0);
411            if ret.0 <= 0 {
412                break;
413            }
414            DispatchMessageW(&mut msg);
415        }
416    }
417}
418
419/// Wrapper for query senders (both sync and async)
420enum QuerySender {
421    Sync(mpsc::Sender<QueryList>),
422    #[cfg(feature = "tokio")]
423    Tokio(tokio::sync::oneshot::Sender<QueryList>),
424}
425
426/// Inner state shared by Everything
427struct ClientInner {
428    ipc_window: IpcWindow,
429    /// The sender for the current (last) query
430    /// Using Mutex for thread-safe mutable access
431    current_query_sender: std::sync::Mutex<Option<QuerySender>>,
432}
433
434/**
435Everything IPC client
436
437See [`wm`](super::wm) for details.
438*/
439pub struct EverythingClient {
440    inner: Arc<ClientInner>,
441    reply_window: ReplyWindow,
442}
443
444impl IpcWindow {
445    pub fn wm_client(&self) -> Result<EverythingClient, IpcError> {
446        // Create the inner state
447        let inner = Arc::new(ClientInner {
448            ipc_window: self.clone(),
449            current_query_sender: std::sync::Mutex::new(None),
450        });
451
452        // Create the reply window with a pointer to the inner state
453        let inner_ptr = Arc::as_ptr(&inner) as *mut ClientInner;
454        let reply_window = ReplyWindow::new(inner_ptr)?;
455
456        Ok(EverythingClient {
457            inner,
458            reply_window,
459        })
460    }
461}
462
463impl std::ops::Deref for EverythingClient {
464    type Target = IpcWindow;
465
466    fn deref(&self) -> &Self::Target {
467        self.ipc_window()
468    }
469}
470
471impl EverythingClient {
472    /// Create a new Everything client
473    pub fn new() -> Result<Self, IpcError> {
474        IpcWindow::new().ok_or(IpcError::NoIpcWindow)?.wm_client()
475    }
476
477    /// Create a new Everything client with instance name
478    pub fn with_instance(instance_name: Option<&str>) -> Result<Self, IpcError> {
479        IpcWindow::with_instance(instance_name)
480            .ok_or(IpcError::NoIpcWindow)?
481            .wm_client()
482    }
483
484    /// Get the IPC window for sending messages
485    fn ipc_window(&self) -> &IpcWindow {
486        &self.inner.ipc_window
487    }
488
489    /// Get the next query ID
490    fn next_id(&self) -> u32 {
491        static NEXT_ID: atomic::AtomicU32 = atomic::AtomicU32::new(0);
492        NEXT_ID.fetch_add(1, atomic::Ordering::SeqCst)
493    }
494
495    /// Send a query to Everything
496    fn query_send(
497        &self,
498        search: &str,
499        search_flags: SearchFlags,
500        request_flags: RequestFlags,
501        sort: Sort,
502        id: u32,
503        offset: u32,
504        max_results: Option<u32>,
505    ) -> bool {
506        let msg_hwnd = self.reply_window.hwnd();
507
508        // Build the query request using EverythingIpcQuery2 struct
509        let request = EverythingIpcQuery2::create(
510            msg_hwnd.0 as u32,
511            id,
512            search_flags.bits(),
513            offset,
514            max_results.unwrap_or(u32::MAX),
515            request_flags.bits(),
516            sort as u32,
517            search,
518        );
519
520        // Box the request Vec to keep it alive until the message is processed
521        let request_box = Box::new(request);
522        let request_ptr = Box::into_raw(request_box);
523
524        // available: SendMessageW (blocked), SendMessageTimeoutW (unstable)
525        // unavailable: PostMessageW, SendNotifyMessageW
526        // not tested: SendMessageCallbackW
527
528        // Use ReplyWindow::post_message to send WM_APP
529        // The reply window's wndproc will forward this to the IPC window
530        // WPARAM contains pointer to Box<Vec<u8>>, which owns the request data
531        match self
532            .reply_window
533            .post_message(WM_APP, WPARAM(request_ptr as usize), LPARAM(0))
534        {
535            Ok(_) => true,
536            Err(_) => {
537                // PostMessageW failed, free the request to avoid leak
538                let _ = unsafe { Box::from_raw(request_ptr) };
539                false
540            }
541        }
542    }
543}
544
545#[bon]
546impl EverythingClient {
547    /// Send a query to Everything
548    ///
549    /// # Important Note
550    /// Everything only handles one query per window at a time. Sending another query
551    /// when a query has not completed will cancel the old query.
552    ///
553    /// This method serializes queries by replacing the previous sender.
554    /// Callers should wait for the receiver before calling again.
555    ///
556    /// # Example
557    /// ```no_run
558    /// use std::time::Duration;
559    /// use everything_ipc::wm::{EverythingClient, RequestFlags};
560    ///
561    /// let everything = EverythingClient::new().unwrap();
562    ///
563    /// // These queries will be serialized (not sent concurrently)
564    /// let receiver1 = everything
565    ///     .query("search1")
566    ///     .request_flags(RequestFlags::FileName)
567    ///     .call()
568    ///     .unwrap();
569    /// let result1 = receiver1.recv_timeout(Duration::from_secs(5)).unwrap();
570    ///
571    /// // Now safe to send next query
572    /// let receiver2 = everything
573    ///     .query("search2")
574    ///     .request_flags(RequestFlags::FileName)
575    ///     .call()
576    ///     .unwrap();
577    /// let result2 = receiver2.recv_timeout(Duration::from_secs(5)).unwrap();
578    /// ```
579    #[instrument(skip_all)]
580    #[builder]
581    pub fn query(
582        &self,
583        #[builder(start_fn)] search: &str,
584        #[builder(default)] search_flags: SearchFlags,
585        request_flags: RequestFlags,
586        #[builder(default)] sort: Sort,
587        #[builder(default)] offset: u32,
588        max_results: Option<u32>,
589    ) -> Result<mpsc::Receiver<QueryList>, IpcError> {
590        let id = self.next_id();
591        debug!("generating query ID {}", id);
592
593        // Create a channel for the response
594        let (sender, receiver) = mpsc::channel::<QueryList>();
595
596        // Send the query first
597        let sent = self.query_send(
598            search,
599            search_flags,
600            request_flags,
601            sort,
602            id,
603            offset,
604            max_results,
605        );
606
607        if !sent {
608            warn!("failed to send query ID {}", id);
609            return Err(IpcError::Send);
610        }
611        debug!("query ID {} sent successfully", id);
612
613        // Store the sender (only one query at a time per Everything instance)
614        // Using Mutex for thread-safe mutable access
615        let old_sender = self
616            .inner
617            .current_query_sender
618            .lock()
619            .unwrap()
620            .replace(QuerySender::Sync(sender));
621        // Drop any previous sender that wasn't used - its receiver will fail
622        drop(old_sender);
623
624        Ok(receiver)
625    }
626
627    /// Send a query to Everything and wait for the result
628    ///
629    /// This method serializes queries to work around Everything's single-query-per-window limitation.
630    /// Only one query can be sent at a time per reply window.
631    #[instrument(skip_all)]
632    #[builder]
633    pub fn query_wait(
634        &self,
635        #[builder(start_fn)] search: &str,
636        #[builder(default)] search_flags: SearchFlags,
637        request_flags: RequestFlags,
638        #[builder(default)] sort: Sort,
639        #[builder(default)] offset: u32,
640        max_results: Option<u32>,
641        #[builder(default = Duration::from_millis(3000))] timeout: Duration,
642    ) -> Result<QueryList, IpcError> {
643        // Reuse query to send the query, then wait for the result
644        let receiver = self
645            .query(search)
646            .search_flags(search_flags)
647            .request_flags(request_flags)
648            .sort(sort)
649            .offset(offset)
650            .maybe_max_results(max_results)
651            .call()?;
652
653        // Wait for the response with timeout
654        match receiver.recv_timeout(timeout) {
655            Ok(results) => Ok(results),
656            Err(_) => {
657                warn!("query timed out");
658                Err(IpcError::Timeout)
659            }
660        }
661    }
662}
663
664#[cfg(feature = "tokio")]
665#[bon]
666impl EverythingClient {
667    /// Send a query to Everything asynchronously
668    ///
669    /// # Important Note
670    /// Everything only handles one query per window at a time. Sending another query
671    /// when a query has not completed will cancel the old query.
672    ///
673    /// This method serializes queries by replacing the previous sender.
674    /// Callers should await the receiver before calling again.
675    ///
676    /// # Example
677    /// ```no_run
678    /// use everything_ipc::wm::{EverythingClient, RequestFlags};
679    ///
680    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
681    /// let everything = EverythingClient::new().unwrap();
682    ///
683    /// // These queries will be serialized (not sent concurrently)
684    /// let receiver1 = everything
685    ///     .query_tokio("search1")
686    ///     .request_flags(RequestFlags::FileName)
687    ///     .call()?;
688    /// let result1 = receiver1.await?;
689    ///
690    /// // Now safe to send next query
691    /// let receiver2 = everything
692    ///     .query_tokio("search2")
693    ///     .request_flags(RequestFlags::FileName)
694    ///     .call()?;
695    /// let result2 = receiver2.await?;
696    /// # Ok(())
697    /// # }
698    /// ```
699    #[instrument(skip_all)]
700    #[builder]
701    pub fn query_tokio(
702        &self,
703        #[builder(start_fn)] search: &str,
704        #[builder(default)] search_flags: SearchFlags,
705        request_flags: RequestFlags,
706        #[builder(default)] sort: Sort,
707        #[builder(default)] offset: u32,
708        max_results: Option<u32>,
709    ) -> Result<tokio::sync::oneshot::Receiver<QueryList>, IpcError> {
710        let id = self.next_id();
711        debug!("generating query ID {}", id);
712
713        // Create a channel for the response
714        let (sender, receiver) = tokio::sync::oneshot::channel::<QueryList>();
715
716        // Send the query first
717        let sent = self.query_send(
718            search,
719            search_flags,
720            request_flags,
721            sort,
722            id,
723            offset,
724            max_results,
725        );
726
727        if !sent {
728            warn!("failed to send query ID {}", id);
729            return Err(IpcError::Send);
730        }
731        debug!("query ID {} sent successfully", id);
732
733        // Store the sender (only one query at a time per Everything instance)
734        // Using Mutex for thread-safe mutable access
735        let old_sender = self
736            .inner
737            .current_query_sender
738            .lock()
739            .unwrap()
740            .replace(QuerySender::Tokio(sender));
741        // Drop any previous sender that wasn't used - its receiver will fail
742        drop(old_sender);
743
744        Ok(receiver)
745    }
746
747    /// Send a query to Everything asynchronously and wait for the result
748    ///
749    /// This method serializes queries to work around Everything's single-query-per-window limitation.
750    /// Only one query can be sent at a time per reply window.
751    #[instrument(skip_all)]
752    #[builder]
753    pub async fn query_wait_tokio(
754        &self,
755        #[builder(start_fn)] search: &str,
756        #[builder(default)] search_flags: SearchFlags,
757        request_flags: RequestFlags,
758        #[builder(default)] sort: Sort,
759        #[builder(default)] offset: u32,
760        max_results: Option<u32>,
761        #[builder(default = Duration::from_millis(3000))] timeout: Duration,
762    ) -> Result<QueryList, IpcError> {
763        // Reuse query_async to send the query, then wait for the result
764        let receiver = self
765            .query_tokio(search)
766            .search_flags(search_flags)
767            .request_flags(request_flags)
768            .sort(sort)
769            .offset(offset)
770            .maybe_max_results(max_results)
771            .call()?;
772
773        // Wait for the response with timeout
774        match tokio::time::timeout(timeout, receiver).await {
775            Ok(Ok(results)) => Ok(results),
776            Ok(Err(_)) => {
777                warn!("query receiver error");
778                Err(IpcError::Send)
779            }
780            Err(_) => {
781                warn!("query timed out");
782                Err(IpcError::Timeout)
783            }
784        }
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791
792    #[test_log::test]
793    #[test_log(default_log_filter = "trace")]
794    fn doc() {
795        let everything = EverythingClient::new().expect("not available");
796
797        let list = everything
798            .query_wait(r"C:\Windows\ *.exe")
799            .request_flags(RequestFlags::FileName | RequestFlags::Size | RequestFlags::Path)
800            .sort(Sort::SizeDescending)
801            .max_results(5)
802            .call()
803            .expect("query");
804
805        println!("Found {} items:", list.len());
806        println!("{:<25} {:>10}  {}", "Filename", "Size", "Path");
807        for item in list.iter() {
808            // get_string() for String, get_str() for &U16CStr
809            let filename = item.get_string(RequestFlags::FileName).unwrap();
810            let path = item.get_str(RequestFlags::Path).unwrap().display();
811            let size = item.get_size(RequestFlags::Size).unwrap();
812            println!("{:<25} {:>10}  {}", filename, size, path);
813        }
814        println!("Total: {} items", list.total_len());
815    }
816
817    #[test_log::test]
818    #[test_log(default_log_filter = "trace")]
819    fn query_empty_search() {
820        let everything = EverythingClient::new().unwrap();
821        let search = "";
822        let search_flags = SearchFlags::MatchCase;
823        let request_flags = RequestFlags::FileName | RequestFlags::Path;
824        let sort = Sort::NameAscending;
825
826        // Send query for first 5 items
827        let result =
828            everything.query_send(search, search_flags, request_flags, sort, 1000, 0, Some(5));
829
830        assert!(result, "Query should be sent successfully");
831    }
832
833    #[test_log::test]
834    #[test_log(default_log_filter = "trace")]
835    fn query_with_pattern() {
836        let everything = EverythingClient::new().unwrap();
837        let search = "test";
838        let search_flags = SearchFlags::MatchCase;
839        let request_flags = RequestFlags::FileName;
840        let sort = Sort::NameAscending;
841
842        let result =
843            everything.query_send(search, search_flags, request_flags, sort, 1001, 0, Some(10));
844
845        assert!(result, "Query should be sent successfully");
846    }
847
848    #[test]
849    fn query_with_full_path() {
850        let everything = EverythingClient::new().unwrap();
851        let search = "";
852        let search_flags = SearchFlags::MatchCase;
853        let request_flags =
854            RequestFlags::FullPathAndFileName | RequestFlags::Size | RequestFlags::DateModified;
855        let sort = Sort::NameAscending;
856
857        let result =
858            everything.query_send(search, search_flags, request_flags, sort, 1002, 0, Some(3));
859
860        assert!(result, "Query should be sent successfully");
861    }
862
863    #[test]
864    fn query_sort_by_size() {
865        let everything = EverythingClient::new().unwrap();
866        let search = "";
867        let search_flags = SearchFlags::MatchCase;
868        let request_flags = RequestFlags::FileName | RequestFlags::Size;
869        let sort = Sort::SizeAscending;
870
871        let result =
872            everything.query_send(search, search_flags, request_flags, sort, 1003, 0, Some(5));
873
874        assert!(result, "Query should be sent successfully");
875    }
876
877    #[test]
878    fn query_with_offset() {
879        let everything = EverythingClient::new().unwrap();
880        let search = "";
881        let search_flags = SearchFlags::MatchCase;
882        let request_flags = RequestFlags::FileName;
883        let sort = Sort::NameAscending;
884
885        // First query without offset
886        let result1 =
887            everything.query_send(search, search_flags, request_flags, sort, 1005, 0, Some(2));
888
889        assert!(result1, "First query should be sent successfully");
890
891        // Second query with offset
892        let result2 =
893            everything.query_send(search, search_flags, request_flags, sort, 1006, 2, Some(2));
894
895        assert!(
896            result2,
897            "Second query with offset should be sent successfully"
898        );
899    }
900
901    #[test]
902    fn query_everything() {
903        let everything = EverythingClient::new().unwrap();
904        let search = "test";
905        let search_flags = SearchFlags::MatchCase;
906        let request_flags = RequestFlags::FileName;
907        let sort = Sort::NameAscending;
908
909        let result = everything.query_send(
910            search,
911            search_flags,
912            request_flags,
913            sort,
914            everything.next_id(),
915            0,
916            Some(5),
917        );
918
919        assert!(result, "Query should be sent successfully");
920    }
921
922    #[test]
923    fn query_multiple_requests() {
924        let everything = EverythingClient::new().unwrap();
925        let search = "";
926        let search_flags = SearchFlags::MatchCase;
927        let request_flags = RequestFlags::FileName
928            | RequestFlags::Path
929            | RequestFlags::Size
930            | RequestFlags::DateModified
931            | RequestFlags::DateCreated;
932        let sort = Sort::NameAscending;
933
934        let result =
935            everything.query_send(search, search_flags, request_flags, sort, 1004, 0, Some(5));
936
937        assert!(result, "Query should be sent successfully");
938    }
939
940    #[test]
941    fn query_wait_empty() {
942        let everything = EverythingClient::new().unwrap();
943        let search = "";
944        let search_flags = SearchFlags::MatchCase;
945        let request_flags = RequestFlags::FileName;
946        let sort = Sort::NameAscending;
947
948        // Check if IPC is available
949        assert!(everything.is_ipc_available(), "IPC should be available");
950
951        let result = everything
952            .query_wait(search)
953            .search_flags(search_flags)
954            .request_flags(request_flags)
955            .sort(sort)
956            .offset(0)
957            .max_results(10)
958            .call();
959        assert!(
960            result.is_ok(),
961            "query_wait should return Ok when Everything is available"
962        );
963    }
964
965    #[test_log::test]
966    #[test_log(default_log_filter = "trace")]
967    fn query_wait() {
968        let everything = EverythingClient::new().unwrap();
969        // Use empty string to get all files, which is more reliable than searching for "test"
970        // which may not exist on the system
971        let search = "test";
972        // MATCH_ACCENTS is marked as abandoned in the C++ code, use MATCH_CASE instead
973        let search_flags = SearchFlags::MatchCase;
974        let request_flags = RequestFlags::FileName;
975        let sort = Sort::NameAscending;
976
977        // Check if IPC is available
978        assert!(everything.is_ipc_available(), "IPC should be available");
979
980        let result = everything
981            .query_wait(search)
982            .search_flags(search_flags)
983            .request_flags(request_flags)
984            .sort(sort)
985            .offset(0)
986            .max_results(10)
987            .call();
988        dbg!(&result);
989        assert!(
990            result.is_ok(),
991            "query_wait should return Ok when Everything is available"
992        );
993        assert!(
994            result.as_ref().is_ok_and(|r| r.total_len() > 0),
995            "Expected found_num > 0, got: {:?}",
996            result
997        );
998    }
999
1000    #[test_log::test]
1001    #[test_log(default_log_filter = "trace")]
1002    fn query_wait_cancel() {
1003        let everything = EverythingClient::new().unwrap();
1004
1005        // Check if IPC is available
1006        assert!(everything.is_ipc_available(), "IPC should be available");
1007
1008        // Send multiple queries at once
1009        // Note: These will be serialized, so only the last one succeeds
1010        // The first two queries will be disconnected when the next query replaces their sender
1011        let searches = ["", "test", "rust"];
1012        let mut receivers = Vec::new();
1013
1014        for search in &searches {
1015            let search_flags = SearchFlags::MatchCase;
1016            let request_flags = RequestFlags::FileName;
1017            let sort = Sort::NameAscending;
1018            let receiver = everything
1019                .query(search)
1020                .search_flags(search_flags)
1021                .request_flags(request_flags)
1022                .sort(sort)
1023                .offset(0)
1024                .max_results(10)
1025                .call()
1026                .expect("query should succeed");
1027            receivers.push(receiver);
1028        }
1029
1030        // First query should fail (response rejected because sender was replaced)
1031        // When query 2 is sent, it replaces the sender for query 0
1032        // When the response comes back for query 0, the old sender is gone
1033        let result = receivers[0].recv_timeout(std::time::Duration::from_millis(3000));
1034        assert!(
1035            result.is_err(),
1036            "Query 0 should fail because sender was replaced (got: {:?})",
1037            result
1038        );
1039
1040        // Second query should succeed (it's the current sender when response arrives)
1041        let result = receivers[1].recv_timeout(std::time::Duration::from_millis(3000));
1042        assert!(
1043            result.is_err(),
1044            "Query 1 should fail because sender was replaced (got: {:?})",
1045            result
1046        );
1047
1048        // Last query should succeed
1049        let result = receivers[2].recv_timeout(std::time::Duration::from_millis(3000));
1050        let result = result.expect("Last query should succeed");
1051        dbg!(&result);
1052        assert!(
1053            result.total_len() > 0,
1054            "Last query should return valid results"
1055        );
1056    }
1057
1058    #[test_log::test]
1059    #[test_log(default_log_filter = "trace")]
1060    fn query_wait_parallel() {
1061        // Check if IPC is available
1062        let everything1 = EverythingClient::new().unwrap();
1063        let everything2 = EverythingClient::new().unwrap();
1064        let everything3 = EverythingClient::new().unwrap();
1065
1066        assert!(everything1.is_ipc_available(), "IPC should be available");
1067
1068        // Send multiple queries at once using separate Everything instances
1069        // Note: These won't cancel each other
1070        let receiver1 = everything1
1071            .query("")
1072            .search_flags(SearchFlags::MatchCase)
1073            .request_flags(RequestFlags::FileName)
1074            .sort(Sort::NameAscending)
1075            .offset(0)
1076            .max_results(10)
1077            .call()
1078            .expect("query should succeed");
1079        let receiver2 = everything2
1080            .query("test")
1081            .search_flags(SearchFlags::MatchCase)
1082            .request_flags(RequestFlags::FileName)
1083            .sort(Sort::NameAscending)
1084            .offset(0)
1085            .max_results(10)
1086            .call()
1087            .expect("query should succeed");
1088        let receiver3 = everything3
1089            .query("rust")
1090            .search_flags(SearchFlags::MatchCase)
1091            .request_flags(RequestFlags::FileName)
1092            .sort(Sort::NameAscending)
1093            .offset(0)
1094            .max_results(10)
1095            .call()
1096            .expect("query should succeed");
1097
1098        // Wait for all queries to complete
1099        for (i, receiver) in [receiver1, receiver2, receiver3].into_iter().enumerate() {
1100            let result = receiver.recv_timeout(std::time::Duration::from_millis(5000));
1101            let result = result.expect(&format!("Query {} timed out", i));
1102            dbg!(&result);
1103            assert!(result.len() > 0, "Query {} should return valid results", i);
1104        }
1105    }
1106}