1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
use crate::channel::{ChannelReceiver, ChannelSender, Message, NotifierChannelSender};
use crate::*;
use anyhow::anyhow;
use derive_more::Debug;
use derive_new::new;
use std::collections::VecDeque;
use std::option::Option;
use std::rc::Rc;
pub(crate) trait ChannelOperators<T>
where
T: Element + Send,
{
#[must_use]
fn send(
self: &Rc<Self>,
sender: ChannelSender<T>,
trigger: Option<Rc<dyn Node>>,
) -> Rc<dyn Node>;
}
impl<T> ChannelOperators<T> for dyn Stream<T>
where
T: Element + Send,
{
fn send(
self: &Rc<Self>,
sender: ChannelSender<T>,
trigger: Option<Rc<dyn Node>>,
) -> Rc<dyn Node> {
SenderNode::new(self.clone(), sender, trigger).into_node()
}
}
#[derive(new)]
pub(crate) struct SenderNode<T: Element + Send> {
source: Rc<dyn Stream<T>>,
sender: ChannelSender<T>,
trigger: Option<Rc<dyn Node>>,
/// Graph index of `source`, resolved once on the first cycle so the
/// tick-check avoids an `Rc` clone plus hash-map lookup every tick.
#[new(default)]
source_index: Option<usize>,
}
impl<T: Element + Send> MutableNode for SenderNode<T> {
fn upstreams(&self) -> UpStreams {
let mut upstreams = vec![self.source.clone().as_node()];
if let Some(trig) = &self.trigger {
upstreams.push(trig.clone());
}
UpStreams::new(upstreams, Vec::new())
}
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
//println!("SenderNode::cycle");
let source_index = *self.source_index.get_or_insert_with(|| {
state
.node_index(self.source.clone().as_node())
.expect("invariant: channel sender source wired at graph init")
});
if state.node_index_ticked(source_index) {
self.sender.send(state, self.source.peek_value())?;
Ok(true)
} else {
match &self.trigger {
Some(trig) => {
debug_assert!(state.ticked(trig.clone()));
self.sender.send_checkpoint(state)?;
}
None => {
anyhow::bail!("None trigger!");
}
}
Ok(false)
}
}
fn stop(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
self.sender.send_message(Message::EndOfStream)?;
Ok(())
}
}
#[derive(new, Debug)]
pub struct ChannelReceiverStream<T: Element + Send> {
receiver: ChannelReceiver<T>,
#[debug(skip)]
trigger: Option<Rc<dyn Node>>,
notifier_channel: Option<NotifierChannelSender>,
#[new(default)]
value: Burst<T>,
#[new(default)]
finished: bool,
#[new(default)]
message_time: Option<NanoTime>,
#[new(default)]
queue: VecDeque<ValueAt<T>>,
}
// `finished` is only read by `ReceiverStream`, which is itself gated behind the
// zmq/aeron adapters; gate the accessor the same way to avoid a dead-code warning
// in the default build.
#[cfg(any(feature = "zmq", feature = "aeron", feature = "aeron-rs-beta"))]
impl<T: Element + Send> ChannelReceiverStream<T> {
/// Whether the producer has signalled end-of-stream (i.e. a
/// [`Message::EndOfStream`] has been received and drained).
pub(crate) fn finished(&self) -> bool {
self.finished
}
}
#[node(output = value: Burst<T>)]
impl<T: Element + Send> MutableNode for ChannelReceiverStream<T> {
fn upstreams(&self) -> UpStreams {
let mut ups = Vec::new();
if let Some(trigger) = &self.trigger {
ups.push(trigger.clone());
}
UpStreams::new(ups, vec![])
}
fn cycle(&mut self, state: &mut crate::GraphState) -> anyhow::Result<bool> {
let mut values: Burst<T> = Burst::new();
match state.run_mode() {
RunMode::RealTime => {
// cycle triggered by notiifcation from sender
loop {
if self.finished {
break;
} else {
match self.receiver.try_recv() {
Some(message) => match message {
Message::RealtimeValue(value) => {
values.push(value);
}
Message::HistoricalValue(value_at) => {
values.push(value_at.value);
}
Message::HistoricalBatch(_) => {
return Err(anyhow!(
"received HistoricalBatch but RunMode is RealTime"
));
}
Message::EndOfStream => self.finished = true,
Message::CheckPoint(_) => {}
Message::Error(err) => {
return Err(anyhow!(err));
}
},
None => break,
}
}
}
}
RunMode::HistoricalFrom(_) => {
// No notifications from the sender. While we are behind the
// current engine time we block for the next message; once we
// have caught up (a message stamped at the current time) we
// switch to non-blocking reads so that a burst of same-time
// ticks delivered as separate messages is drained together.
//
// We must never *block* for the next message once caught up:
// an untriggered receiver may be fed one value per engine step
// (e.g. the graph-map worker), and blocking would deadlock it.
loop {
if self.finished {
break;
}
let mut non_blocking = false;
if let Some(t) = self.message_time {
if t > state.time() {
// Read past the current time; nothing more is due now.
break;
} else if t == state.time() {
if self.trigger.is_some() {
// Triggered receivers are driven by the trigger
// node: deliver what we have and let the next
// trigger tick cycle us again.
break;
}
// Caught up to the current time. Drain any further
// same-time messages that are *already* buffered,
// but do not block for the next one.
non_blocking = true;
}
}
let message = if non_blocking {
match self.receiver.try_recv() {
Some(message) => message,
// Nothing more buffered at the current time.
None => break,
}
} else {
// block for message
self.receiver.recv()
};
match message {
Message::RealtimeValue(_) => {
return Err(anyhow!(
"received RealtimeValue but RunMode is Historical"
));
}
Message::HistoricalValue(value_at) => {
if value_at.time < state.time() {
return Err(anyhow!(
"received Historical message but with time less than graph time, {} < {}",
value_at.time,
state.time()
));
}
self.message_time = Some(value_at.time);
self.queue.push_back(value_at);
}
Message::HistoricalBatch(batch) => {
if batch.is_empty() {
continue;
}
// Validate: all timestamps must be >= current graph time
// (batch is non-empty per the `is_empty()` check above).
let min_time = batch
.iter()
.map(|va| va.time)
.min()
.expect("batch is non-empty");
if min_time < state.time() {
return Err(anyhow!(
"received HistoricalBatch with timestamp less than graph time, {} < {}",
min_time,
state.time()
));
}
// Set message_time to earliest timestamp
self.message_time = Some(min_time);
// Unpack all values into queue
for value_at in batch.iter() {
self.queue.push_back(value_at.clone());
}
}
Message::EndOfStream => self.finished = true,
Message::CheckPoint(check_point) => {
self.message_time = Some(check_point);
}
Message::Error(err) => {
return Err(anyhow!(err));
}
}
}
while let Some(value_at) = self.queue.front() {
if value_at.time <= state.time() {
// front() returned Some, so pop_front is guaranteed.
let popped = self.queue.pop_front().expect("front() just returned Some");
values.push(popped.value);
} else {
break;
}
}
match self.queue.front() {
Some(head) => state.add_callback(head.time),
None => {
// No buffered look-ahead. If the stream is still open and
// we are self-driven (no trigger), schedule one more wakeup
// so the next cycle blocks for the next message; a triggered
// or finished receiver is left to wind down. Clearing
// message_time makes that next cycle block.
if !self.finished && self.trigger.is_none() {
state.add_callback(state.time());
}
self.message_time = None;
}
}
}
}
if !values.is_empty() {
self.value = values;
Ok(true)
} else {
Ok(false)
}
}
fn setup(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
match state.run_mode() {
RunMode::RealTime => {
if let Some(chan) = self.notifier_channel.take() {
chan.send(state.ready_notifier())
.map_err(|e| anyhow::anyhow!(e))?;
}
}
RunMode::HistoricalFrom(time) => {
if self.trigger.is_none() {
state.add_callback(time);
}
}
}
Ok(())
}
fn teardown(&mut self, _: &mut GraphState) -> anyhow::Result<()> {
self.receiver.teardown();
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::channel::{Message, channel_pair};
use crate::queue::ValueAt;
use std::cell::RefCell;
/// Regression: a historical-mode receiver fed multiple ticks that share the
/// same engine time, delivered as separate messages in a single burst, must
/// surface them all at that time without driving engine time backwards.
///
/// The producer publishes two historical values stamped at t=100 as two
/// distinct `HistoricalValue` messages (exactly what an upstream burst looks
/// like once it has been fanned out onto the channel), then ends the stream.
///
/// Previously the drain in `ChannelReceiverStream::cycle` stopped reading
/// after the first value reached the current engine time and cleared
/// `message_time`. The graph's strict monotonic clock then advanced to t=101
/// before the second same-time message was read, so the receiver observed a
/// message whose timestamp (100) was now in the past and aborted the run with
/// "received Historical message but with time less than graph time, 100 < 101".
///
/// The fix drains every message already buffered at the current time
/// (non-blocking) before yielding, so a same-time burst is surfaced as one
/// burst instead of being split across cycles.
#[test]
fn same_time_burst_does_not_break_monotonic_engine_time() {
let (sender, receiver) = channel_pair::<u64>(None);
// A burst of two ticks at the *same* engine time, fanned out as separate
// messages onto the channel.
sender
.send_message(Message::HistoricalValue(ValueAt::new(
1u64,
NanoTime::new(100),
)))
.unwrap();
sender
.send_message(Message::HistoricalValue(ValueAt::new(
2u64,
NanoTime::new(100),
)))
.unwrap();
sender.send_message(Message::EndOfStream).unwrap();
// Drop the sending end so `teardown()`'s "wait for sender to close" check
// sees the channel closed.
drop(sender);
let receiver_stream = Rc::new(RefCell::new(ChannelReceiverStream::new(
receiver, None, None,
)));
let collected = receiver_stream.as_stream().collect();
collected
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.expect("same-time burst must not abort the historical run");
// Flatten every burst the receiver emitted, pairing each value with the
// engine time it was delivered at.
let delivered: Vec<(u64, NanoTime)> = collected
.peek_value()
.iter()
.flat_map(|burst| burst.value.iter().map(|v| (*v, burst.time)))
.collect();
// Both values must be delivered, and both at t=100 (their real time).
assert_eq!(
delivered,
vec![(1, NanoTime::new(100)), (2, NanoTime::new(100))],
"expected both same-time values delivered at t=100, got {delivered:?}"
);
}
}