everything_ipc/wm/mod.rs
1/*!
2Everything's window message IPC interface, supported by Everything v1.4+.
3
4The main API is [`EverythingClient`].
5
6- Support Everything v1.4 and v1.5 (including Alpha version).
7- Higher performance than Everything v1.4's official SDK:
8 - Hot query time is about 30% shorter.
9 - Sending blocking time is 60% shorter for async queries.
10- Support both sync and async (Tokio) querying.
11
12## Examples
13```no_run
14// cargo add everything-ipc
15use everything_ipc::wm::{EverythingClient, RequestFlags, Sort};
16
17let everything = EverythingClient::new().expect("not available");
18
19let list = everything
20 .query_wait(r"C:\Windows\ *.exe")
21 .request_flags(RequestFlags::FileName | RequestFlags::Size | RequestFlags::Path)
22 .sort(Sort::SizeDescending)
23 .max_results(5)
24 .call()
25 .expect("query");
26
27println!("Found {} items:", list.len());
28println!("{:<25} {:>10} {}", "Filename", "Size", "Path");
29for item in list.iter() {
30 // get_string() for String, get_str() for &U16CStr
31 let filename = item.get_string(RequestFlags::FileName).unwrap();
32 let path = item.get_str(RequestFlags::Path).unwrap().display();
33 let size = item.get_size(RequestFlags::Size).unwrap();
34 println!("{:<25} {:>10} {}", filename, size, path);
35}
36println!("Total: {} items", list.total_len());
37/*
38Found 5 items:
39Filename Size Path
40MRT.exe 223939376 C:\Windows\System32
41MRT-KB890830.exe 133315992 C:\Windows\System32
42OneDriveSetup.exe 89771848 C:\Windows\WinSxS\amd64_microsoft-windows-onedrive-setup_31bf3856ad364e35_10.0.26100.5074_none_c1340e9ad5f0a5d0
43OneDriveSetup.exe 89771848 C:\Windows\System32
44OneDriveSetup.exe 60357040 C:\Windows\WinSxS\amd64_microsoft-windows-onedrive-setup_31bf3856ad364e35_10.0.26100.1_none_2233e98c8e9ce5f5
45Total: 5742 items
46*/
47```
48
49## References
50- [everything-cpp (IbEverythingLib)](https://github.com/Chaoses-Ib/IbEverythingLib/tree/master/everything-cpp)
51*/
52use std::{
53 mem,
54 sync::{Arc, atomic, mpsc},
55 time::Duration,
56};
57
58use bon::bon;
59use tracing::{debug, error, instrument, trace, warn};
60use windows::Win32::{
61 Foundation::{HWND, LPARAM, LRESULT, WPARAM},
62 System::DataExchange::COPYDATASTRUCT,
63 UI::{
64 Controls::WC_STATIC,
65 WindowsAndMessaging::{
66 CreateWindowExW, DefWindowProcW, DispatchMessageW, GWL_USERDATA, GWLP_WNDPROC,
67 GetMessageW, GetWindowLongPtrW, MSG, PostMessageW, ReplyMessage, SendMessageW,
68 SetWindowLongPtrW, WINDOW_EX_STYLE, WINDOW_STYLE, WM_APP, WM_COPYDATA, WM_QUIT,
69 },
70 },
71};
72
73use crate::IpcWindow;
74
75mod shared;
76mod types;
77pub use types::*;
78mod ext;
79
80/// Errors that can occur when querying Everything
81#[derive(Debug, thiserror::Error)]
82pub enum IpcError {
83 /// No IPC window available
84 #[error("IPC window not found")]
85 NoIpcWindow,
86
87 /// Failed to create reply window
88 #[error("failed to create reply window")]
89 CreateReplyWindow,
90
91 /// Failed to send query to Everything
92 #[error("failed to send query to Everything")]
93 Send,
94
95 /// Query timed out waiting for response
96 #[error("query timed out")]
97 Timeout,
98
99 #[error("query: {0}")]
100 Query(&'static str),
101}
102
103// ==================== Reply Window ====================
104
105/*
106/// Window class name for Everything IPC reply windows
107const WINDOW_CLASS_NAME: &widestring::U16CStr = u16cstr!("everything_ipc::wm");
108
109/// Global flag to track if the window class has been registered
110static CLASS_REGISTERED: Once = Once::new();
111
112/// Register the window class globally (once per process)
113/// The class must be registered in the same thread that creates windows
114fn register_window_class() {
115 CLASS_REGISTERED.call_once(|| unsafe {
116 let wnd_class = WNDCLASSW {
117 lpfnWndProc: Some(reply_window_wndproc),
118 hInstance: get_current_module_handle().into(),
119 lpszClassName: PCWSTR(WINDOW_CLASS_NAME.as_ptr()),
120 style: Default::default(),
121 cbClsExtra: 0,
122 cbWndExtra: 0,
123 hIcon: Default::default(),
124 hCursor: Default::default(),
125 hbrBackground: Default::default(),
126 lpszMenuName: Default::default(),
127 };
128
129 let class_atom = RegisterClassW(&wnd_class);
130 if class_atom == 0 {
131 error!("Failed to register window class");
132 } else {
133 debug!(
134 "Registered window class {}",
135 WINDOW_CLASS_NAME.to_string_lossy()
136 );
137 }
138 });
139}
140*/
141
142/// A hidden reply window that receives query responses from Everything
143#[derive(Debug)]
144struct ReplyWindow {
145 hwnd: HWND,
146 // The thread handle is Send because we only use it to join the thread
147 // This is safe because we only join the thread in Drop
148 /// `mem::MaybeUninit` would be enough if we don't need `quit_join_thread()`.
149 thread: Option<std::thread::JoinHandle<()>>,
150}
151
152// SAFETY: The JoinHandle is only used to join the thread in Drop.
153// The thread is created in ReplyWindow::new and only runs the message loop,
154// which doesn't access any thread-local state. The HWND is stored as a raw
155// pointer internally and is only accessed on the message loop thread.
156// ReplyWindow is Send because we move the window creation into the thread.
157unsafe impl Send for ReplyWindow {}
158// ReplyWindow is Sync because the message loop is the only thread that
159// accesses the HWND, and all access is through the single message loop thread.
160unsafe impl Sync for ReplyWindow {}
161
162/// Result from the message loop thread: the created window handle as usize
163#[derive(Debug)]
164struct MessageLoopResult {
165 hwnd_usize: usize,
166}
167
168impl ReplyWindow {
169 /// Create a new reply window - creates the window in the message loop thread
170 ///
171 /// `Box` would be enough if we don't need `quit_join_thread()`.
172 pub fn new(inner: Arc<ClientInner>) -> Result<Self, IpcError> {
173 /*
174 // Register the window class (once per process)
175 // This must be called before creating any windows
176 register_window_class();
177 */
178
179 // Create a channel to receive the window handle from the message loop thread
180 let (tx, rx) = mpsc::channel::<MessageLoopResult>();
181
182 // Store the inner pointer for use in the message loop thread
183 // let inner_ptr_usize = inner_ptr as usize;
184
185 // Start the message loop in a separate thread
186 // The window class must be registered in the same thread where windows are created
187 let thread = std::thread::spawn(move || {
188 // Create the window in THIS thread (the message loop thread)
189 let hwnd = unsafe {
190 CreateWindowExW(
191 WINDOW_EX_STYLE(0),
192 // PCWSTR(WINDOW_CLASS_NAME.as_ptr()),
193 WC_STATIC,
194 None,
195 WINDOW_STYLE(0),
196 0,
197 0,
198 0,
199 0,
200 None,
201 None,
202 // Some(HINSTANCE::default()),
203 None,
204 None,
205 )
206 };
207
208 let hwnd = match hwnd.ok() {
209 Some(h) => h,
210 None => {
211 debug!("Failed to create window in message loop thread");
212 let _ = tx.send(MessageLoopResult { hwnd_usize: 0 });
213 return;
214 }
215 };
216
217 // Send the window handle back to the caller as usize
218 if let Err(_) = tx.send(MessageLoopResult {
219 hwnd_usize: hwnd.0 as usize,
220 }) {
221 // _ = unsafe { DestroyWindow(hwnd) };
222 return;
223 }
224
225 debug!(?hwnd, "Created reply window");
226
227 // Set GWL_USERDATA to the EverythingInner pointer
228 let inner_ptr = Arc::into_raw(inner);
229 unsafe { SetWindowLongPtrW(hwnd, GWL_USERDATA, inner_ptr as isize) };
230
231 unsafe {
232 SetWindowLongPtrW(
233 hwnd,
234 GWLP_WNDPROC,
235 reply_window_wndproc as *const () as isize,
236 )
237 };
238
239 // Run the message loop
240 run_message_loop(hwnd);
241 });
242
243 // Wait for the window handle from the message loop thread
244 let result = rx.recv().map_err(|_| IpcError::CreateReplyWindow)?;
245 let MessageLoopResult { hwnd_usize } = result;
246
247 let hwnd = HWND(hwnd_usize as *mut _);
248 if hwnd.is_invalid() {
249 return Err(IpcError::CreateReplyWindow);
250 }
251
252 Ok(Self {
253 hwnd,
254 // _thread: mem::MaybeUninit::new(thread),
255 thread: Some(thread),
256 })
257 }
258
259 /// Get the window handle
260 pub fn hwnd(&self) -> HWND {
261 self.hwnd
262 }
263
264 /// Send a message to this window
265 pub fn post_message(
266 &self,
267 msg: u32,
268 w_param: WPARAM,
269 l_param: LPARAM,
270 ) -> Result<(), windows::core::Error> {
271 unsafe { PostMessageW(Some(self.hwnd), msg, w_param, l_param) }
272 }
273
274 /// Post `WM_QUIT` to the reply window to signal the message loop to exit
275 ///
276 /// `WM_CLOSE` works too, but not `DestroyWindow()`.
277 pub fn quit(&self) {
278 let _ = self.post_message(WM_QUIT, WPARAM(0), LPARAM(0));
279 }
280
281 pub fn quit_join_thread(&mut self) {
282 if let Some(thread) = self.thread.take() {
283 self.quit();
284 _ = thread.join();
285 }
286 }
287}
288
289impl Drop for ReplyWindow {
290 fn drop(&mut self) {
291 // This must be done before waiting for the thread to ensure the thread
292 // can process the quit message
293 self.quit();
294
295 /*
296 let _thread = unsafe { self._thread.assume_init_read() };
297 // Join the message loop thread if it exists
298 // if let Some(handle) = self.thread.take() {
299 // let _ = handle.join();
300 // }
301 #[cfg(feature = "drop-join-thread")]
302 let _ = _thread.join();
303 */
304 #[cfg(feature = "drop-join-thread")]
305 if let Some(thread) = self.thread.take() {
306 _ = thread.join();
307 }
308 }
309}
310
311/// A query response received from Everything
312#[derive(Debug)]
313pub struct QueryResponse {
314 pub id: u32,
315 pub data: Vec<u8>,
316}
317
318/// Reply window procedure for handling WM_APP and WM_COPYDATA
319#[instrument(skip_all, fields(hwnd))]
320unsafe extern "system" fn reply_window_wndproc(
321 hwnd: HWND,
322 msg: u32,
323 w_param: WPARAM,
324 l_param: LPARAM,
325) -> LRESULT {
326 // dbg!(hwnd, msg, w_param, l_param);
327 match msg {
328 // Forward the request data to the IPC window
329 WM_APP => {
330 // WPARAM contains pointer to Box<Vec<u8>> (from PostMessageW)
331 // We take ownership of the Box, create a COPYDATASTRUCT, and forward it
332 let request_ptr = w_param.0 as *mut Vec<u8>;
333 let request = unsafe { Box::from_raw(request_ptr) };
334
335 // Get the EverythingInner pointer from GWL_USERDATA
336 let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) };
337 if inner_ptr != 0 {
338 let inner = unsafe { &*(inner_ptr as *const ClientInner) };
339 let ipc_hwnd = inner.ipc_window.hwnd();
340
341 // Create COPYDATASTRUCT from the request data
342 let cds = COPYDATASTRUCT {
343 dwData: EVERYTHING_IPC_COPYDATA_QUERY2W as usize,
344 cbData: request.len() as u32,
345 lpData: request.as_ptr() as *mut _,
346 };
347
348 // Get the raw pointer to the COPYDATASTRUCT
349 let cds_ptr = &cds as *const COPYDATASTRUCT;
350
351 // Send to IPC window synchronously
352 let r = unsafe {
353 SendMessageW(
354 ipc_hwnd,
355 WM_COPYDATA,
356 Some(WPARAM(hwnd.0 as usize)),
357 Some(LPARAM(cds_ptr as isize)),
358 )
359 };
360 if r.0 == 1 {
361 trace!(?ipc_hwnd, ?r);
362 } else {
363 warn!(?ipc_hwnd, ?r);
364 // Drop the current query sender since the message failed to send
365 // This will cause the query to Err on the client side
366 drop(inner.take_current_query_sender());
367 }
368 }
369
370 // Request Vec is dropped here after SendMessageW returns
371
372 LRESULT(0)
373 }
374 // Response from Everything IPC window
375 WM_COPYDATA => {
376 let copydata = unsafe { &*(l_param.0 as *const COPYDATASTRUCT) };
377 // Do not assert that copydata->dwData == _EVERYTHING_COPYDATA_QUERYREPLY(0)
378 // The code in Everything's SDK is wrong. copydata->dwData is replyid and can be any value.
379 let id = copydata.dwData as u32;
380
381 // Get the EverythingInner pointer from GWL_USERDATA
382 let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) } as *const ClientInner;
383 if inner_ptr.is_null() {
384 error!("No object found");
385 return LRESULT(0);
386 }
387
388 // Get the sender from the inner struct
389 let inner = unsafe { &*inner_ptr };
390 if let Some(sender) = inner.take_current_query_sender() {
391 if match &sender {
392 QuerySender::Sync(_sender) => {
393 // TODO: https://github.com/rust-lang/rust/issues/153668
394 /*
395 if sender.is_disconnected() {
396 return LRESULT(1);
397 }
398 */
399 false
400 }
401 #[cfg(feature = "tokio")]
402 QuerySender::Tokio(sender) => sender.is_closed(),
403 } {
404 return LRESULT(1);
405 }
406
407 // Convert to QueryList and send
408 // TODO: Callback for one less copy
409 let data = unsafe {
410 std::slice::from_raw_parts(
411 copydata.lpData as *const u8,
412 copydata.cbData as usize,
413 )
414 }
415 .into();
416 // Reply to Everything
417 _ = unsafe { ReplyMessage(LRESULT(1)) };
418 trace!(id, cbData = copydata.cbData, "WM_COPYDATA received");
419
420 let results = QueryList::new(id, data);
421 if match sender {
422 QuerySender::Sync(sender) => sender.send(results).is_ok(),
423 #[cfg(feature = "tokio")]
424 QuerySender::Tokio(sender) => sender.send(results).is_ok(),
425 } {
426 debug!(id, "Sent query response");
427 } else {
428 warn!(id, "Failed to send query response");
429 }
430 } else {
431 warn!(id, "No pending query");
432 }
433
434 LRESULT(1)
435 }
436 _ => unsafe { DefWindowProcW(hwnd, msg, w_param, l_param) },
437 }
438}
439
440/// Message loop runner - runs in a separate thread
441/// Takes an HWND which is passed as usize for Send compatibility
442fn run_message_loop(hwnd: HWND) {
443 unsafe {
444 let mut msg: MSG = mem::zeroed();
445 let mut ret;
446 loop {
447 ret = GetMessageW(&mut msg, Some(hwnd), 0, 0);
448 if ret.0 <= 0 {
449 break;
450 }
451 DispatchMessageW(&mut msg);
452 }
453 }
454
455 // Cleanup
456 let inner_ptr = unsafe { GetWindowLongPtrW(hwnd, GWL_USERDATA) };
457 if inner_ptr != 0 {
458 drop(unsafe { Arc::from_raw(inner_ptr as *mut ClientInner) });
459 }
460}
461
462/// Wrapper for query senders (both sync and async)
463enum QuerySender {
464 Sync(mpsc::Sender<QueryList>),
465 #[cfg(feature = "tokio")]
466 Tokio(tokio::sync::oneshot::Sender<QueryList>),
467}
468
469#[cfg(not(feature = "tokio"))]
470type QueryLock = std::sync::Mutex<()>;
471#[cfg(feature = "tokio")]
472type QueryLock = tokio::sync::Mutex<()>;
473
474/// Inner state shared by Everything
475struct ClientInner {
476 ipc_window: IpcWindow,
477 /// The sender for the current (last) query
478 /// Using Mutex for thread-safe mutable access
479 current_query_sender: std::sync::Mutex<Option<QuerySender>>,
480
481 /// For `query_wait()` from multi threads.
482 ///
483 /// TODO: Single thread version without this?
484 query_lock: QueryLock,
485}
486
487impl ClientInner {
488 /// Safe guard to mitigate possible `PoisonError` panics.
489 pub fn take_current_query_sender(&self) -> Option<QuerySender> {
490 match self.current_query_sender.lock() {
491 Ok(mut sender) => sender.take(),
492 #[cfg(debug_assertions)]
493 Err(e) => Err(e).unwrap(),
494 #[cfg(not(debug_assertions))]
495 Err(e) => {
496 error!("poison");
497 // self.current_query_sender.clear_poison();
498 // e.into_inner()
499 None
500 }
501 }
502 }
503}
504
505/**
506Everything IPC client
507
508See [`wm`](super::wm) for details.
509*/
510pub struct EverythingClient {
511 /// Owned by [`ReplyWindow`] to avoid possible UAF of [`ClientInner`] after drop.
512 inner: Arc<ClientInner>,
513 reply_window: ReplyWindow,
514}
515
516impl IpcWindow {
517 pub fn wm_client(&self) -> Result<EverythingClient, IpcError> {
518 // Create the inner state
519 let inner = Arc::new(ClientInner {
520 ipc_window: self.clone(),
521 current_query_sender: std::sync::Mutex::new(None),
522 query_lock: Default::default(),
523 });
524 /*
525 let inner_ref: &ClientInner = inner.as_ref();
526 let inner_ref: &'static ClientInner = unsafe { mem::transmute(inner_ref) };
527 */
528
529 // Create the reply window with a pointer to the inner state
530 // let inner_ptr = Arc::as_ptr(&inner) as *mut ClientInner;
531 let reply_window = ReplyWindow::new(inner.clone())?;
532
533 // let inner = inner_ref;
534 Ok(EverythingClient {
535 inner,
536 reply_window,
537 })
538 }
539}
540
541impl std::ops::Deref for EverythingClient {
542 type Target = IpcWindow;
543
544 fn deref(&self) -> &Self::Target {
545 self.ipc_window()
546 }
547}
548
549impl EverythingClient {
550 /// Create a new Everything client
551 pub fn new() -> Result<Self, IpcError> {
552 IpcWindow::new().ok_or(IpcError::NoIpcWindow)?.wm_client()
553 }
554
555 /// Create a new Everything client with instance name
556 pub fn with_instance(instance_name: Option<&str>) -> Result<Self, IpcError> {
557 IpcWindow::with_instance(instance_name)
558 .ok_or(IpcError::NoIpcWindow)?
559 .wm_client()
560 }
561
562 /// Get the IPC window for sending messages
563 fn ipc_window(&self) -> &IpcWindow {
564 &self.inner.ipc_window
565 }
566
567 /// Get the next query ID
568 fn next_id(&self) -> u32 {
569 static NEXT_ID: atomic::AtomicU32 = atomic::AtomicU32::new(0);
570 NEXT_ID.fetch_add(1, atomic::Ordering::SeqCst)
571 }
572
573 /// Send a query to Everything
574 fn query_send(
575 &self,
576 search: &str,
577 search_flags: SearchFlags,
578 request_flags: RequestFlags,
579 sort: Sort,
580 id: u32,
581 offset: u32,
582 max_results: Option<u32>,
583 ) -> bool {
584 let msg_hwnd = self.reply_window.hwnd();
585
586 // Build the query request using EverythingIpcQuery2 struct
587 let request = EverythingIpcQuery2::create(
588 msg_hwnd.0 as u32,
589 id,
590 search_flags.bits(),
591 offset,
592 max_results.unwrap_or(u32::MAX),
593 request_flags.bits(),
594 sort as u32,
595 search,
596 );
597
598 // Box the request Vec to keep it alive until the message is processed
599 let request_box = Box::new(request);
600 let request_ptr = Box::into_raw(request_box);
601
602 // available: SendMessageW (blocked), SendMessageTimeoutW (unstable)
603 // unavailable: PostMessageW, SendNotifyMessageW
604 // not tested: SendMessageCallbackW
605
606 // Use ReplyWindow::post_message to send WM_APP
607 // The reply window's wndproc will forward this to the IPC window
608 // WPARAM contains pointer to Box<Vec<u8>>, which owns the request data
609 match self
610 .reply_window
611 .post_message(WM_APP, WPARAM(request_ptr as usize), LPARAM(0))
612 {
613 Ok(_) => true,
614 Err(_) => {
615 // PostMessageW failed, free the request to avoid leak
616 let _ = unsafe { Box::from_raw(request_ptr) };
617 false
618 }
619 }
620 }
621}
622
623#[bon]
624impl EverythingClient {
625 /// Send a query to Everything
626 ///
627 /// # Important Note
628 /// Everything only handles one query per window at a time. Sending another query
629 /// when a query has not completed will cancel the old query.
630 ///
631 /// This method serializes queries by replacing the previous sender.
632 /// Callers should wait for the receiver before calling again.
633 ///
634 /// # Example
635 /// ```no_run
636 /// use std::time::Duration;
637 /// use everything_ipc::wm::{EverythingClient, RequestFlags};
638 ///
639 /// let everything = EverythingClient::new().unwrap();
640 ///
641 /// // These queries will be serialized (not sent concurrently)
642 /// let receiver1 = everything
643 /// .query("search1")
644 /// .request_flags(RequestFlags::FileName)
645 /// .call()
646 /// .unwrap();
647 /// let result1 = receiver1.recv_timeout(Duration::from_secs(5)).unwrap();
648 ///
649 /// // Now safe to send next query
650 /// let receiver2 = everything
651 /// .query("search2")
652 /// .request_flags(RequestFlags::FileName)
653 /// .call()
654 /// .unwrap();
655 /// let result2 = receiver2.recv_timeout(Duration::from_secs(5)).unwrap();
656 /// ```
657 #[instrument(skip_all)]
658 #[builder]
659 pub fn query(
660 &self,
661 #[builder(start_fn)] search: &str,
662 #[builder(default)] search_flags: SearchFlags,
663 request_flags: RequestFlags,
664 #[builder(default)] sort: Sort,
665 #[builder(default)] offset: u32,
666 max_results: Option<u32>,
667 // #[builder(setters(vis = ""))] current_query_sender: Option<&mut Option<QuerySender>>,
668 ) -> Result<mpsc::Receiver<QueryList>, IpcError> {
669 let id = self.next_id();
670 debug!("generating query ID {}", id);
671
672 // Create a channel for the response
673 let (sender, receiver) = mpsc::channel::<QueryList>();
674
675 // Send the query first
676 let sent = self.query_send(
677 search,
678 search_flags,
679 request_flags,
680 sort,
681 id,
682 offset,
683 max_results,
684 );
685
686 if !sent {
687 warn!("failed to send query ID {}", id);
688 return Err(IpcError::Send);
689 }
690 debug!("query ID {} sent successfully", id);
691
692 // Store the sender (only one query at a time per Everything instance)
693 // Using Mutex for thread-safe mutable access
694 /*
695 let old_sender = match current_query_sender {
696 Some(current_query_sender) => current_query_sender.replace(QuerySender::Sync(sender)),
697 None => self
698 .inner
699 .current_query_sender
700 .lock()
701 .unwrap()
702 .replace(QuerySender::Sync(sender)),
703 };
704 */
705 let old_sender = self
706 .inner
707 .current_query_sender
708 .lock()
709 .unwrap()
710 .replace(QuerySender::Sync(sender));
711 // Drop any previous sender that wasn't used - its receiver will fail
712 drop(old_sender);
713
714 Ok(receiver)
715 }
716
717 /// Send a query to Everything and wait for the result
718 ///
719 /// This method serializes queries to work around Everything's single-query-per-window limitation.
720 /// Only one query can be sent at a time per reply window.
721 #[instrument(skip_all)]
722 #[builder]
723 pub fn query_wait(
724 &self,
725 #[builder(start_fn)] search: &str,
726 #[builder(default)] search_flags: SearchFlags,
727 request_flags: RequestFlags,
728 #[builder(default)] sort: Sort,
729 #[builder(default)] offset: u32,
730 max_results: Option<u32>,
731 #[builder(default = Duration::from_millis(3000))] timeout: Duration,
732 ) -> Result<QueryList, IpcError> {
733 // let mut current_query_sender = self.inner.current_query_sender.lock().unwrap();
734 #[cfg(not(feature = "tokio"))]
735 let _lock = self.inner.query_lock.lock().unwrap();
736 #[cfg(feature = "tokio")]
737 let _lock = self.inner.query_lock.blocking_lock();
738
739 // Reuse query to send the query, then wait for the result
740 let receiver = self
741 .query(search)
742 .search_flags(search_flags)
743 .request_flags(request_flags)
744 .sort(sort)
745 .offset(offset)
746 .maybe_max_results(max_results)
747 // .current_query_sender(&mut current_query_sender)
748 .call()?;
749
750 // Wait for the response with timeout
751 match receiver.recv_timeout(timeout) {
752 Ok(results) => Ok(results),
753 Err(_) => {
754 warn!("query timed out");
755 Err(IpcError::Timeout)
756 }
757 }
758 }
759}
760
761#[cfg(feature = "tokio")]
762#[bon]
763impl EverythingClient {
764 /// Send a query to Everything asynchronously
765 ///
766 /// # Important Note
767 /// Everything only handles one query per window at a time. Sending another query
768 /// when a query has not completed will cancel the old query.
769 ///
770 /// This method serializes queries by replacing the previous sender.
771 /// Callers should await the receiver before calling again.
772 ///
773 /// # Example
774 /// ```no_run
775 /// use everything_ipc::wm::{EverythingClient, RequestFlags};
776 ///
777 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
778 /// let everything = EverythingClient::new().unwrap();
779 ///
780 /// // These queries will be serialized (not sent concurrently)
781 /// let receiver1 = everything
782 /// .query_tokio("search1")
783 /// .request_flags(RequestFlags::FileName)
784 /// .call()?;
785 /// let result1 = receiver1.await?;
786 ///
787 /// // Now safe to send next query
788 /// let receiver2 = everything
789 /// .query_tokio("search2")
790 /// .request_flags(RequestFlags::FileName)
791 /// .call()?;
792 /// let result2 = receiver2.await?;
793 /// # Ok(())
794 /// # }
795 /// ```
796 #[instrument(skip_all)]
797 #[builder]
798 pub fn query_tokio(
799 &self,
800 #[builder(start_fn)] search: &str,
801 #[builder(default)] search_flags: SearchFlags,
802 request_flags: RequestFlags,
803 #[builder(default)] sort: Sort,
804 #[builder(default)] offset: u32,
805 max_results: Option<u32>,
806 ) -> Result<tokio::sync::oneshot::Receiver<QueryList>, IpcError> {
807 let id = self.next_id();
808 debug!("generating query ID {}", id);
809
810 // Create a channel for the response
811 let (sender, receiver) = tokio::sync::oneshot::channel::<QueryList>();
812
813 // Send the query first
814 let sent = self.query_send(
815 search,
816 search_flags,
817 request_flags,
818 sort,
819 id,
820 offset,
821 max_results,
822 );
823
824 if !sent {
825 warn!("failed to send query ID {}", id);
826 return Err(IpcError::Send);
827 }
828 debug!("query ID {} sent successfully", id);
829
830 // Store the sender (only one query at a time per Everything instance)
831 // Using Mutex for thread-safe mutable access
832 let old_sender = self
833 .inner
834 .current_query_sender
835 .lock()
836 .unwrap()
837 .replace(QuerySender::Tokio(sender));
838 // Drop any previous sender that wasn't used - its receiver will fail
839 drop(old_sender);
840
841 Ok(receiver)
842 }
843
844 /// Send a query to Everything asynchronously and wait for the result
845 ///
846 /// This method serializes queries to work around Everything's single-query-per-window limitation.
847 /// Only one query can be sent at a time per reply window.
848 #[instrument(skip_all)]
849 #[builder]
850 pub async fn query_wait_tokio(
851 &self,
852 #[builder(start_fn)] search: &str,
853 #[builder(default)] search_flags: SearchFlags,
854 request_flags: RequestFlags,
855 #[builder(default)] sort: Sort,
856 #[builder(default)] offset: u32,
857 max_results: Option<u32>,
858 #[builder(default = Duration::from_millis(3000))] timeout: Duration,
859 ) -> Result<QueryList, IpcError> {
860 let _lock = self.inner.query_lock.lock().await;
861
862 // Reuse query_async to send the query, then wait for the result
863 let receiver = self
864 .query_tokio(search)
865 .search_flags(search_flags)
866 .request_flags(request_flags)
867 .sort(sort)
868 .offset(offset)
869 .maybe_max_results(max_results)
870 .call()?;
871
872 // Wait for the response with timeout
873 match tokio::time::timeout(timeout, receiver).await {
874 Ok(Ok(results)) => Ok(results),
875 Ok(Err(_)) => {
876 warn!("query receiver error");
877 Err(IpcError::Send)
878 }
879 Err(_) => {
880 warn!("query timed out");
881 Err(IpcError::Timeout)
882 }
883 }
884 }
885}
886
887#[cfg(test)]
888mod tests {
889 use super::*;
890
891 #[test_log::test]
892 #[test_log(default_log_filter = "trace")]
893 fn doc() {
894 let everything = EverythingClient::new().expect("not available");
895
896 let list = everything
897 .query_wait(r"C:\Windows\ *.exe")
898 .request_flags(RequestFlags::FileName | RequestFlags::Size | RequestFlags::Path)
899 .sort(Sort::SizeDescending)
900 .max_results(5)
901 .call()
902 .expect("query");
903
904 println!("Found {} items:", list.len());
905 println!("{:<25} {:>10} {}", "Filename", "Size", "Path");
906 for item in list.iter() {
907 // get_string() for String, get_str() for &U16CStr
908 let filename = item.get_string(RequestFlags::FileName).unwrap();
909 let path = item.get_str(RequestFlags::Path).unwrap().display();
910 let size = item.get_size(RequestFlags::Size).unwrap();
911 println!("{:<25} {:>10} {}", filename, size, path);
912 }
913 println!("Total: {} items", list.total_len());
914 }
915
916 #[test_log::test]
917 #[test_log(default_log_filter = "trace")]
918 fn query_empty_search() {
919 let everything = EverythingClient::new().unwrap();
920 let search = "";
921 let search_flags = SearchFlags::MatchCase;
922 let request_flags = RequestFlags::FileName | RequestFlags::Path;
923 let sort = Sort::NameAscending;
924
925 // Send query for first 5 items
926 let result =
927 everything.query_send(search, search_flags, request_flags, sort, 1000, 0, Some(5));
928
929 assert!(result, "Query should be sent successfully");
930 }
931
932 #[test_log::test]
933 #[test_log(default_log_filter = "trace")]
934 fn query_with_pattern() {
935 let everything = EverythingClient::new().unwrap();
936 let search = "test";
937 let search_flags = SearchFlags::MatchCase;
938 let request_flags = RequestFlags::FileName;
939 let sort = Sort::NameAscending;
940
941 let result =
942 everything.query_send(search, search_flags, request_flags, sort, 1001, 0, Some(10));
943
944 assert!(result, "Query should be sent successfully");
945 }
946
947 #[test]
948 fn query_with_full_path() {
949 let everything = EverythingClient::new().unwrap();
950 let search = "";
951 let search_flags = SearchFlags::MatchCase;
952 let request_flags =
953 RequestFlags::FullPathAndFileName | RequestFlags::Size | RequestFlags::DateModified;
954 let sort = Sort::NameAscending;
955
956 let result =
957 everything.query_send(search, search_flags, request_flags, sort, 1002, 0, Some(3));
958
959 assert!(result, "Query should be sent successfully");
960 }
961
962 #[test]
963 fn query_sort_by_size() {
964 let everything = EverythingClient::new().unwrap();
965 let search = "";
966 let search_flags = SearchFlags::MatchCase;
967 let request_flags = RequestFlags::FileName | RequestFlags::Size;
968 let sort = Sort::SizeAscending;
969
970 let result =
971 everything.query_send(search, search_flags, request_flags, sort, 1003, 0, Some(5));
972
973 assert!(result, "Query should be sent successfully");
974 }
975
976 #[test]
977 fn query_with_offset() {
978 let everything = EverythingClient::new().unwrap();
979 let search = "";
980 let search_flags = SearchFlags::MatchCase;
981 let request_flags = RequestFlags::FileName;
982 let sort = Sort::NameAscending;
983
984 // First query without offset
985 let result1 =
986 everything.query_send(search, search_flags, request_flags, sort, 1005, 0, Some(2));
987
988 assert!(result1, "First query should be sent successfully");
989
990 // Second query with offset
991 let result2 =
992 everything.query_send(search, search_flags, request_flags, sort, 1006, 2, Some(2));
993
994 assert!(
995 result2,
996 "Second query with offset should be sent successfully"
997 );
998 }
999
1000 #[test]
1001 fn query_everything() {
1002 let everything = EverythingClient::new().unwrap();
1003 let search = "test";
1004 let search_flags = SearchFlags::MatchCase;
1005 let request_flags = RequestFlags::FileName;
1006 let sort = Sort::NameAscending;
1007
1008 let result = everything.query_send(
1009 search,
1010 search_flags,
1011 request_flags,
1012 sort,
1013 everything.next_id(),
1014 0,
1015 Some(5),
1016 );
1017
1018 assert!(result, "Query should be sent successfully");
1019 }
1020
1021 #[test]
1022 fn query_multiple_requests() {
1023 let everything = EverythingClient::new().unwrap();
1024 let search = "";
1025 let search_flags = SearchFlags::MatchCase;
1026 let request_flags = RequestFlags::FileName
1027 | RequestFlags::Path
1028 | RequestFlags::Size
1029 | RequestFlags::DateModified
1030 | RequestFlags::DateCreated;
1031 let sort = Sort::NameAscending;
1032
1033 let result =
1034 everything.query_send(search, search_flags, request_flags, sort, 1004, 0, Some(5));
1035
1036 assert!(result, "Query should be sent successfully");
1037 }
1038
1039 #[test]
1040 fn query_wait_empty() {
1041 let everything = EverythingClient::new().unwrap();
1042 let search = "";
1043 let search_flags = SearchFlags::MatchCase;
1044 let request_flags = RequestFlags::FileName;
1045 let sort = Sort::NameAscending;
1046
1047 // Check if IPC is available
1048 assert!(everything.is_ipc_available(), "IPC should be available");
1049
1050 let result = everything
1051 .query_wait(search)
1052 .search_flags(search_flags)
1053 .request_flags(request_flags)
1054 .sort(sort)
1055 .offset(0)
1056 .max_results(10)
1057 .call();
1058 assert!(
1059 result.is_ok(),
1060 "query_wait should return Ok when Everything is available"
1061 );
1062 }
1063
1064 #[test_log::test]
1065 #[test_log(default_log_filter = "trace")]
1066 fn query_wait() {
1067 let everything = EverythingClient::new().unwrap();
1068 // Use empty string to get all files, which is more reliable than searching for "test"
1069 // which may not exist on the system
1070 let search = "test";
1071 // MATCH_ACCENTS is marked as abandoned in the C++ code, use MATCH_CASE instead
1072 let search_flags = SearchFlags::MatchCase;
1073 let request_flags = RequestFlags::FileName;
1074 let sort = Sort::NameAscending;
1075
1076 // Check if IPC is available
1077 assert!(everything.is_ipc_available(), "IPC should be available");
1078
1079 let result = everything
1080 .query_wait(search)
1081 .search_flags(search_flags)
1082 .request_flags(request_flags)
1083 .sort(sort)
1084 .offset(0)
1085 .max_results(10)
1086 .call();
1087 dbg!(&result);
1088 assert!(
1089 result.is_ok(),
1090 "query_wait should return Ok when Everything is available"
1091 );
1092 assert!(
1093 result.as_ref().is_ok_and(|r| r.total_len() > 0),
1094 "Expected found_num > 0, got: {:?}",
1095 result
1096 );
1097 }
1098
1099 #[test_log::test]
1100 #[test_log(default_log_filter = "trace")]
1101 fn query_wait_cancel() {
1102 let everything = EverythingClient::new().unwrap();
1103
1104 // Check if IPC is available
1105 assert!(everything.is_ipc_available(), "IPC should be available");
1106
1107 // Send multiple queries at once
1108 // Note: These will be serialized, so only the last one succeeds
1109 // The first two queries will be disconnected when the next query replaces their sender
1110 let searches = ["", "test", "rust"];
1111 let mut receivers = Vec::new();
1112
1113 for search in &searches {
1114 let search_flags = SearchFlags::MatchCase;
1115 let request_flags = RequestFlags::FileName;
1116 let sort = Sort::NameAscending;
1117 let receiver = everything
1118 .query(search)
1119 .search_flags(search_flags)
1120 .request_flags(request_flags)
1121 .sort(sort)
1122 .offset(0)
1123 .max_results(10)
1124 .call()
1125 .expect("query should succeed");
1126 receivers.push(receiver);
1127 }
1128
1129 // First query should fail (response rejected because sender was replaced)
1130 // When query 2 is sent, it replaces the sender for query 0
1131 // When the response comes back for query 0, the old sender is gone
1132 let result = receivers[0].recv_timeout(std::time::Duration::from_millis(3000));
1133 assert!(
1134 result.is_err(),
1135 "Query 0 should fail because sender was replaced (got: {:?})",
1136 result
1137 );
1138
1139 // Second query should succeed (it's the current sender when response arrives)
1140 let result = receivers[1].recv_timeout(std::time::Duration::from_millis(3000));
1141 assert!(
1142 result.is_err(),
1143 "Query 1 should fail because sender was replaced (got: {:?})",
1144 result
1145 );
1146
1147 // Last query should succeed
1148 let result = receivers[2].recv_timeout(std::time::Duration::from_millis(3000));
1149 let result = result.expect("Last query should succeed");
1150 dbg!(&result);
1151 assert!(
1152 result.total_len() > 0,
1153 "Last query should return valid results"
1154 );
1155 }
1156
1157 #[test_log::test]
1158 #[test_log(default_log_filter = "trace")]
1159 fn query_wait_parallel() {
1160 // Check if IPC is available
1161 let everything1 = EverythingClient::new().unwrap();
1162 let everything2 = EverythingClient::new().unwrap();
1163 let everything3 = EverythingClient::new().unwrap();
1164
1165 assert!(everything1.is_ipc_available(), "IPC should be available");
1166
1167 // Send multiple queries at once using separate Everything instances
1168 // Note: These won't cancel each other
1169 let receiver1 = everything1
1170 .query("")
1171 .search_flags(SearchFlags::MatchCase)
1172 .request_flags(RequestFlags::FileName)
1173 .sort(Sort::NameAscending)
1174 .offset(0)
1175 .max_results(10)
1176 .call()
1177 .expect("query should succeed");
1178 let receiver2 = everything2
1179 .query("test")
1180 .search_flags(SearchFlags::MatchCase)
1181 .request_flags(RequestFlags::FileName)
1182 .sort(Sort::NameAscending)
1183 .offset(0)
1184 .max_results(10)
1185 .call()
1186 .expect("query should succeed");
1187 let receiver3 = everything3
1188 .query("rust")
1189 .search_flags(SearchFlags::MatchCase)
1190 .request_flags(RequestFlags::FileName)
1191 .sort(Sort::NameAscending)
1192 .offset(0)
1193 .max_results(10)
1194 .call()
1195 .expect("query should succeed");
1196
1197 // Wait for all queries to complete
1198 for (i, receiver) in [receiver1, receiver2, receiver3].into_iter().enumerate() {
1199 let result = receiver.recv_timeout(std::time::Duration::from_millis(5000));
1200 let result = result.expect(&format!("Query {} timed out", i));
1201 dbg!(&result);
1202 assert!(result.len() > 0, "Query {} should return valid results", i);
1203 }
1204 }
1205}