computation_process/
collector.rs

1use crate::{Completable, Computable, DynGeneratable, Generatable, Incomplete};
2use serde::{Deserialize, Serialize};
3use std::marker::PhantomData;
4
5/// A [`Computable`] that collects all items from a [`Generatable`] into a collection.
6///
7/// This is useful for converting a generator/stream of items into a single collected result.
8/// The collection type must implement [`Default`] and [`Extend`].
9///
10/// # Example
11///
12/// ```rust
13/// use computation_process::{Generator, GeneratorStep, Completable, Computable, Collector, Stateful, Generatable};
14///
15/// struct RangeStep;
16///
17/// impl GeneratorStep<u32, u32, u32> for RangeStep {
18///     fn step(max: &u32, current: &mut u32) -> Completable<Option<u32>> {
19///         *current += 1;
20///         if *current <= *max {
21///             Ok(Some(*current))
22///         } else {
23///             Ok(None)
24///         }
25///     }
26/// }
27///
28/// let generator = Generator::<u32, u32, u32, RangeStep>::from_parts(3, 0);
29/// let mut collector: Collector<u32, Vec<u32>> = generator.dyn_generatable().into();
30/// let result = collector.compute().unwrap();
31/// assert_eq!(result, vec![1, 2, 3]);
32/// ```
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34#[serde(
35    bound = "G: Serialize + for<'a> Deserialize<'a>, COLLECTION: Serialize + for<'a> Deserialize<'a>"
36)]
37pub struct Collector<ITEM, COLLECTION, G = DynGeneratable<ITEM>>
38where
39    COLLECTION: Default + Extend<ITEM>,
40    G: Generatable<ITEM>,
41{
42    generator: G,
43    collector: Option<COLLECTION>,
44    #[serde(skip)]
45    _phantom: PhantomData<ITEM>,
46}
47
48impl<ITEM, COLLECTION, G> Collector<ITEM, COLLECTION, G>
49where
50    COLLECTION: Default + Extend<ITEM>,
51    G: Generatable<ITEM>,
52{
53    /// Create a new collector for the given generator.
54    pub fn new(generator: G) -> Self {
55        Collector {
56            generator,
57            collector: Some(Default::default()),
58            _phantom: Default::default(),
59        }
60    }
61}
62
63impl<ITEM, COLLECTION: Default + Extend<ITEM>> From<DynGeneratable<ITEM>>
64    for Collector<ITEM, COLLECTION, DynGeneratable<ITEM>>
65{
66    fn from(value: DynGeneratable<ITEM>) -> Self {
67        Collector::new(value)
68    }
69}
70
71impl<ITEM, COLLECTION, G> Computable<COLLECTION> for Collector<ITEM, COLLECTION, G>
72where
73    COLLECTION: Default + Extend<ITEM>,
74    G: Generatable<ITEM>,
75{
76    fn try_compute(&mut self) -> Completable<COLLECTION> {
77        match self.generator.try_next() {
78            None => {
79                if let Some(collector) = self.collector.take() {
80                    Ok(collector)
81                } else {
82                    Err(Incomplete::Exhausted)
83                }
84            }
85            Some(Ok(item)) => {
86                if let Some(collector) = self.collector.as_mut() {
87                    collector.extend(std::iter::once(item));
88                    Err(Incomplete::Suspended)
89                } else {
90                    Err(Incomplete::Exhausted)
91                }
92            }
93            Some(Err(Incomplete::Suspended)) => Err(Incomplete::Suspended),
94            Some(Err(Incomplete::Cancelled(c))) => Err(Incomplete::Cancelled(c)),
95            Some(Err(Incomplete::Exhausted)) => Err(Incomplete::Exhausted),
96        }
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use crate::{Computable, Generatable, Incomplete};
104    use cancel_this::Cancellable;
105
106    struct TestGenerator {
107        items: Vec<i32>,
108        index: usize,
109    }
110
111    impl Iterator for TestGenerator {
112        type Item = Cancellable<i32>;
113
114        fn next(&mut self) -> Option<Self::Item> {
115            if self.index < self.items.len() {
116                let item = self.items[self.index];
117                self.index += 1;
118                Some(Ok(item))
119            } else {
120                None
121            }
122        }
123    }
124
125    impl Generatable<i32> for TestGenerator {
126        fn try_next(&mut self) -> Option<Completable<i32>> {
127            if self.index < self.items.len() {
128                let item = self.items[self.index];
129                self.index += 1;
130                Some(Ok(item))
131            } else {
132                None
133            }
134        }
135    }
136
137    #[test]
138    fn test_collector_from() {
139        let generator = TestGenerator {
140            items: vec![1, 2, 3],
141            index: 0,
142        };
143        let collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
144        // Should have initialized with Some(Default::default())
145        assert!(collector.collector.is_some());
146    }
147
148    #[test]
149    fn test_collector_basic() {
150        let generator = TestGenerator {
151            items: vec![1, 2, 3],
152            index: 0,
153        };
154        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
155
156        // First item
157        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
158
159        // Second item
160        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
161
162        // Third item
163        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
164
165        // Collection complete
166        let result = collector.try_compute().unwrap();
167        assert_eq!(result, vec![1, 2, 3]);
168    }
169
170    #[test]
171    fn test_collector_compute() {
172        let generator = TestGenerator {
173            items: vec![10, 20, 30],
174            index: 0,
175        };
176        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
177
178        let result = collector.compute().unwrap();
179        assert_eq!(result, vec![10, 20, 30]);
180    }
181
182    #[test]
183    fn test_collector_empty() {
184        let generator = TestGenerator {
185            items: vec![],
186            index: 0,
187        };
188        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
189
190        let result = collector.try_compute().unwrap();
191        assert_eq!(result, Vec::<i32>::new());
192    }
193
194    #[test]
195    fn test_collector_single_item() {
196        let generator = TestGenerator {
197            items: vec![42],
198            index: 0,
199        };
200        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
201
202        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
203        let result = collector.try_compute().unwrap();
204        assert_eq!(result, vec![42]);
205    }
206
207    #[test]
208    fn test_collector_with_hashset() {
209        let generator = TestGenerator {
210            items: vec![1, 2, 2, 3],
211            index: 0,
212        };
213        let mut collector: Collector<i32, std::collections::HashSet<i32>> =
214            generator.dyn_generatable().into();
215
216        // Collect all items
217        let result = loop {
218            match collector.try_compute() {
219                Ok(result) => break result,
220                Err(Incomplete::Suspended) => continue,
221                Err(e) => panic!("Unexpected error: {:?}", e),
222            }
223        };
224
225        assert_eq!(result.len(), 3); // HashSet deduplicates
226        assert!(result.contains(&1));
227        assert!(result.contains(&2));
228        assert!(result.contains(&3));
229    }
230
231    struct SuspendingGenerator {
232        items: Vec<i32>,
233        index: usize,
234        first_call: bool,
235    }
236
237    impl Iterator for SuspendingGenerator {
238        type Item = Cancellable<i32>;
239
240        fn next(&mut self) -> Option<Self::Item> {
241            if self.index < self.items.len() {
242                let item = self.items[self.index];
243                self.index += 1;
244                Some(Ok(item))
245            } else {
246                None
247            }
248        }
249    }
250
251    impl Generatable<i32> for SuspendingGenerator {
252        fn try_next(&mut self) -> Option<Completable<i32>> {
253            if self.index < self.items.len() {
254                // Suspend on the very first call, then return items normally
255                if self.first_call {
256                    self.first_call = false;
257                    Some(Err(Incomplete::Suspended))
258                } else {
259                    let item = self.items[self.index];
260                    self.index += 1;
261                    Some(Ok(item))
262                }
263            } else {
264                None
265            }
266        }
267    }
268
269    #[test]
270    fn test_collector_with_suspensions() {
271        let generator = SuspendingGenerator {
272            items: vec![1, 2, 3],
273            index: 0,
274            first_call: true,
275        };
276        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
277
278        // First call: generator suspends on the first call
279        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
280
281        // Second call: generator returns the first item, collector adds it and suspends
282        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
283
284        // Third call: generator returns the second item, collector adds it and suspends
285        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
286
287        // Fourth call: generator returns the third item, collector adds it and suspends
288        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
289
290        // Fifth call: generator returns None, collector completes
291        let result = collector.try_compute().unwrap();
292        assert_eq!(result, vec![1, 2, 3]);
293    }
294
295    #[test]
296    fn test_collector_exhausted_after_completion() {
297        let generator = TestGenerator {
298            items: vec![1],
299            index: 0,
300        };
301        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
302
303        // First call adds item and suspends
304        assert_eq!(collector.try_compute(), Err(Incomplete::Suspended));
305
306        // The second call completes
307        let _ = collector.try_compute().unwrap();
308
309        // Third call should return Exhausted
310        assert_eq!(collector.try_compute(), Err(Incomplete::Exhausted));
311    }
312
313    struct CancellingGenerator {
314        cancelled: bool,
315    }
316
317    impl Iterator for CancellingGenerator {
318        type Item = Cancellable<i32>;
319
320        fn next(&mut self) -> Option<Self::Item> {
321            None
322        }
323    }
324
325    impl Generatable<i32> for CancellingGenerator {
326        fn try_next(&mut self) -> Option<Completable<i32>> {
327            if !self.cancelled {
328                self.cancelled = true;
329                Some(Err(
330                    Incomplete::Cancelled(cancel_this::Cancelled::default()),
331                ))
332            } else {
333                None
334            }
335        }
336    }
337
338    #[test]
339    fn test_collector_cancellation() {
340        let generator = CancellingGenerator { cancelled: false };
341        let mut collector: Collector<i32, Vec<i32>> = generator.dyn_generatable().into();
342
343        let result = collector.try_compute();
344        assert!(matches!(result, Err(Incomplete::Cancelled(_))));
345    }
346}