Skip to main content

dbsp/operator/
generator.rs

1//! Defines an operator that generates an infinite output stream from a single
2//! seed value.
3
4use crate::circuit::{
5    Runtime, Scope,
6    operator_traits::{Data, Operator, SourceOperator},
7};
8use std::{borrow::Cow, marker::PhantomData};
9
10/// A source operator that yields an infinite output stream
11/// from a generator function.
12pub struct Generator<T, F> {
13    generator: F,
14    _t: PhantomData<T>,
15}
16
17impl<T, F> Generator<T, F>
18where
19    T: Clone,
20{
21    /// Creates a generator
22    pub fn new(g: F) -> Self {
23        Self {
24            generator: g,
25            _t: Default::default(),
26        }
27    }
28}
29
30impl<T, F> Operator for Generator<T, F>
31where
32    T: Data,
33    F: 'static,
34{
35    fn name(&self) -> Cow<'static, str> {
36        Cow::from("Generator")
37    }
38    fn fixedpoint(&self, _scope: Scope) -> bool {
39        false
40    }
41}
42
43impl<T, F> SourceOperator<T> for Generator<T, F>
44where
45    F: FnMut() -> T + 'static,
46    T: Data,
47{
48    async fn eval(&mut self) -> T {
49        (self.generator)()
50    }
51}
52
53/// A version of Generator that passes a flag to the generator function when `flush` has been called, giving it a chance
54/// to produce one output per transaction.
55pub struct TransactionGenerator<T, F> {
56    generator: F,
57    flush: bool,
58    _t: PhantomData<T>,
59}
60
61impl<T, F> TransactionGenerator<T, F>
62where
63    T: Clone,
64{
65    /// Creates a generator
66    pub fn new(g: F) -> Self {
67        Self {
68            generator: g,
69            flush: false,
70            _t: Default::default(),
71        }
72    }
73}
74
75impl<T, F> Operator for TransactionGenerator<T, F>
76where
77    T: Data,
78    F: 'static,
79{
80    fn name(&self) -> Cow<'static, str> {
81        Cow::from("Generator")
82    }
83    fn flush(&mut self) {
84        self.flush = true;
85    }
86    fn fixedpoint(&self, _scope: Scope) -> bool {
87        false
88    }
89}
90
91impl<T, F> SourceOperator<T> for TransactionGenerator<T, F>
92where
93    F: FnMut(bool) -> T + 'static,
94    T: Data,
95{
96    async fn eval(&mut self) -> T {
97        let result = (self.generator)(self.flush);
98        self.flush = false;
99        result
100    }
101}
102
103/// Generator operator for nested circuits.
104///
105/// At each parent clock tick, invokes a user-provided reset closure, which
106/// returns a generator closure, that yields a nested stream.
107pub struct GeneratorNested<T> {
108    reset: Box<dyn FnMut() -> Box<dyn FnMut() -> T>>,
109    generator: Option<Box<dyn FnMut() -> T>>,
110    flush: bool,
111    empty: T,
112}
113
114impl<T> GeneratorNested<T>
115where
116    T: Clone,
117{
118    /// Creates a nested generator with specified `reset` closure.
119    pub fn new(reset: Box<dyn FnMut() -> Box<dyn FnMut() -> T>>, empty: T) -> Self {
120        Self {
121            reset,
122            generator: None,
123            flush: false,
124            empty,
125        }
126    }
127}
128
129impl<T> Operator for GeneratorNested<T>
130where
131    T: Data,
132{
133    fn name(&self) -> Cow<'static, str> {
134        Cow::from("GeneratorNested")
135    }
136    fn clock_start(&mut self, scope: Scope) {
137        if scope == 0 {
138            self.generator = Some((self.reset)());
139        }
140    }
141
142    fn is_input(&self) -> bool {
143        true
144    }
145
146    fn fixedpoint(&self, _scope: Scope) -> bool {
147        // TODO: do we want a version of `GeneratorNested` that
148        // can inform the circuit that it's reached a fixedpoint?
149        false
150    }
151
152    fn flush(&mut self) {
153        self.flush = true;
154    }
155}
156
157impl<T> SourceOperator<T> for GeneratorNested<T>
158where
159    T: Data,
160{
161    async fn eval(&mut self) -> T {
162        if self.flush {
163            self.flush = false;
164            (self.generator.as_mut().unwrap())()
165        } else {
166            self.empty.clone()
167        }
168    }
169}
170
171/// A source operator that yields an infinite output stream
172/// from a constant value.  Note: only worker 0 generates the data;
173/// the other workers generate empty streams.
174pub struct ConstantGenerator<T> {
175    generator: T,
176}
177
178impl<T> ConstantGenerator<T>
179where
180    T: Clone,
181{
182    /// Creates a constant generator
183    pub fn new(g: T) -> Self {
184        Self { generator: g }
185    }
186}
187
188impl<T> Operator for ConstantGenerator<T>
189where
190    T: Data + 'static,
191{
192    fn name(&self) -> Cow<'static, str> {
193        Cow::from("ConstantGenerator")
194    }
195    fn fixedpoint(&self, _scope: Scope) -> bool {
196        // Always a fixed-point
197        true
198    }
199}
200
201impl<T> SourceOperator<T> for ConstantGenerator<T>
202where
203    T: Data + 'static + Clone + Default,
204{
205    async fn eval(&mut self) -> T {
206        if Runtime::worker_index() == 0 {
207            self.generator.clone()
208        } else {
209            T::default()
210        }
211    }
212}