rust_lodash/utils/
async_support.rs

1/*!
2Async support utilities for Lodash-RS.
3
4This module provides async/await support for all collection operations,
5enabling non-blocking operations on large datasets.
6*/
7
8#[cfg(feature = "async")]
9use futures::future::{join_all, Future};
10#[cfg(feature = "async")]
11use crate::utils::{LodashError, Result};
12
13#[cfg(feature = "async")]
14/// Trait for async predicate functions.
15pub trait AsyncPredicate<T> {
16    /// Apply the async predicate to the given value.
17    type Output: Future<Output = bool>;
18    
19    /// Apply the predicate to the given value.
20    fn apply(&self, value: &T) -> Self::Output;
21}
22
23#[cfg(feature = "async")]
24impl<T, F, Fut> AsyncPredicate<T> for F
25where
26    F: Fn(&T) -> Fut,
27    Fut: Future<Output = bool>,
28{
29    type Output = Fut;
30    
31    fn apply(&self, value: &T) -> Self::Output {
32        self(value)
33    }
34}
35
36#[cfg(feature = "async")]
37/// Trait for async mapper functions.
38pub trait AsyncMapper<T, U> {
39    /// Apply the async mapper to the given value.
40    type Output: Future<Output = U>;
41    
42    /// Apply the mapper to the given value.
43    fn apply(&self, value: &T) -> Self::Output;
44}
45
46#[cfg(feature = "async")]
47impl<T, U, F, Fut> AsyncMapper<T, U> for F
48where
49    F: Fn(&T) -> Fut,
50    Fut: Future<Output = U>,
51{
52    type Output = Fut;
53    
54    fn apply(&self, value: &T) -> Self::Output {
55        self(value)
56    }
57}
58
59#[cfg(feature = "async")]
60/// Trait for async reducer functions.
61pub trait AsyncReducer<T, U> {
62    /// Apply the async reducer to the accumulator and value.
63    type Output: Future<Output = U>;
64    
65    /// Apply the reducer to the accumulator and value.
66    fn apply(&self, acc: U, value: &T) -> Self::Output;
67}
68
69#[cfg(feature = "async")]
70impl<T, U, F, Fut> AsyncReducer<T, U> for F
71where
72    F: Fn(U, &T) -> Fut,
73    Fut: Future<Output = U>,
74{
75    type Output = Fut;
76    
77    fn apply(&self, acc: U, value: &T) -> Self::Output {
78        self(acc, value)
79    }
80}
81
82#[cfg(feature = "async")]
83/// Utility function to execute async operations in parallel.
84pub async fn execute_parallel<T, F, Fut, R>(
85    items: &[T],
86    operation: F,
87) -> Result<Vec<R>>
88where
89    F: Fn(&T) -> Fut,
90    Fut: Future<Output = R>,
91{
92    let futures = items.iter().map(|item| operation(item));
93    let results = join_all(futures).await;
94    Ok(results)
95}
96
97#[cfg(feature = "async")]
98/// Utility function to execute async operations sequentially.
99pub async fn execute_sequential<T, F, Fut, R>(
100    items: &[T],
101    operation: F,
102) -> Result<Vec<R>>
103where
104    F: Fn(&T) -> Fut,
105    Fut: Future<Output = R>,
106{
107    let mut results = Vec::with_capacity(items.len());
108    
109    for item in items {
110        let result = operation(item).await;
111        results.push(result);
112    }
113    
114    Ok(results)
115}
116
117#[cfg(feature = "async")]
118/// Utility function to execute async operations with concurrency limit.
119pub async fn execute_with_concurrency<T, F, Fut, R>(
120    items: &[T],
121    operation: F,
122    concurrency: usize,
123) -> Result<Vec<R>>
124where
125    F: Fn(&T) -> Fut,
126    Fut: Future<Output = R>,
127{
128    if concurrency == 0 {
129        return Err(LodashError::invalid_input("Concurrency must be greater than 0"));
130    }
131    
132    let mut results = Vec::with_capacity(items.len());
133    let mut chunks = items.chunks(concurrency);
134    
135    while let Some(chunk) = chunks.next() {
136        let futures = chunk.iter().map(|item| operation(item));
137        let chunk_results = join_all(futures).await;
138        results.extend(chunk_results);
139    }
140    
141    Ok(results)
142}
143
144#[cfg(test)]
145#[cfg(feature = "async")]
146mod tests {
147    use super::*;
148    use tokio_test;
149
150    #[tokio::test]
151    async fn test_async_predicate() {
152        let pred = |x: &i32| async move { *x > 5 };
153        assert!(pred.apply(&6).await);
154        assert!(!pred.apply(&4).await);
155    }
156
157    #[tokio::test]
158    async fn test_async_mapper() {
159        let mapper = |x: &i32| async move { x * 2 };
160        assert_eq!(mapper.apply(&3).await, 6);
161    }
162
163    #[tokio::test]
164    async fn test_async_reducer() {
165        let reducer = |acc: i32, x: &i32| async move { acc + x };
166        assert_eq!(reducer.apply(5, &3).await, 8);
167    }
168
169    #[tokio::test]
170    async fn test_execute_parallel() {
171        let items = vec![1, 2, 3, 4, 5];
172        let operation = |x: &i32| async move { x * 2 };
173        
174        let results = execute_parallel(&items, operation).await.unwrap();
175        assert_eq!(results, vec![2, 4, 6, 8, 10]);
176    }
177
178    #[tokio::test]
179    async fn test_execute_sequential() {
180        let items = vec![1, 2, 3, 4, 5];
181        let operation = |x: &i32| async move { x * 2 };
182        
183        let results = execute_sequential(&items, operation).await.unwrap();
184        assert_eq!(results, vec![2, 4, 6, 8, 10]);
185    }
186
187    #[tokio::test]
188    async fn test_execute_with_concurrency() {
189        let items = vec![1, 2, 3, 4, 5, 6, 7, 8];
190        let operation = |x: &i32| async move { x * 2 };
191        
192        let results = execute_with_concurrency(&items, operation, 3).await.unwrap();
193        assert_eq!(results, vec![2, 4, 6, 8, 10, 12, 14, 16]);
194    }
195
196    #[tokio::test]
197    async fn test_execute_with_concurrency_zero() {
198        let items = vec![1, 2, 3];
199        let operation = |x: &i32| async move { x * 2 };
200        
201        let result = execute_with_concurrency(&items, operation, 0).await;
202        assert!(result.is_err());
203    }
204}