1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! `delta_0` operator.
use crate::{
algebra::HasZero,
circuit::{
Circuit, OwnershipPreference, Scope, Stream,
operator_traits::{Data, ImportOperator, Operator},
},
};
use std::borrow::Cow;
impl<C, D> Stream<C, D>
where
D: HasZero + Clone + 'static,
C: Circuit,
{
/// Import `self` from the parent circuit to `subcircuit` via the `Delta0`
/// operator.
///
/// See [`Delta0`] operator documentation.
#[track_caller]
pub fn delta0<CC>(&self, subcircuit: &CC) -> Stream<CC, D>
where
CC: Circuit<Parent = C>,
{
let delta = subcircuit.import_stream(Delta0::new(), &self.try_sharded_version());
delta.mark_sharded_if(self);
delta
}
/// Like [`Self::delta0`], but overrides the ownership
/// preference on the input stream with `input_preference`.
#[track_caller]
pub fn delta0_with_preference<CC>(
&self,
subcircuit: &CC,
input_preference: OwnershipPreference,
) -> Stream<CC, D>
where
CC: Circuit<Parent = C>,
{
let delta = subcircuit.import_stream_with_preference(
Delta0::new(),
&self.try_sharded_version(),
input_preference,
);
delta.mark_sharded_if(self);
delta
}
}
/// `delta_0` DBSP operator.
///
/// `delta_0` is an
/// [import operator](`crate::circuit::operator_traits::ImportOperator`), i.e.,
/// an operator that makes the contents of a parent stream available
/// inside the child circuit. At the first nested clock cycle, it reads and
/// outputs a value from the parent stream. During subsequent nested clock
/// cycles, it outputs zero.
///
/// # Examples
///
/// Given a parent stream
///
/// ```text
/// [1, 2, 3, 4, ...]
/// ```
///
/// the `delta_0` operator produces the following nested stream (each row in
/// the matrix represents a nested clock epoch):
///
/// ```text
/// ┌ ┐
/// │1 0 0 0 ...│
/// │2 0 0 0 ...│
/// │3 0 0 0 ...|
/// |4 0 0 0 ...|
/// └ ┘
/// ```
pub struct Delta0<D> {
val: Option<D>,
fixedpoint: bool,
}
impl<D> Delta0<D> {
pub fn new() -> Self {
Self {
val: None,
fixedpoint: false,
}
}
}
impl<D> Default for Delta0<D> {
fn default() -> Self {
Self::new()
}
}
impl<D> Operator for Delta0<D>
where
D: Data,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("delta0")
}
fn fixedpoint(&self, scope: Scope) -> bool {
if scope == 0 {
// Output becomes stable (all zeros) after the first clock cycle.
self.fixedpoint
} else {
// Delta0 does not maintain any state across epochs.
true
}
}
}
impl<D> ImportOperator<D, D> for Delta0<D>
where
D: HasZero + Clone + 'static,
{
fn import(&mut self, val: &D) {
self.val = Some(val.clone());
self.fixedpoint = false;
}
fn import_owned(&mut self, val: D) {
self.val = Some(val);
self.fixedpoint = false;
}
async fn eval(&mut self) -> D {
if self.val.is_none() {
self.fixedpoint = true;
}
self.val.take().unwrap_or_else(D::zero)
}
/// Ownership preference on the operator's input stream
/// (see [`OwnershipPreference`]).
fn input_preference(&self) -> OwnershipPreference {
OwnershipPreference::PREFER_OWNED
}
}