Skip to main content

dbsp/operator/
delta0.rs

1//! `delta_0` operator.
2
3use crate::{
4    algebra::HasZero,
5    circuit::{
6        Circuit, OwnershipPreference, Scope, Stream,
7        operator_traits::{Data, ImportOperator, Operator},
8    },
9};
10use std::borrow::Cow;
11
12impl<C, D> Stream<C, D>
13where
14    D: HasZero + Clone + 'static,
15    C: Circuit,
16{
17    /// Import `self` from the parent circuit to `subcircuit` via the `Delta0`
18    /// operator.
19    ///
20    /// See [`Delta0`] operator documentation.
21    #[track_caller]
22    pub fn delta0<CC>(&self, subcircuit: &CC) -> Stream<CC, D>
23    where
24        CC: Circuit<Parent = C>,
25    {
26        let delta = subcircuit.import_stream(Delta0::new(), &self.try_sharded_version());
27        delta.mark_sharded_if(self);
28
29        delta
30    }
31
32    /// Like [`Self::delta0`], but overrides the ownership
33    /// preference on the input stream with `input_preference`.
34    #[track_caller]
35    pub fn delta0_with_preference<CC>(
36        &self,
37        subcircuit: &CC,
38        input_preference: OwnershipPreference,
39    ) -> Stream<CC, D>
40    where
41        CC: Circuit<Parent = C>,
42    {
43        let delta = subcircuit.import_stream_with_preference(
44            Delta0::new(),
45            &self.try_sharded_version(),
46            input_preference,
47        );
48        delta.mark_sharded_if(self);
49
50        delta
51    }
52}
53
54/// `delta_0` DBSP operator.
55///
56/// `delta_0` is an
57/// [import operator](`crate::circuit::operator_traits::ImportOperator`), i.e.,
58/// an operator that makes the contents of a parent stream available
59/// inside the child circuit.  At the first nested clock cycle, it reads and
60/// outputs a value from the parent stream.  During subsequent nested clock
61/// cycles, it outputs zero.
62///
63/// # Examples
64///
65/// Given a parent stream
66///
67/// ```text
68/// [1, 2, 3, 4, ...]
69/// ```
70///
71/// the `delta_0` operator produces the following nested stream (each row in
72/// the matrix represents a nested clock epoch):
73///
74/// ```text
75/// ┌           ┐
76/// │1 0 0 0 ...│
77/// │2 0 0 0 ...│
78/// │3 0 0 0 ...|
79/// |4 0 0 0 ...|
80/// └           ┘
81/// ```
82pub struct Delta0<D> {
83    val: Option<D>,
84    fixedpoint: bool,
85}
86
87impl<D> Delta0<D> {
88    pub fn new() -> Self {
89        Self {
90            val: None,
91            fixedpoint: false,
92        }
93    }
94}
95
96impl<D> Default for Delta0<D> {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102impl<D> Operator for Delta0<D>
103where
104    D: Data,
105{
106    fn name(&self) -> Cow<'static, str> {
107        Cow::from("delta0")
108    }
109
110    fn fixedpoint(&self, scope: Scope) -> bool {
111        if scope == 0 {
112            // Output becomes stable (all zeros) after the first clock cycle.
113            self.fixedpoint
114        } else {
115            // Delta0 does not maintain any state across epochs.
116            true
117        }
118    }
119}
120
121impl<D> ImportOperator<D, D> for Delta0<D>
122where
123    D: HasZero + Clone + 'static,
124{
125    fn import(&mut self, val: &D) {
126        self.val = Some(val.clone());
127        self.fixedpoint = false;
128    }
129
130    fn import_owned(&mut self, val: D) {
131        self.val = Some(val);
132        self.fixedpoint = false;
133    }
134
135    async fn eval(&mut self) -> D {
136        if self.val.is_none() {
137            self.fixedpoint = true;
138        }
139        self.val.take().unwrap_or_else(D::zero)
140    }
141
142    /// Ownership preference on the operator's input stream
143    /// (see [`OwnershipPreference`]).
144    fn input_preference(&self) -> OwnershipPreference {
145        OwnershipPreference::PREFER_OWNED
146    }
147}