fluxus_core/source/
generator.rs

1use crate::models::{Record, StreamResult};
2use async_trait::async_trait;
3use std::marker::PhantomData;
4
5use super::Source;
6
7/// A source that generates test data
8pub struct GeneratorSource<T, F>
9where
10    F: FnMut() -> Option<T> + Send,
11{
12    generator: F,
13    _phantom: PhantomData<T>,
14}
15
16impl<T, F> GeneratorSource<T, F>
17where
18    F: FnMut() -> Option<T> + Send,
19{
20    /// Create a new generator source
21    pub fn new(generator: F) -> Self {
22        Self {
23            generator,
24            _phantom: PhantomData,
25        }
26    }
27
28    /// Create a counting source that generates numbers from start to end
29    pub fn counter(start: i64, end: i64) -> GeneratorSource<i64, impl FnMut() -> Option<i64>> {
30        let current = start;
31        GeneratorSource::new(move || {
32            static mut CURRENT: i64 = 0;
33            unsafe {
34                if CURRENT == 0 {
35                    CURRENT = current;
36                }
37                if CURRENT <= end {
38                    let value = CURRENT;
39                    CURRENT += 1;
40                    Some(value)
41                } else {
42                    None
43                }
44            }
45        })
46    }
47}
48
49#[async_trait]
50impl<T, F> Source<T> for GeneratorSource<T, F>
51where
52    T: Send,
53    F: FnMut() -> Option<T> + Send + Sync,
54{
55    async fn init(&mut self) -> StreamResult<()> {
56        Ok(())
57    }
58
59    async fn next(&mut self) -> StreamResult<Option<Record<T>>> {
60        Ok((self.generator)().map(Record::new))
61    }
62
63    async fn close(&mut self) -> StreamResult<()> {
64        Ok(())
65    }
66}