Skip to main content

rocketmq_admin_core/core/
concurrent.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Concurrent query utilities for performance optimization
16
17use std::future::Future;
18
19use futures::stream::FuturesUnordered;
20use futures::StreamExt;
21
22use crate::core::RocketMQResult;
23
24/// Execute multiple async operations concurrently
25///
26/// # Example
27/// ```rust,ignore
28/// let results = concurrent_query(topics.iter().map(|topic| {
29///     let admin = admin.clone();
30///     let topic = topic.clone();
31///     async move {
32///         admin.examine_topic_route_info(topic).await
33///     }
34/// })).await;
35/// ```
36pub async fn concurrent_query<F, T>(queries: impl Iterator<Item = F>) -> Vec<RocketMQResult<T>>
37where
38    F: Future<Output = RocketMQResult<T>>,
39{
40    let mut futures = queries.collect::<FuturesUnordered<_>>();
41    let mut results = Vec::new();
42
43    while let Some(result) = futures.next().await {
44        results.push(result);
45    }
46
47    results
48}
49
50/// Execute queries concurrently with a maximum concurrency limit
51///
52/// This prevents overwhelming the server with too many simultaneous connections.
53pub async fn concurrent_query_limited<F, T>(
54    queries: impl Iterator<Item = F>,
55    max_concurrent: usize,
56) -> Vec<RocketMQResult<T>>
57where
58    F: Future<Output = RocketMQResult<T>>,
59{
60    let mut futures = FuturesUnordered::new();
61    let mut queries = queries.peekable();
62    let mut results = Vec::new();
63
64    // Fill initial batch
65    for _ in 0..max_concurrent {
66        if let Some(query) = queries.next() {
67            futures.push(query);
68        } else {
69            break;
70        }
71    }
72
73    // Process results and add new queries
74    while let Some(result) = futures.next().await {
75        results.push(result);
76
77        // Add next query if available
78        if let Some(query) = queries.next() {
79            futures.push(query);
80        }
81    }
82
83    results
84}
85
86/// Batch process items with concurrent queries
87///
88/// Splits items into batches and processes each batch concurrently.
89pub async fn batch_query<I, F, T>(items: Vec<I>, batch_size: usize, query_fn: impl Fn(I) -> F) -> Vec<RocketMQResult<T>>
90where
91    F: Future<Output = RocketMQResult<T>>,
92    I: Clone,
93{
94    let mut all_results = Vec::new();
95
96    for chunk in items.chunks(batch_size) {
97        let queries = chunk.iter().cloned().map(&query_fn);
98        let batch_results = concurrent_query(queries).await;
99        all_results.extend(batch_results);
100    }
101
102    all_results
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108
109    #[tokio::test]
110    async fn test_concurrent_query() {
111        let queries = (0..5).map(|i| async move { Ok(i) });
112        let results = concurrent_query(queries).await;
113
114        assert_eq!(results.len(), 5);
115        assert!(results.iter().all(|r| r.is_ok()));
116    }
117
118    #[tokio::test]
119    async fn test_concurrent_query_limited() {
120        let queries = (0..10).map(|i| async move { Ok(i) });
121        let results = concurrent_query_limited(queries, 3).await;
122
123        assert_eq!(results.len(), 10);
124        assert!(results.iter().all(|r| r.is_ok()));
125    }
126
127    #[tokio::test]
128    async fn test_batch_query() {
129        let items: Vec<i32> = (0..10).collect();
130        let results = batch_query(items, 3, |i| async move { Ok(i * 2) }).await;
131
132        assert_eq!(results.len(), 10);
133        assert!(results.iter().all(|r| r.is_ok()));
134    }
135}