Skip to main content

everything_ipc/wm/
mod.rs

1/*!
2Everything's window message IPC interface, supported by Everything v1.4+.
3
4The main API is [`EverythingClient`].
5
6- Support Everything v1.4 and v1.5 (including Alpha version).
7- Higher performance than Everything v1.4's official SDK:
8  - Hot query time is about 30% shorter.
9  - Sending blocking time is 60% shorter for async queries.
10- Support both sync and async (Tokio) querying.
11
12## Examples
13```no_run
14// cargo add everything-ipc
15use everything_ipc::wm::{EverythingClient, RequestFlags, Sort};
16
17let everything = EverythingClient::new().expect("not available");
18
19let list = everything
20    .query_wait(r"C:\Windows\ *.exe")
21    .request_flags(RequestFlags::FileName | RequestFlags::Size | RequestFlags::Path)
22    .sort(Sort::SizeDescending)
23    .max_results(5)
24    .call()
25    .expect("query");
26
27println!("Found {} items:", list.len());
28println!("{:<25} {:>10}  {}", "Filename", "Size", "Path");
29for item in list.iter() {
30    // get_string() for String, get_str() for &U16CStr
31    let filename = item.get_string(RequestFlags::FileName).unwrap();
32    let path = item.get_str(RequestFlags::Path).unwrap().display();
33    let size = item.get_size(RequestFlags::Size).unwrap();
34    println!("{:<25} {:>10}  {}", filename, size, path);
35}
36println!("Total: {} items", list.total_len());
37/*
38Found 5 items:
39Filename                        Size  Path
40MRT.exe                    223939376  C:\Windows\System32
41MRT-KB890830.exe           133315992  C:\Windows\System32
42OneDriveSetup.exe           89771848  C:\Windows\WinSxS\amd64_microsoft-windows-onedrive-setup_31bf3856ad364e35_10.0.26100.5074_none_c1340e9ad5f0a5d0
43OneDriveSetup.exe           89771848  C:\Windows\System32
44OneDriveSetup.exe           60357040  C:\Windows\WinSxS\amd64_microsoft-windows-onedrive-setup_31bf3856ad364e35_10.0.26100.1_none_2233e98c8e9ce5f5
45Total: 5742 items
46*/
47```
48
49## References
50- [everything-cpp (IbEverythingLib)](https://github.com/Chaoses-Ib/IbEverythingLib/tree/master/everything-cpp)
51*/
52use std::{
53    mem,
54    sync::{atomic, mpsc},
55    time::Duration,
56};
57
58use bon::bon;
59use tracing::{debug, error, instrument, trace, warn};
60use windows::Win32::{
61    Foundation::{HWND, LPARAM, LRESULT, WPARAM},
62    System::DataExchange::COPYDATASTRUCT,
63    UI::{
64        Controls::WC_STATIC,
65        WindowsAndMessaging::{
66            CreateWindowExW, DefWindowProcW, DispatchMessageW, GWL_USERDATA, GWLP_WNDPROC,
67            GetMessageW, GetWindowLongPtrW, MSG, PostMessageW, ReplyMessage, SendMessageW,
68            SetWindowLongPtrW, WINDOW_EX_STYLE, WINDOW_STYLE, WM_APP, WM_COPYDATA, WM_QUIT,
69        },
70    },
71};
72
73use crate::IpcWindow;
74
75mod types;
76pub use types::*;
77mod ext;
78
79/// Errors that can occur when querying Everything
80#[derive(Debug, thiserror::Error)]
81pub enum IpcError {
82    /// No IPC window available
83    #[error("IPC window not found")]
84    NoIpcWindow,
85
86    /// Failed to create reply window
87    #[error("failed to create reply window")]
88    CreateReplyWindow,
89
90    /// Failed to send query to Everything
91    #[error("failed to send query to Everything")]
92    Send,
93
94    /// Query timed out waiting for response
95    #[error("query timed out")]
96    Timeout,
97
98    #[error("query: {0}")]
99    Query(&'static str),
100}
101
102// ==================== Reply Window ====================
103
104/*
105/// Window class name for Everything IPC reply windows
106const WINDOW_CLASS_NAME: &widestring::U16CStr = u16cstr!("everything_ipc::wm");
107
108/// Global flag to track if the window class has been registered
109static CLASS_REGISTERED: Once = Once::new();
110
111/// Register the window class globally (once per process)
112/// The class must be registered in the same thread that creates windows
113fn register_window_class() {
114    CLASS_REGISTERED.call_once(|| unsafe {
115        let wnd_class = WNDCLASSW {
116            lpfnWndProc: Some(reply_window_wndproc),
117            hInstance: get_current_module_handle().into(),
118            lpszClassName: PCWSTR(WINDOW_CLASS_NAME.as_ptr()),
119            style: Default::default(),
120            cbClsExtra: 0,
121            cbWndExtra: 0,
122            hIcon: Default::default(),
123            hCursor: Default::default(),
124            hbrBackground: Default::default(),
125            lpszMenuName: Default::default(),
126        };
127
128        let class_atom = RegisterClassW(&wnd_class);
129        if class_atom == 0 {
130            error!("Failed to register window class");
131        } else {
132            debug!(
133                "Registered window class {}",
134                WINDOW_CLASS_NAME.to_string_lossy()
135            );
136        }
137    });
138}
139*/
140
141/// A hidden reply window that receives query responses from Everything
142#[derive(Debug)]
143struct ReplyWindow {
144    hwnd: HWND,
145    // The thread handle is Send because we only use it to join the thread
146    // This is safe because we only join the thread in Drop
147    _thread: mem::MaybeUninit<std::thread::JoinHandle<()>>,
148}
149
150// SAFETY: The JoinHandle is only used to join the thread in Drop.
151// The thread is created in ReplyWindow::new and only runs the message loop,
152// which doesn't access any thread-local state. The HWND is stored as a raw
153// pointer internally and is only accessed on the message loop thread.
154// ReplyWindow is Send because we move the window creation into the thread.
155unsafe impl Send for ReplyWindow {}
156// ReplyWindow is Sync because the message loop is the only thread that
157// accesses the HWND, and all access is through the single message loop thread.
158unsafe impl Sync for ReplyWindow {}
159
160/// Result from the message loop thread: the created window handle as usize
161#[derive(Debug)]
162struct MessageLoopResult {
163    hwnd_usize: usize,
164}
165
166impl ReplyWindow {
167    /// Create a new reply window - creates the window in the message loop thread
168    pub fn new(inner: Box<ClientInner>) -> Result<Self, IpcError> {
169        /*
170        // Register the window class (once per process)
171        // This must be called before creating any windows
172        register_window_class();
173        */
174
175        // Create a channel to receive the window handle from the message loop thread
176        let (tx, rx) = mpsc::channel::<MessageLoopResult>();
177
178        // Store the inner pointer for use in the message loop thread
179        // let inner_ptr_usize = inner_ptr as usize;
180
181        // Start the message loop in a separate thread
182        // The window class must be registered in the same thread where windows are created
183        let thread = std::thread::spawn(move || {
184            // Create the window in THIS thread (the message loop thread)
185            let hwnd = unsafe {
186                CreateWindowExW(
187                    WINDOW_EX_STYLE(0),
188                    // PCWSTR(WINDOW_CLASS_NAME.as_ptr()),
189                    WC_STATIC,
190                    None,
191                    WINDOW_STYLE(0),
192                    0,
193                    0,
194                    0,
195                    0,
196                    None,
197                    None,
198                    // Some(HINSTANCE::default()),
199                    None,
200                    None,
201                )
202            };
203
204            let hwnd = match hwnd.ok() {
205                Some(h) => h,
206                None => {
207                    debug!("Failed to create window in message loop thread");
208                    let _ = tx.send(MessageLoopResult { hwnd_usize: 0 });
209                    return;
210                }
211            };
212
213            // Send the window handle back to the caller as usize
214            if let Err(_) = tx.send(MessageLoopResult {
215                hwnd_usize: hwnd.0 as usize,
216            }) {
217                // _ = unsafe { DestroyWindow(hwnd) };
218                return;
219            }
220
221            debug!(?hwnd, "Created reply window");
222
223            // Set GWL_USERDATA to the EverythingInner pointer
224            let inner_ptr = Box::into_raw(inner);
225            unsafe { SetWindowLongPtrW(hwnd, GWL_USERDATA, inner_ptr as isize) };
226
227            unsafe {
228                SetWindowLongPtrW(
229                    hwnd,
230                    GWLP_WNDPROC,
231                    reply_window_wndproc as *const () as isize,
232                )
233            };
234
235            // Run the message loop
236            run_message_loop(hwnd);
237        });
238
239        // Wait for the window handle from the message loop thread
240        let result = rx.recv().map_err(|_| IpcError::CreateReplyWindow)?;
241        let MessageLoopResult { hwnd_usize } = result;
242
243        let hwnd = HWND(hwnd_usize as *mut _);
244        if hwnd.is_invalid() {
245            return Err(IpcError::CreateReplyWindow);
246        }
247
248        Ok(Self {
249            hwnd,
250            _thread: mem::MaybeUninit::new(thread),
251        })
252    }
253
254    /// Get the window handle
255    pub fn hwnd(&self) -> HWND {
256        self.hwnd
257    }
258
259    /// Send a message to this window
260    pub fn post_message(
261        &self,
262        msg: u32,
263        w_param: WPARAM,
264        l_param: LPARAM,
265    ) -> Result<(), windows::core::Error> {
266        unsafe { PostMessageW(Some(self.hwnd), msg, w_param, l_param) }
267    }
268
269    /// Post `WM_QUIT` to the reply window to signal the message loop to exit
270    ///
271    /// `WM_CLOSE` works too, but not `DestroyWindow()`.
272    pub fn quit(&self) {
273        let _ = self.post_message(WM_QUIT, WPARAM(0), LPARAM(0));
274    }
275}
276
277impl Drop for ReplyWindow {
278    fn drop(&mut self) {
279        // This must be done before waiting for the thread to ensure the thread
280        // can process the quit message
281        self.quit();
282
283        let _thread = unsafe { self._thread.assume_init_read() };
284        // Join the message loop thread if it exists
285        // if let Some(handle) = self.thread.take() {
286        //     let _ = handle.join();
287        // }
288        #[cfg(feature = "drop-join-thread")]
289        let _ = _thread.join();
290    }
291}
292
293/// A query response received from Everything
294#[derive(Debug)]
295pub struct QueryResponse {
296    pub id: u32,
297    pub data: Vec<u8>,
298}
299
300/// Reply window procedure for handling WM_APP and WM_COPYDATA
301#[instrument(skip_all, fields(hwnd))]
302unsafe extern "system" fn reply_window_wndproc(
303    hwnd: HWND,
304    msg: u32,
305    w_param: WPARAM,
306    l_param: LPARAM,
307) -> LRESULT {
308    // dbg!(hwnd, msg, w_param, l_param);
309    match msg {
310        // Forward the request data to the IPC window
311        WM_APP => {
312            // WPARAM contains pointer to Box<Vec<u8>> (from PostMessageW)
313            // We take ownership of the Box, create a COPYDATASTRUCT, and forward it
314            let request_ptr = w_param.0 as *mut Vec<u8>;
315            let request = unsafe { Box::from_raw(request_ptr) };
316
317            // Get the EverythingInner pointer from GWL_USERDATA
318            let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) };
319            if inner_ptr != 0 {
320                let inner = unsafe { &*(inner_ptr as *const ClientInner) };
321                let ipc_hwnd = inner.ipc_window.hwnd();
322
323                // Create COPYDATASTRUCT from the request data
324                let cds = COPYDATASTRUCT {
325                    dwData: EVERYTHING_IPC_COPYDATA_QUERY2W as usize,
326                    cbData: request.len() as u32,
327                    lpData: request.as_ptr() as *mut _,
328                };
329
330                // Get the raw pointer to the COPYDATASTRUCT
331                let cds_ptr = &cds as *const COPYDATASTRUCT;
332
333                // Send to IPC window synchronously
334                let r = unsafe {
335                    SendMessageW(
336                        ipc_hwnd,
337                        WM_COPYDATA,
338                        Some(WPARAM(hwnd.0 as usize)),
339                        Some(LPARAM(cds_ptr as isize)),
340                    )
341                };
342                if r.0 == 1 {
343                    trace!(?ipc_hwnd, ?r);
344                } else {
345                    warn!(?ipc_hwnd, ?r);
346                    // Drop the current query sender since the message failed to send
347                    // This will cause the query to Err on the client side
348                    drop(inner.take_current_query_sender());
349                }
350            }
351
352            // Request Vec is dropped here after SendMessageW returns
353
354            LRESULT(0)
355        }
356        // Response from Everything IPC window
357        WM_COPYDATA => {
358            let copydata = unsafe { &*(l_param.0 as *const COPYDATASTRUCT) };
359            // Do not assert that copydata->dwData == _EVERYTHING_COPYDATA_QUERYREPLY(0)
360            // The code in Everything's SDK is wrong. copydata->dwData is replyid and can be any value.
361            let id = copydata.dwData as u32;
362
363            // Get the EverythingInner pointer from GWL_USERDATA
364            let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) } as *const ClientInner;
365            if inner_ptr.is_null() {
366                error!("No object found");
367                return LRESULT(0);
368            }
369
370            // Get the sender from the inner struct
371            let inner = unsafe { &*inner_ptr };
372            if let Some(sender) = inner.take_current_query_sender() {
373                if match &sender {
374                    QuerySender::Sync(_sender) => {
375                        // TODO: https://github.com/rust-lang/rust/issues/153668
376                        /*
377                        if sender.is_disconnected() {
378                            return LRESULT(1);
379                        }
380                        */
381                        false
382                    }
383                    #[cfg(feature = "tokio")]
384                    QuerySender::Tokio(sender) => sender.is_closed(),
385                } {
386                    return LRESULT(1);
387                }
388
389                // Convert to QueryList and send
390                // TODO: Callback for one less copy
391                let data = unsafe {
392                    std::slice::from_raw_parts(
393                        copydata.lpData as *const u8,
394                        copydata.cbData as usize,
395                    )
396                }
397                .into();
398                // Reply to Everything
399                _ = unsafe { ReplyMessage(LRESULT(1)) };
400                trace!(id, cbData = copydata.cbData, "WM_COPYDATA received");
401
402                let results = QueryList::new(id, data);
403                if match sender {
404                    QuerySender::Sync(sender) => sender.send(results).is_ok(),
405                    #[cfg(feature = "tokio")]
406                    QuerySender::Tokio(sender) => sender.send(results).is_ok(),
407                } {
408                    debug!(id, "Sent query response");
409                } else {
410                    warn!(id, "Failed to send query response");
411                }
412            } else {
413                warn!(id, "No pending query");
414            }
415
416            LRESULT(1)
417        }
418        _ => unsafe { DefWindowProcW(hwnd, msg, w_param, l_param) },
419    }
420}
421
422/// Message loop runner - runs in a separate thread
423/// Takes an HWND which is passed as usize for Send compatibility
424fn run_message_loop(hwnd: HWND) {
425    unsafe {
426        let mut msg: MSG = mem::zeroed();
427        let mut ret;
428        loop {
429            ret = GetMessageW(&mut msg, Some(hwnd), 0, 0);
430            if ret.0 <= 0 {
431                break;
432            }
433            DispatchMessageW(&mut msg);
434        }
435    }
436
437    // Cleanup
438    let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) };
439    if inner_ptr != 0 {
440        drop(unsafe { Box::from_raw(inner_ptr as *mut ClientInner) });
441    }
442}
443
444/// Wrapper for query senders (both sync and async)
445enum QuerySender {
446    Sync(mpsc::Sender<QueryList>),
447    #[cfg(feature = "tokio")]
448    Tokio(tokio::sync::oneshot::Sender<QueryList>),
449}
450
451/// Inner state shared by Everything
452struct ClientInner {
453    ipc_window: IpcWindow,
454    /// The sender for the current (last) query
455    /// Using Mutex for thread-safe mutable access
456    current_query_sender: std::sync::Mutex<Option<QuerySender>>,
457}
458
459impl ClientInner {
460    /// Safe guard to mitigate possible `PoisonError` panics.
461    pub fn take_current_query_sender(&self) -> Option<QuerySender> {
462        match self.current_query_sender.lock() {
463            Ok(mut sender) => sender.take(),
464            #[cfg(debug_assertions)]
465            Err(e) => Err(e).unwrap(),
466            #[cfg(not(debug_assertions))]
467            Err(e) => {
468                error!("poison");
469                // self.current_query_sender.clear_poison();
470                // e.into_inner()
471                None
472            }
473        }
474    }
475}
476
477/**
478Everything IPC client
479
480See [`wm`](super::wm) for details.
481*/
482pub struct EverythingClient {
483    /// Owned by [`ReplyWindow`] to avoid possible UAF of [`ClientInner`] after drop.
484    inner: &'static ClientInner,
485    reply_window: ReplyWindow,
486}
487
488impl IpcWindow {
489    pub fn wm_client(&self) -> Result<EverythingClient, IpcError> {
490        // Create the inner state
491        let inner = Box::new(ClientInner {
492            ipc_window: self.clone(),
493            current_query_sender: std::sync::Mutex::new(None),
494        });
495        let inner_ref: &ClientInner = inner.as_ref();
496        let inner_ref: &'static ClientInner = unsafe { mem::transmute(inner_ref) };
497
498        // Create the reply window with a pointer to the inner state
499        // let inner_ptr = Arc::as_ptr(&inner) as *mut ClientInner;
500        let reply_window = ReplyWindow::new(inner)?;
501
502        let inner = inner_ref;
503        Ok(EverythingClient {
504            inner,
505            reply_window,
506        })
507    }
508}
509
510impl std::ops::Deref for EverythingClient {
511    type Target = IpcWindow;
512
513    fn deref(&self) -> &Self::Target {
514        self.ipc_window()
515    }
516}
517
518impl EverythingClient {
519    /// Create a new Everything client
520    pub fn new() -> Result<Self, IpcError> {
521        IpcWindow::new().ok_or(IpcError::NoIpcWindow)?.wm_client()
522    }
523
524    /// Create a new Everything client with instance name
525    pub fn with_instance(instance_name: Option<&str>) -> Result<Self, IpcError> {
526        IpcWindow::with_instance(instance_name)
527            .ok_or(IpcError::NoIpcWindow)?
528            .wm_client()
529    }
530
531    /// Get the IPC window for sending messages
532    fn ipc_window(&self) -> &IpcWindow {
533        &self.inner.ipc_window
534    }
535
536    /// Get the next query ID
537    fn next_id(&self) -> u32 {
538        static NEXT_ID: atomic::AtomicU32 = atomic::AtomicU32::new(0);
539        NEXT_ID.fetch_add(1, atomic::Ordering::SeqCst)
540    }
541
542    /// Send a query to Everything
543    fn query_send(
544        &self,
545        search: &str,
546        search_flags: SearchFlags,
547        request_flags: RequestFlags,
548        sort: Sort,
549        id: u32,
550        offset: u32,
551        max_results: Option<u32>,
552    ) -> bool {
553        let msg_hwnd = self.reply_window.hwnd();
554
555        // Build the query request using EverythingIpcQuery2 struct
556        let request = EverythingIpcQuery2::create(
557            msg_hwnd.0 as u32,
558            id,
559            search_flags.bits(),
560            offset,
561            max_results.unwrap_or(u32::MAX),
562            request_flags.bits(),
563            sort as u32,
564            search,
565        );
566
567        // Box the request Vec to keep it alive until the message is processed
568        let request_box = Box::new(request);
569        let request_ptr = Box::into_raw(request_box);
570
571        // available: SendMessageW (blocked), SendMessageTimeoutW (unstable)
572        // unavailable: PostMessageW, SendNotifyMessageW
573        // not tested: SendMessageCallbackW
574
575        // Use ReplyWindow::post_message to send WM_APP
576        // The reply window's wndproc will forward this to the IPC window
577        // WPARAM contains pointer to Box<Vec<u8>>, which owns the request data
578        match self
579            .reply_window
580            .post_message(WM_APP, WPARAM(request_ptr as usize), LPARAM(0))
581        {
582            Ok(_) => true,
583            Err(_) => {
584                // PostMessageW failed, free the request to avoid leak
585                let _ = unsafe { Box::from_raw(request_ptr) };
586                false
587            }
588        }
589    }
590}
591
592#[bon]
593impl EverythingClient {
594    /// Send a query to Everything
595    ///
596    /// # Important Note
597    /// Everything only handles one query per window at a time. Sending another query
598    /// when a query has not completed will cancel the old query.
599    ///
600    /// This method serializes queries by replacing the previous sender.
601    /// Callers should wait for the receiver before calling again.
602    ///
603    /// # Example
604    /// ```no_run
605    /// use std::time::Duration;
606    /// use everything_ipc::wm::{EverythingClient, RequestFlags};
607    ///
608    /// let everything = EverythingClient::new().unwrap();
609    ///
610    /// // These queries will be serialized (not sent concurrently)
611    /// let receiver1 = everything
612    ///     .query("search1")
613    ///     .request_flags(RequestFlags::FileName)
614    ///     .call()
615    ///     .unwrap();
616    /// let result1 = receiver1.recv_timeout(Duration::from_secs(5)).unwrap();
617    ///
618    /// // Now safe to send next query
619    /// let receiver2 = everything
620    ///     .query("search2")
621    ///     .request_flags(RequestFlags::FileName)
622    ///     .call()
623    ///     .unwrap();
624    /// let result2 = receiver2.recv_timeout(Duration::from_secs(5)).unwrap();
625    /// ```
626    #[instrument(skip_all)]
627    #[builder]
628    pub fn query(
629        &self,
630        #[builder(start_fn)] search: &str,
631        #[builder(default)] search_flags: SearchFlags,
632        request_flags: RequestFlags,
633        #[builder(default)] sort: Sort,
634        #[builder(default)] offset: u32,
635        max_results: Option<u32>,
636    ) -> Result<mpsc::Receiver<QueryList>, IpcError> {
637        let id = self.next_id();
638        debug!("generating query ID {}", id);
639
640        // Create a channel for the response
641        let (sender, receiver) = mpsc::channel::<QueryList>();
642
643        // Send the query first
644        let sent = self.query_send(
645            search,
646            search_flags,
647            request_flags,
648            sort,
649            id,
650            offset,
651            max_results,
652        );
653
654        if !sent {
655            warn!("failed to send query ID {}", id);
656            return Err(IpcError::Send);
657        }
658        debug!("query ID {} sent successfully", id);
659
660        // Store the sender (only one query at a time per Everything instance)
661        // Using Mutex for thread-safe mutable access
662        let old_sender = self
663            .inner
664            .current_query_sender
665            .lock()
666            .unwrap()
667            .replace(QuerySender::Sync(sender));
668        // Drop any previous sender that wasn't used - its receiver will fail
669        drop(old_sender);
670
671        Ok(receiver)
672    }
673
674    /// Send a query to Everything and wait for the result
675    ///
676    /// This method serializes queries to work around Everything's single-query-per-window limitation.
677    /// Only one query can be sent at a time per reply window.
678    #[instrument(skip_all)]
679    #[builder]
680    pub fn query_wait(
681        &self,
682        #[builder(start_fn)] search: &str,
683        #[builder(default)] search_flags: SearchFlags,
684        request_flags: RequestFlags,
685        #[builder(default)] sort: Sort,
686        #[builder(default)] offset: u32,
687        max_results: Option<u32>,
688        #[builder(default = Duration::from_millis(3000))] timeout: Duration,
689    ) -> Result<QueryList, IpcError> {
690        // Reuse query to send the query, then wait for the result
691        let receiver = self
692            .query(search)
693            .search_flags(search_flags)
694            .request_flags(request_flags)
695            .sort(sort)
696            .offset(offset)
697            .maybe_max_results(max_results)
698            .call()?;
699
700        // Wait for the response with timeout
701        match receiver.recv_timeout(timeout) {
702            Ok(results) => Ok(results),
703            Err(_) => {
704                warn!("query timed out");
705                Err(IpcError::Timeout)
706            }
707        }
708    }
709}
710
711#[cfg(feature = "tokio")]
712#[bon]
713impl EverythingClient {
714    /// Send a query to Everything asynchronously
715    ///
716    /// # Important Note
717    /// Everything only handles one query per window at a time. Sending another query
718    /// when a query has not completed will cancel the old query.
719    ///
720    /// This method serializes queries by replacing the previous sender.
721    /// Callers should await the receiver before calling again.
722    ///
723    /// # Example
724    /// ```no_run
725    /// use everything_ipc::wm::{EverythingClient, RequestFlags};
726    ///
727    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
728    /// let everything = EverythingClient::new().unwrap();
729    ///
730    /// // These queries will be serialized (not sent concurrently)
731    /// let receiver1 = everything
732    ///     .query_tokio("search1")
733    ///     .request_flags(RequestFlags::FileName)
734    ///     .call()?;
735    /// let result1 = receiver1.await?;
736    ///
737    /// // Now safe to send next query
738    /// let receiver2 = everything
739    ///     .query_tokio("search2")
740    ///     .request_flags(RequestFlags::FileName)
741    ///     .call()?;
742    /// let result2 = receiver2.await?;
743    /// # Ok(())
744    /// # }
745    /// ```
746    #[instrument(skip_all)]
747    #[builder]
748    pub fn query_tokio(
749        &self,
750        #[builder(start_fn)] search: &str,
751        #[builder(default)] search_flags: SearchFlags,
752        request_flags: RequestFlags,
753        #[builder(default)] sort: Sort,
754        #[builder(default)] offset: u32,
755        max_results: Option<u32>,
756    ) -> Result<tokio::sync::oneshot::Receiver<QueryList>, IpcError> {
757        let id = self.next_id();
758        debug!("generating query ID {}", id);
759
760        // Create a channel for the response
761        let (sender, receiver) = tokio::sync::oneshot::channel::<QueryList>();
762
763        // Send the query first
764        let sent = self.query_send(
765            search,
766            search_flags,
767            request_flags,
768            sort,
769            id,
770            offset,
771            max_results,
772        );
773
774        if !sent {
775            warn!("failed to send query ID {}", id);
776            return Err(IpcError::Send);
777        }
778        debug!("query ID {} sent successfully", id);
779
780        // Store the sender (only one query at a time per Everything instance)
781        // Using Mutex for thread-safe mutable access
782        let old_sender = self
783            .inner
784            .current_query_sender
785            .lock()
786            .unwrap()
787            .replace(QuerySender::Tokio(sender));
788        // Drop any previous sender that wasn't used - its receiver will fail
789        drop(old_sender);
790
791        Ok(receiver)
792    }
793
794    /// Send a query to Everything asynchronously and wait for the result
795    ///
796    /// This method serializes queries to work around Everything's single-query-per-window limitation.
797    /// Only one query can be sent at a time per reply window.
798    #[instrument(skip_all)]
799    #[builder]
800    pub async fn query_wait_tokio(
801        &self,
802        #[builder(start_fn)] search: &str,
803        #[builder(default)] search_flags: SearchFlags,
804        request_flags: RequestFlags,
805        #[builder(default)] sort: Sort,
806        #[builder(default)] offset: u32,
807        max_results: Option<u32>,
808        #[builder(default = Duration::from_millis(3000))] timeout: Duration,
809    ) -> Result<QueryList, IpcError> {
810        // Reuse query_async to send the query, then wait for the result
811        let receiver = self
812            .query_tokio(search)
813            .search_flags(search_flags)
814            .request_flags(request_flags)
815            .sort(sort)
816            .offset(offset)
817            .maybe_max_results(max_results)
818            .call()?;
819
820        // Wait for the response with timeout
821        match tokio::time::timeout(timeout, receiver).await {
822            Ok(Ok(results)) => Ok(results),
823            Ok(Err(_)) => {
824                warn!("query receiver error");
825                Err(IpcError::Send)
826            }
827            Err(_) => {
828                warn!("query timed out");
829                Err(IpcError::Timeout)
830            }
831        }
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838
839    #[test_log::test]
840    #[test_log(default_log_filter = "trace")]
841    fn doc() {
842        let everything = EverythingClient::new().expect("not available");
843
844        let list = everything
845            .query_wait(r"C:\Windows\ *.exe")
846            .request_flags(RequestFlags::FileName | RequestFlags::Size | RequestFlags::Path)
847            .sort(Sort::SizeDescending)
848            .max_results(5)
849            .call()
850            .expect("query");
851
852        println!("Found {} items:", list.len());
853        println!("{:<25} {:>10}  {}", "Filename", "Size", "Path");
854        for item in list.iter() {
855            // get_string() for String, get_str() for &U16CStr
856            let filename = item.get_string(RequestFlags::FileName).unwrap();
857            let path = item.get_str(RequestFlags::Path).unwrap().display();
858            let size = item.get_size(RequestFlags::Size).unwrap();
859            println!("{:<25} {:>10}  {}", filename, size, path);
860        }
861        println!("Total: {} items", list.total_len());
862    }
863
864    #[test_log::test]
865    #[test_log(default_log_filter = "trace")]
866    fn query_empty_search() {
867        let everything = EverythingClient::new().unwrap();
868        let search = "";
869        let search_flags = SearchFlags::MatchCase;
870        let request_flags = RequestFlags::FileName | RequestFlags::Path;
871        let sort = Sort::NameAscending;
872
873        // Send query for first 5 items
874        let result =
875            everything.query_send(search, search_flags, request_flags, sort, 1000, 0, Some(5));
876
877        assert!(result, "Query should be sent successfully");
878    }
879
880    #[test_log::test]
881    #[test_log(default_log_filter = "trace")]
882    fn query_with_pattern() {
883        let everything = EverythingClient::new().unwrap();
884        let search = "test";
885        let search_flags = SearchFlags::MatchCase;
886        let request_flags = RequestFlags::FileName;
887        let sort = Sort::NameAscending;
888
889        let result =
890            everything.query_send(search, search_flags, request_flags, sort, 1001, 0, Some(10));
891
892        assert!(result, "Query should be sent successfully");
893    }
894
895    #[test]
896    fn query_with_full_path() {
897        let everything = EverythingClient::new().unwrap();
898        let search = "";
899        let search_flags = SearchFlags::MatchCase;
900        let request_flags =
901            RequestFlags::FullPathAndFileName | RequestFlags::Size | RequestFlags::DateModified;
902        let sort = Sort::NameAscending;
903
904        let result =
905            everything.query_send(search, search_flags, request_flags, sort, 1002, 0, Some(3));
906
907        assert!(result, "Query should be sent successfully");
908    }
909
910    #[test]
911    fn query_sort_by_size() {
912        let everything = EverythingClient::new().unwrap();
913        let search = "";
914        let search_flags = SearchFlags::MatchCase;
915        let request_flags = RequestFlags::FileName | RequestFlags::Size;
916        let sort = Sort::SizeAscending;
917
918        let result =
919            everything.query_send(search, search_flags, request_flags, sort, 1003, 0, Some(5));
920
921        assert!(result, "Query should be sent successfully");
922    }
923
924    #[test]
925    fn query_with_offset() {
926        let everything = EverythingClient::new().unwrap();
927        let search = "";
928        let search_flags = SearchFlags::MatchCase;
929        let request_flags = RequestFlags::FileName;
930        let sort = Sort::NameAscending;
931
932        // First query without offset
933        let result1 =
934            everything.query_send(search, search_flags, request_flags, sort, 1005, 0, Some(2));
935
936        assert!(result1, "First query should be sent successfully");
937
938        // Second query with offset
939        let result2 =
940            everything.query_send(search, search_flags, request_flags, sort, 1006, 2, Some(2));
941
942        assert!(
943            result2,
944            "Second query with offset should be sent successfully"
945        );
946    }
947
948    #[test]
949    fn query_everything() {
950        let everything = EverythingClient::new().unwrap();
951        let search = "test";
952        let search_flags = SearchFlags::MatchCase;
953        let request_flags = RequestFlags::FileName;
954        let sort = Sort::NameAscending;
955
956        let result = everything.query_send(
957            search,
958            search_flags,
959            request_flags,
960            sort,
961            everything.next_id(),
962            0,
963            Some(5),
964        );
965
966        assert!(result, "Query should be sent successfully");
967    }
968
969    #[test]
970    fn query_multiple_requests() {
971        let everything = EverythingClient::new().unwrap();
972        let search = "";
973        let search_flags = SearchFlags::MatchCase;
974        let request_flags = RequestFlags::FileName
975            | RequestFlags::Path
976            | RequestFlags::Size
977            | RequestFlags::DateModified
978            | RequestFlags::DateCreated;
979        let sort = Sort::NameAscending;
980
981        let result =
982            everything.query_send(search, search_flags, request_flags, sort, 1004, 0, Some(5));
983
984        assert!(result, "Query should be sent successfully");
985    }
986
987    #[test]
988    fn query_wait_empty() {
989        let everything = EverythingClient::new().unwrap();
990        let search = "";
991        let search_flags = SearchFlags::MatchCase;
992        let request_flags = RequestFlags::FileName;
993        let sort = Sort::NameAscending;
994
995        // Check if IPC is available
996        assert!(everything.is_ipc_available(), "IPC should be available");
997
998        let result = everything
999            .query_wait(search)
1000            .search_flags(search_flags)
1001            .request_flags(request_flags)
1002            .sort(sort)
1003            .offset(0)
1004            .max_results(10)
1005            .call();
1006        assert!(
1007            result.is_ok(),
1008            "query_wait should return Ok when Everything is available"
1009        );
1010    }
1011
1012    #[test_log::test]
1013    #[test_log(default_log_filter = "trace")]
1014    fn query_wait() {
1015        let everything = EverythingClient::new().unwrap();
1016        // Use empty string to get all files, which is more reliable than searching for "test"
1017        // which may not exist on the system
1018        let search = "test";
1019        // MATCH_ACCENTS is marked as abandoned in the C++ code, use MATCH_CASE instead
1020        let search_flags = SearchFlags::MatchCase;
1021        let request_flags = RequestFlags::FileName;
1022        let sort = Sort::NameAscending;
1023
1024        // Check if IPC is available
1025        assert!(everything.is_ipc_available(), "IPC should be available");
1026
1027        let result = everything
1028            .query_wait(search)
1029            .search_flags(search_flags)
1030            .request_flags(request_flags)
1031            .sort(sort)
1032            .offset(0)
1033            .max_results(10)
1034            .call();
1035        dbg!(&result);
1036        assert!(
1037            result.is_ok(),
1038            "query_wait should return Ok when Everything is available"
1039        );
1040        assert!(
1041            result.as_ref().is_ok_and(|r| r.total_len() > 0),
1042            "Expected found_num > 0, got: {:?}",
1043            result
1044        );
1045    }
1046
1047    #[test_log::test]
1048    #[test_log(default_log_filter = "trace")]
1049    fn query_wait_cancel() {
1050        let everything = EverythingClient::new().unwrap();
1051
1052        // Check if IPC is available
1053        assert!(everything.is_ipc_available(), "IPC should be available");
1054
1055        // Send multiple queries at once
1056        // Note: These will be serialized, so only the last one succeeds
1057        // The first two queries will be disconnected when the next query replaces their sender
1058        let searches = ["", "test", "rust"];
1059        let mut receivers = Vec::new();
1060
1061        for search in &searches {
1062            let search_flags = SearchFlags::MatchCase;
1063            let request_flags = RequestFlags::FileName;
1064            let sort = Sort::NameAscending;
1065            let receiver = everything
1066                .query(search)
1067                .search_flags(search_flags)
1068                .request_flags(request_flags)
1069                .sort(sort)
1070                .offset(0)
1071                .max_results(10)
1072                .call()
1073                .expect("query should succeed");
1074            receivers.push(receiver);
1075        }
1076
1077        // First query should fail (response rejected because sender was replaced)
1078        // When query 2 is sent, it replaces the sender for query 0
1079        // When the response comes back for query 0, the old sender is gone
1080        let result = receivers[0].recv_timeout(std::time::Duration::from_millis(3000));
1081        assert!(
1082            result.is_err(),
1083            "Query 0 should fail because sender was replaced (got: {:?})",
1084            result
1085        );
1086
1087        // Second query should succeed (it's the current sender when response arrives)
1088        let result = receivers[1].recv_timeout(std::time::Duration::from_millis(3000));
1089        assert!(
1090            result.is_err(),
1091            "Query 1 should fail because sender was replaced (got: {:?})",
1092            result
1093        );
1094
1095        // Last query should succeed
1096        let result = receivers[2].recv_timeout(std::time::Duration::from_millis(3000));
1097        let result = result.expect("Last query should succeed");
1098        dbg!(&result);
1099        assert!(
1100            result.total_len() > 0,
1101            "Last query should return valid results"
1102        );
1103    }
1104
1105    #[test_log::test]
1106    #[test_log(default_log_filter = "trace")]
1107    fn query_wait_parallel() {
1108        // Check if IPC is available
1109        let everything1 = EverythingClient::new().unwrap();
1110        let everything2 = EverythingClient::new().unwrap();
1111        let everything3 = EverythingClient::new().unwrap();
1112
1113        assert!(everything1.is_ipc_available(), "IPC should be available");
1114
1115        // Send multiple queries at once using separate Everything instances
1116        // Note: These won't cancel each other
1117        let receiver1 = everything1
1118            .query("")
1119            .search_flags(SearchFlags::MatchCase)
1120            .request_flags(RequestFlags::FileName)
1121            .sort(Sort::NameAscending)
1122            .offset(0)
1123            .max_results(10)
1124            .call()
1125            .expect("query should succeed");
1126        let receiver2 = everything2
1127            .query("test")
1128            .search_flags(SearchFlags::MatchCase)
1129            .request_flags(RequestFlags::FileName)
1130            .sort(Sort::NameAscending)
1131            .offset(0)
1132            .max_results(10)
1133            .call()
1134            .expect("query should succeed");
1135        let receiver3 = everything3
1136            .query("rust")
1137            .search_flags(SearchFlags::MatchCase)
1138            .request_flags(RequestFlags::FileName)
1139            .sort(Sort::NameAscending)
1140            .offset(0)
1141            .max_results(10)
1142            .call()
1143            .expect("query should succeed");
1144
1145        // Wait for all queries to complete
1146        for (i, receiver) in [receiver1, receiver2, receiver3].into_iter().enumerate() {
1147            let result = receiver.recv_timeout(std::time::Duration::from_millis(5000));
1148            let result = result.expect(&format!("Query {} timed out", i));
1149            dbg!(&result);
1150            assert!(result.len() > 0, "Query {} should return valid results", i);
1151        }
1152    }
1153}