1#[cfg(feature = "async")]
9use futures::future::{join_all, Future};
10#[cfg(feature = "async")]
11use crate::utils::{LodashError, Result};
12
13#[cfg(feature = "async")]
14pub trait AsyncPredicate<T> {
16 type Output: Future<Output = bool>;
18
19 fn apply(&self, value: &T) -> Self::Output;
21}
22
23#[cfg(feature = "async")]
24impl<T, F, Fut> AsyncPredicate<T> for F
25where
26 F: Fn(&T) -> Fut,
27 Fut: Future<Output = bool>,
28{
29 type Output = Fut;
30
31 fn apply(&self, value: &T) -> Self::Output {
32 self(value)
33 }
34}
35
36#[cfg(feature = "async")]
37pub trait AsyncMapper<T, U> {
39 type Output: Future<Output = U>;
41
42 fn apply(&self, value: &T) -> Self::Output;
44}
45
46#[cfg(feature = "async")]
47impl<T, U, F, Fut> AsyncMapper<T, U> for F
48where
49 F: Fn(&T) -> Fut,
50 Fut: Future<Output = U>,
51{
52 type Output = Fut;
53
54 fn apply(&self, value: &T) -> Self::Output {
55 self(value)
56 }
57}
58
59#[cfg(feature = "async")]
60pub trait AsyncReducer<T, U> {
62 type Output: Future<Output = U>;
64
65 fn apply(&self, acc: U, value: &T) -> Self::Output;
67}
68
69#[cfg(feature = "async")]
70impl<T, U, F, Fut> AsyncReducer<T, U> for F
71where
72 F: Fn(U, &T) -> Fut,
73 Fut: Future<Output = U>,
74{
75 type Output = Fut;
76
77 fn apply(&self, acc: U, value: &T) -> Self::Output {
78 self(acc, value)
79 }
80}
81
82#[cfg(feature = "async")]
83pub async fn execute_parallel<T, F, Fut, R>(
85 items: &[T],
86 operation: F,
87) -> Result<Vec<R>>
88where
89 F: Fn(&T) -> Fut,
90 Fut: Future<Output = R>,
91{
92 let futures = items.iter().map(|item| operation(item));
93 let results = join_all(futures).await;
94 Ok(results)
95}
96
97#[cfg(feature = "async")]
98pub async fn execute_sequential<T, F, Fut, R>(
100 items: &[T],
101 operation: F,
102) -> Result<Vec<R>>
103where
104 F: Fn(&T) -> Fut,
105 Fut: Future<Output = R>,
106{
107 let mut results = Vec::with_capacity(items.len());
108
109 for item in items {
110 let result = operation(item).await;
111 results.push(result);
112 }
113
114 Ok(results)
115}
116
117#[cfg(feature = "async")]
118pub async fn execute_with_concurrency<T, F, Fut, R>(
120 items: &[T],
121 operation: F,
122 concurrency: usize,
123) -> Result<Vec<R>>
124where
125 F: Fn(&T) -> Fut,
126 Fut: Future<Output = R>,
127{
128 if concurrency == 0 {
129 return Err(LodashError::invalid_input("Concurrency must be greater than 0"));
130 }
131
132 let mut results = Vec::with_capacity(items.len());
133 let mut chunks = items.chunks(concurrency);
134
135 while let Some(chunk) = chunks.next() {
136 let futures = chunk.iter().map(|item| operation(item));
137 let chunk_results = join_all(futures).await;
138 results.extend(chunk_results);
139 }
140
141 Ok(results)
142}
143
144#[cfg(test)]
145#[cfg(feature = "async")]
146mod tests {
147 use super::*;
148 use tokio_test;
149
150 #[tokio::test]
151 async fn test_async_predicate() {
152 let pred = |x: &i32| async move { *x > 5 };
153 assert!(pred.apply(&6).await);
154 assert!(!pred.apply(&4).await);
155 }
156
157 #[tokio::test]
158 async fn test_async_mapper() {
159 let mapper = |x: &i32| async move { x * 2 };
160 assert_eq!(mapper.apply(&3).await, 6);
161 }
162
163 #[tokio::test]
164 async fn test_async_reducer() {
165 let reducer = |acc: i32, x: &i32| async move { acc + x };
166 assert_eq!(reducer.apply(5, &3).await, 8);
167 }
168
169 #[tokio::test]
170 async fn test_execute_parallel() {
171 let items = vec![1, 2, 3, 4, 5];
172 let operation = |x: &i32| async move { x * 2 };
173
174 let results = execute_parallel(&items, operation).await.unwrap();
175 assert_eq!(results, vec![2, 4, 6, 8, 10]);
176 }
177
178 #[tokio::test]
179 async fn test_execute_sequential() {
180 let items = vec![1, 2, 3, 4, 5];
181 let operation = |x: &i32| async move { x * 2 };
182
183 let results = execute_sequential(&items, operation).await.unwrap();
184 assert_eq!(results, vec![2, 4, 6, 8, 10]);
185 }
186
187 #[tokio::test]
188 async fn test_execute_with_concurrency() {
189 let items = vec![1, 2, 3, 4, 5, 6, 7, 8];
190 let operation = |x: &i32| async move { x * 2 };
191
192 let results = execute_with_concurrency(&items, operation, 3).await.unwrap();
193 assert_eq!(results, vec![2, 4, 6, 8, 10, 12, 14, 16]);
194 }
195
196 #[tokio::test]
197 async fn test_execute_with_concurrency_zero() {
198 let items = vec![1, 2, 3];
199 let operation = |x: &i32| async move { x * 2 };
200
201 let result = execute_with_concurrency(&items, operation, 0).await;
202 assert!(result.is_err());
203 }
204}