rust_queries_core/
lazy_parallel.rs

1//! Parallel lazy query implementation using rayon for parallel processing.
2//!
3//! This module provides parallel lazy evaluation of queries, deferring execution
4//! until results are actually consumed, but using multiple threads for better performance.
5//!
6//! # Benefits
7//!
8//! - **Deferred execution**: No work until results needed
9//! - **Parallel processing**: Utilizes multiple CPU cores
10//! - **Iterator fusion**: Rust optimizes chained operations
11//! - **Early termination**: `.take()` stops as soon as enough items found
12//! - **Composable**: Build complex queries by composition
13//! - **Thread-safe**: All operations are Send + Sync
14//!
15//! # Example
16//!
17//! ```ignore
18//! // Nothing executes yet
19//! let query = LazyParallelQuery::new(&products)
20//!     .where_(Product::price(), |&p| p < 100.0)
21//!     .where_(Product::stock(), |&s| s > 0);
22//!
23//! // Parallel execution happens here
24//! let results: Vec<_> = query.collect_parallel();
25//! ```
26
27#[cfg(feature = "parallel")]
28use {
29    rayon::prelude::*,
30    key_paths_core::KeyPaths,
31    std::marker::PhantomData,
32    std::time::SystemTime,
33};
34
35// DateTime is used in doc comments and conditional compilation
36#[cfg(feature = "datetime")]
37#[allow(unused_imports)]
38use chrono::DateTime;
39
40/// A parallel lazy query builder that uses rayon for parallel processing.
41///
42/// Unlike the standard `LazyQuery`, `LazyParallelQuery` uses parallel iterators
43/// for better performance on large datasets while maintaining lazy evaluation.
44///
45/// # Type Parameters
46///
47/// * `'a` - The lifetime of the data being queried
48/// * `T` - The type of items in the collection
49///
50/// # Example
51///
52/// ```ignore
53/// let query = LazyParallelQuery::new(&products)
54///     .where_(Product::price(), |&p| p < 100.0)
55///     .and(Product::stock(), |&s| s > 0)
56///     .or(Product::category(), |c| c == "Premium")
57///     .collect_parallel();
58/// ```
59#[cfg(feature = "parallel")]
60pub struct LazyParallelQuery<'a, T: 'static + Send + Sync> {
61    data: &'a [T],
62    filter_groups: Vec<FilterGroup<'a, T>>,
63    _phantom: PhantomData<&'a T>,
64}
65
66/// Represents a group of filters with a logical operator for parallel queries.
67/// All filters must be Send + Sync for thread safety.
68#[cfg(feature = "parallel")]
69enum FilterGroup<'a, T: 'static + Send + Sync> {
70    And(Vec<Box<dyn Fn(&T) -> bool + Send + Sync + 'a>>),
71    Or(Vec<Box<dyn Fn(&T) -> bool + Send + Sync + 'a>>),
72}
73
74#[cfg(feature = "parallel")]
75impl<'a, T: 'static + Send + Sync> FilterGroup<'a, T> {
76    fn evaluate(&self, item: &T) -> bool {
77        match self {
78            FilterGroup::And(filters) => filters.iter().all(|f| f(item)),
79            FilterGroup::Or(filters) => filters.iter().any(|f| f(item)),
80        }
81    }
82}
83
84#[cfg(feature = "parallel")]
85impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
86    /// Creates a new parallel lazy query from a slice.
87    ///
88    /// # Example
89    ///
90    /// ```ignore
91    /// let query = LazyParallelQuery::new(&products);
92    /// ```
93    pub fn new(data: &'a [T]) -> Self {
94        Self {
95            data,
96            filter_groups: Vec::new(),
97            _phantom: PhantomData,
98        }
99    }
100
101    /// Adds a filter predicate (lazy - not executed yet).
102    /// Multiple `where_` calls are implicitly ANDed together, unless the last group is an OR group,
103    /// in which case `where_` adds to the OR group.
104    ///
105    /// # Example
106    ///
107    /// ```ignore
108    /// let query = LazyParallelQuery::new(&products)
109    ///     .where_(Product::price(), |&p| p < 100.0);
110    /// ```
111    pub fn where_<F>(mut self, path: KeyPaths<T, F>, predicate: impl Fn(&F) -> bool + Send + Sync + 'static) -> Self
112    where
113        F: 'static + Send + Sync,
114    {
115        let filter = Box::new(move |item: &T| {
116            path.get(item).map_or(false, |val| predicate(val))
117        });
118
119        // If the last group is an OR group, add to it; otherwise create/add to AND group
120        match self.filter_groups.last_mut() {
121            Some(FilterGroup::Or(filters)) => {
122                // Add to existing OR group
123                filters.push(filter);
124            }
125            Some(FilterGroup::And(filters)) => {
126                // Add to existing AND group
127                filters.push(filter);
128            }
129            None => {
130                // Create new AND group
131                self.filter_groups.push(FilterGroup::And(vec![filter]));
132            }
133        }
134        
135        self
136    }
137
138    /// Adds a filter with AND logic (explicit AND operator).
139    /// This is equivalent to `where_` but makes the AND relationship explicit.
140    ///
141    /// # Example
142    ///
143    /// ```ignore
144    /// let query = LazyParallelQuery::new(&products)
145    ///     .where_(Product::price(), |&p| p < 100.0)
146    ///     .and(Product::stock(), |&s| s > 0);
147    /// ```
148    pub fn and<F>(mut self, path: KeyPaths<T, F>, predicate: impl Fn(&F) -> bool + Send + Sync + 'static) -> Self
149    where
150        F: 'static + Send + Sync,
151    {
152        let filter = Box::new(move |item: &T| {
153            path.get(item).map_or(false, |val| predicate(val))
154        });
155        
156        match self.filter_groups.last_mut() {
157            Some(FilterGroup::And(filters)) => {
158                filters.push(filter);
159            }
160            _ => {
161                // If no previous group or previous was OR, start a new AND group
162                self.filter_groups.push(FilterGroup::And(vec![filter]));
163            }
164        }
165        
166        self
167    }
168
169    /// Adds a filter with OR logic (explicit OR operator).
170    /// Items matching this filter OR any filters in the current OR group will pass.
171    ///
172    /// # Example
173    ///
174    /// ```ignore
175    /// let query = LazyParallelQuery::new(&products)
176    ///     .where_(Product::price(), |&p| p < 100.0)
177    ///     .or(Product::category(), |c| c == "Premium");
178    /// ```
179    pub fn or<F>(mut self, path: KeyPaths<T, F>, predicate: impl Fn(&F) -> bool + Send + Sync + 'static) -> Self
180    where
181        F: 'static + Send + Sync,
182    {
183        let filter = Box::new(move |item: &T| {
184            path.get(item).map_or(false, |val| predicate(val))
185        });
186        
187        match self.filter_groups.last_mut() {
188            Some(FilterGroup::Or(filters)) => {
189                filters.push(filter);
190            }
191            _ => {
192                // If no previous group or previous was AND, start a new OR group
193                self.filter_groups.push(FilterGroup::Or(vec![filter]));
194            }
195        }
196        
197        self
198    }
199
200    /// Evaluates all filter groups against an item.
201    /// Filter groups are evaluated as follows:
202    /// - AND groups: all filters in the group must pass
203    /// - OR groups: at least one filter in the group must pass
204    /// - Between groups: if there are both AND and OR groups, items must satisfy
205    ///   either all AND groups OR at least one OR group (if OR groups exist)
206    fn evaluate_filters(&self, item: &T) -> bool {
207        let (and_groups, or_groups): (Vec<_>, Vec<_>) = self.filter_groups
208            .iter()
209            .partition(|group| matches!(group, FilterGroup::And(_)));
210        
211        match (and_groups.is_empty(), or_groups.is_empty()) {
212            // Only AND groups: all must pass
213            (false, true) => and_groups.iter().all(|group| group.evaluate(item)),
214            // Only OR groups: at least one must pass
215            (true, false) => or_groups.iter().any(|group| group.evaluate(item)),
216            // Both AND and OR groups: (all AND pass) OR (any OR pass)
217            (false, false) => {
218                let all_and_pass = and_groups.iter().all(|group| group.evaluate(item));
219                let any_or_pass = or_groups.iter().any(|group| group.evaluate(item));
220                all_and_pass || any_or_pass
221            }
222            // No filters: everything passes
223            (true, true) => true,
224        }
225    }
226
227    /// Collects all items into a vector (terminal operation - executes query in parallel).
228    ///
229    /// # Example
230    ///
231    /// ```ignore
232    /// let results: Vec<&Product> = query.collect_parallel();
233    /// ```
234    pub fn collect_parallel(&self) -> Vec<&'a T> {
235        self.data
236            .par_iter()
237            .filter(|item| self.evaluate_filters(item))
238            .collect()
239    }
240
241    /// Gets the first item (terminal operation - executes until first match in parallel).
242    ///
243    /// # Example
244    ///
245    /// ```ignore
246    /// let first = query.first_parallel();
247    /// ```
248    pub fn first_parallel(&self) -> Option<&'a T> {
249        self.data
250            .par_iter()
251            .find_any(|item| self.evaluate_filters(item))
252    }
253
254    /// Counts items (terminal operation - executes query in parallel).
255    ///
256    /// # Example
257    ///
258    /// ```ignore
259    /// let count = query.count_parallel();
260    /// ```
261    pub fn count_parallel(&self) -> usize {
262        self.data
263            .par_iter()
264            .filter(|item| self.evaluate_filters(item))
265            .count()
266    }
267
268    /// Checks if any items match (terminal operation - short-circuits in parallel).
269    ///
270    /// # Example
271    ///
272    /// ```ignore
273    /// let exists = query.any_parallel();
274    /// ```
275    pub fn any_parallel(&self) -> bool {
276        self.data
277            .par_iter()
278            .any(|item| self.evaluate_filters(item))
279    }
280
281    /// Checks if all items match a predicate (terminal operation - short-circuits in parallel).
282    ///
283    /// # Example
284    ///
285    /// ```ignore
286    /// let all_positive = query.all_match_parallel(|item| item.value > 0);
287    /// ```
288    pub fn all_match_parallel<P>(&self, predicate: P) -> bool
289    where
290        P: Fn(&'a T) -> bool + Send + Sync,
291    {
292        self.data
293            .par_iter()
294            .filter(|item| self.evaluate_filters(item))
295            .all(predicate)
296    }
297
298    /// Executes a function for each item (terminal operation in parallel).
299    ///
300    /// # Example
301    ///
302    /// ```ignore
303    /// query.for_each_parallel(|item| println!("{:?}", item));
304    /// ```
305    pub fn for_each_parallel<F>(&self, f: F)
306    where
307        F: Fn(&'a T) + Send + Sync,
308    {
309        self.data
310            .par_iter()
311            .filter(|item| self.evaluate_filters(item))
312            .for_each(f)
313    }
314
315    /// Folds the iterator (terminal operation in parallel).
316    ///
317    /// Note: This is a simplified implementation that collects all items first.
318    /// For true parallel folding with custom reduction, consider using the sequential version
319    /// or implementing a custom parallel reduction strategy.
320    ///
321    /// # Example
322    ///
323    /// ```ignore
324    /// let sum = query.fold_parallel(0.0, |acc, item| acc + item.price);
325    /// ```
326    pub fn fold_parallel<B, F>(&self, init: B, f: F) -> B
327    where
328        B: Send + Sync,
329        F: Fn(B, &'a T) -> B + Send + Sync,
330    {
331        let items: Vec<&'a T> = self.data
332            .par_iter()
333            .filter(|item| self.evaluate_filters(item))
334            .collect();
335        
336        items.into_iter().fold(init, f)
337    }
338
339    /// Finds an item matching a predicate (terminal - short-circuits in parallel).
340    ///
341    /// # Example
342    ///
343    /// ```ignore
344    /// let found = query.find_parallel(|item| item.id == 42);
345    /// ```
346    pub fn find_parallel<P>(&self, predicate: P) -> Option<&'a T>
347    where
348        P: Fn(&&'a T) -> bool + Send + Sync,
349    {
350        self.data
351            .par_iter()
352            .filter(|item| self.evaluate_filters(item))
353            .find_any(predicate)
354    }
355
356    /// Projects/selects a single field from results (parallel).
357    ///
358    /// # Arguments
359    ///
360    /// * `path` - The key-path to the field to select
361    ///
362    /// # Example
363    ///
364    /// ```ignore
365    /// let names = query.select_parallel(Product::name());
366    /// ```
367    pub fn select_parallel<F>(&self, path: KeyPaths<T, F>) -> Vec<F>
368    where
369        F: Clone + Send + Sync + 'static,
370    {
371        self.data
372            .par_iter()
373            .filter(|item| self.evaluate_filters(item))
374            .filter_map(|item| path.get(item).cloned())
375            .collect()
376    }
377
378    /// Maps each item through a transformation (parallel).
379    ///
380    /// # Example
381    ///
382    /// ```ignore
383    /// let prices = query.map_items_parallel(|p| p.price);
384    /// ```
385    pub fn map_items_parallel<F, O>(&self, f: F) -> Vec<O>
386    where
387        F: Fn(&'a T) -> O + Send + Sync,
388        O: Send,
389    {
390        self.data
391            .par_iter()
392            .filter(|item| self.evaluate_filters(item))
393            .map(f)
394            .collect()
395    }
396
397    /// Takes at most `n` items (parallel).
398    ///
399    /// Note: This collects all filtered items and then takes the first n.
400    /// For true lazy evaluation with take, use the sequential LazyQuery.
401    ///
402    /// # Example
403    ///
404    /// ```ignore
405    /// let first_10: Vec<_> = query.take_parallel(10);
406    /// ```
407    pub fn take_parallel(&self, n: usize) -> Vec<&'a T> {
408        let mut results: Vec<&'a T> = self.data
409            .par_iter()
410            .filter(|item| self.evaluate_filters(item))
411            .collect();
412        results.truncate(n);
413        results
414    }
415
416    /// Skips `n` items (parallel).
417    ///
418    /// Note: This collects all filtered items and then skips the first n.
419    /// For true lazy evaluation with skip, use the sequential LazyQuery.
420    ///
421    /// # Example
422    ///
423    /// ```ignore
424    /// let page_2: Vec<_> = query.skip_parallel(10);
425    /// ```
426    pub fn skip_parallel(&self, n: usize) -> Vec<&'a T> {
427        let results: Vec<&'a T> = self.data
428            .par_iter()
429            .filter(|item| self.evaluate_filters(item))
430            .collect();
431        results.into_iter().skip(n).collect()
432    }
433}
434
435// Aggregation operations (parallel)
436#[cfg(feature = "parallel")]
437impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
438    /// Computes sum of a field (terminal operation in parallel).
439    ///
440    /// # Example
441    ///
442    /// ```ignore
443    /// let total: f64 = LazyParallelQuery::new(&products)
444    ///     .sum_by_parallel(Product::price());
445    /// ```
446    pub fn sum_by_parallel<F>(&self, path: KeyPaths<T, F>) -> F
447    where
448        F: Clone + std::ops::Add<Output = F> + Default + Send + Sync + std::iter::Sum + 'static,
449    {
450        self.data
451            .par_iter()
452            .filter(|item| self.evaluate_filters(item))
453            .filter_map(|item| path.get(item).cloned())
454            .sum()
455    }
456
457    /// Computes average of a float field (terminal operation in parallel).
458    ///
459    /// # Example
460    ///
461    /// ```ignore
462    /// let avg = LazyParallelQuery::new(&products)
463    ///     .avg_by_parallel(Product::price());
464    /// ```
465    pub fn avg_by_parallel(&self, path: KeyPaths<T, f64>) -> Option<f64>
466    where
467        T: Send + Sync,
468    {
469        let items: Vec<f64> = self
470            .data
471            .par_iter()
472            .filter(|item| self.evaluate_filters(item))
473            .filter_map(|item| path.get(item).cloned())
474            .collect();
475
476        if items.is_empty() {
477            None
478        } else {
479            Some(items.par_iter().sum::<f64>() / items.len() as f64)
480        }
481    }
482
483    /// Finds minimum value of a field (terminal operation in parallel).
484    ///
485    /// # Example
486    ///
487    /// ```ignore
488    /// let min = LazyParallelQuery::new(&products)
489    ///     .min_by_parallel(Product::price());
490    /// ```
491    pub fn min_by_parallel<F>(&self, path: KeyPaths<T, F>) -> Option<F>
492    where
493        F: Ord + Clone + Send + Sync + 'static,
494    {
495        self.data
496            .par_iter()
497            .filter(|item| self.evaluate_filters(item))
498            .filter_map(|item| path.get(item).cloned())
499            .min()
500    }
501
502    /// Finds maximum value of a field (terminal operation in parallel).
503    ///
504    /// # Example
505    ///
506    /// ```ignore
507    /// let max = LazyParallelQuery::new(&products)
508    ///     .max_by_parallel(Product::price());
509    /// ```
510    pub fn max_by_parallel<F>(&self, path: KeyPaths<T, F>) -> Option<F>
511    where
512        F: Ord + Clone + Send + Sync + 'static,
513    {
514        self.data
515            .par_iter()
516            .filter(|item| self.evaluate_filters(item))
517            .filter_map(|item| path.get(item).cloned())
518            .max()
519    }
520
521    /// Finds minimum float value (terminal operation in parallel).
522    pub fn min_by_float_parallel(&self, path: KeyPaths<T, f64>) -> Option<f64>
523    where
524        T: Send + Sync,
525    {
526        self.data
527            .par_iter()
528            .filter(|item| self.evaluate_filters(item))
529            .filter_map(|item| path.get(item).cloned())
530            .min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
531    }
532
533    /// Finds maximum float value (terminal operation in parallel).
534    pub fn max_by_float_parallel(&self, path: KeyPaths<T, f64>) -> Option<f64>
535    where
536        T: Send + Sync,
537    {
538        self.data
539            .par_iter()
540            .filter(|item| self.evaluate_filters(item))
541            .filter_map(|item| path.get(item).cloned())
542            .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
543    }
544}
545
546// DateTime operations for SystemTime (parallel)
547#[cfg(feature = "parallel")]
548impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
549    /// Filter by SystemTime being after a reference time (parallel).
550    ///
551    /// # Arguments
552    ///
553    /// * `path` - The key-path to the SystemTime field
554    /// * `reference` - The reference time to compare against
555    ///
556    /// # Example
557    ///
558    /// ```ignore
559    /// let recent = LazyParallelQuery::new(&events)
560    ///     .where_after_systemtime_parallel(Event::timestamp(), cutoff_time);
561    /// ```
562    pub fn where_after_systemtime_parallel(self, path: KeyPaths<T, SystemTime>, reference: SystemTime) -> Self {
563        self.where_(path, move |time| time > &reference)
564    }
565
566    /// Filter by SystemTime being before a reference time (parallel).
567    ///
568    /// # Arguments
569    ///
570    /// * `path` - The key-path to the SystemTime field
571    /// * `reference` - The reference time to compare against
572    ///
573    /// # Example
574    ///
575    /// ```ignore
576    /// let old = LazyParallelQuery::new(&events)
577    ///     .where_before_systemtime_parallel(Event::timestamp(), cutoff_time);
578    /// ```
579    pub fn where_before_systemtime_parallel(self, path: KeyPaths<T, SystemTime>, reference: SystemTime) -> Self {
580        self.where_(path, move |time| time < &reference)
581    }
582
583    /// Filter by SystemTime being between two times (inclusive, parallel).
584    ///
585    /// # Arguments
586    ///
587    /// * `path` - The key-path to the SystemTime field
588    /// * `start` - The start time
589    /// * `end` - The end time
590    ///
591    /// # Example
592    ///
593    /// ```ignore
594    /// let range = LazyParallelQuery::new(&events)
595    ///     .where_between_systemtime_parallel(Event::timestamp(), start, end);
596    /// ```
597    pub fn where_between_systemtime_parallel(
598        self,
599        path: KeyPaths<T, SystemTime>,
600        start: SystemTime,
601        end: SystemTime,
602    ) -> Self {
603        self.where_(path, move |time| time >= &start && time <= &end)
604    }
605}
606
607// DateTime operations with chrono (only available with datetime feature, parallel)
608#[cfg(all(feature = "parallel", feature = "datetime"))]
609impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
610    /// Filter by DateTime being after a reference time (parallel).
611    ///
612    /// # Arguments
613    ///
614    /// * `path` - The key-path to the DateTime field
615    /// * `reference` - The reference time to compare against
616    ///
617    /// # Example
618    ///
619    /// ```ignore
620    /// let recent = LazyParallelQuery::new(&events)
621    ///     .where_after_parallel(Event::timestamp(), cutoff_time);
622    /// ```
623    pub fn where_after_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, reference: DateTime<Tz>) -> Self
624    where
625        Tz: TimeZone + 'static,
626        Tz::Offset: std::fmt::Display + Send + Sync,
627    {
628        self.where_(path, move |time| time > &reference)
629    }
630
631    /// Filter by DateTime being before a reference time (parallel).
632    ///
633    /// # Arguments
634    ///
635    /// * `path` - The key-path to the DateTime field
636    /// * `reference` - The reference time to compare against
637    ///
638    /// # Example
639    ///
640    /// ```ignore
641    /// let old = LazyParallelQuery::new(&events)
642    ///     .where_before_parallel(Event::timestamp(), cutoff_time);
643    /// ```
644    pub fn where_before_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, reference: DateTime<Tz>) -> Self
645    where
646        Tz: TimeZone + 'static,
647        Tz::Offset: std::fmt::Display + Send + Sync,
648    {
649        self.where_(path, move |time| time < &reference)
650    }
651
652    /// Filter by DateTime being between two times (inclusive, parallel).
653    ///
654    /// # Arguments
655    ///
656    /// * `path` - The key-path to the DateTime field
657    /// * `start` - The start time
658    /// * `end` - The end time
659    ///
660    /// # Example
661    ///
662    /// ```ignore
663    /// let range = LazyParallelQuery::new(&events)
664    ///     .where_between_parallel(Event::timestamp(), start, end);
665    /// ```
666    pub fn where_between_parallel<Tz>(
667        self,
668        path: KeyPaths<T, DateTime<Tz>>,
669        start: DateTime<Tz>,
670        end: DateTime<Tz>,
671    ) -> Self
672    where
673        Tz: TimeZone + 'static,
674        Tz::Offset: std::fmt::Display + Send + Sync,
675    {
676        self.where_(path, move |time| time >= &start && time <= &end)
677    }
678
679    /// Filter by DateTime being today (parallel).
680    ///
681    /// # Arguments
682    ///
683    /// * `path` - The key-path to the DateTime field
684    /// * `now` - The current DateTime to compare against
685    ///
686    /// # Example
687    ///
688    /// ```ignore
689    /// let today = LazyParallelQuery::new(&events)
690    ///     .where_today_parallel(Event::timestamp(), Utc::now());
691    /// ```
692    pub fn where_today_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, now: DateTime<Tz>) -> Self
693    where
694        Tz: TimeZone + 'static,
695        Tz::Offset: std::fmt::Display + Send + Sync,
696    {
697        self.where_(path, move |time| {
698            time.date_naive() == now.date_naive()
699        })
700    }
701
702    /// Filter by DateTime year (parallel).
703    ///
704    /// # Arguments
705    ///
706    /// * `path` - The key-path to the DateTime field
707    /// * `year` - The year to filter by
708    ///
709    /// # Example
710    ///
711    /// ```ignore
712    /// let this_year = LazyParallelQuery::new(&events)
713    ///     .where_year_parallel(Event::timestamp(), 2024);
714    /// ```
715    pub fn where_year_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, year: i32) -> Self
716    where
717        Tz: TimeZone + 'static,
718        Tz::Offset: std::fmt::Display + Send + Sync,
719    {
720        use chrono::Datelike;
721        self.where_(path, move |time| time.year() == year)
722    }
723
724    /// Filter by DateTime month (parallel).
725    ///
726    /// # Arguments
727    ///
728    /// * `path` - The key-path to the DateTime field
729    /// * `month` - The month to filter by (1-12)
730    ///
731    /// # Example
732    ///
733    /// ```ignore
734    /// let december = LazyParallelQuery::new(&events)
735    ///     .where_month_parallel(Event::timestamp(), 12);
736    /// ```
737    pub fn where_month_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, month: u32) -> Self
738    where
739        Tz: TimeZone + 'static,
740        Tz::Offset: std::fmt::Display + Send + Sync,
741    {
742        use chrono::Datelike;
743        self.where_(path, move |time| time.month() == month)
744    }
745
746    /// Filter by DateTime day (parallel).
747    ///
748    /// # Arguments
749    ///
750    /// * `path` - The key-path to the DateTime field
751    /// * `day` - The day to filter by (1-31)
752    ///
753    /// # Example
754    ///
755    /// ```ignore
756    /// let first = LazyParallelQuery::new(&events)
757    ///     .where_day_parallel(Event::timestamp(), 1);
758    /// ```
759    pub fn where_day_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>, day: u32) -> Self
760    where
761        Tz: TimeZone + 'static,
762        Tz::Offset: std::fmt::Display + Send + Sync,
763    {
764        use chrono::Datelike;
765        self.where_(path, move |time| time.day() == day)
766    }
767
768    /// Filter by weekend dates (Saturday and Sunday, parallel).
769    ///
770    /// # Arguments
771    ///
772    /// * `path` - The key-path to the DateTime field
773    ///
774    /// # Example
775    ///
776    /// ```ignore
777    /// let weekend_events = LazyParallelQuery::new(&events)
778    ///     .where_weekend_parallel(Event::timestamp());
779    /// ```
780    pub fn where_weekend_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>) -> Self
781    where
782        Tz: TimeZone + 'static,
783        Tz::Offset: std::fmt::Display + Send + Sync,
784    {
785        use chrono::Datelike;
786        self.where_(path, |time| {
787            let weekday = time.weekday().num_days_from_monday();
788            weekday >= 5
789        })
790    }
791
792    /// Filter by weekday dates (Monday through Friday, parallel).
793    ///
794    /// # Arguments
795    ///
796    /// * `path` - The key-path to the DateTime field
797    ///
798    /// # Example
799    ///
800    /// ```ignore
801    /// let weekday_events = LazyParallelQuery::new(&events)
802    ///     .where_weekday_parallel(Event::timestamp());
803    /// ```
804    pub fn where_weekday_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>) -> Self
805    where
806        Tz: TimeZone + 'static,
807        Tz::Offset: std::fmt::Display + Send + Sync,
808    {
809        use chrono::Datelike;
810        self.where_(path, |time| {
811            let weekday = time.weekday().num_days_from_monday();
812            weekday < 5
813        })
814    }
815
816    /// Filter by business hours (9 AM - 5 PM, parallel).
817    ///
818    /// # Arguments
819    ///
820    /// * `path` - The key-path to the DateTime field
821    ///
822    /// # Example
823    ///
824    /// ```ignore
825    /// let business_hours = LazyParallelQuery::new(&events)
826    ///     .where_business_hours_parallel(Event::timestamp());
827    /// ```
828    pub fn where_business_hours_parallel<Tz>(self, path: KeyPaths<T, DateTime<Tz>>) -> Self
829    where
830        Tz: TimeZone + 'static,
831        Tz::Offset: std::fmt::Display + Send + Sync,
832    {
833        use chrono::Timelike;
834        self.where_(path, |time| {
835            let hour = time.hour();
836            hour >= 9 && hour < 17
837        })
838    }
839}
840
841// i64 DateTime Aggregators (Unix timestamps in milliseconds, parallel)
842#[cfg(feature = "parallel")]
843impl<'a, T: 'static + Send + Sync> LazyParallelQuery<'a, T> {
844    /// Finds minimum i64 timestamp value (terminal operation in parallel).
845    ///
846    /// # Example
847    ///
848    /// ```ignore
849    /// let earliest = LazyParallelQuery::new(&events)
850    ///     .min_timestamp_parallel(Event::created_at_r());
851    /// ```
852    pub fn min_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> Option<i64>
853    where
854        T: Send + Sync,
855    {
856        self.data
857            .par_iter()
858            .filter(|item| self.evaluate_filters(item))
859            .filter_map(|item| path.get(item).cloned())
860            .min()
861    }
862
863    /// Finds maximum i64 timestamp value (terminal operation in parallel).
864    ///
865    /// # Example
866    ///
867    /// ```ignore
868    /// let latest = LazyParallelQuery::new(&events)
869    ///     .max_timestamp_parallel(Event::created_at_r());
870    /// ```
871    pub fn max_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> Option<i64>
872    where
873        T: Send + Sync,
874    {
875        self.data
876            .par_iter()
877            .filter(|item| self.evaluate_filters(item))
878            .filter_map(|item| path.get(item).cloned())
879            .max()
880    }
881
882    /// Computes average of i64 timestamp values (terminal operation in parallel).
883    ///
884    /// # Example
885    ///
886    /// ```ignore
887    /// let avg = LazyParallelQuery::new(&events)
888    ///     .avg_timestamp_parallel(Event::created_at_r());
889    /// ```
890    pub fn avg_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> Option<i64>
891    where
892        T: Send + Sync,
893    {
894        let items: Vec<i64> = self
895            .data
896            .par_iter()
897            .filter(|item| self.evaluate_filters(item))
898            .filter_map(|item| path.get(item).cloned())
899            .collect();
900
901        if items.is_empty() {
902            None
903        } else {
904            Some(items.par_iter().sum::<i64>() / items.len() as i64)
905        }
906    }
907
908    /// Computes sum of i64 timestamp values (terminal operation in parallel).
909    ///
910    /// # Example
911    ///
912    /// ```ignore
913    /// let total = LazyParallelQuery::new(&events)
914    ///     .sum_timestamp_parallel(Event::created_at_r());
915    /// ```
916    pub fn sum_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> i64
917    where
918        T: Send + Sync,
919    {
920        self.data
921            .par_iter()
922            .filter(|item| self.evaluate_filters(item))
923            .filter_map(|item| path.get(item).cloned())
924            .sum()
925    }
926
927    /// Counts i64 timestamp values (terminal operation in parallel).
928    ///
929    /// # Example
930    ///
931    /// ```ignore
932    /// let count = LazyParallelQuery::new(&events)
933    ///     .count_timestamp_parallel(Event::created_at_r());
934    /// ```
935    pub fn count_timestamp_parallel(&self, path: KeyPaths<T, i64>) -> usize
936    where
937        T: Send + Sync,
938    {
939        self.data
940            .par_iter()
941            .filter(|item| self.evaluate_filters(item))
942            .filter(|item| path.get(item).is_some())
943            .count()
944    }
945
946    /// Filter by i64 timestamp being after a reference time (parallel).
947    ///
948    /// # Arguments
949    ///
950    /// * `path` - The key-path to the i64 timestamp field
951    /// * `reference` - The reference timestamp to compare against
952    ///
953    /// # Example
954    ///
955    /// ```ignore
956    /// let recent = LazyParallelQuery::new(&events)
957    ///     .where_after_timestamp_parallel(Event::created_at_r(), cutoff_time);
958    /// ```
959    pub fn where_after_timestamp_parallel(self, path: KeyPaths<T, i64>, reference: i64) -> Self {
960        self.where_(path, move |timestamp| timestamp > &reference)
961    }
962
963    /// Filter by i64 timestamp being before a reference time (parallel).
964    ///
965    /// # Arguments
966    ///
967    /// * `path` - The key-path to the i64 timestamp field
968    /// * `reference` - The reference timestamp to compare against
969    ///
970    /// # Example
971    ///
972    /// ```ignore
973    /// let old = LazyParallelQuery::new(&events)
974    ///     .where_before_timestamp_parallel(Event::created_at_r(), cutoff_time);
975    /// ```
976    pub fn where_before_timestamp_parallel(self, path: KeyPaths<T, i64>, reference: i64) -> Self {
977        self.where_(path, move |timestamp| timestamp < &reference)
978    }
979
980    /// Filter by i64 timestamp being between two times (inclusive, parallel).
981    ///
982    /// # Arguments
983    ///
984    /// * `path` - The key-path to the i64 timestamp field
985    /// * `start` - The start timestamp
986    /// * `end` - The end timestamp
987    ///
988    /// # Example
989    ///
990    /// ```ignore
991    /// let range = LazyParallelQuery::new(&events)
992    ///     .where_between_timestamp_parallel(Event::created_at_r(), start, end);
993    /// ```
994    pub fn where_between_timestamp_parallel(
995        self,
996        path: KeyPaths<T, i64>,
997        start: i64,
998        end: i64,
999    ) -> Self {
1000        self.where_(path, move |timestamp| timestamp >= &start && timestamp <= &end)
1001    }
1002
1003    /// Filter by i64 timestamp being within the last N days (parallel).
1004    ///
1005    /// # Arguments
1006    ///
1007    /// * `path` - The key-path to the i64 timestamp field
1008    /// * `days` - Number of days to look back
1009    ///
1010    /// # Example
1011    ///
1012    /// ```ignore
1013    /// let recent = LazyParallelQuery::new(&events)
1014    ///     .where_last_days_timestamp_parallel(Event::created_at_r(), 30);
1015    /// ```
1016    pub fn where_last_days_timestamp_parallel(self, path: KeyPaths<T, i64>, days: i64) -> Self {
1017        let now = chrono::Utc::now().timestamp_millis();
1018        let cutoff = now - (days * 24 * 60 * 60 * 1000); // Convert days to milliseconds
1019        self.where_after_timestamp_parallel(path, cutoff)
1020    }
1021
1022    /// Filter by i64 timestamp being within the next N days (parallel).
1023    ///
1024    /// # Arguments
1025    ///
1026    /// * `path` - The key-path to the i64 timestamp field
1027    /// * `days` - Number of days to look forward
1028    ///
1029    /// # Example
1030    ///
1031    /// ```ignore
1032    /// let upcoming = LazyParallelQuery::new(&events)
1033    ///     .where_next_days_timestamp_parallel(Event::scheduled_at_r(), 7);
1034    /// ```
1035    pub fn where_next_days_timestamp_parallel(self, path: KeyPaths<T, i64>, days: i64) -> Self {
1036        let now = chrono::Utc::now().timestamp_millis();
1037        let cutoff = now + (days * 24 * 60 * 60 * 1000); // Convert days to milliseconds
1038        self.where_before_timestamp_parallel(path, cutoff)
1039    }
1040
1041    /// Filter by i64 timestamp being within the last N hours (parallel).
1042    ///
1043    /// # Arguments
1044    ///
1045    /// * `path` - The key-path to the i64 timestamp field
1046    /// * `hours` - Number of hours to look back
1047    ///
1048    /// # Example
1049    ///
1050    /// ```ignore
1051    /// let recent = LazyParallelQuery::new(&events)
1052    ///     .where_last_hours_timestamp_parallel(Event::created_at_r(), 24);
1053    /// ```
1054    pub fn where_last_hours_timestamp_parallel(self, path: KeyPaths<T, i64>, hours: i64) -> Self {
1055        let now = chrono::Utc::now().timestamp_millis();
1056        let cutoff = now - (hours * 60 * 60 * 1000); // Convert hours to milliseconds
1057        self.where_after_timestamp_parallel(path, cutoff)
1058    }
1059
1060    /// Filter by i64 timestamp being within the next N hours (parallel).
1061    ///
1062    /// # Arguments
1063    ///
1064    /// * `path` - The key-path to the i64 timestamp field
1065    /// * `hours` - Number of hours to look forward
1066    ///
1067    /// # Example
1068    ///
1069    /// ```ignore
1070    /// let upcoming = LazyParallelQuery::new(&events)
1071    ///     .where_next_hours_timestamp_parallel(Event::scheduled_at_r(), 2);
1072    /// ```
1073    pub fn where_next_hours_timestamp_parallel(self, path: KeyPaths<T, i64>, hours: i64) -> Self {
1074        let now = chrono::Utc::now().timestamp_millis();
1075        let cutoff = now + (hours * 60 * 60 * 1000); // Convert hours to milliseconds
1076        self.where_before_timestamp_parallel(path, cutoff)
1077    }
1078
1079    /// Filter by i64 timestamp being within the last N minutes (parallel).
1080    ///
1081    /// # Arguments
1082    ///
1083    /// * `path` - The key-path to the i64 timestamp field
1084    /// * `minutes` - Number of minutes to look back
1085    ///
1086    /// # Example
1087    ///
1088    /// ```ignore
1089    /// let recent = LazyParallelQuery::new(&events)
1090    ///     .where_last_minutes_timestamp_parallel(Event::created_at_r(), 60);
1091    /// ```
1092    pub fn where_last_minutes_timestamp_parallel(self, path: KeyPaths<T, i64>, minutes: i64) -> Self {
1093        let now = chrono::Utc::now().timestamp_millis();
1094        let cutoff = now - (minutes * 60 * 1000); // Convert minutes to milliseconds
1095        self.where_after_timestamp_parallel(path, cutoff)
1096    }
1097
1098    /// Filter by i64 timestamp being within the next N minutes (parallel).
1099    ///
1100    /// # Arguments
1101    ///
1102    /// * `path` - The key-path to the i64 timestamp field
1103    /// * `minutes` - Number of minutes to look forward
1104    ///
1105    /// # Example
1106    ///
1107    /// ```ignore
1108    /// let upcoming = LazyParallelQuery::new(&events)
1109    ///     .where_next_minutes_timestamp_parallel(Event::scheduled_at_r(), 30);
1110    /// ```
1111    pub fn where_next_minutes_timestamp_parallel(self, path: KeyPaths<T, i64>, minutes: i64) -> Self {
1112        let now = chrono::Utc::now().timestamp_millis();
1113        let cutoff = now + (minutes * 60 * 1000); // Convert minutes to milliseconds
1114        self.where_before_timestamp_parallel(path, cutoff)
1115    }
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120    use super::*;
1121    use crate::ext::LazyParallelQueryExt;
1122    use key_paths_derive::Keypath;
1123
1124    #[derive(Debug, Clone, PartialEq, Keypath)]
1125    struct Product {
1126        id: u32,
1127        name: String,
1128        price: f64,
1129        category: String,
1130        stock: u32,
1131    }
1132
1133    fn create_test_products() -> Vec<Product> {
1134        vec![
1135            Product {
1136                id: 1,
1137                name: "Laptop".to_string(),
1138                price: 999.99,
1139                category: "Electronics".to_string(),
1140                stock: 5,
1141            },
1142            Product {
1143                id: 2,
1144                name: "Mouse".to_string(),
1145                price: 29.99,
1146                category: "Electronics".to_string(),
1147                stock: 50,
1148            },
1149            Product {
1150                id: 3,
1151                name: "Keyboard".to_string(),
1152                price: 79.99,
1153                category: "Electronics".to_string(),
1154                stock: 30,
1155            },
1156            Product {
1157                id: 4,
1158                name: "Desk Chair".to_string(),
1159                price: 199.99,
1160                category: "Furniture".to_string(),
1161                stock: 8,
1162            },
1163        ]
1164    }
1165
1166    #[test]
1167    fn test_parallel_and_operator() {
1168        let products = create_test_products();
1169        
1170        let results: Vec<_> = products
1171            .lazy_parallel_query()
1172            .where_(Product::price(), |&p| p < 100.0)
1173            .and(Product::stock(), |&s| s > 10)
1174            .collect_parallel();
1175        
1176        // Should find: Mouse and Keyboard
1177        assert_eq!(results.len(), 2);
1178        assert!(results.iter().any(|p| p.name == "Mouse"));
1179        assert!(results.iter().any(|p| p.name == "Keyboard"));
1180    }
1181
1182    #[test]
1183    fn test_parallel_or_operator() {
1184        let products = create_test_products();
1185        
1186        let results: Vec<_> = products
1187            .lazy_parallel_query()
1188            .where_(Product::price(), |&p| p < 50.0)
1189            .or(Product::category(), |c| c == "Furniture")
1190            .collect_parallel();
1191        
1192        // Should find: Mouse (price < 50) and Desk Chair (Furniture)
1193        assert_eq!(results.len(), 2);
1194        assert!(results.iter().any(|p| p.name == "Mouse"));
1195        assert!(results.iter().any(|p| p.name == "Desk Chair"));
1196    }
1197
1198    #[test]
1199    fn test_parallel_complex_and_or() {
1200        let products = create_test_products();
1201        
1202        // (price < 100 AND stock > 10) OR (category == "Furniture")
1203        let results: Vec<_> = products
1204            .lazy_parallel_query()
1205            .where_(Product::price(), |&p| p < 100.0)
1206            .and(Product::stock(), |&s| s > 10)
1207            .or(Product::category(), |c| c == "Furniture")
1208            .collect_parallel();
1209        
1210        // Should find: Mouse, Keyboard (from AND group), and Desk Chair (from OR group)
1211        assert_eq!(results.len(), 3);
1212        assert!(results.iter().any(|p| p.name == "Mouse"));
1213        assert!(results.iter().any(|p| p.name == "Keyboard"));
1214        assert!(results.iter().any(|p| p.name == "Desk Chair"));
1215    }
1216}
1217
1218/// Extension trait for easy access to parallel lazy queries.
1219///
1220/// This trait provides a convenient method to create parallel lazy queries
1221/// from collections that implement the required traits.
1222///
1223/// # Example
1224///
1225/// ```ignore
1226/// use rust_queries_core::LazyParallelQueryExt;
1227///
1228/// let products = vec![/* ... */];
1229/// let query = products.lazy_parallel_query()
1230///     .where_(Product::price(), |&p| p < 100.0)
1231///     .collect_parallel();
1232/// ```
1233#[cfg(feature = "parallel")]
1234pub trait LazyParallelQueryExt<T: 'static + Send + Sync> {
1235    /// Creates a new parallel lazy query from the collection.
1236    ///
1237    /// # Example
1238    ///
1239    /// ```ignore
1240    /// let query = products.lazy_parallel_query();
1241    /// ```
1242    fn lazy_parallel_query(&self) -> LazyParallelQuery<T>;
1243}
1244
1245#[cfg(feature = "parallel")]
1246impl<T: 'static + Send + Sync> LazyParallelQueryExt<T> for [T] {
1247    fn lazy_parallel_query(&self) -> LazyParallelQuery<T> {
1248        LazyParallelQuery::new(self)
1249    }
1250}
1251
1252#[cfg(feature = "parallel")]
1253impl<T: 'static + Send + Sync> LazyParallelQueryExt<T> for Vec<T> {
1254    fn lazy_parallel_query(&self) -> LazyParallelQuery<T> {
1255        LazyParallelQuery::new(self)
1256    }
1257}