rust_queries_core/lazy_parallel.rs
1//! Parallel lazy query implementation using rayon for parallel processing.
2//!
3//! This module provides parallel lazy evaluation of queries, deferring execution
4//! until results are actually consumed, but using multiple threads for better performance.
5//!
6//! # Benefits
7//!
8//! - **Deferred execution**: No work until results needed
9//! - **Parallel processing**: Utilizes multiple CPU cores
10//! - **Iterator fusion**: Rust optimizes chained operations
11//! - **Early termination**: `.take()` stops as soon as enough items found
12//! - **Composable**: Build complex queries by composition
13//! - **Thread-safe**: All operations are Send + Sync
14//!
15//! # Example
16//!
17//! ```ignore
18//! // Nothing executes yet
19//! let query = LazyParallelQuery::new(&products)
20//! .where_(Product::price(), |&p| p < 100.0)
21//! .where_(Product::stock(), |&s| s > 0);
22//!
23//! // Parallel execution happens here
24//! let results: Vec<_> = query.collect_parallel();
25//! ```
26
27#[cfg(feature = "parallel")]
28use {
29 rayon::prelude::*,
30 key_paths_core::KeyPaths,
31 std::marker::PhantomData,
32 std::time::SystemTime,
33};
34
35// DateTime is used in doc comments and conditional compilation
36#[cfg(feature = "datetime")]
37#[allow(unused_imports)]
38use chrono::DateTime;
39
40/// A parallel lazy query builder that uses rayon for parallel processing.
41///
42/// Unlike the standard `LazyQuery`, `LazyParallelQuery` uses parallel iterators
43/// for better performance on large datasets while maintaining lazy evaluation.
44///
45/// # Type Parameters
46///
47/// * `'a` - The lifetime of the data being queried
48/// * `T` - The type of items in the collection
49///
50/// # Example
51///
52/// ```ignore
53/// let query = LazyParallelQuery::new(&products)
54/// .where_(Product::price(), |&p| p < 100.0)
55/// .and(Product::stock(), |&s| s > 0)
56/// .or(Product::category(), |c| c == "Premium")
57/// .collect_parallel();
58/// ```
59#[cfg(feature = "parallel")]
60pub struct LazyParallelQuery<'a, T: 'static + Send + Sync> {
61 data: &'a [T],
62 filter_groups: Vec<FilterGroup<'a, T>>,
63 _phantom: PhantomData<&'a T>,
64}
65
66/// Represents a group of filters with a logical operator for parallel queries.
67/// All filters must be Send + Sync for thread safety.
68#[cfg(feature = "parallel")]
69enum FilterGroup<'a, T: 'static + Send + Sync> {
70 And(Vec<Box<dyn Fn(&T) -> bool + Send + Sync + 'a>>),
71 Or(Vec<Box<dyn Fn(&T) -> bool + Send + Sync + 'a>>),
72}
73
74#[cfg(feature = "parallel")]
75impl<'a, T: 'static + Send + Sync> FilterGroup<'a, T> {
76 fn evaluate(&self, item: &T) -> bool {
77 match self {
78 FilterGroup::And(filters) => filters.iter().all(|f| f(item)),
79 FilterGroup::Or(filters) => filters.iter().any(|f| f(item)),
80 }
81 }
82}
83
84#[cfg(feature = "parallel")]
85impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
86 /// Creates a new parallel lazy query from a slice.
87 ///
88 /// # Example
89 ///
90 /// ```ignore
91 /// let query = LazyParallelQuery::new(&products);
92 /// ```
93 pub fn new(data: &'a [T]) -> Self {
94 Self {
95 data,
96 filter_groups: Vec::new(),
97 _phantom: PhantomData,
98 }
99 }
100
101 /// Adds a filter predicate (lazy - not executed yet).
102 /// Multiple `where_` calls are implicitly ANDed together, unless the last group is an OR group,
103 /// in which case `where_` adds to the OR group.
104 ///
105 /// # Example
106 ///
107 /// ```ignore
108 /// let query = LazyParallelQuery::new(&products)
109 /// .where_(Product::price(), |&p| p < 100.0);
110 /// ```
111 pub fn where_<F>(mut self, path: KeyPaths<T, F>, predicate: impl Fn(&F) -> bool + Send + Sync + 'static) -> Self
112 where
113 F: 'static + Send + Sync,
114 {
115 let filter = Box::new(move |item: &T| {
116 path.get(item).map_or(false, |val| predicate(val))
117 });
118
119 // If the last group is an OR group, add to it; otherwise create/add to AND group
120 match self.filter_groups.last_mut() {
121 Some(FilterGroup::Or(filters)) => {
122 // Add to existing OR group
123 filters.push(filter);
124 }
125 Some(FilterGroup::And(filters)) => {
126 // Add to existing AND group
127 filters.push(filter);
128 }
129 None => {
130 // Create new AND group
131 self.filter_groups.push(FilterGroup::And(vec![filter]));
132 }
133 }
134
135 self
136 }
137
138 /// Adds a filter with AND logic (explicit AND operator).
139 /// This is equivalent to `where_` but makes the AND relationship explicit.
140 ///
141 /// # Example
142 ///
143 /// ```ignore
144 /// let query = LazyParallelQuery::new(&products)
145 /// .where_(Product::price(), |&p| p < 100.0)
146 /// .and(Product::stock(), |&s| s > 0);
147 /// ```
148 pub fn and<F>(mut self, path: KeyPaths<T, F>, predicate: impl Fn(&F) -> bool + Send + Sync + 'static) -> Self
149 where
150 F: 'static + Send + Sync,
151 {
152 let filter = Box::new(move |item: &T| {
153 path.get(item).map_or(false, |val| predicate(val))
154 });
155
156 match self.filter_groups.last_mut() {
157 Some(FilterGroup::And(filters)) => {
158 filters.push(filter);
159 }
160 _ => {
161 // If no previous group or previous was OR, start a new AND group
162 self.filter_groups.push(FilterGroup::And(vec![filter]));
163 }
164 }
165
166 self
167 }
168
169 /// Adds a filter with OR logic (explicit OR operator).
170 /// Items matching this filter OR any filters in the current OR group will pass.
171 ///
172 /// # Example
173 ///
174 /// ```ignore
175 /// let query = LazyParallelQuery::new(&products)
176 /// .where_(Product::price(), |&p| p < 100.0)
177 /// .or(Product::category(), |c| c == "Premium");
178 /// ```
179 pub fn or<F>(mut self, path: KeyPaths<T, F>, predicate: impl Fn(&F) -> bool + Send + Sync + 'static) -> Self
180 where
181 F: 'static + Send + Sync,
182 {
183 let filter = Box::new(move |item: &T| {
184 path.get(item).map_or(false, |val| predicate(val))
185 });
186
187 match self.filter_groups.last_mut() {
188 Some(FilterGroup::Or(filters)) => {
189 filters.push(filter);
190 }
191 _ => {
192 // If no previous group or previous was AND, start a new OR group
193 self.filter_groups.push(FilterGroup::Or(vec![filter]));
194 }
195 }
196
197 self
198 }
199
200 /// Evaluates all filter groups against an item.
201 /// Filter groups are evaluated as follows:
202 /// - AND groups: all filters in the group must pass
203 /// - OR groups: at least one filter in the group must pass
204 /// - Between groups: if there are both AND and OR groups, items must satisfy
205 /// either all AND groups OR at least one OR group (if OR groups exist)
206 fn evaluate_filters(&self, item: &T) -> bool {
207 let (and_groups, or_groups): (Vec<_>, Vec<_>) = self.filter_groups
208 .iter()
209 .partition(|group| matches!(group, FilterGroup::And(_)));
210
211 match (and_groups.is_empty(), or_groups.is_empty()) {
212 // Only AND groups: all must pass
213 (false, true) => and_groups.iter().all(|group| group.evaluate(item)),
214 // Only OR groups: at least one must pass
215 (true, false) => or_groups.iter().any(|group| group.evaluate(item)),
216 // Both AND and OR groups: (all AND pass) OR (any OR pass)
217 (false, false) => {
218 let all_and_pass = and_groups.iter().all(|group| group.evaluate(item));
219 let any_or_pass = or_groups.iter().any(|group| group.evaluate(item));
220 all_and_pass || any_or_pass
221 }
222 // No filters: everything passes
223 (true, true) => true,
224 }
225 }
226
227 /// Collects all items into a vector (terminal operation - executes query in parallel).
228 ///
229 /// # Example
230 ///
231 /// ```ignore
232 /// let results: Vec<&Product> = query.collect_parallel();
233 /// ```
234 pub fn collect_parallel(&self) -> Vec<&'a T> {
235 self.data
236 .par_iter()
237 .filter(|item| self.evaluate_filters(item))
238 .collect()
239 }
240
241 /// Gets the first item (terminal operation - executes until first match in parallel).
242 ///
243 /// # Example
244 ///
245 /// ```ignore
246 /// let first = query.first_parallel();
247 /// ```
248 pub fn first_parallel(&self) -> Option<&'a T> {
249 self.data
250 .par_iter()
251 .find_any(|item| self.evaluate_filters(item))
252 }
253
254 /// Counts items (terminal operation - executes query in parallel).
255 ///
256 /// # Example
257 ///
258 /// ```ignore
259 /// let count = query.count_parallel();
260 /// ```
261 pub fn count_parallel(&self) -> usize {
262 self.data
263 .par_iter()
264 .filter(|item| self.evaluate_filters(item))
265 .count()
266 }
267
268 /// Checks if any items match (terminal operation - short-circuits in parallel).
269 ///
270 /// # Example
271 ///
272 /// ```ignore
273 /// let exists = query.any_parallel();
274 /// ```
275 pub fn any_parallel(&self) -> bool {
276 self.data
277 .par_iter()
278 .any(|item| self.evaluate_filters(item))
279 }
280
281 /// Checks if all items match a predicate (terminal operation - short-circuits in parallel).
282 ///
283 /// # Example
284 ///
285 /// ```ignore
286 /// let all_positive = query.all_match_parallel(|item| item.value > 0);
287 /// ```
288 pub fn all_match_parallel<P>(&self, predicate: P) -> bool
289 where
290 P: Fn(&'a T) -> bool + Send + Sync,
291 {
292 self.data
293 .par_iter()
294 .filter(|item| self.evaluate_filters(item))
295 .all(predicate)
296 }
297
298 /// Executes a function for each item (terminal operation in parallel).
299 ///
300 /// # Example
301 ///
302 /// ```ignore
303 /// query.for_each_parallel(|item| println!("{:?}", item));
304 /// ```
305 pub fn for_each_parallel<F>(&self, f: F)
306 where
307 F: Fn(&'a T) + Send + Sync,
308 {
309 self.data
310 .par_iter()
311 .filter(|item| self.evaluate_filters(item))
312 .for_each(f)
313 }
314
315 /// Folds the iterator (terminal operation in parallel).
316 ///
317 /// Note: This is a simplified implementation that collects all items first.
318 /// For true parallel folding with custom reduction, consider using the sequential version
319 /// or implementing a custom parallel reduction strategy.
320 ///
321 /// # Example
322 ///
323 /// ```ignore
324 /// let sum = query.fold_parallel(0.0, |acc, item| acc + item.price);
325 /// ```
326 pub fn fold_parallel<B, F>(&self, init: B, f: F) -> B
327 where
328 B: Send + Sync,
329 F: Fn(B, &'a T) -> B + Send + Sync,
330 {
331 let items: Vec<&'a T> = self.data
332 .par_iter()
333 .filter(|item| self.evaluate_filters(item))
334 .collect();
335
336 items.into_iter().fold(init, f)
337 }
338
339 /// Finds an item matching a predicate (terminal - short-circuits in parallel).
340 ///
341 /// # Example
342 ///
343 /// ```ignore
344 /// let found = query.find_parallel(|item| item.id == 42);
345 /// ```
346 pub fn find_parallel<P>(&self, predicate: P) -> Option<&'a T>
347 where
348 P: Fn(&&'a T) -> bool + Send + Sync,
349 {
350 self.data
351 .par_iter()
352 .filter(|item| self.evaluate_filters(item))
353 .find_any(predicate)
354 }
355
356 /// Projects/selects a single field from results (parallel).
357 ///
358 /// # Arguments
359 ///
360 /// * `path` - The key-path to the field to select
361 ///
362 /// # Example
363 ///
364 /// ```ignore
365 /// let names = query.select_parallel(Product::name());
366 /// ```
367 pub fn select_parallel<F>(&self, path: KeyPaths<T, F>) -> Vec<F>
368 where
369 F: Clone + Send + Sync + 'static,
370 {
371 self.data
372 .par_iter()
373 .filter(|item| self.evaluate_filters(item))
374 .filter_map(|item| path.get(item).cloned())
375 .collect()
376 }
377
378 /// Maps each item through a transformation (parallel).
379 ///
380 /// # Example
381 ///
382 /// ```ignore
383 /// let prices = query.map_items_parallel(|p| p.price);
384 /// ```
385 pub fn map_items_parallel<F, O>(&self, f: F) -> Vec<O>
386 where
387 F: Fn(&'a T) -> O + Send + Sync,
388 O: Send,
389 {
390 self.data
391 .par_iter()
392 .filter(|item| self.evaluate_filters(item))
393 .map(f)
394 .collect()
395 }
396
397 /// Takes at most `n` items (parallel).
398 ///
399 /// Note: This collects all filtered items and then takes the first n.
400 /// For true lazy evaluation with take, use the sequential LazyQuery.
401 ///
402 /// # Example
403 ///
404 /// ```ignore
405 /// let first_10: Vec<_> = query.take_parallel(10);
406 /// ```
407 pub fn take_parallel(&self, n: usize) -> Vec<&'a T> {
408 let mut results: Vec<&'a T> = self.data
409 .par_iter()
410 .filter(|item| self.evaluate_filters(item))
411 .collect();
412 results.truncate(n);
413 results
414 }
415
416 /// Skips `n` items (parallel).
417 ///
418 /// Note: This collects all filtered items and then skips the first n.
419 /// For true lazy evaluation with skip, use the sequential LazyQuery.
420 ///
421 /// # Example
422 ///
423 /// ```ignore
424 /// let page_2: Vec<_> = query.skip_parallel(10);
425 /// ```
426 pub fn skip_parallel(&self, n: usize) -> Vec<&'a T> {
427 let results: Vec<&'a T> = self.data
428 .par_iter()
429 .filter(|item| self.evaluate_filters(item))
430 .collect();
431 results.into_iter().skip(n).collect()
432 }
433}
434
435// Aggregation operations (parallel)
436#[cfg(feature = "parallel")]
437impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
438 /// Computes sum of a field (terminal operation in parallel).
439 ///
440 /// # Example
441 ///
442 /// ```ignore
443 /// let total: f64 = LazyParallelQuery::new(&products)
444 /// .sum_by_parallel(Product::price());
445 /// ```
446 pub fn sum_by_parallel<F>(&self, path: KeyPaths<T, F>) -> F
447 where
448 F: Clone + std::ops::Add<Output = F> + Default + Send + Sync + std::iter::Sum + 'static,
449 {
450 self.data
451 .par_iter()
452 .filter(|item| self.evaluate_filters(item))
453 .filter_map(|item| path.get(item).cloned())
454 .sum()
455 }
456
457 /// Computes average of a float field (terminal operation in parallel).
458 ///
459 /// # Example
460 ///
461 /// ```ignore
462 /// let avg = LazyParallelQuery::new(&products)
463 /// .avg_by_parallel(Product::price());
464 /// ```
465 pub fn avg_by_parallel(&self, path: KeyPaths<T, f64>) -> Option<f64>
466 where
467 T: Send + Sync,
468 {
469 let items: Vec<f64> = self
470 .data
471 .par_iter()
472 .filter(|item| self.evaluate_filters(item))
473 .filter_map(|item| path.get(item).cloned())
474 .collect();
475
476 if items.is_empty() {
477 None
478 } else {
479 Some(items.par_iter().sum::<f64>() / items.len() as f64)
480 }
481 }
482
483 /// Finds minimum value of a field (terminal operation in parallel).
484 ///
485 /// # Example
486 ///
487 /// ```ignore
488 /// let min = LazyParallelQuery::new(&products)
489 /// .min_by_parallel(Product::price());
490 /// ```
491 pub fn min_by_parallel<F>(&self, path: KeyPaths<T, F>) -> Option<F>
492 where
493 F: Ord + Clone + Send + Sync + 'static,
494 {
495 self.data
496 .par_iter()
497 .filter(|item| self.evaluate_filters(item))
498 .filter_map(|item| path.get(item).cloned())
499 .min()
500 }
501
502 /// Finds maximum value of a field (terminal operation in parallel).
503 ///
504 /// # Example
505 ///
506 /// ```ignore
507 /// let max = LazyParallelQuery::new(&products)
508 /// .max_by_parallel(Product::price());
509 /// ```
510 pub fn max_by_parallel<F>(&self, path: KeyPaths<T, F>) -> Option<F>
511 where
512 F: Ord + Clone + Send + Sync + 'static,
513 {
514 self.data
515 .par_iter()
516 .filter(|item| self.evaluate_filters(item))
517 .filter_map(|item| path.get(item).cloned())
518 .max()
519 }
520
521 /// Finds minimum float value (terminal operation in parallel).
522 pub fn min_by_float_parallel(&self, path: KeyPaths<T, f64>) -> Option<f64>
523 where
524 T: Send + Sync,
525 {
526 self.data
527 .par_iter()
528 .filter(|item| self.evaluate_filters(item))
529 .filter_map(|item| path.get(item).cloned())
530 .min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
531 }
532
533 /// Finds maximum float value (terminal operation in parallel).
534 pub fn max_by_float_parallel(&self, path: KeyPaths<T, f64>) -> Option<f64>
535 where
536 T: Send + Sync,
537 {
538 self.data
539 .par_iter()
540 .filter(|item| self.evaluate_filters(item))
541 .filter_map(|item| path.get(item).cloned())
542 .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
543 }
544}
545
546// DateTime operations for SystemTime (parallel)
547#[cfg(feature = "parallel")]
548impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
549 /// Filter by SystemTime being after a reference time (parallel).
550 ///
551 /// # Arguments
552 ///
553 /// * `path` - The key-path to the SystemTime field
554 /// * `reference` - The reference time to compare against
555 ///
556 /// # Example
557 ///
558 /// ```ignore
559 /// let recent = LazyParallelQuery::new(&events)
560 /// .where_after_systemtime_parallel(Event::timestamp(), cutoff_time);
561 /// ```
562 pub fn where_after_systemtime_parallel(self, path: KeyPaths<T, SystemTime>, reference: SystemTime) -> Self {
563 self.where_(path, move |time| time > &reference)
564 }
565
566 /// Filter by SystemTime being before a reference time (parallel).
567 ///
568 /// # Arguments
569 ///
570 /// * `path` - The key-path to the SystemTime field
571 /// * `reference` - The reference time to compare against
572 ///
573 /// # Example
574 ///
575 /// ```ignore
576 /// let old = LazyParallelQuery::new(&events)
577 /// .where_before_systemtime_parallel(Event::timestamp(), cutoff_time);
578 /// ```
579 pub fn where_before_systemtime_parallel(self, path: KeyPaths<T, SystemTime>, reference: SystemTime) -> Self {
580 self.where_(path, move |time| time < &reference)
581 }
582
583 /// Filter by SystemTime being between two times (inclusive, parallel).
584 ///
585 /// # Arguments
586 ///
587 /// * `path` - The key-path to the SystemTime field
588 /// * `start` - The start time
589 /// * `end` - The end time
590 ///
591 /// # Example
592 ///
593 /// ```ignore
594 /// let range = LazyParallelQuery::new(&events)
595 /// .where_between_systemtime_parallel(Event::timestamp(), start, end);
596 /// ```
597 pub fn where_between_systemtime_parallel(
598 self,
599 path: KeyPaths<T, SystemTime>,
600 start: SystemTime,
601 end: SystemTime,
602 ) -> Self {
603 self.where_(path, move |time| time >= &start && time <= &end)
604 }
605}
606
607// DateTime operations with chrono (only available with datetime feature, parallel)
608#[cfg(all(feature = "parallel", feature = "datetime"))]
609impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
610 /// Filter by DateTime being after a reference time (parallel).
611 ///
612 /// # Arguments
613 ///
614 /// * `path` - The key-path to the DateTime field
615 /// * `reference` - The reference time to compare against
616 ///
617 /// # Example
618 ///
619 /// ```ignore
620 /// let recent = LazyParallelQuery::new(&events)
621 /// .where_after_parallel(Event::timestamp(), cutoff_time);
622 /// ```
623 pub fn where_after_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, reference: DateTime<Tz>) -> Self
624 where
625 Tz: TimeZone + 'static,
626 Tz::Offset: std::fmt::Display + Send + Sync,
627 {
628 self.where_(path, move |time| time > &reference)
629 }
630
631 /// Filter by DateTime being before a reference time (parallel).
632 ///
633 /// # Arguments
634 ///
635 /// * `path` - The key-path to the DateTime field
636 /// * `reference` - The reference time to compare against
637 ///
638 /// # Example
639 ///
640 /// ```ignore
641 /// let old = LazyParallelQuery::new(&events)
642 /// .where_before_parallel(Event::timestamp(), cutoff_time);
643 /// ```
644 pub fn where_before_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, reference: DateTime<Tz>) -> Self
645 where
646 Tz: TimeZone + 'static,
647 Tz::Offset: std::fmt::Display + Send + Sync,
648 {
649 self.where_(path, move |time| time < &reference)
650 }
651
652 /// Filter by DateTime being between two times (inclusive, parallel).
653 ///
654 /// # Arguments
655 ///
656 /// * `path` - The key-path to the DateTime field
657 /// * `start` - The start time
658 /// * `end` - The end time
659 ///
660 /// # Example
661 ///
662 /// ```ignore
663 /// let range = LazyParallelQuery::new(&events)
664 /// .where_between_parallel(Event::timestamp(), start, end);
665 /// ```
666 pub fn where_between_parallel<Tz>(
667 self,
668 path: KeyPaths<T, DateTime<Tz>>,
669 start: DateTime<Tz>,
670 end: DateTime<Tz>,
671 ) -> Self
672 where
673 Tz: TimeZone + 'static,
674 Tz::Offset: std::fmt::Display + Send + Sync,
675 {
676 self.where_(path, move |time| time >= &start && time <= &end)
677 }
678
679 /// Filter by DateTime being today (parallel).
680 ///
681 /// # Arguments
682 ///
683 /// * `path` - The key-path to the DateTime field
684 /// * `now` - The current DateTime to compare against
685 ///
686 /// # Example
687 ///
688 /// ```ignore
689 /// let today = LazyParallelQuery::new(&events)
690 /// .where_today_parallel(Event::timestamp(), Utc::now());
691 /// ```
692 pub fn where_today_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, now: DateTime<Tz>) -> Self
693 where
694 Tz: TimeZone + 'static,
695 Tz::Offset: std::fmt::Display + Send + Sync,
696 {
697 self.where_(path, move |time| {
698 time.date_naive() == now.date_naive()
699 })
700 }
701
702 /// Filter by DateTime year (parallel).
703 ///
704 /// # Arguments
705 ///
706 /// * `path` - The key-path to the DateTime field
707 /// * `year` - The year to filter by
708 ///
709 /// # Example
710 ///
711 /// ```ignore
712 /// let this_year = LazyParallelQuery::new(&events)
713 /// .where_year_parallel(Event::timestamp(), 2024);
714 /// ```
715 pub fn where_year_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, year: i32) -> Self
716 where
717 Tz: TimeZone + 'static,
718 Tz::Offset: std::fmt::Display + Send + Sync,
719 {
720 use chrono::Datelike;
721 self.where_(path, move |time| time.year() == year)
722 }
723
724 /// Filter by DateTime month (parallel).
725 ///
726 /// # Arguments
727 ///
728 /// * `path` - The key-path to the DateTime field
729 /// * `month` - The month to filter by (1-12)
730 ///
731 /// # Example
732 ///
733 /// ```ignore
734 /// let december = LazyParallelQuery::new(&events)
735 /// .where_month_parallel(Event::timestamp(), 12);
736 /// ```
737 pub fn where_month_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, month: u32) -> Self
738 where
739 Tz: TimeZone + 'static,
740 Tz::Offset: std::fmt::Display + Send + Sync,
741 {
742 use chrono::Datelike;
743 self.where_(path, move |time| time.month() == month)
744 }
745
746 /// Filter by DateTime day (parallel).
747 ///
748 /// # Arguments
749 ///
750 /// * `path` - The key-path to the DateTime field
751 /// * `day` - The day to filter by (1-31)
752 ///
753 /// # Example
754 ///
755 /// ```ignore
756 /// let first = LazyParallelQuery::new(&events)
757 /// .where_day_parallel(Event::timestamp(), 1);
758 /// ```
759 pub fn where_day_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, day: u32) -> Self
760 where
761 Tz: TimeZone + 'static,
762 Tz::Offset: std::fmt::Display + Send + Sync,
763 {
764 use chrono::Datelike;
765 self.where_(path, move |time| time.day() == day)
766 }
767
768 /// Filter by weekend dates (Saturday and Sunday, parallel).
769 ///
770 /// # Arguments
771 ///
772 /// * `path` - The key-path to the DateTime field
773 ///
774 /// # Example
775 ///
776 /// ```ignore
777 /// let weekend_events = LazyParallelQuery::new(&events)
778 /// .where_weekend_parallel(Event::timestamp());
779 /// ```
780 pub fn where_weekend_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>) -> Self
781 where
782 Tz: TimeZone + 'static,
783 Tz::Offset: std::fmt::Display + Send + Sync,
784 {
785 use chrono::Datelike;
786 self.where_(path, |time| {
787 let weekday = time.weekday().num_days_from_monday();
788 weekday >= 5
789 })
790 }
791
792 /// Filter by weekday dates (Monday through Friday, parallel).
793 ///
794 /// # Arguments
795 ///
796 /// * `path` - The key-path to the DateTime field
797 ///
798 /// # Example
799 ///
800 /// ```ignore
801 /// let weekday_events = LazyParallelQuery::new(&events)
802 /// .where_weekday_parallel(Event::timestamp());
803 /// ```
804 pub fn where_weekday_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>) -> Self
805 where
806 Tz: TimeZone + 'static,
807 Tz::Offset: std::fmt::Display + Send + Sync,
808 {
809 use chrono::Datelike;
810 self.where_(path, |time| {
811 let weekday = time.weekday().num_days_from_monday();
812 weekday < 5
813 })
814 }
815
816 /// Filter by business hours (9 AM - 5 PM, parallel).
817 ///
818 /// # Arguments
819 ///
820 /// * `path` - The key-path to the DateTime field
821 ///
822 /// # Example
823 ///
824 /// ```ignore
825 /// let business_hours = LazyParallelQuery::new(&events)
826 /// .where_business_hours_parallel(Event::timestamp());
827 /// ```
828 pub fn where_business_hours_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>) -> Self
829 where
830 Tz: TimeZone + 'static,
831 Tz::Offset: std::fmt::Display + Send + Sync,
832 {
833 use chrono::Timelike;
834 self.where_(path, |time| {
835 let hour = time.hour();
836 hour >= 9 && hour < 17
837 })
838 }
839}
840
841// i64 DateTime Aggregators (Unix timestamps in milliseconds, parallel)
842#[cfg(feature = "parallel")]
843impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
844 /// Finds minimum i64 timestamp value (terminal operation in parallel).
845 ///
846 /// # Example
847 ///
848 /// ```ignore
849 /// let earliest = LazyParallelQuery::new(&events)
850 /// .min_timestamp_parallel(Event::created_at_r());
851 /// ```
852 pub fn min_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> Option<i64>
853 where
854 T: Send + Sync,
855 {
856 self.data
857 .par_iter()
858 .filter(|item| self.evaluate_filters(item))
859 .filter_map(|item| path.get(item).cloned())
860 .min()
861 }
862
863 /// Finds maximum i64 timestamp value (terminal operation in parallel).
864 ///
865 /// # Example
866 ///
867 /// ```ignore
868 /// let latest = LazyParallelQuery::new(&events)
869 /// .max_timestamp_parallel(Event::created_at_r());
870 /// ```
871 pub fn max_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> Option<i64>
872 where
873 T: Send + Sync,
874 {
875 self.data
876 .par_iter()
877 .filter(|item| self.evaluate_filters(item))
878 .filter_map(|item| path.get(item).cloned())
879 .max()
880 }
881
882 /// Computes average of i64 timestamp values (terminal operation in parallel).
883 ///
884 /// # Example
885 ///
886 /// ```ignore
887 /// let avg = LazyParallelQuery::new(&events)
888 /// .avg_timestamp_parallel(Event::created_at_r());
889 /// ```
890 pub fn avg_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> Option<i64>
891 where
892 T: Send + Sync,
893 {
894 let items: Vec<i64> = self
895 .data
896 .par_iter()
897 .filter(|item| self.evaluate_filters(item))
898 .filter_map(|item| path.get(item).cloned())
899 .collect();
900
901 if items.is_empty() {
902 None
903 } else {
904 Some(items.par_iter().sum::<i64>() / items.len() as i64)
905 }
906 }
907
908 /// Computes sum of i64 timestamp values (terminal operation in parallel).
909 ///
910 /// # Example
911 ///
912 /// ```ignore
913 /// let total = LazyParallelQuery::new(&events)
914 /// .sum_timestamp_parallel(Event::created_at_r());
915 /// ```
916 pub fn sum_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> i64
917 where
918 T: Send + Sync,
919 {
920 self.data
921 .par_iter()
922 .filter(|item| self.evaluate_filters(item))
923 .filter_map(|item| path.get(item).cloned())
924 .sum()
925 }
926
927 /// Counts i64 timestamp values (terminal operation in parallel).
928 ///
929 /// # Example
930 ///
931 /// ```ignore
932 /// let count = LazyParallelQuery::new(&events)
933 /// .count_timestamp_parallel(Event::created_at_r());
934 /// ```
935 pub fn count_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> usize
936 where
937 T: Send + Sync,
938 {
939 self.data
940 .par_iter()
941 .filter(|item| self.evaluate_filters(item))
942 .filter(|item| path.get(item).is_some())
943 .count()
944 }
945
946 /// Filter by i64 timestamp being after a reference time (parallel).
947 ///
948 /// # Arguments
949 ///
950 /// * `path` - The key-path to the i64 timestamp field
951 /// * `reference` - The reference timestamp to compare against
952 ///
953 /// # Example
954 ///
955 /// ```ignore
956 /// let recent = LazyParallelQuery::new(&events)
957 /// .where_after_timestamp_parallel(Event::created_at_r(), cutoff_time);
958 /// ```
959 pub fn where_after_timestamp_parallel(self, path: KeyPaths<T, i64>, reference: i64) -> Self {
960 self.where_(path, move |timestamp| timestamp > &reference)
961 }
962
963 /// Filter by i64 timestamp being before a reference time (parallel).
964 ///
965 /// # Arguments
966 ///
967 /// * `path` - The key-path to the i64 timestamp field
968 /// * `reference` - The reference timestamp to compare against
969 ///
970 /// # Example
971 ///
972 /// ```ignore
973 /// let old = LazyParallelQuery::new(&events)
974 /// .where_before_timestamp_parallel(Event::created_at_r(), cutoff_time);
975 /// ```
976 pub fn where_before_timestamp_parallel(self, path: KeyPaths<T, i64>, reference: i64) -> Self {
977 self.where_(path, move |timestamp| timestamp < &reference)
978 }
979
980 /// Filter by i64 timestamp being between two times (inclusive, parallel).
981 ///
982 /// # Arguments
983 ///
984 /// * `path` - The key-path to the i64 timestamp field
985 /// * `start` - The start timestamp
986 /// * `end` - The end timestamp
987 ///
988 /// # Example
989 ///
990 /// ```ignore
991 /// let range = LazyParallelQuery::new(&events)
992 /// .where_between_timestamp_parallel(Event::created_at_r(), start, end);
993 /// ```
994 pub fn where_between_timestamp_parallel(
995 self,
996 path: KeyPaths<T, i64>,
997 start: i64,
998 end: i64,
999 ) -> Self {
1000 self.where_(path, move |timestamp| timestamp >= &start && timestamp <= &end)
1001 }
1002
1003 /// Filter by i64 timestamp being within the last N days (parallel).
1004 ///
1005 /// # Arguments
1006 ///
1007 /// * `path` - The key-path to the i64 timestamp field
1008 /// * `days` - Number of days to look back
1009 ///
1010 /// # Example
1011 ///
1012 /// ```ignore
1013 /// let recent = LazyParallelQuery::new(&events)
1014 /// .where_last_days_timestamp_parallel(Event::created_at_r(), 30);
1015 /// ```
1016 pub fn where_last_days_timestamp_parallel(self, path: KeyPaths<T, i64>, days: i64) -> Self {
1017 let now = chrono::Utc::now().timestamp_millis();
1018 let cutoff = now - (days * 24 * 60 * 60 * 1000); // Convert days to milliseconds
1019 self.where_after_timestamp_parallel(path, cutoff)
1020 }
1021
1022 /// Filter by i64 timestamp being within the next N days (parallel).
1023 ///
1024 /// # Arguments
1025 ///
1026 /// * `path` - The key-path to the i64 timestamp field
1027 /// * `days` - Number of days to look forward
1028 ///
1029 /// # Example
1030 ///
1031 /// ```ignore
1032 /// let upcoming = LazyParallelQuery::new(&events)
1033 /// .where_next_days_timestamp_parallel(Event::scheduled_at_r(), 7);
1034 /// ```
1035 pub fn where_next_days_timestamp_parallel(self, path: KeyPaths<T, i64>, days: i64) -> Self {
1036 let now = chrono::Utc::now().timestamp_millis();
1037 let cutoff = now + (days * 24 * 60 * 60 * 1000); // Convert days to milliseconds
1038 self.where_before_timestamp_parallel(path, cutoff)
1039 }
1040
1041 /// Filter by i64 timestamp being within the last N hours (parallel).
1042 ///
1043 /// # Arguments
1044 ///
1045 /// * `path` - The key-path to the i64 timestamp field
1046 /// * `hours` - Number of hours to look back
1047 ///
1048 /// # Example
1049 ///
1050 /// ```ignore
1051 /// let recent = LazyParallelQuery::new(&events)
1052 /// .where_last_hours_timestamp_parallel(Event::created_at_r(), 24);
1053 /// ```
1054 pub fn where_last_hours_timestamp_parallel(self, path: KeyPaths<T, i64>, hours: i64) -> Self {
1055 let now = chrono::Utc::now().timestamp_millis();
1056 let cutoff = now - (hours * 60 * 60 * 1000); // Convert hours to milliseconds
1057 self.where_after_timestamp_parallel(path, cutoff)
1058 }
1059
1060 /// Filter by i64 timestamp being within the next N hours (parallel).
1061 ///
1062 /// # Arguments
1063 ///
1064 /// * `path` - The key-path to the i64 timestamp field
1065 /// * `hours` - Number of hours to look forward
1066 ///
1067 /// # Example
1068 ///
1069 /// ```ignore
1070 /// let upcoming = LazyParallelQuery::new(&events)
1071 /// .where_next_hours_timestamp_parallel(Event::scheduled_at_r(), 2);
1072 /// ```
1073 pub fn where_next_hours_timestamp_parallel(self, path: KeyPaths<T, i64>, hours: i64) -> Self {
1074 let now = chrono::Utc::now().timestamp_millis();
1075 let cutoff = now + (hours * 60 * 60 * 1000); // Convert hours to milliseconds
1076 self.where_before_timestamp_parallel(path, cutoff)
1077 }
1078
1079 /// Filter by i64 timestamp being within the last N minutes (parallel).
1080 ///
1081 /// # Arguments
1082 ///
1083 /// * `path` - The key-path to the i64 timestamp field
1084 /// * `minutes` - Number of minutes to look back
1085 ///
1086 /// # Example
1087 ///
1088 /// ```ignore
1089 /// let recent = LazyParallelQuery::new(&events)
1090 /// .where_last_minutes_timestamp_parallel(Event::created_at_r(), 60);
1091 /// ```
1092 pub fn where_last_minutes_timestamp_parallel(self, path: KeyPaths<T, i64>, minutes: i64) -> Self {
1093 let now = chrono::Utc::now().timestamp_millis();
1094 let cutoff = now - (minutes * 60 * 1000); // Convert minutes to milliseconds
1095 self.where_after_timestamp_parallel(path, cutoff)
1096 }
1097
1098 /// Filter by i64 timestamp being within the next N minutes (parallel).
1099 ///
1100 /// # Arguments
1101 ///
1102 /// * `path` - The key-path to the i64 timestamp field
1103 /// * `minutes` - Number of minutes to look forward
1104 ///
1105 /// # Example
1106 ///
1107 /// ```ignore
1108 /// let upcoming = LazyParallelQuery::new(&events)
1109 /// .where_next_minutes_timestamp_parallel(Event::scheduled_at_r(), 30);
1110 /// ```
1111 pub fn where_next_minutes_timestamp_parallel(self, path: KeyPaths<T, i64>, minutes: i64) -> Self {
1112 let now = chrono::Utc::now().timestamp_millis();
1113 let cutoff = now + (minutes * 60 * 1000); // Convert minutes to milliseconds
1114 self.where_before_timestamp_parallel(path, cutoff)
1115 }
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120 use super::*;
1121 use crate::ext::LazyParallelQueryExt;
1122 use key_paths_derive::Keypath;
1123
1124 #[derive(Debug, Clone, PartialEq, Keypath)]
1125 struct Product {
1126 id: u32,
1127 name: String,
1128 price: f64,
1129 category: String,
1130 stock: u32,
1131 }
1132
1133 fn create_test_products() -> Vec<Product> {
1134 vec![
1135 Product {
1136 id: 1,
1137 name: "Laptop".to_string(),
1138 price: 999.99,
1139 category: "Electronics".to_string(),
1140 stock: 5,
1141 },
1142 Product {
1143 id: 2,
1144 name: "Mouse".to_string(),
1145 price: 29.99,
1146 category: "Electronics".to_string(),
1147 stock: 50,
1148 },
1149 Product {
1150 id: 3,
1151 name: "Keyboard".to_string(),
1152 price: 79.99,
1153 category: "Electronics".to_string(),
1154 stock: 30,
1155 },
1156 Product {
1157 id: 4,
1158 name: "Desk Chair".to_string(),
1159 price: 199.99,
1160 category: "Furniture".to_string(),
1161 stock: 8,
1162 },
1163 ]
1164 }
1165
1166 #[test]
1167 fn test_parallel_and_operator() {
1168 let products = create_test_products();
1169
1170 let results: Vec<_> = products
1171 .lazy_parallel_query()
1172 .where_(Product::price(), |&p| p < 100.0)
1173 .and(Product::stock(), |&s| s > 10)
1174 .collect_parallel();
1175
1176 // Should find: Mouse and Keyboard
1177 assert_eq!(results.len(), 2);
1178 assert!(results.iter().any(|p| p.name == "Mouse"));
1179 assert!(results.iter().any(|p| p.name == "Keyboard"));
1180 }
1181
1182 #[test]
1183 fn test_parallel_or_operator() {
1184 let products = create_test_products();
1185
1186 let results: Vec<_> = products
1187 .lazy_parallel_query()
1188 .where_(Product::price(), |&p| p < 50.0)
1189 .or(Product::category(), |c| c == "Furniture")
1190 .collect_parallel();
1191
1192 // Should find: Mouse (price < 50) and Desk Chair (Furniture)
1193 assert_eq!(results.len(), 2);
1194 assert!(results.iter().any(|p| p.name == "Mouse"));
1195 assert!(results.iter().any(|p| p.name == "Desk Chair"));
1196 }
1197
1198 #[test]
1199 fn test_parallel_complex_and_or() {
1200 let products = create_test_products();
1201
1202 // (price < 100 AND stock > 10) OR (category == "Furniture")
1203 let results: Vec<_> = products
1204 .lazy_parallel_query()
1205 .where_(Product::price(), |&p| p < 100.0)
1206 .and(Product::stock(), |&s| s > 10)
1207 .or(Product::category(), |c| c == "Furniture")
1208 .collect_parallel();
1209
1210 // Should find: Mouse, Keyboard (from AND group), and Desk Chair (from OR group)
1211 assert_eq!(results.len(), 3);
1212 assert!(results.iter().any(|p| p.name == "Mouse"));
1213 assert!(results.iter().any(|p| p.name == "Keyboard"));
1214 assert!(results.iter().any(|p| p.name == "Desk Chair"));
1215 }
1216}
1217
1218/// Extension trait for easy access to parallel lazy queries.
1219///
1220/// This trait provides a convenient method to create parallel lazy queries
1221/// from collections that implement the required traits.
1222///
1223/// # Example
1224///
1225/// ```ignore
1226/// use rust_queries_core::LazyParallelQueryExt;
1227///
1228/// let products = vec![/* ... */];
1229/// let query = products.lazy_parallel_query()
1230/// .where_(Product::price(), |&p| p < 100.0)
1231/// .collect_parallel();
1232/// ```
1233#[cfg(feature = "parallel")]
1234pub trait LazyParallelQueryExt<T: 'static + Send + Sync> {
1235 /// Creates a new parallel lazy query from the collection.
1236 ///
1237 /// # Example
1238 ///
1239 /// ```ignore
1240 /// let query = products.lazy_parallel_query();
1241 /// ```
1242 fn lazy_parallel_query(&self) -> LazyParallelQuery<T>;
1243}
1244
1245#[cfg(feature = "parallel")]
1246impl<T: 'static + Send + Sync> LazyParallelQueryExt<T> for [T] {
1247 fn lazy_parallel_query(&self) -> LazyParallelQuery<T> {
1248 LazyParallelQuery::new(self)
1249 }
1250}
1251
1252#[cfg(feature = "parallel")]
1253impl<T: 'static + Send + Sync> LazyParallelQueryExt<T> for Vec<T> {
1254 fn lazy_parallel_query(&self) -> LazyParallelQuery<T> {
1255 LazyParallelQuery::new(self)
1256 }
1257}