dbsp/operator/
generator.rs1use crate::circuit::{
5 Runtime, Scope,
6 operator_traits::{Data, Operator, SourceOperator},
7};
8use std::{borrow::Cow, marker::PhantomData};
9
10pub struct Generator<T, F> {
13 generator: F,
14 _t: PhantomData<T>,
15}
16
17impl<T, F> Generator<T, F>
18where
19 T: Clone,
20{
21 pub fn new(g: F) -> Self {
23 Self {
24 generator: g,
25 _t: Default::default(),
26 }
27 }
28}
29
30impl<T, F> Operator for Generator<T, F>
31where
32 T: Data,
33 F: 'static,
34{
35 fn name(&self) -> Cow<'static, str> {
36 Cow::from("Generator")
37 }
38 fn fixedpoint(&self, _scope: Scope) -> bool {
39 false
40 }
41}
42
43impl<T, F> SourceOperator<T> for Generator<T, F>
44where
45 F: FnMut() -> T + 'static,
46 T: Data,
47{
48 async fn eval(&mut self) -> T {
49 (self.generator)()
50 }
51}
52
53pub struct TransactionGenerator<T, F> {
56 generator: F,
57 flush: bool,
58 _t: PhantomData<T>,
59}
60
61impl<T, F> TransactionGenerator<T, F>
62where
63 T: Clone,
64{
65 pub fn new(g: F) -> Self {
67 Self {
68 generator: g,
69 flush: false,
70 _t: Default::default(),
71 }
72 }
73}
74
75impl<T, F> Operator for TransactionGenerator<T, F>
76where
77 T: Data,
78 F: 'static,
79{
80 fn name(&self) -> Cow<'static, str> {
81 Cow::from("Generator")
82 }
83 fn flush(&mut self) {
84 self.flush = true;
85 }
86 fn fixedpoint(&self, _scope: Scope) -> bool {
87 false
88 }
89}
90
91impl<T, F> SourceOperator<T> for TransactionGenerator<T, F>
92where
93 F: FnMut(bool) -> T + 'static,
94 T: Data,
95{
96 async fn eval(&mut self) -> T {
97 let result = (self.generator)(self.flush);
98 self.flush = false;
99 result
100 }
101}
102
103pub struct GeneratorNested<T> {
108 reset: Box<dyn FnMut() -> Box<dyn FnMut() -> T>>,
109 generator: Option<Box<dyn FnMut() -> T>>,
110 flush: bool,
111 empty: T,
112}
113
114impl<T> GeneratorNested<T>
115where
116 T: Clone,
117{
118 pub fn new(reset: Box<dyn FnMut() -> Box<dyn FnMut() -> T>>, empty: T) -> Self {
120 Self {
121 reset,
122 generator: None,
123 flush: false,
124 empty,
125 }
126 }
127}
128
129impl<T> Operator for GeneratorNested<T>
130where
131 T: Data,
132{
133 fn name(&self) -> Cow<'static, str> {
134 Cow::from("GeneratorNested")
135 }
136 fn clock_start(&mut self, scope: Scope) {
137 if scope == 0 {
138 self.generator = Some((self.reset)());
139 }
140 }
141
142 fn is_input(&self) -> bool {
143 true
144 }
145
146 fn fixedpoint(&self, _scope: Scope) -> bool {
147 false
150 }
151
152 fn flush(&mut self) {
153 self.flush = true;
154 }
155}
156
157impl<T> SourceOperator<T> for GeneratorNested<T>
158where
159 T: Data,
160{
161 async fn eval(&mut self) -> T {
162 if self.flush {
163 self.flush = false;
164 (self.generator.as_mut().unwrap())()
165 } else {
166 self.empty.clone()
167 }
168 }
169}
170
171pub struct ConstantGenerator<T> {
175 generator: T,
176}
177
178impl<T> ConstantGenerator<T>
179where
180 T: Clone,
181{
182 pub fn new(g: T) -> Self {
184 Self { generator: g }
185 }
186}
187
188impl<T> Operator for ConstantGenerator<T>
189where
190 T: Data + 'static,
191{
192 fn name(&self) -> Cow<'static, str> {
193 Cow::from("ConstantGenerator")
194 }
195 fn fixedpoint(&self, _scope: Scope) -> bool {
196 true
198 }
199}
200
201impl<T> SourceOperator<T> for ConstantGenerator<T>
202where
203 T: Data + 'static + Clone + Default,
204{
205 async fn eval(&mut self) -> T {
206 if Runtime::worker_index() == 0 {
207 self.generator.clone()
208 } else {
209 T::default()
210 }
211 }
212}