computation_process/
collector.rs

1use crate::{Completable, Computable, DynGeneratable, Generatable, Incomplete};
2use std::marker::PhantomData;
3
4/// A [`Computable`] that collects all items from a [`Generatable`] into a collection.
5///
6/// This is useful for converting a generator/stream of items into a single collected result.
7/// The collection type must implement [`Default`] and [`Extend`].
8///
9/// # Example
10///
11/// ```rust
12/// use computation_process::{Generator, GeneratorStep, Completable, Computable, Collector, Stateful, Generatable};
13///
14/// struct RangeStep;
15///
16/// impl GeneratorStep<u32, u32, u32> for RangeStep {
17///     fn step(max: &u32, current: &mut u32) -> Completable<Option<u32>> {
18///         *current += 1;
19///         if *current <= *max {
20///             Ok(Some(*current))
21///         } else {
22///             Ok(None)
23///         }
24///     }
25/// }
26///
27/// let generator = Generator::<u32, u32, u32, RangeStep>::from_parts(3, 0);
28/// let mut collector: Collector<u32, Vec<u32>> = generator.dyn_generatable().into();
29/// let result = collector.compute().unwrap();
30/// assert_eq!(result, vec![1, 2, 3]);
31/// ```
32#[derive(Debug, Clone, PartialEq, Eq)]
33#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
34#[cfg_attr(
35    feature = "serde",
36    serde(
37        bound = "G: serde::Serialize + for<'a> serde::Deserialize<'a>, COLLECTION: serde::Serialize + for<'a> serde::Deserialize<'a>"
38    )
39)]
40pub struct Collector<ITEM, COLLECTION, G = DynGeneratable<ITEM>>
41where
42    COLLECTION: Default + Extend<ITEM>,
43    G: Generatable<ITEM>,
44{
45    generator: G,
46    collector: Option<COLLECTION>,
47    #[cfg_attr(feature = "serde", serde(skip))]
48    _phantom: PhantomData<ITEM>,
49}
50
51impl<ITEM, COLLECTION, G> Collector<ITEM, COLLECTION, G>
52where
53    COLLECTION: Default + Extend<ITEM>,
54    G: Generatable<ITEM>,
55{
56    /// Create a new collector for the given generator.
57    pub fn new(generator: G) -> Self {
58        Collector {
59            generator,
60            collector: Some(Default::default()),
61            _phantom: Default::default(),
62        }
63    }
64}
65
66impl<ITEM, COLLECTION: Default + Extend<ITEM>> From<DynGeneratable<ITEM>>
67    for Collector<ITEM, COLLECTION, DynGeneratable<ITEM>>
68{
69    fn from(value: DynGeneratable<ITEM>) -> Self {
70        Collector::new(value)
71    }
72}
73
74impl<ITEM, COLLECTION, G> Computable<COLLECTION> for Collector<ITEM, COLLECTION, G>
75where
76    COLLECTION: Default + Extend<ITEM>,
77    G: Generatable<ITEM>,
78{
79    fn try_compute(&mut self) -> Completable<COLLECTION> {
80        match self.generator.try_next() {
81            None => {
82                if let Some(collector) = self.collector.take() {
83                    Ok(collector)
84                } else {
85                    Err(Incomplete::Exhausted)
86                }
87            }
88            Some(Ok(item)) => {
89                if let Some(collector) = self.collector.as_mut() {
90                    collector.extend(std::iter::once(item));
91                    Err(Incomplete::Suspended)
92                } else {
93                    Err(Incomplete::Exhausted)
94                }
95            }
96            Some(Err(Incomplete::Suspended)) => Err(Incomplete::Suspended),
97            Some(Err(Incomplete::Cancelled(c))) => Err(Incomplete::Cancelled(c)),
98            Some(Err(Incomplete::Exhausted)) => Err(Incomplete::Exhausted),
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::{Computable, Generatable, Incomplete};
107    use cancel_this::Cancellable;
108
109    struct TestGenerator {
110        items: Vec<i32>,
111        index: usize,
112    }
113
114    impl Iterator for TestGenerator {
115        type Item = Cancellable<i32>;
116
117        fn next(&mut self) -> Option<Self::Item> {
118            if self.index < self.items.len() {
119                let item = self.items[self.index];
120                self.index += 1;
121                Some(Ok(item))
122            } else {
123                None
124            }
125        }
126    }
127
128    impl Generatable<i32> for TestGenerator {
129        fn try_next(&mut self) -> Option<Completable<i32>> {
130            if self.index < self.items.len() {
131                let item = self.items[self.index];
132                self.index += 1;
133                Some(Ok(item))
134            } else {
135                None
136            }
137        }
138    }
139
140    #[test]
141    fn test_collector_from() {
142        let generator = TestGenerator {
143            items: vec![1, 2, 3],
144            index: 0,
145        };
146        let collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
147        // Should have initialized with Some(Default::default())
148        assert!(collector.collector.is_some());
149    }
150
151    #[test]
152    fn test_collector_basic() {
153        let generator = TestGenerator {
154            items: vec![1, 2, 3],
155            index: 0,
156        };
157        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
158
159        // First item
160        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
161
162        // Second item
163        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
164
165        // Third item
166        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
167
168        // Collection complete
169        let result = collector.try_compute().unwrap();
170        assert_eq!(result, vec![1, 2, 3]);
171    }
172
173    #[test]
174    fn test_collector_compute() {
175        let generator = TestGenerator {
176            items: vec![10, 20, 30],
177            index: 0,
178        };
179        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
180
181        let result = collector.compute().unwrap();
182        assert_eq!(result, vec![10, 20, 30]);
183    }
184
185    #[test]
186    fn test_collector_empty() {
187        let generator = TestGenerator {
188            items: vec![],
189            index: 0,
190        };
191        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
192
193        let result = collector.try_compute().unwrap();
194        assert_eq!(result, Vec::<i32>::new());
195    }
196
197    #[test]
198    fn test_collector_single_item() {
199        let generator = TestGenerator {
200            items: vec![42],
201            index: 0,
202        };
203        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
204
205        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
206        let result = collector.try_compute().unwrap();
207        assert_eq!(result, vec![42]);
208    }
209
210    #[test]
211    fn test_collector_with_hashset() {
212        let generator = TestGenerator {
213            items: vec![1, 2, 2, 3],
214            index: 0,
215        };
216        let mut collector: Collector<i32, std::collections::HashSet<i32>> =
217            generator.dyn_generatable().into();
218
219        // Collect all items
220        let result = loop {
221            match collector.try_compute() {
222                Ok(result) => break result,
223                Err(Incomplete::Suspended) => continue,
224                Err(e) => panic!("Unexpected error: {:?}", e),
225            }
226        };
227
228        assert_eq!(result.len(), 3); // HashSet deduplicates
229        assert!(result.contains(&1));
230        assert!(result.contains(&2));
231        assert!(result.contains(&3));
232    }
233
234    struct SuspendingGenerator {
235        items: Vec<i32>,
236        index: usize,
237        first_call: bool,
238    }
239
240    impl Iterator for SuspendingGenerator {
241        type Item = Cancellable<i32>;
242
243        fn next(&mut self) -> Option<Self::Item> {
244            if self.index < self.items.len() {
245                let item = self.items[self.index];
246                self.index += 1;
247                Some(Ok(item))
248            } else {
249                None
250            }
251        }
252    }
253
254    impl Generatable<i32> for SuspendingGenerator {
255        fn try_next(&mut self) -> Option<Completable<i32>> {
256            if self.index < self.items.len() {
257                // Suspend on the very first call, then return items normally
258                if self.first_call {
259                    self.first_call = false;
260                    Some(Err(Incomplete::Suspended))
261                } else {
262                    let item = self.items[self.index];
263                    self.index += 1;
264                    Some(Ok(item))
265                }
266            } else {
267                None
268            }
269        }
270    }
271
272    #[test]
273    fn test_collector_with_suspensions() {
274        let generator = SuspendingGenerator {
275            items: vec![1, 2, 3],
276            index: 0,
277            first_call: true,
278        };
279        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
280
281        // First call: generator suspends on the first call
282        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
283
284        // Second call: generator returns the first item, collector adds it and suspends
285        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
286
287        // Third call: generator returns the second item, collector adds it and suspends
288        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
289
290        // Fourth call: generator returns the third item, collector adds it and suspends
291        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
292
293        // Fifth call: generator returns None, collector completes
294        let result = collector.try_compute().unwrap();
295        assert_eq!(result, vec![1, 2, 3]);
296    }
297
298    #[test]
299    fn test_collector_exhausted_after_completion() {
300        let generator = TestGenerator {
301            items: vec![1],
302            index: 0,
303        };
304        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
305
306        // First call adds item and suspends
307        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
308
309        // The second call completes
310        let _ = collector.try_compute().unwrap();
311
312        // Third call should return Exhausted
313        assert_eq!(collector.try_compute(), Err(Incomplete::Exhausted));
314    }
315
316    struct CancellingGenerator {
317        cancelled: bool,
318    }
319
320    impl Iterator for CancellingGenerator {
321        type Item = Cancellable<i32>;
322
323        fn next(&mut self) -> Option<Self::Item> {
324            None
325        }
326    }
327
328    impl Generatable<i32> for CancellingGenerator {
329        fn try_next(&mut self) -> Option<Completable<i32>> {
330            if !self.cancelled {
331                self.cancelled = true;
332                Some(Err(
333                    Incomplete::Cancelled(cancel_this::Cancelled::default()),
334                ))
335            } else {
336                None
337            }
338        }
339    }
340
341    #[test]
342    fn test_collector_cancellation() {
343        let generator = CancellingGenerator { cancelled: false };
344        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
345
346        let result = collector.try_compute();
347        assert!(matches!(result, Err(Incomplete::Cancelled(_))));
348    }
349}