Skip to main content

everything_ipc/wm/
mod.rs

1/*!
2Everything's window message IPC interface, supported by Everything v1.4+.
3
4The main API is [`EverythingClient`].
5
6- Support Everything v1.4 and v1.5 (including Alpha version).
7- Higher performance than Everything v1.4's official SDK:
8  - Hot query time is about 30% shorter.
9  - Sending blocking time is 60% shorter for async queries.
10- Support both sync and async (Tokio) querying.
11
12## Examples
13```no_run
14// cargo add everything-ipc
15use everything_ipc::wm::{EverythingClient, RequestFlags, Sort};
16
17let everything = EverythingClient::new().expect("not available");
18
19let list = everything
20    .query_wait(r"C:\Windows\ *.exe")
21    .request_flags(RequestFlags::FileName | RequestFlags::Size | RequestFlags::Path)
22    .sort(Sort::SizeDescending)
23    .max_results(5)
24    .call()
25    .expect("query");
26
27println!("Found {} items:", list.len());
28println!("{:<25} {:>10}  {}", "Filename", "Size", "Path");
29for item in list.iter() {
30    // get_string() for String, get_str() for &U16CStr
31    let filename = item.get_string(RequestFlags::FileName).unwrap();
32    let path = item.get_str(RequestFlags::Path).unwrap().display();
33    let size = item.get_size(RequestFlags::Size).unwrap();
34    println!("{:<25} {:>10}  {}", filename, size, path);
35}
36println!("Total: {} items", list.total_len());
37/*
38Found 5 items:
39Filename                        Size  Path
40MRT.exe                    223939376  C:\Windows\System32
41MRT-KB890830.exe           133315992  C:\Windows\System32
42OneDriveSetup.exe           89771848  C:\Windows\WinSxS\amd64_microsoft-windows-onedrive-setup_31bf3856ad364e35_10.0.26100.5074_none_c1340e9ad5f0a5d0
43OneDriveSetup.exe           89771848  C:\Windows\System32
44OneDriveSetup.exe           60357040  C:\Windows\WinSxS\amd64_microsoft-windows-onedrive-setup_31bf3856ad364e35_10.0.26100.1_none_2233e98c8e9ce5f5
45Total: 5742 items
46*/
47```
48
49## References
50- [everything-cpp (IbEverythingLib)](https://github.com/Chaoses-Ib/IbEverythingLib/tree/master/everything-cpp)
51*/
52use std::{
53    mem,
54    sync::{Arc, atomic, mpsc},
55    time::Duration,
56};
57
58use bon::bon;
59use tracing::{debug, error, instrument, trace, warn};
60use windows::Win32::{
61    Foundation::{HWND, LPARAM, LRESULT, WPARAM},
62    System::DataExchange::COPYDATASTRUCT,
63    UI::{
64        Controls::WC_STATIC,
65        WindowsAndMessaging::{
66            CreateWindowExW, DefWindowProcW, DispatchMessageW, GWL_USERDATA, GWLP_WNDPROC,
67            GetMessageW, GetWindowLongPtrW, MSG, PostMessageW, ReplyMessage, SendMessageW,
68            SetWindowLongPtrW, WINDOW_EX_STYLE, WINDOW_STYLE, WM_APP, WM_COPYDATA, WM_QUIT,
69        },
70    },
71};
72
73use crate::IpcWindow;
74
75mod shared;
76mod types;
77pub use types::*;
78mod ext;
79
80/// Errors that can occur when querying Everything
81#[derive(Debug, thiserror::Error)]
82pub enum IpcError {
83    /// No IPC window available
84    #[error("IPC window not found")]
85    NoIpcWindow,
86
87    /// Failed to create reply window
88    #[error("failed to create reply window")]
89    CreateReplyWindow,
90
91    /// Failed to send query to Everything
92    #[error("failed to send query to Everything")]
93    Send,
94
95    /// Query timed out waiting for response
96    #[error("query timed out")]
97    Timeout,
98
99    #[error("query: {0}")]
100    Query(&'static str),
101}
102
103// ==================== Reply Window ====================
104
105/*
106/// Window class name for Everything IPC reply windows
107const WINDOW_CLASS_NAME: &widestring::U16CStr = u16cstr!("everything_ipc::wm");
108
109/// Global flag to track if the window class has been registered
110static CLASS_REGISTERED: Once = Once::new();
111
112/// Register the window class globally (once per process)
113/// The class must be registered in the same thread that creates windows
114fn register_window_class() {
115    CLASS_REGISTERED.call_once(|| unsafe {
116        let wnd_class = WNDCLASSW {
117            lpfnWndProc: Some(reply_window_wndproc),
118            hInstance: get_current_module_handle().into(),
119            lpszClassName: PCWSTR(WINDOW_CLASS_NAME.as_ptr()),
120            style: Default::default(),
121            cbClsExtra: 0,
122            cbWndExtra: 0,
123            hIcon: Default::default(),
124            hCursor: Default::default(),
125            hbrBackground: Default::default(),
126            lpszMenuName: Default::default(),
127        };
128
129        let class_atom = RegisterClassW(&wnd_class);
130        if class_atom == 0 {
131            error!("Failed to register window class");
132        } else {
133            debug!(
134                "Registered window class {}",
135                WINDOW_CLASS_NAME.to_string_lossy()
136            );
137        }
138    });
139}
140*/
141
142/// A hidden reply window that receives query responses from Everything
143#[derive(Debug)]
144struct ReplyWindow {
145    hwnd: HWND,
146    // The thread handle is Send because we only use it to join the thread
147    // This is safe because we only join the thread in Drop
148    /// `mem::MaybeUninit` would be enough if we don't need `quit_join_thread()`.
149    thread: Option<std::thread::JoinHandle<()>>,
150}
151
152// SAFETY: The JoinHandle is only used to join the thread in Drop.
153// The thread is created in ReplyWindow::new and only runs the message loop,
154// which doesn't access any thread-local state. The HWND is stored as a raw
155// pointer internally and is only accessed on the message loop thread.
156// ReplyWindow is Send because we move the window creation into the thread.
157unsafe impl Send for ReplyWindow {}
158// ReplyWindow is Sync because the message loop is the only thread that
159// accesses the HWND, and all access is through the single message loop thread.
160unsafe impl Sync for ReplyWindow {}
161
162/// Result from the message loop thread: the created window handle as usize
163#[derive(Debug)]
164struct MessageLoopResult {
165    hwnd_usize: usize,
166}
167
168impl ReplyWindow {
169    /// Create a new reply window - creates the window in the message loop thread
170    ///
171    /// `Box` would be enough if we don't need `quit_join_thread()`.
172    pub fn new(inner: Arc<ClientInner>) -> Result<Self, IpcError> {
173        /*
174        // Register the window class (once per process)
175        // This must be called before creating any windows
176        register_window_class();
177        */
178
179        // Create a channel to receive the window handle from the message loop thread
180        let (tx, rx) = mpsc::channel::<MessageLoopResult>();
181
182        // Store the inner pointer for use in the message loop thread
183        // let inner_ptr_usize = inner_ptr as usize;
184
185        // Start the message loop in a separate thread
186        // The window class must be registered in the same thread where windows are created
187        let thread = std::thread::spawn(move || {
188            // Create the window in THIS thread (the message loop thread)
189            let hwnd = unsafe {
190                CreateWindowExW(
191                    WINDOW_EX_STYLE(0),
192                    // PCWSTR(WINDOW_CLASS_NAME.as_ptr()),
193                    WC_STATIC,
194                    None,
195                    WINDOW_STYLE(0),
196                    0,
197                    0,
198                    0,
199                    0,
200                    None,
201                    None,
202                    // Some(HINSTANCE::default()),
203                    None,
204                    None,
205                )
206            };
207
208            let hwnd = match hwnd.ok() {
209                Some(h) => h,
210                None => {
211                    debug!("Failed to create window in message loop thread");
212                    let _ = tx.send(MessageLoopResult { hwnd_usize: 0 });
213                    return;
214                }
215            };
216
217            // Send the window handle back to the caller as usize
218            if let Err(_) = tx.send(MessageLoopResult {
219                hwnd_usize: hwnd.0 as usize,
220            }) {
221                // _ = unsafe { DestroyWindow(hwnd) };
222                return;
223            }
224
225            debug!(?hwnd, "Created reply window");
226
227            // Set GWL_USERDATA to the EverythingInner pointer
228            let inner_ptr = Arc::into_raw(inner);
229            unsafe { SetWindowLongPtrW(hwnd, GWL_USERDATA, inner_ptr as isize) };
230
231            unsafe {
232                SetWindowLongPtrW(
233                    hwnd,
234                    GWLP_WNDPROC,
235                    reply_window_wndproc as *const () as isize,
236                )
237            };
238
239            // Run the message loop
240            run_message_loop(hwnd);
241        });
242
243        // Wait for the window handle from the message loop thread
244        let result = rx.recv().map_err(|_| IpcError::CreateReplyWindow)?;
245        let MessageLoopResult { hwnd_usize } = result;
246
247        let hwnd = HWND(hwnd_usize as *mut _);
248        if hwnd.is_invalid() {
249            return Err(IpcError::CreateReplyWindow);
250        }
251
252        Ok(Self {
253            hwnd,
254            // _thread: mem::MaybeUninit::new(thread),
255            thread: Some(thread),
256        })
257    }
258
259    /// Get the window handle
260    pub fn hwnd(&self) -> HWND {
261        self.hwnd
262    }
263
264    /// Send a message to this window
265    pub fn post_message(
266        &self,
267        msg: u32,
268        w_param: WPARAM,
269        l_param: LPARAM,
270    ) -> Result<(), windows::core::Error> {
271        unsafe { PostMessageW(Some(self.hwnd), msg, w_param, l_param) }
272    }
273
274    /// Post `WM_QUIT` to the reply window to signal the message loop to exit
275    ///
276    /// `WM_CLOSE` works too, but not `DestroyWindow()`.
277    pub fn quit(&self) {
278        let _ = self.post_message(WM_QUIT, WPARAM(0), LPARAM(0));
279    }
280
281    pub fn quit_join_thread(&mut self) {
282        if let Some(thread) = self.thread.take() {
283            self.quit();
284            _ = thread.join();
285        }
286    }
287}
288
289impl Drop for ReplyWindow {
290    fn drop(&mut self) {
291        // This must be done before waiting for the thread to ensure the thread
292        // can process the quit message
293        self.quit();
294
295        /*
296        let _thread = unsafe { self._thread.assume_init_read() };
297        // Join the message loop thread if it exists
298        // if let Some(handle) = self.thread.take() {
299        //     let _ = handle.join();
300        // }
301        #[cfg(feature = "drop-join-thread")]
302        let _ = _thread.join();
303        */
304        #[cfg(feature = "drop-join-thread")]
305        if let Some(thread) = self.thread.take() {
306            _ = thread.join();
307        }
308    }
309}
310
311/// A query response received from Everything
312#[derive(Debug)]
313pub struct QueryResponse {
314    pub id: u32,
315    pub data: Vec<u8>,
316}
317
318/// Reply window procedure for handling WM_APP and WM_COPYDATA
319#[instrument(skip_all, fields(hwnd))]
320unsafe extern "system" fn reply_window_wndproc(
321    hwnd: HWND,
322    msg: u32,
323    w_param: WPARAM,
324    l_param: LPARAM,
325) -> LRESULT {
326    // dbg!(hwnd, msg, w_param, l_param);
327    match msg {
328        // Forward the request data to the IPC window
329        WM_APP => {
330            // WPARAM contains pointer to Box<Vec<u8>> (from PostMessageW)
331            // We take ownership of the Box, create a COPYDATASTRUCT, and forward it
332            let request_ptr = w_param.0 as *mut Vec<u8>;
333            let request = unsafe { Box::from_raw(request_ptr) };
334
335            // Get the EverythingInner pointer from GWL_USERDATA
336            let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) };
337            if inner_ptr != 0 {
338                let inner = unsafe { &*(inner_ptr as *const ClientInner) };
339                let ipc_hwnd = inner.ipc_window.hwnd();
340
341                // Create COPYDATASTRUCT from the request data
342                let cds = COPYDATASTRUCT {
343                    dwData: EVERYTHING_IPC_COPYDATA_QUERY2W as usize,
344                    cbData: request.len() as u32,
345                    lpData: request.as_ptr() as *mut _,
346                };
347
348                // Get the raw pointer to the COPYDATASTRUCT
349                let cds_ptr = &cds as *const COPYDATASTRUCT;
350
351                // Send to IPC window synchronously
352                let r = unsafe {
353                    SendMessageW(
354                        ipc_hwnd,
355                        WM_COPYDATA,
356                        Some(WPARAM(hwnd.0 as usize)),
357                        Some(LPARAM(cds_ptr as isize)),
358                    )
359                };
360                if r.0 == 1 {
361                    trace!(?ipc_hwnd, ?r);
362                } else {
363                    warn!(?ipc_hwnd, ?r);
364                    // Drop the current query sender since the message failed to send
365                    // This will cause the query to Err on the client side
366                    drop(inner.take_current_query_sender());
367                }
368            }
369
370            // Request Vec is dropped here after SendMessageW returns
371
372            LRESULT(0)
373        }
374        // Response from Everything IPC window
375        WM_COPYDATA => {
376            let copydata = unsafe { &*(l_param.0 as *const COPYDATASTRUCT) };
377            // Do not assert that copydata->dwData == _EVERYTHING_COPYDATA_QUERYREPLY(0)
378            // The code in Everything's SDK is wrong. copydata->dwData is replyid and can be any value.
379            let id = copydata.dwData as u32;
380
381            // Get the EverythingInner pointer from GWL_USERDATA
382            let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) } as *const ClientInner;
383            if inner_ptr.is_null() {
384                error!("No object found");
385                return LRESULT(0);
386            }
387
388            // Get the sender from the inner struct
389            let inner = unsafe { &*inner_ptr };
390            if let Some(sender) = inner.take_current_query_sender() {
391                if match &sender {
392                    QuerySender::Sync(_sender) => {
393                        // TODO: https://github.com/rust-lang/rust/issues/153668
394                        /*
395                        if sender.is_disconnected() {
396                            return LRESULT(1);
397                        }
398                        */
399                        false
400                    }
401                    #[cfg(feature = "tokio")]
402                    QuerySender::Tokio(sender) => sender.is_closed(),
403                } {
404                    return LRESULT(1);
405                }
406
407                // Convert to QueryList and send
408                // TODO: Callback for one less copy
409                let data = unsafe {
410                    std::slice::from_raw_parts(
411                        copydata.lpData as *const u8,
412                        copydata.cbData as usize,
413                    )
414                }
415                .into();
416                // Reply to Everything
417                _ = unsafe { ReplyMessage(LRESULT(1)) };
418                trace!(id, cbData = copydata.cbData, "WM_COPYDATA received");
419
420                let results = QueryList::new(id, data);
421                if match sender {
422                    QuerySender::Sync(sender) => sender.send(results).is_ok(),
423                    #[cfg(feature = "tokio")]
424                    QuerySender::Tokio(sender) => sender.send(results).is_ok(),
425                } {
426                    debug!(id, "Sent query response");
427                } else {
428                    warn!(id, "Failed to send query response");
429                }
430            } else {
431                warn!(id, "No pending query");
432            }
433
434            LRESULT(1)
435        }
436        _ => unsafe { DefWindowProcW(hwnd, msg, w_param, l_param) },
437    }
438}
439
440/// Message loop runner - runs in a separate thread
441/// Takes an HWND which is passed as usize for Send compatibility
442fn run_message_loop(hwnd: HWND) {
443    unsafe {
444        let mut msg: MSG = mem::zeroed();
445        let mut ret;
446        loop {
447            ret = GetMessageW(&mut msg, Some(hwnd), 0, 0);
448            if ret.0 <= 0 {
449                break;
450            }
451            DispatchMessageW(&mut msg);
452        }
453    }
454
455    // Cleanup
456    let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) };
457    if inner_ptr != 0 {
458        drop(unsafe { Arc::from_raw(inner_ptr as *mut ClientInner) });
459    }
460}
461
462/// Wrapper for query senders (both sync and async)
463enum QuerySender {
464    Sync(mpsc::Sender<QueryList>),
465    #[cfg(feature = "tokio")]
466    Tokio(tokio::sync::oneshot::Sender<QueryList>),
467}
468
469#[cfg(not(feature = "tokio"))]
470type QueryLock = std::sync::Mutex<()>;
471#[cfg(feature = "tokio")]
472type QueryLock = tokio::sync::Mutex<()>;
473
474/// Inner state shared by Everything
475struct ClientInner {
476    ipc_window: IpcWindow,
477    /// The sender for the current (last) query
478    /// Using Mutex for thread-safe mutable access
479    current_query_sender: std::sync::Mutex<Option<QuerySender>>,
480
481    /// For `query_wait()` from multi threads.
482    ///
483    /// TODO: Single thread version without this?
484    query_lock: QueryLock,
485}
486
487impl ClientInner {
488    /// Safe guard to mitigate possible `PoisonError` panics.
489    pub fn take_current_query_sender(&self) -> Option<QuerySender> {
490        match self.current_query_sender.lock() {
491            Ok(mut sender) => sender.take(),
492            #[cfg(debug_assertions)]
493            Err(e) => Err(e).unwrap(),
494            #[cfg(not(debug_assertions))]
495            Err(e) => {
496                error!("poison");
497                // self.current_query_sender.clear_poison();
498                // e.into_inner()
499                None
500            }
501        }
502    }
503}
504
505/**
506Everything IPC client
507
508See [`wm`](super::wm) for details.
509*/
510pub struct EverythingClient {
511    /// Owned by [`ReplyWindow`] to avoid possible UAF of [`ClientInner`] after drop.
512    inner: Arc<ClientInner>,
513    reply_window: ReplyWindow,
514}
515
516impl IpcWindow {
517    pub fn wm_client(&self) -> Result<EverythingClient, IpcError> {
518        // Create the inner state
519        let inner = Arc::new(ClientInner {
520            ipc_window: self.clone(),
521            current_query_sender: std::sync::Mutex::new(None),
522            query_lock: Default::default(),
523        });
524        /*
525        let inner_ref: &ClientInner = inner.as_ref();
526        let inner_ref: &'static ClientInner = unsafe { mem::transmute(inner_ref) };
527        */
528
529        // Create the reply window with a pointer to the inner state
530        // let inner_ptr = Arc::as_ptr(&inner) as *mut ClientInner;
531        let reply_window = ReplyWindow::new(inner.clone())?;
532
533        // let inner = inner_ref;
534        Ok(EverythingClient {
535            inner,
536            reply_window,
537        })
538    }
539}
540
541impl std::ops::Deref for EverythingClient {
542    type Target = IpcWindow;
543
544    fn deref(&self) -> &Self::Target {
545        self.ipc_window()
546    }
547}
548
549impl EverythingClient {
550    /// Create a new Everything client
551    pub fn new() -> Result<Self, IpcError> {
552        IpcWindow::new().ok_or(IpcError::NoIpcWindow)?.wm_client()
553    }
554
555    /// Create a new Everything client with instance name
556    pub fn with_instance(instance_name: Option<&str>) -> Result<Self, IpcError> {
557        IpcWindow::with_instance(instance_name)
558            .ok_or(IpcError::NoIpcWindow)?
559            .wm_client()
560    }
561
562    /// Get the IPC window for sending messages
563    fn ipc_window(&self) -> &IpcWindow {
564        &self.inner.ipc_window
565    }
566
567    /// Get the next query ID
568    fn next_id(&self) -> u32 {
569        static NEXT_ID: atomic::AtomicU32 = atomic::AtomicU32::new(0);
570        NEXT_ID.fetch_add(1, atomic::Ordering::SeqCst)
571    }
572
573    /// Send a query to Everything
574    fn query_send(
575        &self,
576        search: &str,
577        search_flags: SearchFlags,
578        request_flags: RequestFlags,
579        sort: Sort,
580        id: u32,
581        offset: u32,
582        max_results: Option<u32>,
583    ) -> bool {
584        let msg_hwnd = self.reply_window.hwnd();
585
586        // Build the query request using EverythingIpcQuery2 struct
587        let request = EverythingIpcQuery2::create(
588            msg_hwnd.0 as u32,
589            id,
590            search_flags.bits(),
591            offset,
592            max_results.unwrap_or(u32::MAX),
593            request_flags.bits(),
594            sort as u32,
595            search,
596        );
597
598        // Box the request Vec to keep it alive until the message is processed
599        let request_box = Box::new(request);
600        let request_ptr = Box::into_raw(request_box);
601
602        // available: SendMessageW (blocked), SendMessageTimeoutW (unstable)
603        // unavailable: PostMessageW, SendNotifyMessageW
604        // not tested: SendMessageCallbackW
605
606        // Use ReplyWindow::post_message to send WM_APP
607        // The reply window's wndproc will forward this to the IPC window
608        // WPARAM contains pointer to Box<Vec<u8>>, which owns the request data
609        match self
610            .reply_window
611            .post_message(WM_APP, WPARAM(request_ptr as usize), LPARAM(0))
612        {
613            Ok(_) => true,
614            Err(_) => {
615                // PostMessageW failed, free the request to avoid leak
616                let _ = unsafe { Box::from_raw(request_ptr) };
617                false
618            }
619        }
620    }
621}
622
623#[bon]
624impl EverythingClient {
625    /// Send a query to Everything
626    ///
627    /// # Important Note
628    /// Everything only handles one query per window at a time. Sending another query
629    /// when a query has not completed will cancel the old query.
630    ///
631    /// This method serializes queries by replacing the previous sender.
632    /// Callers should wait for the receiver before calling again.
633    ///
634    /// # Example
635    /// ```no_run
636    /// use std::time::Duration;
637    /// use everything_ipc::wm::{EverythingClient, RequestFlags};
638    ///
639    /// let everything = EverythingClient::new().unwrap();
640    ///
641    /// // These queries will be serialized (not sent concurrently)
642    /// let receiver1 = everything
643    ///     .query("search1")
644    ///     .request_flags(RequestFlags::FileName)
645    ///     .call()
646    ///     .unwrap();
647    /// let result1 = receiver1.recv_timeout(Duration::from_secs(5)).unwrap();
648    ///
649    /// // Now safe to send next query
650    /// let receiver2 = everything
651    ///     .query("search2")
652    ///     .request_flags(RequestFlags::FileName)
653    ///     .call()
654    ///     .unwrap();
655    /// let result2 = receiver2.recv_timeout(Duration::from_secs(5)).unwrap();
656    /// ```
657    #[instrument(skip_all)]
658    #[builder]
659    pub fn query(
660        &self,
661        #[builder(start_fn)] search: &str,
662        #[builder(default)] search_flags: SearchFlags,
663        request_flags: RequestFlags,
664        #[builder(default)] sort: Sort,
665        #[builder(default)] offset: u32,
666        max_results: Option<u32>,
667        // #[builder(setters(vis = ""))] current_query_sender: Option<&mut Option<QuerySender>>,
668    ) -> Result<mpsc::Receiver<QueryList>, IpcError> {
669        let id = self.next_id();
670        debug!("generating query ID {}", id);
671
672        // Create a channel for the response
673        let (sender, receiver) = mpsc::channel::<QueryList>();
674
675        // Send the query first
676        let sent = self.query_send(
677            search,
678            search_flags,
679            request_flags,
680            sort,
681            id,
682            offset,
683            max_results,
684        );
685
686        if !sent {
687            warn!("failed to send query ID {}", id);
688            return Err(IpcError::Send);
689        }
690        debug!("query ID {} sent successfully", id);
691
692        // Store the sender (only one query at a time per Everything instance)
693        // Using Mutex for thread-safe mutable access
694        /*
695        let old_sender = match current_query_sender {
696            Some(current_query_sender) => current_query_sender.replace(QuerySender::Sync(sender)),
697            None => self
698                .inner
699                .current_query_sender
700                .lock()
701                .unwrap()
702                .replace(QuerySender::Sync(sender)),
703        };
704        */
705        let old_sender = self
706            .inner
707            .current_query_sender
708            .lock()
709            .unwrap()
710            .replace(QuerySender::Sync(sender));
711        // Drop any previous sender that wasn't used - its receiver will fail
712        drop(old_sender);
713
714        Ok(receiver)
715    }
716
717    /// Send a query to Everything and wait for the result
718    ///
719    /// This method serializes queries to work around Everything's single-query-per-window limitation.
720    /// Only one query can be sent at a time per reply window.
721    #[instrument(skip_all)]
722    #[builder]
723    pub fn query_wait(
724        &self,
725        #[builder(start_fn)] search: &str,
726        #[builder(default)] search_flags: SearchFlags,
727        request_flags: RequestFlags,
728        #[builder(default)] sort: Sort,
729        #[builder(default)] offset: u32,
730        max_results: Option<u32>,
731        #[builder(default = Duration::from_millis(3000))] timeout: Duration,
732    ) -> Result<QueryList, IpcError> {
733        // let mut current_query_sender = self.inner.current_query_sender.lock().unwrap();
734        #[cfg(not(feature = "tokio"))]
735        let _lock = self.inner.query_lock.lock().unwrap();
736        #[cfg(feature = "tokio")]
737        let _lock = self.inner.query_lock.blocking_lock();
738
739        // Reuse query to send the query, then wait for the result
740        let receiver = self
741            .query(search)
742            .search_flags(search_flags)
743            .request_flags(request_flags)
744            .sort(sort)
745            .offset(offset)
746            .maybe_max_results(max_results)
747            // .current_query_sender(&mut current_query_sender)
748            .call()?;
749
750        // Wait for the response with timeout
751        match receiver.recv_timeout(timeout) {
752            Ok(results) => Ok(results),
753            Err(_) => {
754                warn!("query timed out");
755                Err(IpcError::Timeout)
756            }
757        }
758    }
759}
760
761#[cfg(feature = "tokio")]
762#[bon]
763impl EverythingClient {
764    /// Send a query to Everything asynchronously
765    ///
766    /// # Important Note
767    /// Everything only handles one query per window at a time. Sending another query
768    /// when a query has not completed will cancel the old query.
769    ///
770    /// This method serializes queries by replacing the previous sender.
771    /// Callers should await the receiver before calling again.
772    ///
773    /// # Example
774    /// ```no_run
775    /// use everything_ipc::wm::{EverythingClient, RequestFlags};
776    ///
777    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
778    /// let everything = EverythingClient::new().unwrap();
779    ///
780    /// // These queries will be serialized (not sent concurrently)
781    /// let receiver1 = everything
782    ///     .query_tokio("search1")
783    ///     .request_flags(RequestFlags::FileName)
784    ///     .call()?;
785    /// let result1 = receiver1.await?;
786    ///
787    /// // Now safe to send next query
788    /// let receiver2 = everything
789    ///     .query_tokio("search2")
790    ///     .request_flags(RequestFlags::FileName)
791    ///     .call()?;
792    /// let result2 = receiver2.await?;
793    /// # Ok(())
794    /// # }
795    /// ```
796    #[instrument(skip_all)]
797    #[builder]
798    pub fn query_tokio(
799        &self,
800        #[builder(start_fn)] search: &str,
801        #[builder(default)] search_flags: SearchFlags,
802        request_flags: RequestFlags,
803        #[builder(default)] sort: Sort,
804        #[builder(default)] offset: u32,
805        max_results: Option<u32>,
806    ) -> Result<tokio::sync::oneshot::Receiver<QueryList>, IpcError> {
807        let id = self.next_id();
808        debug!("generating query ID {}", id);
809
810        // Create a channel for the response
811        let (sender, receiver) = tokio::sync::oneshot::channel::<QueryList>();
812
813        // Send the query first
814        let sent = self.query_send(
815            search,
816            search_flags,
817            request_flags,
818            sort,
819            id,
820            offset,
821            max_results,
822        );
823
824        if !sent {
825            warn!("failed to send query ID {}", id);
826            return Err(IpcError::Send);
827        }
828        debug!("query ID {} sent successfully", id);
829
830        // Store the sender (only one query at a time per Everything instance)
831        // Using Mutex for thread-safe mutable access
832        let old_sender = self
833            .inner
834            .current_query_sender
835            .lock()
836            .unwrap()
837            .replace(QuerySender::Tokio(sender));
838        // Drop any previous sender that wasn't used - its receiver will fail
839        drop(old_sender);
840
841        Ok(receiver)
842    }
843
844    /// Send a query to Everything asynchronously and wait for the result
845    ///
846    /// This method serializes queries to work around Everything's single-query-per-window limitation.
847    /// Only one query can be sent at a time per reply window.
848    #[instrument(skip_all)]
849    #[builder]
850    pub async fn query_wait_tokio(
851        &self,
852        #[builder(start_fn)] search: &str,
853        #[builder(default)] search_flags: SearchFlags,
854        request_flags: RequestFlags,
855        #[builder(default)] sort: Sort,
856        #[builder(default)] offset: u32,
857        max_results: Option<u32>,
858        #[builder(default = Duration::from_millis(3000))] timeout: Duration,
859    ) -> Result<QueryList, IpcError> {
860        let _lock = self.inner.query_lock.lock().await;
861
862        // Reuse query_async to send the query, then wait for the result
863        let receiver = self
864            .query_tokio(search)
865            .search_flags(search_flags)
866            .request_flags(request_flags)
867            .sort(sort)
868            .offset(offset)
869            .maybe_max_results(max_results)
870            .call()?;
871
872        // Wait for the response with timeout
873        match tokio::time::timeout(timeout, receiver).await {
874            Ok(Ok(results)) => Ok(results),
875            Ok(Err(_)) => {
876                warn!("query receiver error");
877                Err(IpcError::Send)
878            }
879            Err(_) => {
880                warn!("query timed out");
881                Err(IpcError::Timeout)
882            }
883        }
884    }
885}
886
887#[cfg(test)]
888mod tests {
889    use super::*;
890
891    #[test_log::test]
892    #[test_log(default_log_filter = "trace")]
893    fn doc() {
894        let everything = EverythingClient::new().expect("not available");
895
896        let list = everything
897            .query_wait(r"C:\Windows\ *.exe")
898            .request_flags(RequestFlags::FileName | RequestFlags::Size | RequestFlags::Path)
899            .sort(Sort::SizeDescending)
900            .max_results(5)
901            .call()
902            .expect("query");
903
904        println!("Found {} items:", list.len());
905        println!("{:<25} {:>10}  {}", "Filename", "Size", "Path");
906        for item in list.iter() {
907            // get_string() for String, get_str() for &U16CStr
908            let filename = item.get_string(RequestFlags::FileName).unwrap();
909            let path = item.get_str(RequestFlags::Path).unwrap().display();
910            let size = item.get_size(RequestFlags::Size).unwrap();
911            println!("{:<25} {:>10}  {}", filename, size, path);
912        }
913        println!("Total: {} items", list.total_len());
914    }
915
916    #[test_log::test]
917    #[test_log(default_log_filter = "trace")]
918    fn query_empty_search() {
919        let everything = EverythingClient::new().unwrap();
920        let search = "";
921        let search_flags = SearchFlags::MatchCase;
922        let request_flags = RequestFlags::FileName | RequestFlags::Path;
923        let sort = Sort::NameAscending;
924
925        // Send query for first 5 items
926        let result =
927            everything.query_send(search, search_flags, request_flags, sort, 1000, 0, Some(5));
928
929        assert!(result, "Query should be sent successfully");
930    }
931
932    #[test_log::test]
933    #[test_log(default_log_filter = "trace")]
934    fn query_with_pattern() {
935        let everything = EverythingClient::new().unwrap();
936        let search = "test";
937        let search_flags = SearchFlags::MatchCase;
938        let request_flags = RequestFlags::FileName;
939        let sort = Sort::NameAscending;
940
941        let result =
942            everything.query_send(search, search_flags, request_flags, sort, 1001, 0, Some(10));
943
944        assert!(result, "Query should be sent successfully");
945    }
946
947    #[test]
948    fn query_with_full_path() {
949        let everything = EverythingClient::new().unwrap();
950        let search = "";
951        let search_flags = SearchFlags::MatchCase;
952        let request_flags =
953            RequestFlags::FullPathAndFileName | RequestFlags::Size | RequestFlags::DateModified;
954        let sort = Sort::NameAscending;
955
956        let result =
957            everything.query_send(search, search_flags, request_flags, sort, 1002, 0, Some(3));
958
959        assert!(result, "Query should be sent successfully");
960    }
961
962    #[test]
963    fn query_sort_by_size() {
964        let everything = EverythingClient::new().unwrap();
965        let search = "";
966        let search_flags = SearchFlags::MatchCase;
967        let request_flags = RequestFlags::FileName | RequestFlags::Size;
968        let sort = Sort::SizeAscending;
969
970        let result =
971            everything.query_send(search, search_flags, request_flags, sort, 1003, 0, Some(5));
972
973        assert!(result, "Query should be sent successfully");
974    }
975
976    #[test]
977    fn query_with_offset() {
978        let everything = EverythingClient::new().unwrap();
979        let search = "";
980        let search_flags = SearchFlags::MatchCase;
981        let request_flags = RequestFlags::FileName;
982        let sort = Sort::NameAscending;
983
984        // First query without offset
985        let result1 =
986            everything.query_send(search, search_flags, request_flags, sort, 1005, 0, Some(2));
987
988        assert!(result1, "First query should be sent successfully");
989
990        // Second query with offset
991        let result2 =
992            everything.query_send(search, search_flags, request_flags, sort, 1006, 2, Some(2));
993
994        assert!(
995            result2,
996            "Second query with offset should be sent successfully"
997        );
998    }
999
1000    #[test]
1001    fn query_everything() {
1002        let everything = EverythingClient::new().unwrap();
1003        let search = "test";
1004        let search_flags = SearchFlags::MatchCase;
1005        let request_flags = RequestFlags::FileName;
1006        let sort = Sort::NameAscending;
1007
1008        let result = everything.query_send(
1009            search,
1010            search_flags,
1011            request_flags,
1012            sort,
1013            everything.next_id(),
1014            0,
1015            Some(5),
1016        );
1017
1018        assert!(result, "Query should be sent successfully");
1019    }
1020
1021    #[test]
1022    fn query_multiple_requests() {
1023        let everything = EverythingClient::new().unwrap();
1024        let search = "";
1025        let search_flags = SearchFlags::MatchCase;
1026        let request_flags = RequestFlags::FileName
1027            | RequestFlags::Path
1028            | RequestFlags::Size
1029            | RequestFlags::DateModified
1030            | RequestFlags::DateCreated;
1031        let sort = Sort::NameAscending;
1032
1033        let result =
1034            everything.query_send(search, search_flags, request_flags, sort, 1004, 0, Some(5));
1035
1036        assert!(result, "Query should be sent successfully");
1037    }
1038
1039    #[test]
1040    fn query_wait_empty() {
1041        let everything = EverythingClient::new().unwrap();
1042        let search = "";
1043        let search_flags = SearchFlags::MatchCase;
1044        let request_flags = RequestFlags::FileName;
1045        let sort = Sort::NameAscending;
1046
1047        // Check if IPC is available
1048        assert!(everything.is_ipc_available(), "IPC should be available");
1049
1050        let result = everything
1051            .query_wait(search)
1052            .search_flags(search_flags)
1053            .request_flags(request_flags)
1054            .sort(sort)
1055            .offset(0)
1056            .max_results(10)
1057            .call();
1058        assert!(
1059            result.is_ok(),
1060            "query_wait should return Ok when Everything is available"
1061        );
1062    }
1063
1064    #[test_log::test]
1065    #[test_log(default_log_filter = "trace")]
1066    fn query_wait() {
1067        let everything = EverythingClient::new().unwrap();
1068        // Use empty string to get all files, which is more reliable than searching for "test"
1069        // which may not exist on the system
1070        let search = "test";
1071        // MATCH_ACCENTS is marked as abandoned in the C++ code, use MATCH_CASE instead
1072        let search_flags = SearchFlags::MatchCase;
1073        let request_flags = RequestFlags::FileName;
1074        let sort = Sort::NameAscending;
1075
1076        // Check if IPC is available
1077        assert!(everything.is_ipc_available(), "IPC should be available");
1078
1079        let result = everything
1080            .query_wait(search)
1081            .search_flags(search_flags)
1082            .request_flags(request_flags)
1083            .sort(sort)
1084            .offset(0)
1085            .max_results(10)
1086            .call();
1087        dbg!(&result);
1088        assert!(
1089            result.is_ok(),
1090            "query_wait should return Ok when Everything is available"
1091        );
1092        assert!(
1093            result.as_ref().is_ok_and(|r| r.total_len() > 0),
1094            "Expected found_num > 0, got: {:?}",
1095            result
1096        );
1097    }
1098
1099    #[test_log::test]
1100    #[test_log(default_log_filter = "trace")]
1101    fn query_wait_cancel() {
1102        let everything = EverythingClient::new().unwrap();
1103
1104        // Check if IPC is available
1105        assert!(everything.is_ipc_available(), "IPC should be available");
1106
1107        // Send multiple queries at once
1108        // Note: These will be serialized, so only the last one succeeds
1109        // The first two queries will be disconnected when the next query replaces their sender
1110        let searches = ["", "test", "rust"];
1111        let mut receivers = Vec::new();
1112
1113        for search in &searches {
1114            let search_flags = SearchFlags::MatchCase;
1115            let request_flags = RequestFlags::FileName;
1116            let sort = Sort::NameAscending;
1117            let receiver = everything
1118                .query(search)
1119                .search_flags(search_flags)
1120                .request_flags(request_flags)
1121                .sort(sort)
1122                .offset(0)
1123                .max_results(10)
1124                .call()
1125                .expect("query should succeed");
1126            receivers.push(receiver);
1127        }
1128
1129        // First query should fail (response rejected because sender was replaced)
1130        // When query 2 is sent, it replaces the sender for query 0
1131        // When the response comes back for query 0, the old sender is gone
1132        let result = receivers[0].recv_timeout(std::time::Duration::from_millis(3000));
1133        assert!(
1134            result.is_err(),
1135            "Query 0 should fail because sender was replaced (got: {:?})",
1136            result
1137        );
1138
1139        // Second query should succeed (it's the current sender when response arrives)
1140        let result = receivers[1].recv_timeout(std::time::Duration::from_millis(3000));
1141        assert!(
1142            result.is_err(),
1143            "Query 1 should fail because sender was replaced (got: {:?})",
1144            result
1145        );
1146
1147        // Last query should succeed
1148        let result = receivers[2].recv_timeout(std::time::Duration::from_millis(3000));
1149        let result = result.expect("Last query should succeed");
1150        dbg!(&result);
1151        assert!(
1152            result.total_len() > 0,
1153            "Last query should return valid results"
1154        );
1155    }
1156
1157    #[test_log::test]
1158    #[test_log(default_log_filter = "trace")]
1159    fn query_wait_parallel() {
1160        // Check if IPC is available
1161        let everything1 = EverythingClient::new().unwrap();
1162        let everything2 = EverythingClient::new().unwrap();
1163        let everything3 = EverythingClient::new().unwrap();
1164
1165        assert!(everything1.is_ipc_available(), "IPC should be available");
1166
1167        // Send multiple queries at once using separate Everything instances
1168        // Note: These won't cancel each other
1169        let receiver1 = everything1
1170            .query("")
1171            .search_flags(SearchFlags::MatchCase)
1172            .request_flags(RequestFlags::FileName)
1173            .sort(Sort::NameAscending)
1174            .offset(0)
1175            .max_results(10)
1176            .call()
1177            .expect("query should succeed");
1178        let receiver2 = everything2
1179            .query("test")
1180            .search_flags(SearchFlags::MatchCase)
1181            .request_flags(RequestFlags::FileName)
1182            .sort(Sort::NameAscending)
1183            .offset(0)
1184            .max_results(10)
1185            .call()
1186            .expect("query should succeed");
1187        let receiver3 = everything3
1188            .query("rust")
1189            .search_flags(SearchFlags::MatchCase)
1190            .request_flags(RequestFlags::FileName)
1191            .sort(Sort::NameAscending)
1192            .offset(0)
1193            .max_results(10)
1194            .call()
1195            .expect("query should succeed");
1196
1197        // Wait for all queries to complete
1198        for (i, receiver) in [receiver1, receiver2, receiver3].into_iter().enumerate() {
1199            let result = receiver.recv_timeout(std::time::Duration::from_millis(5000));
1200            let result = result.expect(&format!("Query {} timed out", i));
1201            dbg!(&result);
1202            assert!(result.len() > 0, "Query {} should return valid results", i);
1203        }
1204    }
1205}