dora_node_api/event_stream/
mod.rs

1use std::{
2    collections::{BTreeMap, HashMap, VecDeque},
3    path::PathBuf,
4    pin::pin,
5    sync::Arc,
6    time::Duration,
7};
8
9use dora_message::{
10    DataflowId,
11    daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
12    id::DataId,
13    node_to_daemon::{DaemonRequest, Timestamped},
14};
15pub use event::{Event, StopCause};
16use futures::{
17    FutureExt, Stream, StreamExt,
18    future::{Either, select},
19};
20use futures_timer::Delay;
21use scheduler::{NON_INPUT_EVENT, Scheduler};
22
23use self::thread::{EventItem, EventStreamThreadHandle};
24use crate::{
25    DaemonCommunicationWrapper,
26    daemon_connection::{DaemonChannel, node_integration_testing::convert_output_to_json},
27    event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData},
28};
29use dora_core::{
30    config::{Input, NodeId},
31    uhlc,
32};
33use eyre::{Context, eyre};
34
35pub use scheduler::Scheduler as EventScheduler;
36
37mod data_conversion;
38mod event;
39pub mod merged;
40mod scheduler;
41mod thread;
42
43/// Asynchronous iterator over the incoming [`Event`]s destined for this node.
44///
45/// This struct [implements](#impl-Stream-for-EventStream) the [`Stream`] trait,
46/// so you can use methods of the [`StreamExt`] trait
47/// on this struct. A common pattern is `while let Some(event) = event_stream.next().await`.
48///
49/// Nodes should iterate over this event stream and react to events that they are interested in.
50/// Typically, the most important event type is [`Event::Input`].
51/// You don't need to handle all events, it's fine to ignore events that are not relevant to your node.
52///
53/// The event stream will close itself after a [`Event::Stop`] was received.
54/// A manual `break` on [`Event::Stop`] is typically not needed.
55/// _(You probably do need to use a manual `break` on stop events when using the
56/// [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on
57/// [`EventStream`] to combine the stream with an external one.)_
58///
59/// Once the event stream finished, nodes should exit.
60/// Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type
61/// [`StopCause::Manual`] was received.
62pub struct EventStream {
63    node_id: NodeId,
64    receiver: flume::r#async::RecvStream<'static, EventItem>,
65    _thread_handle: EventStreamThreadHandle,
66    close_channel: DaemonChannel,
67    clock: Arc<uhlc::HLC>,
68    scheduler: Scheduler,
69    write_events_to: Option<WriteEventsTo>,
70    start_timestamp: uhlc::Timestamp,
71    use_scheduler: bool,
72}
73
74impl EventStream {
75    #[tracing::instrument(level = "trace", skip(clock))]
76    pub(crate) fn init(
77        dataflow_id: DataflowId,
78        node_id: &NodeId,
79        daemon_communication: &DaemonCommunicationWrapper,
80        input_config: BTreeMap<DataId, Input>,
81        clock: Arc<uhlc::HLC>,
82        write_events_to: Option<PathBuf>,
83    ) -> eyre::Result<Self> {
84        let channel = match daemon_communication {
85            DaemonCommunicationWrapper::Standard(daemon_communication) => {
86                match daemon_communication {
87                    DaemonCommunication::Shmem {
88                        daemon_events_region_id,
89                        ..
90                    } => unsafe { DaemonChannel::new_shmem(daemon_events_region_id) }
91                        .wrap_err_with(|| {
92                            format!("failed to create shmem event stream for node `{node_id}`")
93                        })?,
94                    DaemonCommunication::Tcp { socket_addr } => {
95                        DaemonChannel::new_tcp(*socket_addr).wrap_err_with(|| {
96                            format!("failed to connect event stream for node `{node_id}`")
97                        })?
98                    }
99                    #[cfg(unix)]
100                    DaemonCommunication::UnixDomain { socket_file } => {
101                        DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
102                            format!("failed to connect event stream for node `{node_id}`")
103                        })?
104                    }
105                    DaemonCommunication::Interactive => {
106                        DaemonChannel::Interactive(Default::default())
107                    }
108                }
109            }
110
111            DaemonCommunicationWrapper::Testing { channel } => {
112                DaemonChannel::IntegrationTestChannel(channel.clone())
113            }
114        };
115
116        let close_channel = match daemon_communication {
117            DaemonCommunicationWrapper::Standard(daemon_communication) => {
118                match daemon_communication {
119                    DaemonCommunication::Shmem {
120                        daemon_events_close_region_id,
121                        ..
122                    } => unsafe { DaemonChannel::new_shmem(daemon_events_close_region_id) }
123                        .wrap_err_with(|| {
124                            format!(
125                                "failed to create shmem event close channel for node `{node_id}`"
126                            )
127                        })?,
128                    DaemonCommunication::Tcp { socket_addr } => {
129                        DaemonChannel::new_tcp(*socket_addr).wrap_err_with(|| {
130                            format!("failed to connect event close channel for node `{node_id}`")
131                        })?
132                    }
133                    #[cfg(unix)]
134                    DaemonCommunication::UnixDomain { socket_file } => {
135                        DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
136                            format!("failed to connect event close channel for node `{node_id}`")
137                        })?
138                    }
139                    DaemonCommunication::Interactive => {
140                        DaemonChannel::Interactive(Default::default())
141                    }
142                }
143            }
144            DaemonCommunicationWrapper::Testing { channel } => {
145                DaemonChannel::IntegrationTestChannel(channel.clone())
146            }
147        };
148
149        let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
150            .iter()
151            .map(|(input, config)| {
152                (
153                    input.clone(),
154                    (config.queue_size.unwrap_or(1), VecDeque::new()),
155                )
156            })
157            .collect();
158
159        queue_size_limit.insert(
160            DataId::from(NON_INPUT_EVENT.to_string()),
161            (1_000, VecDeque::new()),
162        );
163
164        let scheduler = Scheduler::new(queue_size_limit);
165
166        let write_events_to = match write_events_to {
167            Some(path) => {
168                if let Some(parent) = path.parent() {
169                    std::fs::create_dir_all(parent).wrap_err_with(|| {
170                        format!(
171                            "failed to create parent directories for event output file `{}` for node `{}`",
172                            path.display(),
173                            node_id
174                        )
175                    })?;
176                }
177
178                let file = std::fs::File::create(&path).wrap_err_with(|| {
179                    format!(
180                        "failed to create event output file `{}` for node `{}`",
181                        path.display(),
182                        node_id
183                    )
184                })?;
185
186                Some(WriteEventsTo {
187                    node_id: node_id.clone(),
188                    file,
189                    events_buffer: Vec::new(),
190                })
191            }
192            None => None,
193        };
194
195        Self::init_on_channel(
196            dataflow_id,
197            node_id,
198            channel,
199            close_channel,
200            clock,
201            scheduler,
202            write_events_to,
203        )
204    }
205
206    pub(crate) fn init_on_channel(
207        dataflow_id: DataflowId,
208        node_id: &NodeId,
209        mut channel: DaemonChannel,
210        mut close_channel: DaemonChannel,
211        clock: Arc<uhlc::HLC>,
212        scheduler: Scheduler,
213        write_events_to: Option<WriteEventsTo>,
214    ) -> eyre::Result<Self> {
215        channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
216        let reply = channel
217            .request(&Timestamped {
218                inner: DaemonRequest::Subscribe,
219                timestamp: clock.new_timestamp(),
220            })
221            .map_err(|e| eyre!(e))
222            .wrap_err("failed to create subscription with dora-daemon")?;
223
224        match reply {
225            DaemonReply::Result(Ok(())) => {}
226            DaemonReply::Result(Err(err)) => {
227                eyre::bail!("subscribe failed: {err}")
228            }
229            other => eyre::bail!("unexpected subscribe reply: {other:?}"),
230        }
231
232        close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
233
234        let (tx, rx) = flume::bounded(100_000_000);
235
236        let use_scheduler = match &channel {
237            DaemonChannel::IntegrationTestChannel(_) => {
238                // don't use the scheduler for integration tests because it leads to
239                // non-deterministic event ordering
240                false
241            }
242            _ => true,
243        };
244
245        let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;
246
247        Ok(EventStream {
248            node_id: node_id.clone(),
249            receiver: rx.into_stream(),
250            _thread_handle: thread_handle,
251            close_channel,
252            start_timestamp: clock.new_timestamp(),
253            clock,
254            scheduler,
255            write_events_to,
256            use_scheduler,
257        })
258    }
259
260    /// Synchronously waits for the next event.
261    ///
262    /// Blocks the thread until the next event arrives.
263    /// Returns [`None`] once the event stream is closed.
264    ///
265    /// For an asynchronous variant of this method see [`recv_async`][Self::recv_async].
266    ///
267    /// ## Event Reordering
268    ///
269    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
270    /// events might be returned in a different order than they occurred. For details, check the
271    /// documentation of the [`EventScheduler`] struct.
272    ///
273    /// If you want to receive the events in their original chronological order, use the
274    /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the
275    /// [`Stream`] trait).
276    pub fn recv(&mut self) -> Option<Event> {
277        futures::executor::block_on(self.recv_async())
278    }
279
280    /// Receives the next incoming [`Event`] synchronously with a timeout.
281    ///
282    /// Blocks the thread until the next event arrives or the timeout is reached.
283    /// Returns a [`Event::Error`] if no event was received within the given duration.
284    ///
285    /// Returns [`None`] once the event stream is closed.
286    ///
287    /// For an asynchronous variant of this method see [`recv_async_timeout`][Self::recv_async_timeout].
288    ///
289    /// ## Event Reordering
290    ///
291    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
292    /// events might be returned in a different order than they occurred. For details, check the
293    /// documentation of the [`EventScheduler`] struct.
294    ///
295    /// If you want to receive the events in their original chronological order, use the
296    /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the
297    /// [`Stream`] trait).
298    pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
299        futures::executor::block_on(self.recv_async_timeout(dur))
300    }
301
302    /// Receives the next incoming [`Event`] asynchronously, using an [`EventScheduler`] for fairness.
303    ///
304    /// Returns [`None`] once the event stream is closed.
305    ///
306    /// ## Event Reordering
307    ///
308    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
309    /// events might be returned in a different order than they occurred. For details, check the
310    /// documentation of the [`EventScheduler`] struct.
311    ///
312    /// If you want to receive the events in their original chronological order, use the
313    /// [`StreamExt::next`] method with a custom timeout future instead
314    /// ([`EventStream`] implements the [`Stream`] trait).
315    pub async fn recv_async(&mut self) -> Option<Event> {
316        if !self.use_scheduler {
317            return self.receiver.next().await.map(Self::convert_event_item);
318        }
319        loop {
320            if self.scheduler.is_empty() {
321                if let Some(event) = self.receiver.next().await {
322                    self.add_event(event);
323                } else {
324                    break;
325                }
326            } else {
327                match self.receiver.next().now_or_never().flatten() {
328                    Some(event) => self.add_event(event),
329                    None => break, // no other ready events
330                };
331            }
332        }
333        let event = self.scheduler.next();
334        event.map(Self::convert_event_item)
335    }
336
337    /// Check if there are any buffered events in the scheduler or the receiver.
338    pub fn is_empty(&self) -> bool {
339        self.scheduler.is_empty() & self.receiver.is_empty()
340    }
341
342    fn add_event(&mut self, event: EventItem) {
343        self.record_event(&event).unwrap();
344        self.scheduler.add_event(event);
345    }
346
347    fn record_event(&mut self, event: &EventItem) -> eyre::Result<()> {
348        if let Some(write_events_to) = &mut self.write_events_to {
349            let event_json = match event {
350                EventItem::NodeEvent { event, .. } => match event {
351                    NodeEvent::Stop => {
352                        let time_offset = self
353                            .clock
354                            .new_timestamp()
355                            .get_diff_duration(&self.start_timestamp);
356                        let event_json = serde_json::json!({
357                            "type": "Stop",
358                            "time_offset_secs": time_offset.as_secs_f64(),
359                        });
360                        Some(event_json)
361                    }
362                    NodeEvent::Reload { .. } => None,
363                    NodeEvent::Input { id, metadata, data } => {
364                        let mut event_json = convert_output_to_json(
365                            id,
366                            metadata,
367                            data,
368                            self.start_timestamp,
369                            false,
370                        )?;
371                        event_json.insert("type".into(), "Input".into());
372                        Some(event_json.into())
373                    }
374                    NodeEvent::InputClosed { id } => {
375                        let time_offset = self
376                            .clock
377                            .new_timestamp()
378                            .get_diff_duration(&self.start_timestamp);
379                        let event_json = serde_json::json!({
380                            "type": "InputClosed",
381                            "id": id.to_string(),
382                            "time_offset_secs": time_offset.as_secs_f64(),
383                        });
384                        Some(event_json)
385                    }
386                    NodeEvent::AllInputsClosed => {
387                        let time_offset = self
388                            .clock
389                            .new_timestamp()
390                            .get_diff_duration(&self.start_timestamp);
391                        let event_json = serde_json::json!({
392                            "type": "AllInputsClosed",
393                            "time_offset_secs": time_offset.as_secs_f64(),
394                        });
395                        Some(event_json)
396                    }
397                },
398                _ => None,
399            };
400            if let Some(event_json) = event_json {
401                write_events_to.events_buffer.push(event_json);
402            }
403        }
404        Ok(())
405    }
406
407    /// Receives the next buffered [`Event`] (if any) without blocking, using an
408    /// [`EventScheduler`] for fairness.
409    ///
410    /// Returns [`TryRecvError::Empty`] if no event is available right now.
411    /// Returns [`TryRecvError::Closed`] once the event stream is closed.
412    ///
413    /// This method never blocks and is safe to use in asynchronous contexts.
414    ///
415    /// ## Event Reordering
416    ///
417    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
418    /// events might be returned in a different order than they occurred. For details, check the
419    /// documentation of the [`EventScheduler`] struct.
420    ///
421    /// If you want to receive the events in their original chronological order, use the
422    /// [`StreamExt::next`] method with a custom timeout future instead
423    /// ([`EventStream`] implements the [`Stream`] trait).
424    pub fn try_recv(&mut self) -> Result<Event, TryRecvError> {
425        match self.recv_async().now_or_never() {
426            Some(Some(event)) => Ok(event),
427            Some(None) => Err(TryRecvError::Closed),
428            None => Err(TryRecvError::Empty),
429        }
430    }
431
432    /// Receives all buffered [`Event`]s without blocking, using an [`EventScheduler`] for fairness.
433    ///
434    /// Return `Some(Vec::new())` if no events are ready.
435    /// Returns [`None`] once the event stream is closed and no events are buffered anymore.
436    ///
437    /// This method never blocks and is safe to use in asynchronous contexts.
438    ///
439    /// This method is equivalent to repeatedly calling [`try_recv`][Self::try_recv]. See its docs
440    /// for details on event reordering.
441    pub fn drain(&mut self) -> Option<Vec<Event>> {
442        let mut events = Vec::new();
443        loop {
444            match self.try_recv() {
445                Ok(event) => events.push(event),
446                Err(TryRecvError::Empty) => break,
447                Err(TryRecvError::Closed) => {
448                    if events.is_empty() {
449                        return None;
450                    } else {
451                        break;
452                    }
453                }
454            }
455        }
456        Some(events)
457    }
458
459    /// Receives the next incoming [`Event`] asynchronously with a timeout.
460    ///
461    /// Returns a [`Event::Error`] if no event was received within the given duration.
462    ///
463    /// Returns [`None`] once the event stream is closed.
464    ///
465    /// ## Event Reordering
466    ///
467    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
468    /// events might be returned in a different order than they occurred. For details, check the
469    /// documentation of the [`EventScheduler`] struct.
470    ///
471    /// If you want to receive the events in their original chronological order, use the
472    /// [`StreamExt::next`] method with a custom timeout future instead
473    /// ([`EventStream`] implements the [`Stream`] trait).
474    pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
475        match select(Delay::new(dur), pin!(self.recv_async())).await {
476            Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
477                eyre!("Receiver timed out"),
478            ))),
479            Either::Right((event, _)) => event,
480        }
481    }
482
483    fn convert_event_item(item: EventItem) -> Event {
484        match item {
485            EventItem::NodeEvent { event, ack_channel } => match event {
486                NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
487                NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
488                NodeEvent::InputClosed { id } => Event::InputClosed { id },
489                NodeEvent::Input { id, metadata, data } => {
490                    let data = data_to_arrow_array(data, &metadata, ack_channel);
491                    match data {
492                        Ok(data) => Event::Input {
493                            id,
494                            metadata,
495                            data: data.into(),
496                        },
497                        Err(err) => Event::Error(format!("{err:?}")),
498                    }
499                }
500                NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed),
501            },
502
503            EventItem::FatalError(err) => {
504                Event::Error(format!("fatal event stream error: {err:?}"))
505            }
506            EventItem::TimeoutError(err) => {
507                Event::Error(format!("Timeout event stream error: {err:?}"))
508            }
509        }
510    }
511}
512
513/// No event is available right now or the event stream has been closed.
514#[derive(Debug)]
515pub enum TryRecvError {
516    /// No new event is available right now.
517    Empty,
518    /// The event stream has been closed.
519    Closed,
520}
521
522pub fn data_to_arrow_array(
523    data: Option<DataMessage>,
524    metadata: &dora_message::metadata::Metadata,
525    drop_channel: flume::Sender<()>,
526) -> eyre::Result<Arc<dyn arrow::array::Array>> {
527    let data = match data {
528        None => Ok(None),
529        Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
530        Some(DataMessage::SharedMemory {
531            shared_memory_id,
532            len,
533            drop_token: _, // handled in `event_stream_loop`
534        }) => unsafe {
535            MappedInputData::map(&shared_memory_id, len).map(|data| {
536                Some(RawData::SharedMemory(SharedMemoryData {
537                    data,
538                    _drop: drop_channel,
539                }))
540            })
541        },
542    };
543
544    data.and_then(|data| {
545        let raw_data = data.unwrap_or(RawData::Empty);
546        raw_data
547            .into_arrow_array(&metadata.type_info)
548            .map(arrow::array::make_array)
549    })
550}
551
552impl Stream for EventStream {
553    type Item = Event;
554
555    fn poll_next(
556        mut self: std::pin::Pin<&mut Self>,
557        cx: &mut std::task::Context<'_>,
558    ) -> std::task::Poll<Option<Self::Item>> {
559        self.receiver
560            .poll_next_unpin(cx)
561            .map(|item| item.map(Self::convert_event_item))
562    }
563}
564
565impl Drop for EventStream {
566    fn drop(&mut self) {
567        let request = Timestamped {
568            inner: DaemonRequest::EventStreamDropped,
569            timestamp: self.clock.new_timestamp(),
570        };
571        let result = self
572            .close_channel
573            .request(&request)
574            .map_err(|e| eyre!(e))
575            .wrap_err("failed to signal event stream closure to dora-daemon")
576            .and_then(|r| match r {
577                DaemonReply::Result(Ok(())) => Ok(()),
578                DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
579                other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
580            });
581        if let Err(err) = result {
582            tracing::warn!("{err:?}")
583        }
584
585        if let Some(write_events_to) = self.write_events_to.take() {
586            if let Err(err) = write_events_to.write_out() {
587                tracing::warn!(
588                    "failed to write out events for node {}: {err:?}",
589                    self.node_id
590                );
591            }
592        }
593    }
594}
595
596pub(crate) struct WriteEventsTo {
597    node_id: NodeId,
598    file: std::fs::File,
599    events_buffer: Vec<serde_json::Value>,
600}
601
602impl WriteEventsTo {
603    fn write_out(self) -> eyre::Result<()> {
604        let Self {
605            node_id,
606            file,
607            events_buffer,
608        } = self;
609        let mut inputs_file = serde_json::Map::new();
610        inputs_file.insert("id".into(), node_id.to_string().into());
611        inputs_file.insert("events".into(), events_buffer.into());
612
613        serde_json::to_writer_pretty(file, &inputs_file)
614            .context("failed to write events to file")?;
615        Ok(())
616    }
617}