dora_node_api/event_stream/
mod.rs

1use std::{
2    collections::{BTreeMap, HashMap, VecDeque},
3    pin::pin,
4    sync::Arc,
5    time::Duration,
6};
7
8use dora_message::{
9    daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
10    id::DataId,
11    node_to_daemon::{DaemonRequest, Timestamped},
12    DataflowId,
13};
14pub use event::{Event, MappedInputData, RawData};
15use futures::{
16    future::{select, Either},
17    Stream, StreamExt,
18};
19use futures_timer::Delay;
20use scheduler::{Scheduler, NON_INPUT_EVENT};
21
22use self::{
23    event::SharedMemoryData,
24    thread::{EventItem, EventStreamThreadHandle},
25};
26use crate::daemon_connection::DaemonChannel;
27use dora_core::{
28    config::{Input, NodeId},
29    uhlc,
30};
31use eyre::{eyre, Context};
32
33mod event;
34pub mod merged;
35mod scheduler;
36mod thread;
37
38pub struct EventStream {
39    node_id: NodeId,
40    receiver: flume::r#async::RecvStream<'static, EventItem>,
41    _thread_handle: EventStreamThreadHandle,
42    close_channel: DaemonChannel,
43    clock: Arc<uhlc::HLC>,
44    scheduler: Scheduler,
45}
46
47impl EventStream {
48    #[tracing::instrument(level = "trace", skip(clock))]
49    pub(crate) fn init(
50        dataflow_id: DataflowId,
51        node_id: &NodeId,
52        daemon_communication: &DaemonCommunication,
53        input_config: BTreeMap<DataId, Input>,
54        clock: Arc<uhlc::HLC>,
55    ) -> eyre::Result<Self> {
56        let channel = match daemon_communication {
57            DaemonCommunication::Shmem {
58                daemon_events_region_id,
59                ..
60            } => unsafe { DaemonChannel::new_shmem(daemon_events_region_id) }.wrap_err_with(
61                || format!("failed to create shmem event stream for node `{node_id}`"),
62            )?,
63            DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
64                .wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?,
65            #[cfg(unix)]
66            DaemonCommunication::UnixDomain { socket_file } => {
67                DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
68                    format!("failed to connect event stream for node `{node_id}`")
69                })?
70            }
71        };
72
73        let close_channel = match daemon_communication {
74            DaemonCommunication::Shmem {
75                daemon_events_close_region_id,
76                ..
77            } => unsafe { DaemonChannel::new_shmem(daemon_events_close_region_id) }.wrap_err_with(
78                || format!("failed to create shmem event close channel for node `{node_id}`"),
79            )?,
80            DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
81                .wrap_err_with(|| {
82                    format!("failed to connect event close channel for node `{node_id}`")
83                })?,
84            #[cfg(unix)]
85            DaemonCommunication::UnixDomain { socket_file } => {
86                DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
87                    format!("failed to connect event close channel for node `{node_id}`")
88                })?
89            }
90        };
91
92        let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
93            .iter()
94            .map(|(input, config)| {
95                (
96                    input.clone(),
97                    (config.queue_size.unwrap_or(1), VecDeque::new()),
98                )
99            })
100            .collect();
101
102        queue_size_limit.insert(
103            DataId::from(NON_INPUT_EVENT.to_string()),
104            (1_000, VecDeque::new()),
105        );
106
107        let scheduler = Scheduler::new(queue_size_limit);
108
109        Self::init_on_channel(
110            dataflow_id,
111            node_id,
112            channel,
113            close_channel,
114            clock,
115            scheduler,
116        )
117    }
118
119    pub(crate) fn init_on_channel(
120        dataflow_id: DataflowId,
121        node_id: &NodeId,
122        mut channel: DaemonChannel,
123        mut close_channel: DaemonChannel,
124        clock: Arc<uhlc::HLC>,
125        scheduler: Scheduler,
126    ) -> eyre::Result<Self> {
127        channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
128        let reply = channel
129            .request(&Timestamped {
130                inner: DaemonRequest::Subscribe,
131                timestamp: clock.new_timestamp(),
132            })
133            .map_err(|e| eyre!(e))
134            .wrap_err("failed to create subscription with dora-daemon")?;
135
136        match reply {
137            DaemonReply::Result(Ok(())) => {}
138            DaemonReply::Result(Err(err)) => {
139                eyre::bail!("subscribe failed: {err}")
140            }
141            other => eyre::bail!("unexpected subscribe reply: {other:?}"),
142        }
143
144        close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
145
146        let (tx, rx) = flume::bounded(100_000_000);
147
148        let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;
149
150        Ok(EventStream {
151            node_id: node_id.clone(),
152            receiver: rx.into_stream(),
153            _thread_handle: thread_handle,
154            close_channel,
155            clock,
156            scheduler,
157        })
158    }
159
160    /// wait for the next event on the events stream.
161    pub fn recv(&mut self) -> Option<Event> {
162        futures::executor::block_on(self.recv_async())
163    }
164
165    /// wait for the next event on the events stream until timeout
166    pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
167        futures::executor::block_on(self.recv_async_timeout(dur))
168    }
169
170    pub async fn recv_async(&mut self) -> Option<Event> {
171        loop {
172            if self.scheduler.is_empty() {
173                if let Some(event) = self.receiver.next().await {
174                    self.scheduler.add_event(event);
175                } else {
176                    break;
177                }
178            } else {
179                match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await {
180                    Either::Left((_elapsed, _)) => break,
181                    Either::Right((Some(event), _)) => self.scheduler.add_event(event),
182                    Either::Right((None, _)) => break,
183                };
184            }
185        }
186        let event = self.scheduler.next();
187        event.map(Self::convert_event_item)
188    }
189
190    pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
191        let next_event = match select(Delay::new(dur), pin!(self.recv_async())).await {
192            Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
193                eyre!("Receiver timed out"),
194            ))),
195            Either::Right((event, _)) => event,
196        };
197        next_event
198    }
199
200    fn convert_event_item(item: EventItem) -> Event {
201        match item {
202            EventItem::NodeEvent { event, ack_channel } => match event {
203                NodeEvent::Stop => Event::Stop,
204                NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
205                NodeEvent::InputClosed { id } => Event::InputClosed { id },
206                NodeEvent::Input { id, metadata, data } => {
207                    let data = match data {
208                        None => Ok(None),
209                        Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
210                        Some(DataMessage::SharedMemory {
211                            shared_memory_id,
212                            len,
213                            drop_token: _, // handled in `event_stream_loop`
214                        }) => unsafe {
215                            MappedInputData::map(&shared_memory_id, len).map(|data| {
216                                Some(RawData::SharedMemory(SharedMemoryData {
217                                    data,
218                                    _drop: ack_channel,
219                                }))
220                            })
221                        },
222                    };
223                    let data = data.and_then(|data| {
224                        let raw_data = data.unwrap_or(RawData::Empty);
225                        raw_data
226                            .into_arrow_array(&metadata.type_info)
227                            .map(arrow::array::make_array)
228                    });
229                    match data {
230                        Ok(data) => Event::Input {
231                            id,
232                            metadata,
233                            data: data.into(),
234                        },
235                        Err(err) => Event::Error(format!("{err:?}")),
236                    }
237                }
238                NodeEvent::AllInputsClosed => {
239                    let err = eyre!(
240                        "received `AllInputsClosed` event, which should be handled by background task"
241                    );
242                    tracing::error!("{err:?}");
243                    Event::Error(err.wrap_err("internal error").to_string())
244                }
245            },
246
247            EventItem::FatalError(err) => {
248                Event::Error(format!("fatal event stream error: {err:?}"))
249            }
250            EventItem::TimeoutError(err) => {
251                Event::Error(format!("Timeout event stream error: {err:?}"))
252            }
253        }
254    }
255}
256
257impl Stream for EventStream {
258    type Item = Event;
259
260    fn poll_next(
261        mut self: std::pin::Pin<&mut Self>,
262        cx: &mut std::task::Context<'_>,
263    ) -> std::task::Poll<Option<Self::Item>> {
264        self.receiver
265            .poll_next_unpin(cx)
266            .map(|item| item.map(Self::convert_event_item))
267    }
268}
269
270impl Drop for EventStream {
271    #[tracing::instrument(skip(self), fields(%self.node_id))]
272    fn drop(&mut self) {
273        let request = Timestamped {
274            inner: DaemonRequest::EventStreamDropped,
275            timestamp: self.clock.new_timestamp(),
276        };
277        let result = self
278            .close_channel
279            .request(&request)
280            .map_err(|e| eyre!(e))
281            .wrap_err("failed to signal event stream closure to dora-daemon")
282            .and_then(|r| match r {
283                DaemonReply::Result(Ok(())) => Ok(()),
284                DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
285                other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
286            });
287        if let Err(err) = result {
288            tracing::warn!("{err:?}")
289        }
290    }
291}