1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use quote::quote_spanned;
use super::{
OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, OperatorWriteOutput,
Persistence, RANGE_1, WriteContextArgs,
};
use crate::diagnostic::{Diagnostic, Level};
/// Stores each item as it passes through, and replays all item every tick.
///
/// ```dfir
/// // Normally `source_iter(...)` only emits once, but `persist::<'static>()` will replay the `"hello"`
/// // on every tick.
/// source_iter(["hello"])
/// -> persist::<'static>()
/// -> assert_eq(["hello"]);
/// ```
///
/// `persist()` can be used to introduce statefulness into stateless pipelines. In the example below, the
/// join only stores data for single tick. The `persist::<'static>()` operator introduces statefulness
/// across ticks. This can be useful for optimization transformations within the dfir
/// compiler. Equivalently, we could specify that the join has `static` persistence (`my_join = join::<'static>()`).
/// ```rustbook
/// let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<(&str, &str)>();
/// let mut flow = dfir_rs::dfir_syntax! {
/// source_iter([("hello", "world")]) -> persist::<'static>() -> [0]my_join;
/// source_stream(input_recv) -> persist::<'static>() -> [1]my_join;
/// my_join = join::<'tick>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2));
/// };
/// input_send.send(("hello", "oakland")).unwrap();
/// flow.run_tick();
/// input_send.send(("hello", "san francisco")).unwrap();
/// flow.run_tick();
/// // (hello, (world, oakland))
/// // (hello, (world, oakland))
/// // (hello, (world, san francisco))
/// ```
pub const PERSIST: OperatorConstraints = OperatorConstraints {
name: "persist",
categories: &[OperatorCategory::Persistence],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
persistence_args: RANGE_1,
type_args: &(0..=1),
is_external_input: false,
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
op_span,
ident,
is_pull,
inputs,
outputs,
op_name,
work_fn_async,
op_inst:
OperatorInstance {
generics:
OpInstGenerics {
persistence_args,
type_args,
..
},
..
},
..
},
diagnostics| {
if [Persistence::Static] != persistence_args[..] {
diagnostics.push(Diagnostic::spanned(
op_span,
Level::Error,
format!("{} only supports `'static`.", op_name),
));
}
let generic_type = type_args
.first()
.map(quote::ToTokens::to_token_stream)
.unwrap_or(quote_spanned!(op_span=> _));
let persistdata_ident = wc.make_ident("persistdata");
let vec_ident = wc.make_ident("persistvec");
let write_prologue = quote_spanned! {op_span=>
let mut #persistdata_ident = ::std::vec::Vec::<#generic_type>::new();
};
let write_iterator = if is_pull {
let input = &inputs[0];
quote_spanned! {op_span=>
let #vec_ident = &mut #persistdata_ident;
let #ident = {
let fut = #root::dfir_pipes::pull::Pull::for_each(#input, |item| {
#vec_ident.push(item);
});
let () = #work_fn_async(fut).await;
let iter = #vec_ident.iter().cloned();
#root::dfir_pipes::pull::iter(iter)
};
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
let #vec_ident = &mut #persistdata_ident;
let #ident = {
fn constrain_types<'ctx, Psh, Item>(vec: &'ctx mut Vec<Item>, output: Psh, is_new_tick: bool) -> impl 'ctx + #root::dfir_pipes::push::Push<Item, ()>
where
Psh: 'ctx + #root::dfir_pipes::push::Push<Item, ()>,
Item: ::std::clone::Clone,
{
#root::dfir_pipes::push::persist_state(vec, is_new_tick, output)
}
constrain_types(&mut *#vec_ident, #output, true)
};
}
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
..Default::default()
})
},
};