dora_node_api/event_stream/
mod.rs1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 pin::pin,
4 sync::Arc,
5 time::Duration,
6};
7
8use dora_message::{
9 DataflowId,
10 daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
11 id::DataId,
12 node_to_daemon::{DaemonRequest, Timestamped},
13};
14pub use event::{Event, StopCause};
15use futures::{
16 Stream, StreamExt,
17 future::{Either, select},
18};
19use futures_timer::Delay;
20use scheduler::{NON_INPUT_EVENT, Scheduler};
21
22use self::thread::{EventItem, EventStreamThreadHandle};
23use crate::{
24 daemon_connection::DaemonChannel,
25 event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData},
26};
27use dora_core::{
28 config::{Input, NodeId},
29 uhlc,
30};
31use eyre::{Context, eyre};
32
33pub use scheduler::Scheduler as EventScheduler;
34
35mod data_conversion;
36mod event;
37pub mod merged;
38mod scheduler;
39mod thread;
40
41pub struct EventStream {
61 node_id: NodeId,
62 receiver: flume::r#async::RecvStream<'static, EventItem>,
63 _thread_handle: EventStreamThreadHandle,
64 close_channel: DaemonChannel,
65 clock: Arc<uhlc::HLC>,
66 scheduler: Scheduler,
67}
68
69impl EventStream {
70 #[tracing::instrument(level = "trace", skip(clock))]
71 pub(crate) fn init(
72 dataflow_id: DataflowId,
73 node_id: &NodeId,
74 daemon_communication: &DaemonCommunication,
75 input_config: BTreeMap<DataId, Input>,
76 clock: Arc<uhlc::HLC>,
77 ) -> eyre::Result<Self> {
78 let channel = match daemon_communication {
79 DaemonCommunication::Shmem {
80 daemon_events_region_id,
81 ..
82 } => unsafe { DaemonChannel::new_shmem(daemon_events_region_id) }.wrap_err_with(
83 || format!("failed to create shmem event stream for node `{node_id}`"),
84 )?,
85 DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
86 .wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?,
87 #[cfg(unix)]
88 DaemonCommunication::UnixDomain { socket_file } => {
89 DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
90 format!("failed to connect event stream for node `{node_id}`")
91 })?
92 }
93 };
94
95 let close_channel = match daemon_communication {
96 DaemonCommunication::Shmem {
97 daemon_events_close_region_id,
98 ..
99 } => unsafe { DaemonChannel::new_shmem(daemon_events_close_region_id) }.wrap_err_with(
100 || format!("failed to create shmem event close channel for node `{node_id}`"),
101 )?,
102 DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
103 .wrap_err_with(|| {
104 format!("failed to connect event close channel for node `{node_id}`")
105 })?,
106 #[cfg(unix)]
107 DaemonCommunication::UnixDomain { socket_file } => {
108 DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
109 format!("failed to connect event close channel for node `{node_id}`")
110 })?
111 }
112 };
113
114 let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
115 .iter()
116 .map(|(input, config)| {
117 (
118 input.clone(),
119 (config.queue_size.unwrap_or(1), VecDeque::new()),
120 )
121 })
122 .collect();
123
124 queue_size_limit.insert(
125 DataId::from(NON_INPUT_EVENT.to_string()),
126 (1_000, VecDeque::new()),
127 );
128
129 let scheduler = Scheduler::new(queue_size_limit);
130
131 Self::init_on_channel(
132 dataflow_id,
133 node_id,
134 channel,
135 close_channel,
136 clock,
137 scheduler,
138 )
139 }
140
141 pub(crate) fn init_on_channel(
142 dataflow_id: DataflowId,
143 node_id: &NodeId,
144 mut channel: DaemonChannel,
145 mut close_channel: DaemonChannel,
146 clock: Arc<uhlc::HLC>,
147 scheduler: Scheduler,
148 ) -> eyre::Result<Self> {
149 channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
150 let reply = channel
151 .request(&Timestamped {
152 inner: DaemonRequest::Subscribe,
153 timestamp: clock.new_timestamp(),
154 })
155 .map_err(|e| eyre!(e))
156 .wrap_err("failed to create subscription with dora-daemon")?;
157
158 match reply {
159 DaemonReply::Result(Ok(())) => {}
160 DaemonReply::Result(Err(err)) => {
161 eyre::bail!("subscribe failed: {err}")
162 }
163 other => eyre::bail!("unexpected subscribe reply: {other:?}"),
164 }
165
166 close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
167
168 let (tx, rx) = flume::bounded(100_000_000);
169
170 let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;
171
172 Ok(EventStream {
173 node_id: node_id.clone(),
174 receiver: rx.into_stream(),
175 _thread_handle: thread_handle,
176 close_channel,
177 clock,
178 scheduler,
179 })
180 }
181
182 pub fn recv(&mut self) -> Option<Event> {
199 futures::executor::block_on(self.recv_async())
200 }
201
202 pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
221 futures::executor::block_on(self.recv_async_timeout(dur))
222 }
223
224 pub async fn recv_async(&mut self) -> Option<Event> {
238 loop {
239 if self.scheduler.is_empty() {
240 if let Some(event) = self.receiver.next().await {
241 self.scheduler.add_event(event);
242 } else {
243 break;
244 }
245 } else {
246 match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await {
247 Either::Left((_elapsed, _)) => break,
248 Either::Right((Some(event), _)) => self.scheduler.add_event(event),
249 Either::Right((None, _)) => break,
250 };
251 }
252 }
253 let event = self.scheduler.next();
254 event.map(Self::convert_event_item)
255 }
256
257 pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
273 match select(Delay::new(dur), pin!(self.recv_async())).await {
274 Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
275 eyre!("Receiver timed out"),
276 ))),
277 Either::Right((event, _)) => event,
278 }
279 }
280
281 fn convert_event_item(item: EventItem) -> Event {
282 match item {
283 EventItem::NodeEvent { event, ack_channel } => match event {
284 NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
285 NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
286 NodeEvent::InputClosed { id } => Event::InputClosed { id },
287 NodeEvent::Input { id, metadata, data } => {
288 let data = match data {
289 None => Ok(None),
290 Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
291 Some(DataMessage::SharedMemory {
292 shared_memory_id,
293 len,
294 drop_token: _, }) => unsafe {
296 MappedInputData::map(&shared_memory_id, len).map(|data| {
297 Some(RawData::SharedMemory(SharedMemoryData {
298 data,
299 _drop: ack_channel,
300 }))
301 })
302 },
303 };
304 let data = data.and_then(|data| {
305 let raw_data = data.unwrap_or(RawData::Empty);
306 raw_data
307 .into_arrow_array(&metadata.type_info)
308 .map(arrow::array::make_array)
309 });
310 match data {
311 Ok(data) => Event::Input {
312 id,
313 metadata,
314 data: data.into(),
315 },
316 Err(err) => Event::Error(format!("{err:?}")),
317 }
318 }
319 NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed),
320 },
321
322 EventItem::FatalError(err) => {
323 Event::Error(format!("fatal event stream error: {err:?}"))
324 }
325 EventItem::TimeoutError(err) => {
326 Event::Error(format!("Timeout event stream error: {err:?}"))
327 }
328 }
329 }
330}
331
332impl Stream for EventStream {
333 type Item = Event;
334
335 fn poll_next(
336 mut self: std::pin::Pin<&mut Self>,
337 cx: &mut std::task::Context<'_>,
338 ) -> std::task::Poll<Option<Self::Item>> {
339 self.receiver
340 .poll_next_unpin(cx)
341 .map(|item| item.map(Self::convert_event_item))
342 }
343}
344
345impl Drop for EventStream {
346 #[tracing::instrument(skip(self), fields(%self.node_id))]
347 fn drop(&mut self) {
348 let request = Timestamped {
349 inner: DaemonRequest::EventStreamDropped,
350 timestamp: self.clock.new_timestamp(),
351 };
352 let result = self
353 .close_channel
354 .request(&request)
355 .map_err(|e| eyre!(e))
356 .wrap_err("failed to signal event stream closure to dora-daemon")
357 .and_then(|r| match r {
358 DaemonReply::Result(Ok(())) => Ok(()),
359 DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
360 other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
361 });
362 if let Err(err) = result {
363 tracing::warn!("{err:?}")
364 }
365 }
366}