Skip to main content

dora_node_api/event_stream/
mod.rs

1use std::{
2    collections::{BTreeMap, HashMap, VecDeque},
3    path::PathBuf,
4    pin::pin,
5    sync::Arc,
6    time::Duration,
7};
8
9use dora_message::{
10    DataflowId,
11    daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
12    id::DataId,
13    node_to_daemon::{DaemonRequest, Timestamped},
14};
15pub use event::{Event, StopCause};
16use futures::{
17    FutureExt, Stream, StreamExt,
18    future::{Either, select},
19};
20use futures_timer::Delay;
21use scheduler::{NON_INPUT_EVENT, Scheduler};
22
23use self::thread::{EventItem, EventStreamThreadHandle};
24use crate::{
25    DaemonCommunicationWrapper,
26    daemon_connection::{DaemonChannel, node_integration_testing::convert_output_to_json},
27    event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData},
28};
29use dora_core::{
30    config::{Input, NodeId},
31    uhlc,
32};
33use eyre::{Context, eyre};
34
35pub use scheduler::Scheduler as EventScheduler;
36
37mod data_conversion;
38mod event;
39pub mod merged;
40mod scheduler;
41mod thread;
42
43/// Asynchronous iterator over the incoming [`Event`]s destined for this node.
44///
45/// This struct [implements](#impl-Stream-for-EventStream) the [`Stream`] trait,
46/// so you can use methods of the [`StreamExt`] trait
47/// on this struct. A common pattern is `while let Some(event) = event_stream.next().await`.
48///
49/// Nodes should iterate over this event stream and react to events that they are interested in.
50/// Typically, the most important event type is [`Event::Input`].
51/// You don't need to handle all events, it's fine to ignore events that are not relevant to your node.
52///
53/// The event stream will close itself after a [`Event::Stop`] was received.
54/// A manual `break` on [`Event::Stop`] is typically not needed.
55/// _(You probably do need to use a manual `break` on stop events when using the
56/// [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on
57/// [`EventStream`] to combine the stream with an external one.)_
58///
59/// Once the event stream finished, nodes should exit.
60/// Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type
61/// [`StopCause::Manual`] was received.
62pub struct EventStream {
63    node_id: NodeId,
64    receiver: flume::r#async::RecvStream<'static, EventItem>,
65    _thread_handle: EventStreamThreadHandle,
66    close_channel: DaemonChannel,
67    clock: Arc<uhlc::HLC>,
68    scheduler: Scheduler,
69    write_events_to: Option<WriteEventsTo>,
70    start_timestamp: uhlc::Timestamp,
71    use_scheduler: bool,
72}
73
74impl EventStream {
75    #[tracing::instrument(level = "trace", skip(clock))]
76    pub(crate) fn init(
77        dataflow_id: DataflowId,
78        node_id: &NodeId,
79        daemon_communication: &DaemonCommunicationWrapper,
80        input_config: BTreeMap<DataId, Input>,
81        clock: Arc<uhlc::HLC>,
82        write_events_to: Option<PathBuf>,
83    ) -> eyre::Result<Self> {
84        let channel = match daemon_communication {
85            DaemonCommunicationWrapper::Standard(daemon_communication) => {
86                match daemon_communication {
87                    DaemonCommunication::Tcp { socket_addr } => {
88                        DaemonChannel::new_tcp(*socket_addr).wrap_err_with(|| {
89                            format!("failed to connect event stream for node `{node_id}`")
90                        })?
91                    }
92                    #[cfg(unix)]
93                    DaemonCommunication::UnixDomain { socket_file } => {
94                        DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
95                            format!("failed to connect event stream for node `{node_id}`")
96                        })?
97                    }
98                    DaemonCommunication::Interactive => {
99                        DaemonChannel::Interactive(Default::default())
100                    }
101                }
102            }
103
104            DaemonCommunicationWrapper::Testing { channel } => {
105                DaemonChannel::IntegrationTestChannel(channel.clone())
106            }
107        };
108
109        let close_channel = match daemon_communication {
110            DaemonCommunicationWrapper::Standard(daemon_communication) => {
111                match daemon_communication {
112                    DaemonCommunication::Tcp { socket_addr } => {
113                        DaemonChannel::new_tcp(*socket_addr).wrap_err_with(|| {
114                            format!("failed to connect event close channel for node `{node_id}`")
115                        })?
116                    }
117                    #[cfg(unix)]
118                    DaemonCommunication::UnixDomain { socket_file } => {
119                        DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
120                            format!("failed to connect event close channel for node `{node_id}`")
121                        })?
122                    }
123                    DaemonCommunication::Interactive => {
124                        DaemonChannel::Interactive(Default::default())
125                    }
126                }
127            }
128            DaemonCommunicationWrapper::Testing { channel } => {
129                DaemonChannel::IntegrationTestChannel(channel.clone())
130            }
131        };
132
133        let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
134            .iter()
135            .map(|(input, config)| {
136                (
137                    input.clone(),
138                    (config.queue_size.unwrap_or(1), VecDeque::new()),
139                )
140            })
141            .collect();
142
143        queue_size_limit.insert(
144            DataId::from(NON_INPUT_EVENT.to_string()),
145            (1_000, VecDeque::new()),
146        );
147
148        let scheduler = Scheduler::new(queue_size_limit);
149
150        let write_events_to = match write_events_to {
151            Some(path) => {
152                if let Some(parent) = path.parent() {
153                    std::fs::create_dir_all(parent).wrap_err_with(|| {
154                        format!(
155                            "failed to create parent directories for event output file `{}` for node `{}`",
156                            path.display(),
157                            node_id
158                        )
159                    })?;
160                }
161
162                let file = std::fs::File::create(&path).wrap_err_with(|| {
163                    format!(
164                        "failed to create event output file `{}` for node `{}`",
165                        path.display(),
166                        node_id
167                    )
168                })?;
169
170                Some(WriteEventsTo {
171                    node_id: node_id.clone(),
172                    file,
173                    events_buffer: Vec::new(),
174                })
175            }
176            None => None,
177        };
178
179        Self::init_on_channel(
180            dataflow_id,
181            node_id,
182            channel,
183            close_channel,
184            clock,
185            scheduler,
186            write_events_to,
187        )
188    }
189
190    pub(crate) fn init_on_channel(
191        dataflow_id: DataflowId,
192        node_id: &NodeId,
193        mut channel: DaemonChannel,
194        mut close_channel: DaemonChannel,
195        clock: Arc<uhlc::HLC>,
196        scheduler: Scheduler,
197        write_events_to: Option<WriteEventsTo>,
198    ) -> eyre::Result<Self> {
199        channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
200        let reply = channel
201            .request(&Timestamped {
202                inner: DaemonRequest::Subscribe,
203                timestamp: clock.new_timestamp(),
204            })
205            .map_err(|e| eyre!(e))
206            .wrap_err("failed to create subscription with dora-daemon")?;
207
208        match reply {
209            DaemonReply::Result(Ok(())) => {}
210            DaemonReply::Result(Err(err)) => {
211                eyre::bail!("subscribe failed: {err}")
212            }
213            other => eyre::bail!("unexpected subscribe reply: {other:?}"),
214        }
215
216        close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
217
218        let (tx, rx) = flume::bounded(100_000_000);
219
220        let use_scheduler = match &channel {
221            DaemonChannel::IntegrationTestChannel(_) => {
222                // don't use the scheduler for integration tests because it leads to
223                // non-deterministic event ordering
224                false
225            }
226            _ => true,
227        };
228
229        let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;
230
231        Ok(EventStream {
232            node_id: node_id.clone(),
233            receiver: rx.into_stream(),
234            _thread_handle: thread_handle,
235            close_channel,
236            start_timestamp: clock.new_timestamp(),
237            clock,
238            scheduler,
239            write_events_to,
240            use_scheduler,
241        })
242    }
243
244    /// Synchronously waits for the next event.
245    ///
246    /// Blocks the thread until the next event arrives.
247    /// Returns [`None`] once the event stream is closed.
248    ///
249    /// For an asynchronous variant of this method see [`recv_async`][Self::recv_async].
250    ///
251    /// ## Event Reordering
252    ///
253    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
254    /// events might be returned in a different order than they occurred. For details, check the
255    /// documentation of the [`EventScheduler`] struct.
256    ///
257    /// If you want to receive the events in their original chronological order, use the
258    /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the
259    /// [`Stream`] trait).
260    pub fn recv(&mut self) -> Option<Event> {
261        futures::executor::block_on(self.recv_async())
262    }
263
264    /// Receives the next incoming [`Event`] synchronously with a timeout.
265    ///
266    /// Blocks the thread until the next event arrives or the timeout is reached.
267    /// Returns a [`Event::Error`] if no event was received within the given duration.
268    ///
269    /// Returns [`None`] once the event stream is closed.
270    ///
271    /// For an asynchronous variant of this method see [`recv_async_timeout`][Self::recv_async_timeout].
272    ///
273    /// ## Event Reordering
274    ///
275    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
276    /// events might be returned in a different order than they occurred. For details, check the
277    /// documentation of the [`EventScheduler`] struct.
278    ///
279    /// If you want to receive the events in their original chronological order, use the
280    /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the
281    /// [`Stream`] trait).
282    pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
283        futures::executor::block_on(self.recv_async_timeout(dur))
284    }
285
286    /// Receives the next incoming [`Event`] asynchronously, using an [`EventScheduler`] for fairness.
287    ///
288    /// Returns [`None`] once the event stream is closed.
289    ///
290    /// ## Event Reordering
291    ///
292    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
293    /// events might be returned in a different order than they occurred. For details, check the
294    /// documentation of the [`EventScheduler`] struct.
295    ///
296    /// If you want to receive the events in their original chronological order, use the
297    /// [`StreamExt::next`] method with a custom timeout future instead
298    /// ([`EventStream`] implements the [`Stream`] trait).
299    pub async fn recv_async(&mut self) -> Option<Event> {
300        if !self.use_scheduler {
301            return self.receiver.next().await.map(Self::convert_event_item);
302        }
303        loop {
304            if self.scheduler.is_empty() {
305                if let Some(event) = self.receiver.next().await {
306                    self.add_event(event);
307                } else {
308                    break;
309                }
310            } else {
311                match self.receiver.next().now_or_never().flatten() {
312                    Some(event) => self.add_event(event),
313                    None => break, // no other ready events
314                };
315            }
316        }
317        let event = self.scheduler.next();
318        event.map(Self::convert_event_item)
319    }
320
321    /// Check if there are any buffered events in the scheduler or the receiver.
322    pub fn is_empty(&self) -> bool {
323        self.scheduler.is_empty() & self.receiver.is_empty()
324    }
325
326    fn add_event(&mut self, event: EventItem) {
327        self.record_event(&event).unwrap();
328        self.scheduler.add_event(event);
329    }
330
331    fn record_event(&mut self, event: &EventItem) -> eyre::Result<()> {
332        if let Some(write_events_to) = &mut self.write_events_to {
333            let event_json = match event {
334                EventItem::NodeEvent { event, .. } => match event {
335                    NodeEvent::Stop => {
336                        let time_offset = self
337                            .clock
338                            .new_timestamp()
339                            .get_diff_duration(&self.start_timestamp);
340                        let event_json = serde_json::json!({
341                            "type": "Stop",
342                            "time_offset_secs": time_offset.as_secs_f64(),
343                        });
344                        Some(event_json)
345                    }
346                    NodeEvent::Reload { .. } => None,
347                    NodeEvent::Input { id, metadata, data } => {
348                        let mut event_json = convert_output_to_json(
349                            id,
350                            metadata,
351                            data,
352                            self.start_timestamp,
353                            false,
354                        )?;
355                        event_json.insert("type".into(), "Input".into());
356                        Some(event_json.into())
357                    }
358                    NodeEvent::InputClosed { id } => {
359                        let time_offset = self
360                            .clock
361                            .new_timestamp()
362                            .get_diff_duration(&self.start_timestamp);
363                        let event_json = serde_json::json!({
364                            "type": "InputClosed",
365                            "id": id.to_string(),
366                            "time_offset_secs": time_offset.as_secs_f64(),
367                        });
368                        Some(event_json)
369                    }
370                    NodeEvent::NodeFailed {
371                        affected_input_ids,
372                        error,
373                        source_node_id,
374                    } => {
375                        let time_offset = self
376                            .clock
377                            .new_timestamp()
378                            .get_diff_duration(&self.start_timestamp);
379                        let event_json = serde_json::json!({
380                            "type": "NodeFailed",
381                            "affected_input_ids": affected_input_ids.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
382                            "error": error,
383                            "source_node_id": source_node_id.to_string(),
384                            "time_offset_secs": time_offset.as_secs_f64(),
385                        });
386                        Some(event_json)
387                    }
388                    NodeEvent::AllInputsClosed => {
389                        let time_offset = self
390                            .clock
391                            .new_timestamp()
392                            .get_diff_duration(&self.start_timestamp);
393                        let event_json = serde_json::json!({
394                            "type": "AllInputsClosed",
395                            "time_offset_secs": time_offset.as_secs_f64(),
396                        });
397                        Some(event_json)
398                    }
399                },
400                _ => None,
401            };
402            if let Some(event_json) = event_json {
403                write_events_to.events_buffer.push(event_json);
404            }
405        }
406        Ok(())
407    }
408
409    /// Receives the next buffered [`Event`] (if any) without blocking, using an
410    /// [`EventScheduler`] for fairness.
411    ///
412    /// Returns [`TryRecvError::Empty`] if no event is available right now.
413    /// Returns [`TryRecvError::Closed`] once the event stream is closed.
414    ///
415    /// This method never blocks and is safe to use in asynchronous contexts.
416    ///
417    /// ## Event Reordering
418    ///
419    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
420    /// events might be returned in a different order than they occurred. For details, check the
421    /// documentation of the [`EventScheduler`] struct.
422    ///
423    /// If you want to receive the events in their original chronological order, use the
424    /// [`StreamExt::next`] method with a custom timeout future instead
425    /// ([`EventStream`] implements the [`Stream`] trait).
426    pub fn try_recv(&mut self) -> Result<Event, TryRecvError> {
427        match self.recv_async().now_or_never() {
428            Some(Some(event)) => Ok(event),
429            Some(None) => Err(TryRecvError::Closed),
430            None => Err(TryRecvError::Empty),
431        }
432    }
433
434    /// Receives all buffered [`Event`]s without blocking, using an [`EventScheduler`] for fairness.
435    ///
436    /// Return `Some(Vec::new())` if no events are ready.
437    /// Returns [`None`] once the event stream is closed and no events are buffered anymore.
438    ///
439    /// This method never blocks and is safe to use in asynchronous contexts.
440    ///
441    /// This method is equivalent to repeatedly calling [`try_recv`][Self::try_recv]. See its docs
442    /// for details on event reordering.
443    pub fn drain(&mut self) -> Option<Vec<Event>> {
444        let mut events = Vec::new();
445        loop {
446            match self.try_recv() {
447                Ok(event) => events.push(event),
448                Err(TryRecvError::Empty) => break,
449                Err(TryRecvError::Closed) => {
450                    if events.is_empty() {
451                        return None;
452                    } else {
453                        break;
454                    }
455                }
456            }
457        }
458        Some(events)
459    }
460
461    /// Receives the next incoming [`Event`] asynchronously with a timeout.
462    ///
463    /// Returns a [`Event::Error`] if no event was received within the given duration.
464    ///
465    /// Returns [`None`] once the event stream is closed.
466    ///
467    /// ## Event Reordering
468    ///
469    /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
470    /// events might be returned in a different order than they occurred. For details, check the
471    /// documentation of the [`EventScheduler`] struct.
472    ///
473    /// If you want to receive the events in their original chronological order, use the
474    /// [`StreamExt::next`] method with a custom timeout future instead
475    /// ([`EventStream`] implements the [`Stream`] trait).
476    pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
477        match select(Delay::new(dur), pin!(self.recv_async())).await {
478            Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
479                eyre!("Receiver timed out"),
480            ))),
481            Either::Right((event, _)) => event,
482        }
483    }
484
485    fn convert_event_item(item: EventItem) -> Event {
486        match item {
487            EventItem::NodeEvent { event, ack_channel } => match event {
488                NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
489                NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
490                NodeEvent::InputClosed { id } => Event::InputClosed { id },
491                NodeEvent::NodeFailed {
492                    affected_input_ids,
493                    error,
494                    source_node_id,
495                } => Event::NodeFailed {
496                    affected_input_ids,
497                    error,
498                    source_node_id,
499                },
500                NodeEvent::Input { id, metadata, data } => {
501                    let data = data_to_arrow_array(data, &metadata, ack_channel);
502                    match data {
503                        Ok(data) => Event::Input {
504                            id,
505                            metadata,
506                            data: data.into(),
507                        },
508                        Err(err) => Event::Error(format!("{err:?}")),
509                    }
510                }
511                NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed),
512            },
513
514            EventItem::FatalError(err) => {
515                Event::Error(format!("fatal event stream error: {err:?}"))
516            }
517            EventItem::TimeoutError(err) => {
518                Event::Error(format!("Timeout event stream error: {err:?}"))
519            }
520        }
521    }
522}
523
524/// No event is available right now or the event stream has been closed.
525#[derive(Debug)]
526pub enum TryRecvError {
527    /// No new event is available right now.
528    Empty,
529    /// The event stream has been closed.
530    Closed,
531}
532
533pub fn data_to_arrow_array(
534    data: Option<DataMessage>,
535    metadata: &dora_message::metadata::Metadata,
536    drop_channel: flume::Sender<()>,
537) -> eyre::Result<Arc<dyn arrow::array::Array>> {
538    let data = match data {
539        None => Ok(None),
540        Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
541        Some(DataMessage::SharedMemory {
542            shared_memory_id,
543            len,
544            drop_token: _, // handled in `event_stream_loop`
545        }) => unsafe {
546            MappedInputData::map(&shared_memory_id, len).map(|data| {
547                Some(RawData::SharedMemory(SharedMemoryData {
548                    data,
549                    _drop: drop_channel,
550                }))
551            })
552        },
553    };
554
555    data.and_then(|data| {
556        let raw_data = data.unwrap_or(RawData::Empty);
557        raw_data
558            .into_arrow_array(&metadata.type_info)
559            .map(arrow::array::make_array)
560    })
561}
562
563impl Stream for EventStream {
564    type Item = Event;
565
566    fn poll_next(
567        mut self: std::pin::Pin<&mut Self>,
568        cx: &mut std::task::Context<'_>,
569    ) -> std::task::Poll<Option<Self::Item>> {
570        self.receiver
571            .poll_next_unpin(cx)
572            .map(|item| item.map(Self::convert_event_item))
573    }
574}
575
576impl Drop for EventStream {
577    fn drop(&mut self) {
578        let request = Timestamped {
579            inner: DaemonRequest::EventStreamDropped,
580            timestamp: self.clock.new_timestamp(),
581        };
582        let result = self
583            .close_channel
584            .request(&request)
585            .map_err(|e| eyre!(e))
586            .wrap_err("failed to signal event stream closure to dora-daemon")
587            .and_then(|r| match r {
588                DaemonReply::Result(Ok(())) => Ok(()),
589                DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
590                other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
591            });
592        if let Err(err) = result {
593            tracing::warn!("{err:?}")
594        }
595
596        if let Some(write_events_to) = self.write_events_to.take() {
597            if let Err(err) = write_events_to.write_out() {
598                tracing::warn!(
599                    "failed to write out events for node {}: {err:?}",
600                    self.node_id
601                );
602            }
603        }
604    }
605}
606
607pub(crate) struct WriteEventsTo {
608    node_id: NodeId,
609    file: std::fs::File,
610    events_buffer: Vec<serde_json::Value>,
611}
612
613impl WriteEventsTo {
614    fn write_out(self) -> eyre::Result<()> {
615        let Self {
616            node_id,
617            file,
618            events_buffer,
619        } = self;
620        let mut inputs_file = serde_json::Map::new();
621        inputs_file.insert("id".into(), node_id.to_string().into());
622        inputs_file.insert("events".into(), events_buffer.into());
623
624        serde_json::to_writer_pretty(file, &inputs_file)
625            .context("failed to write events to file")?;
626        Ok(())
627    }
628}