dora_node_api/event_stream/
mod.rs

1use std::{
2    collections::{BTreeMap, HashMap, VecDeque},
3    pin::pin,
4    sync::Arc,
5    time::Duration,
6};
7
8use dora_message::{
9    DataflowId,
10    daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
11    id::DataId,
12    node_to_daemon::{DaemonRequest, Timestamped},
13};
14pub use event::{Event, StopCause};
15use futures::{
16    Stream, StreamExt,
17    future::{Either, select},
18};
19use futures_timer::Delay;
20use scheduler::{NON_INPUT_EVENT, Scheduler};
21
22use self::thread::{EventItem, EventStreamThreadHandle};
23use crate::{
24    daemon_connection::DaemonChannel,
25    event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData},
26};
27use dora_core::{
28    config::{Input, NodeId},
29    uhlc,
30};
31use eyre::{Context, eyre};
32
33pub use scheduler::Scheduler as EventScheduler;
34
35mod data_conversion;
36mod event;
37pub mod merged;
38mod scheduler;
39mod thread;
40
41/// Asynchronous iterator over the incoming [`Event`]s destined for this node.
42///
43/// This struct [implements](#impl-Stream-for-EventStream) the [`Stream`] trait,
44/// so you can use methods of the [`StreamExt`] trait
45/// on this struct. A common pattern is `while let Some(event) = event_stream.next().await`.
46///
47/// Nodes should iterate over this event stream and react to events that they are interested in.
48/// Typically, the most important event type is [`Event::Input`].
49/// You don't need to handle all events, it's fine to ignore events that are not relevant to your node.
50///
51/// The event stream will close itself after a [`Event::Stop`] was received.
52/// A manual `break` on [`Event::Stop`] is typically not needed.
53/// _(You probably do need to use a manual `break` on stop events when using the
54/// [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on
55/// [`EventStream`] to combine the stream with an external one.)_
56///
57/// Once the event stream finished, nodes should exit.
58/// Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type
59/// [`StopCause::Manual`] was received.
60pub struct EventStream {
61    node_id: NodeId,
62    receiver: flume::r#async::RecvStream<'static, EventItem>,
63    _thread_handle: EventStreamThreadHandle,
64    close_channel: DaemonChannel,
65    clock: Arc<uhlc::HLC>,
66    scheduler: Scheduler,
67}
68
69impl EventStream {
70    #[tracing::instrument(level = "trace", skip(clock))]
71    pub(crate) fn init(
72        dataflow_id: DataflowId,
73        node_id: &NodeId,
74        daemon_communication: &DaemonCommunication,
75        input_config: BTreeMap<DataId, Input>,
76        clock: Arc<uhlc::HLC>,
77    ) -> eyre::Result<Self> {
78        let channel = match daemon_communication {
79            DaemonCommunication::Shmem {
80                daemon_events_region_id,
81                ..
82            } => unsafe { DaemonChannel::new_shmem(daemon_events_region_id) }.wrap_err_with(
83                || format!("failed to create shmem event stream for node `{node_id}`"),
84            )?,
85            DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
86                .wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?,
87            #[cfg(unix)]
88            DaemonCommunication::UnixDomain { socket_file } => {
89                DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
90                    format!("failed to connect event stream for node `{node_id}`")
91                })?
92            }
93        };
94
95        let close_channel = match daemon_communication {
96            DaemonCommunication::Shmem {
97                daemon_events_close_region_id,
98                ..
99            } => unsafe { DaemonChannel::new_shmem(daemon_events_close_region_id) }.wrap_err_with(
100                || format!("failed to create shmem event close channel for node `{node_id}`"),
101            )?,
102            DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
103                .wrap_err_with(|| {
104                    format!("failed to connect event close channel for node `{node_id}`")
105                })?,
106            #[cfg(unix)]
107            DaemonCommunication::UnixDomain { socket_file } => {
108                DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
109                    format!("failed to connect event close channel for node `{node_id}`")
110                })?
111            }
112        };
113
114        let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
115            .iter()
116            .map(|(input, config)| {
117                (
118                    input.clone(),
119                    (config.queue_size.unwrap_or(1), VecDeque::new()),
120                )
121            })
122            .collect();
123
124        queue_size_limit.insert(
125            DataId::from(NON_INPUT_EVENT.to_string()),
126            (1_000, VecDeque::new()),
127        );
128
129        let scheduler = Scheduler::new(queue_size_limit);
130
131        Self::init_on_channel(
132            dataflow_id,
133            node_id,
134            channel,
135            close_channel,
136            clock,
137            scheduler,
138        )
139    }
140
141    pub(crate) fn init_on_channel(
142        dataflow_id: DataflowId,
143        node_id: &NodeId,
144        mut channel: DaemonChannel,
145        mut close_channel: DaemonChannel,
146        clock: Arc<uhlc::HLC>,
147        scheduler: Scheduler,
148    ) -> eyre::Result<Self> {
149        channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
150        let reply = channel
151            .request(&Timestamped {
152                inner: DaemonRequest::Subscribe,
153                timestamp: clock.new_timestamp(),
154            })
155            .map_err(|e| eyre!(e))
156            .wrap_err("failed to create subscription with dora-daemon")?;
157
158        match reply {
159            DaemonReply::Result(Ok(())) => {}
160            DaemonReply::Result(Err(err)) => {
161                eyre::bail!("subscribe failed: {err}")
162            }
163            other => eyre::bail!("unexpected subscribe reply: {other:?}"),
164        }
165
166        close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
167
168        let (tx, rx) = flume::bounded(100_000_000);
169
170        let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;
171
172        Ok(EventStream {
173            node_id: node_id.clone(),
174            receiver: rx.into_stream(),
175            _thread_handle: thread_handle,
176            close_channel,
177            clock,
178            scheduler,
179        })
180    }
181
182    /// Synchronously waits for the next event.
183    ///
184    /// Blocks the thread until the next event arrives.
185    /// Returns [`None`] once the event stream is closed.
186    ///
187    /// For an asynchronous variant of this method see [`recv_async`][Self::recv_async].
188    ///
189    /// ## Event Reordering
190    ///
191    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
192    /// events might be returned in a different order than they occurred. For details, check the
193    /// documentation of the [`EventScheduler`] struct.
194    ///
195    /// If you want to receive the events in their original chronological order, use the
196    /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the
197    /// [`Stream`] trait).
198    pub fn recv(&mut self) -> Option<Event> {
199        futures::executor::block_on(self.recv_async())
200    }
201
202    /// Receives the next incoming [`Event`] synchronously with a timeout.
203    ///
204    /// Blocks the thread until the next event arrives or the timeout is reached.
205    /// Returns a [`Event::Error`] if no event was received within the given duration.
206    ///
207    /// Returns [`None`] once the event stream is closed.
208    ///
209    /// For an asynchronous variant of this method see [`recv_async_timeout`][Self::recv_async_timeout].
210    ///
211    /// ## Event Reordering
212    ///
213    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
214    /// events might be returned in a different order than they occurred. For details, check the
215    /// documentation of the [`EventScheduler`] struct.
216    ///
217    /// If you want to receive the events in their original chronological order, use the
218    /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the
219    /// [`Stream`] trait).
220    pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
221        futures::executor::block_on(self.recv_async_timeout(dur))
222    }
223
224    /// Receives the next incoming [`Event`] asynchronously, using an [`EventScheduler`] for fairness.
225    ///
226    /// Returns [`None`] once the event stream is closed.
227    ///
228    /// ## Event Reordering
229    ///
230    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
231    /// events might be returned in a different order than they occurred. For details, check the
232    /// documentation of the [`EventScheduler`] struct.
233    ///
234    /// If you want to receive the events in their original chronological order, use the
235    /// [`StreamExt::next`] method with a custom timeout future instead
236    /// ([`EventStream`] implements the [`Stream`] trait).
237    pub async fn recv_async(&mut self) -> Option<Event> {
238        loop {
239            if self.scheduler.is_empty() {
240                if let Some(event) = self.receiver.next().await {
241                    self.scheduler.add_event(event);
242                } else {
243                    break;
244                }
245            } else {
246                match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await {
247                    Either::Left((_elapsed, _)) => break,
248                    Either::Right((Some(event), _)) => self.scheduler.add_event(event),
249                    Either::Right((None, _)) => break,
250                };
251            }
252        }
253        let event = self.scheduler.next();
254        event.map(Self::convert_event_item)
255    }
256
257    /// Receives the next incoming [`Event`] asynchronously with a timeout.
258    ///
259    /// Returns a [`Event::Error`] if no event was received within the given duration.
260    ///
261    /// Returns [`None`] once the event stream is closed.
262    ///
263    /// ## Event Reordering
264    ///
265    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
266    /// events might be returned in a different order than they occurred. For details, check the
267    /// documentation of the [`EventScheduler`] struct.
268    ///
269    /// If you want to receive the events in their original chronological order, use the
270    /// [`StreamExt::next`] method with a custom timeout future instead
271    /// ([`EventStream`] implements the [`Stream`] trait).
272    pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
273        match select(Delay::new(dur), pin!(self.recv_async())).await {
274            Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
275                eyre!("Receiver timed out"),
276            ))),
277            Either::Right((event, _)) => event,
278        }
279    }
280
281    fn convert_event_item(item: EventItem) -> Event {
282        match item {
283            EventItem::NodeEvent { event, ack_channel } => match event {
284                NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
285                NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
286                NodeEvent::InputClosed { id } => Event::InputClosed { id },
287                NodeEvent::Input { id, metadata, data } => {
288                    let data = match data {
289                        None => Ok(None),
290                        Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
291                        Some(DataMessage::SharedMemory {
292                            shared_memory_id,
293                            len,
294                            drop_token: _, // handled in `event_stream_loop`
295                        }) => unsafe {
296                            MappedInputData::map(&shared_memory_id, len).map(|data| {
297                                Some(RawData::SharedMemory(SharedMemoryData {
298                                    data,
299                                    _drop: ack_channel,
300                                }))
301                            })
302                        },
303                    };
304                    let data = data.and_then(|data| {
305                        let raw_data = data.unwrap_or(RawData::Empty);
306                        raw_data
307                            .into_arrow_array(&metadata.type_info)
308                            .map(arrow::array::make_array)
309                    });
310                    match data {
311                        Ok(data) => Event::Input {
312                            id,
313                            metadata,
314                            data: data.into(),
315                        },
316                        Err(err) => Event::Error(format!("{err:?}")),
317                    }
318                }
319                NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed),
320            },
321
322            EventItem::FatalError(err) => {
323                Event::Error(format!("fatal event stream error: {err:?}"))
324            }
325            EventItem::TimeoutError(err) => {
326                Event::Error(format!("Timeout event stream error: {err:?}"))
327            }
328        }
329    }
330}
331
332impl Stream for EventStream {
333    type Item = Event;
334
335    fn poll_next(
336        mut self: std::pin::Pin<&mut Self>,
337        cx: &mut std::task::Context<'_>,
338    ) -> std::task::Poll<Option<Self::Item>> {
339        self.receiver
340            .poll_next_unpin(cx)
341            .map(|item| item.map(Self::convert_event_item))
342    }
343}
344
345impl Drop for EventStream {
346    #[tracing::instrument(skip(self), fields(%self.node_id))]
347    fn drop(&mut self) {
348        let request = Timestamped {
349            inner: DaemonRequest::EventStreamDropped,
350            timestamp: self.clock.new_timestamp(),
351        };
352        let result = self
353            .close_channel
354            .request(&request)
355            .map_err(|e| eyre!(e))
356            .wrap_err("failed to signal event stream closure to dora-daemon")
357            .and_then(|r| match r {
358                DaemonReply::Result(Ok(())) => Ok(()),
359                DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
360                other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
361            });
362        if let Err(err) = result {
363            tracing::warn!("{err:?}")
364        }
365    }
366}