1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use quote::quote_spanned;
use super::{
OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0,
RANGE_1, WriteContextArgs,
};
/// > 1 input stream, 1 output stream
///
/// > Arguments: two arguments, both closures. The first closure is used to create the initial
/// > value for the accumulator, and the second is used to transform new items with the existing
/// > accumulator value. The second closure takes two arguments: an `&mut Accum` accumulated
/// > value, and an `Item`, and returns an `Option<o>` that will be emitted to the output stream
/// > if it's `Some`, or terminate the stream if it's `None`.
///
/// Similar to Rust's standard library `scan` method. It applies a function to each element of the stream,
/// maintaining an internal state (accumulator) and emitting the values returned by the function.
/// The function can return `None` to terminate the stream early.
///
/// > Note: The closures have access to the [`context` object](surface_flows.mdx#the-context-object).
///
/// `scan` can also be provided with one generic lifetime persistence argument, either
/// `'tick` or `'static`, to specify how data persists. With `'tick`, the accumulator will only be maintained
/// within the same tick. With `'static`, the accumulated value will be remembered across ticks.
/// When not explicitly specified persistence defaults to `'tick`.
///
/// ```dfir
/// // Running sum example
/// source_iter([1, 2, 3, 4])
/// -> scan::<'tick>(|| 0, |acc: &mut i32, x: i32| {
/// *acc += x;
/// Some(*acc)
/// })
/// -> assert_eq([1, 3, 6, 10]);
///
/// // Early termination example
/// source_iter([1, 2, 3, 4])
/// -> scan::<'tick>(|| 1, |state: &mut i32, x: i32| {
/// *state = *state * x;
/// if *state > 6 {
/// None
/// } else {
/// Some(-*state)
/// }
/// })
/// -> assert_eq([-1, -2, -6]);
/// ```
pub const SCAN: OperatorConstraints = OperatorConstraints {
name: "scan",
categories: &[OperatorCategory::Fold],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 2,
persistence_args: &(0..=1),
type_args: RANGE_0,
is_external_input: false,
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
op_span,
ident,
is_pull,
inputs,
outputs,
arguments,
..
},
diagnostics| {
let init_fn = &arguments[0];
let func = &arguments[1];
let singleton_output_ident = wc.make_ident("singleton_output");
let initializer_func_ident = wc.make_ident("initializer_func");
let init = quote_spanned! {op_span=>
(#initializer_func_ident)()
};
let [persistence] = wc.persistence_args_disallow_mutable(diagnostics);
let input = &inputs[0];
let iterator_item_ident = wc.make_ident("iterator_item");
let result_ident = wc.make_ident("result");
let state_ident = wc.make_ident("scan_state");
let write_prologue = quote_spanned! {op_span=>
#[allow(unused_mut, reason = "for if `Fn` instead of `FnMut`.")]
let mut #initializer_func_ident = #init_fn;
#[allow(clippy::redundant_closure_call)]
let mut #singleton_output_ident = Some(#init);
};
let write_tick_end = match persistence {
super::Persistence::Tick => quote_spanned! {op_span=>
#[allow(clippy::redundant_closure_call)]
{ #singleton_output_ident = Some(#init); }
},
_ => Default::default(),
};
let assign_accum_ident = quote_spanned! {op_span=>
let #state_ident = &mut #singleton_output_ident;
if #state_ident.is_none() {
return None;
}
};
let iterator_foreach = quote_spanned! {op_span=>
#[inline(always)]
fn call_scan_fn<Accum, Item, Output>(
accum: &mut Option<Accum>,
item: Item,
func: impl Fn(&mut Accum, Item) -> Option<Output>,
) -> Option<Output> {
let result = (func)(accum.as_mut().unwrap(), item);
if result.is_none() {
*accum = None;
}
result
}
#[allow(clippy::redundant_closure_call)]
let #result_ident = call_scan_fn(&mut *#state_ident, #iterator_item_ident, #func);
};
let filter_map_body = quote_spanned! {op_span=>
#assign_accum_ident
#iterator_foreach
#result_ident
};
let write_iterator = if is_pull {
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::pull::Pull::filter_map(#input, |#iterator_item_ident| {
#filter_map_body
});
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::push::filter_map(|#iterator_item_ident| {
#filter_map_body
}, #output);
}
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
write_tick_end,
..Default::default()
})
},
};