rocketmq_admin_core/core/
concurrent.rs1use std::future::Future;
18
19use futures::stream::FuturesUnordered;
20use futures::StreamExt;
21
22use crate::core::RocketMQResult;
23
24pub async fn concurrent_query<F, T>(queries: impl Iterator<Item = F>) -> Vec<RocketMQResult<T>>
37where
38 F: Future<Output = RocketMQResult<T>>,
39{
40 let mut futures = queries.collect::<FuturesUnordered<_>>();
41 let mut results = Vec::new();
42
43 while let Some(result) = futures.next().await {
44 results.push(result);
45 }
46
47 results
48}
49
50pub async fn concurrent_query_limited<F, T>(
54 queries: impl Iterator<Item = F>,
55 max_concurrent: usize,
56) -> Vec<RocketMQResult<T>>
57where
58 F: Future<Output = RocketMQResult<T>>,
59{
60 let mut futures = FuturesUnordered::new();
61 let mut queries = queries.peekable();
62 let mut results = Vec::new();
63
64 for _ in 0..max_concurrent {
66 if let Some(query) = queries.next() {
67 futures.push(query);
68 } else {
69 break;
70 }
71 }
72
73 while let Some(result) = futures.next().await {
75 results.push(result);
76
77 if let Some(query) = queries.next() {
79 futures.push(query);
80 }
81 }
82
83 results
84}
85
86pub async fn batch_query<I, F, T>(items: Vec<I>, batch_size: usize, query_fn: impl Fn(I) -> F) -> Vec<RocketMQResult<T>>
90where
91 F: Future<Output = RocketMQResult<T>>,
92 I: Clone,
93{
94 let mut all_results = Vec::new();
95
96 for chunk in items.chunks(batch_size) {
97 let queries = chunk.iter().cloned().map(&query_fn);
98 let batch_results = concurrent_query(queries).await;
99 all_results.extend(batch_results);
100 }
101
102 all_results
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108
109 #[tokio::test]
110 async fn test_concurrent_query() {
111 let queries = (0..5).map(|i| async move { Ok(i) });
112 let results = concurrent_query(queries).await;
113
114 assert_eq!(results.len(), 5);
115 assert!(results.iter().all(|r| r.is_ok()));
116 }
117
118 #[tokio::test]
119 async fn test_concurrent_query_limited() {
120 let queries = (0..10).map(|i| async move { Ok(i) });
121 let results = concurrent_query_limited(queries, 3).await;
122
123 assert_eq!(results.len(), 10);
124 assert!(results.iter().all(|r| r.is_ok()));
125 }
126
127 #[tokio::test]
128 async fn test_batch_query() {
129 let items: Vec<i32> = (0..10).collect();
130 let results = batch_query(items, 3, |i| async move { Ok(i * 2) }).await;
131
132 assert_eq!(results.len(), 10);
133 assert!(results.iter().all(|r| r.is_ok()));
134 }
135}