agent_chain_core/utils/
iter.rs

1//! Utilities for working with iterators.
2//!
3//! Adapted from langchain_core/utils/iter.py
4
5use std::collections::VecDeque;
6use std::sync::{Arc, Mutex};
7
8/// A dummy lock that provides the proper interface but no protection.
9///
10/// This is used as a default lock when no synchronization is needed.
11pub struct NoLock;
12
13impl NoLock {
14    /// Create a new NoLock.
15    pub fn new() -> Self {
16        Self
17    }
18
19    /// Acquire the lock (no-op for NoLock).
20    pub fn lock(&self) -> NoLockGuard {
21        NoLockGuard
22    }
23}
24
25impl Default for NoLock {
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31/// A guard for NoLock that does nothing.
32pub struct NoLockGuard;
33
34/// Utility batching function for iterables.
35///
36/// # Arguments
37///
38/// * `size` - The size of each batch. If `None`, returns a single batch with all items.
39/// * `iterable` - The iterable to batch.
40///
41/// # Returns
42///
43/// An iterator over batches.
44///
45/// # Example
46///
47/// ```
48/// use agent_chain_core::utils::iter::batch_iterate;
49///
50/// let items = vec![1, 2, 3, 4, 5];
51/// let batches: Vec<Vec<i32>> = batch_iterate(Some(2), items).collect();
52/// assert_eq!(batches, vec![vec![1, 2], vec![3, 4], vec![5]]);
53/// ```
54pub fn batch_iterate<T, I>(size: Option<usize>, iterable: I) -> BatchIterator<T, I::IntoIter>
55where
56    I: IntoIterator<Item = T>,
57{
58    BatchIterator {
59        size,
60        iter: iterable.into_iter(),
61    }
62}
63
64/// An iterator that yields batches from an underlying iterator.
65pub struct BatchIterator<T, I>
66where
67    I: Iterator<Item = T>,
68{
69    size: Option<usize>,
70    iter: I,
71}
72
73impl<T, I> Iterator for BatchIterator<T, I>
74where
75    I: Iterator<Item = T>,
76{
77    type Item = Vec<T>;
78
79    fn next(&mut self) -> Option<Self::Item> {
80        match self.size {
81            Some(size) => {
82                let batch: Vec<T> = self.iter.by_ref().take(size).collect();
83                if batch.is_empty() { None } else { Some(batch) }
84            }
85            None => {
86                let batch: Vec<T> = self.iter.by_ref().collect();
87                if batch.is_empty() { None } else { Some(batch) }
88            }
89        }
90    }
91}
92
93/// Create `n` separate iterators over an iterable.
94///
95/// This splits a single iterable into multiple iterators, each providing
96/// the same items in the same order. All child iterators may advance separately
97/// but share the same items from the source -- when the most advanced iterator
98/// retrieves an item, it is buffered until the least advanced iterator has
99/// yielded it as well.
100///
101/// # Arguments
102///
103/// * `iterable` - The iterable to split.
104/// * `n` - The number of iterators to create.
105///
106/// # Returns
107///
108/// A `Tee` containing `n` child iterators.
109///
110/// # Example
111///
112/// ```
113/// use agent_chain_core::utils::iter::tee;
114///
115/// let items = vec![1, 2, 3];
116/// let t = tee(items, 2);
117/// // Now t contains 2 iterators that will each yield 1, 2, 3
118/// ```
119pub fn tee<T, I>(iterable: I, n: usize) -> Tee<T>
120where
121    T: Clone,
122    I: IntoIterator<Item = T>,
123    <I as IntoIterator>::IntoIter: Send + 'static,
124{
125    Tee::new(iterable, n)
126}
127
128/// A tee implementation that creates multiple iterators from a single source.
129pub struct Tee<T> {
130    source: Arc<Mutex<TeeSource<T>>>,
131    children: Vec<TeeChild<T>>,
132}
133
134struct TeeSource<T> {
135    iter: Box<dyn Iterator<Item = T> + Send>,
136    buffers: Vec<VecDeque<T>>,
137}
138
139impl<T> Tee<T>
140where
141    T: Clone,
142{
143    /// Create a new Tee with `n` child iterators.
144    pub fn new<I>(iterable: I, n: usize) -> Self
145    where
146        I: IntoIterator<Item = T>,
147        <I as IntoIterator>::IntoIter: Send + 'static,
148    {
149        let iter: Box<dyn Iterator<Item = T> + Send> = Box::new(iterable.into_iter());
150        let buffers: Vec<VecDeque<T>> = (0..n).map(|_| VecDeque::new()).collect();
151
152        let source = Arc::new(Mutex::new(TeeSource { iter, buffers }));
153
154        let children: Vec<TeeChild<T>> = (0..n)
155            .map(|index| TeeChild {
156                source: Arc::clone(&source),
157                index,
158            })
159            .collect();
160
161        Self { source, children }
162    }
163
164    /// Get the number of child iterators.
165    pub fn len(&self) -> usize {
166        self.children.len()
167    }
168
169    /// Check if the tee is empty.
170    pub fn is_empty(&self) -> bool {
171        self.children.is_empty()
172    }
173
174    /// Get a child iterator by index.
175    pub fn get(&self, index: usize) -> Option<TeeChild<T>> {
176        if index < self.children.len() {
177            Some(TeeChild {
178                source: Arc::clone(&self.source),
179                index,
180            })
181        } else {
182            None
183        }
184    }
185
186    /// Consume the tee and return all child iterators.
187    pub fn into_children(self) -> Vec<TeeChild<T>> {
188        self.children
189    }
190}
191
192/// A child iterator of a Tee.
193pub struct TeeChild<T> {
194    source: Arc<Mutex<TeeSource<T>>>,
195    index: usize,
196}
197
198impl<T> Clone for TeeChild<T> {
199    fn clone(&self) -> Self {
200        Self {
201            source: Arc::clone(&self.source),
202            index: self.index,
203        }
204    }
205}
206
207impl<T: Clone> Iterator for TeeChild<T> {
208    type Item = T;
209
210    fn next(&mut self) -> Option<Self::Item> {
211        let mut source = self.source.lock().ok()?;
212
213        if let Some(item) = source.buffers.get_mut(self.index)?.pop_front() {
214            return Some(item);
215        }
216
217        if let Some(item) = source.iter.next() {
218            for (i, buffer) in source.buffers.iter_mut().enumerate() {
219                if i != self.index {
220                    buffer.push_back(item.clone());
221                }
222            }
223            Some(item)
224        } else {
225            None
226        }
227    }
228}
229
230/// A safe version of tee that ensures thread safety.
231pub fn safetee<T, I>(iterable: I, n: usize) -> Tee<T>
232where
233    T: Clone,
234    I: IntoIterator<Item = T>,
235    <I as IntoIterator>::IntoIter: Send + 'static,
236{
237    tee(iterable, n)
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[test]
245    fn test_batch_iterate() {
246        let items = vec![1, 2, 3, 4, 5];
247        let batches: Vec<Vec<i32>> = batch_iterate(Some(2), items).collect();
248        assert_eq!(batches, vec![vec![1, 2], vec![3, 4], vec![5]]);
249    }
250
251    #[test]
252    fn test_batch_iterate_exact() {
253        let items = vec![1, 2, 3, 4];
254        let batches: Vec<Vec<i32>> = batch_iterate(Some(2), items).collect();
255        assert_eq!(batches, vec![vec![1, 2], vec![3, 4]]);
256    }
257
258    #[test]
259    fn test_batch_iterate_empty() {
260        let items: Vec<i32> = vec![];
261        let batches: Vec<Vec<i32>> = batch_iterate(Some(2), items).collect();
262        assert!(batches.is_empty());
263    }
264
265    #[test]
266    fn test_batch_iterate_none_size() {
267        let items = vec![1, 2, 3, 4, 5];
268        let batches: Vec<Vec<i32>> = batch_iterate(None, items).collect();
269        assert_eq!(batches, vec![vec![1, 2, 3, 4, 5]]);
270    }
271
272    #[test]
273    fn test_tee_basic() {
274        let items = vec![1, 2, 3];
275        let t = tee(items, 2);
276
277        assert_eq!(t.len(), 2);
278
279        let children = t.into_children();
280        let first: Vec<i32> = children[0].clone().collect();
281        let second: Vec<i32> = children[1].clone().collect();
282
283        assert_eq!(first, vec![1, 2, 3]);
284        assert_eq!(second, vec![1, 2, 3]);
285    }
286
287    #[test]
288    fn test_tee_interleaved() {
289        let items = vec![1, 2, 3];
290        let t = tee(items, 2);
291        let mut children = t.into_children();
292
293        assert_eq!(children[0].next(), Some(1));
294        assert_eq!(children[1].next(), Some(1));
295        assert_eq!(children[0].next(), Some(2));
296        assert_eq!(children[0].next(), Some(3));
297        assert_eq!(children[1].next(), Some(2));
298        assert_eq!(children[1].next(), Some(3));
299        assert_eq!(children[0].next(), None);
300        assert_eq!(children[1].next(), None);
301    }
302
303    #[test]
304    fn test_no_lock() {
305        let lock = NoLock::new();
306        let _guard = lock.lock();
307    }
308}