fluxus_core/source/
generator.rs1use crate::models::{Record, StreamResult};
2use async_trait::async_trait;
3use std::marker::PhantomData;
4
5use super::Source;
6
7pub struct GeneratorSource<T, F>
9where
10 F: FnMut() -> Option<T> + Send,
11{
12 generator: F,
13 _phantom: PhantomData<T>,
14}
15
16impl<T, F> GeneratorSource<T, F>
17where
18 F: FnMut() -> Option<T> + Send,
19{
20 pub fn new(generator: F) -> Self {
22 Self {
23 generator,
24 _phantom: PhantomData,
25 }
26 }
27
28 pub fn counter(start: i64, end: i64) -> GeneratorSource<i64, impl FnMut() -> Option<i64>> {
30 let current = start;
31 GeneratorSource::new(move || {
32 static mut CURRENT: i64 = 0;
33 unsafe {
34 if CURRENT == 0 {
35 CURRENT = current;
36 }
37 if CURRENT <= end {
38 let value = CURRENT;
39 CURRENT += 1;
40 Some(value)
41 } else {
42 None
43 }
44 }
45 })
46 }
47}
48
49#[async_trait]
50impl<T, F> Source<T> for GeneratorSource<T, F>
51where
52 T: Send,
53 F: FnMut() -> Option<T> + Send + Sync,
54{
55 async fn init(&mut self) -> StreamResult<()> {
56 Ok(())
57 }
58
59 async fn next(&mut self) -> StreamResult<Option<Record<T>>> {
60 Ok((self.generator)().map(Record::new))
61 }
62
63 async fn close(&mut self) -> StreamResult<()> {
64 Ok(())
65 }
66}