dora_node_api/event_stream/
mod.rs1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 pin::pin,
4 sync::Arc,
5 time::Duration,
6};
7
8use dora_message::{
9 daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
10 id::DataId,
11 node_to_daemon::{DaemonRequest, Timestamped},
12 DataflowId,
13};
14pub use event::{Event, MappedInputData, RawData};
15use futures::{
16 future::{select, Either},
17 Stream, StreamExt,
18};
19use futures_timer::Delay;
20use scheduler::{Scheduler, NON_INPUT_EVENT};
21
22use self::{
23 event::SharedMemoryData,
24 thread::{EventItem, EventStreamThreadHandle},
25};
26use crate::daemon_connection::DaemonChannel;
27use dora_core::{
28 config::{Input, NodeId},
29 uhlc,
30};
31use eyre::{eyre, Context};
32
33mod event;
34pub mod merged;
35mod scheduler;
36mod thread;
37
38pub struct EventStream {
39 node_id: NodeId,
40 receiver: flume::r#async::RecvStream<'static, EventItem>,
41 _thread_handle: EventStreamThreadHandle,
42 close_channel: DaemonChannel,
43 clock: Arc<uhlc::HLC>,
44 scheduler: Scheduler,
45}
46
47impl EventStream {
48 #[tracing::instrument(level = "trace", skip(clock))]
49 pub(crate) fn init(
50 dataflow_id: DataflowId,
51 node_id: &NodeId,
52 daemon_communication: &DaemonCommunication,
53 input_config: BTreeMap<DataId, Input>,
54 clock: Arc<uhlc::HLC>,
55 ) -> eyre::Result<Self> {
56 let channel = match daemon_communication {
57 DaemonCommunication::Shmem {
58 daemon_events_region_id,
59 ..
60 } => unsafe { DaemonChannel::new_shmem(daemon_events_region_id) }.wrap_err_with(
61 || format!("failed to create shmem event stream for node `{node_id}`"),
62 )?,
63 DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
64 .wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?,
65 #[cfg(unix)]
66 DaemonCommunication::UnixDomain { socket_file } => {
67 DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
68 format!("failed to connect event stream for node `{node_id}`")
69 })?
70 }
71 };
72
73 let close_channel = match daemon_communication {
74 DaemonCommunication::Shmem {
75 daemon_events_close_region_id,
76 ..
77 } => unsafe { DaemonChannel::new_shmem(daemon_events_close_region_id) }.wrap_err_with(
78 || format!("failed to create shmem event close channel for node `{node_id}`"),
79 )?,
80 DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
81 .wrap_err_with(|| {
82 format!("failed to connect event close channel for node `{node_id}`")
83 })?,
84 #[cfg(unix)]
85 DaemonCommunication::UnixDomain { socket_file } => {
86 DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
87 format!("failed to connect event close channel for node `{node_id}`")
88 })?
89 }
90 };
91
92 let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
93 .iter()
94 .map(|(input, config)| {
95 (
96 input.clone(),
97 (config.queue_size.unwrap_or(1), VecDeque::new()),
98 )
99 })
100 .collect();
101
102 queue_size_limit.insert(
103 DataId::from(NON_INPUT_EVENT.to_string()),
104 (1_000, VecDeque::new()),
105 );
106
107 let scheduler = Scheduler::new(queue_size_limit);
108
109 Self::init_on_channel(
110 dataflow_id,
111 node_id,
112 channel,
113 close_channel,
114 clock,
115 scheduler,
116 )
117 }
118
119 pub(crate) fn init_on_channel(
120 dataflow_id: DataflowId,
121 node_id: &NodeId,
122 mut channel: DaemonChannel,
123 mut close_channel: DaemonChannel,
124 clock: Arc<uhlc::HLC>,
125 scheduler: Scheduler,
126 ) -> eyre::Result<Self> {
127 channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
128 let reply = channel
129 .request(&Timestamped {
130 inner: DaemonRequest::Subscribe,
131 timestamp: clock.new_timestamp(),
132 })
133 .map_err(|e| eyre!(e))
134 .wrap_err("failed to create subscription with dora-daemon")?;
135
136 match reply {
137 DaemonReply::Result(Ok(())) => {}
138 DaemonReply::Result(Err(err)) => {
139 eyre::bail!("subscribe failed: {err}")
140 }
141 other => eyre::bail!("unexpected subscribe reply: {other:?}"),
142 }
143
144 close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
145
146 let (tx, rx) = flume::bounded(100_000_000);
147
148 let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;
149
150 Ok(EventStream {
151 node_id: node_id.clone(),
152 receiver: rx.into_stream(),
153 _thread_handle: thread_handle,
154 close_channel,
155 clock,
156 scheduler,
157 })
158 }
159
160 pub fn recv(&mut self) -> Option<Event> {
162 futures::executor::block_on(self.recv_async())
163 }
164
165 pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
167 futures::executor::block_on(self.recv_async_timeout(dur))
168 }
169
170 pub async fn recv_async(&mut self) -> Option<Event> {
171 loop {
172 if self.scheduler.is_empty() {
173 if let Some(event) = self.receiver.next().await {
174 self.scheduler.add_event(event);
175 } else {
176 break;
177 }
178 } else {
179 match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await {
180 Either::Left((_elapsed, _)) => break,
181 Either::Right((Some(event), _)) => self.scheduler.add_event(event),
182 Either::Right((None, _)) => break,
183 };
184 }
185 }
186 let event = self.scheduler.next();
187 event.map(Self::convert_event_item)
188 }
189
190 pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
191 let next_event = match select(Delay::new(dur), pin!(self.recv_async())).await {
192 Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
193 eyre!("Receiver timed out"),
194 ))),
195 Either::Right((event, _)) => event,
196 };
197 next_event
198 }
199
200 fn convert_event_item(item: EventItem) -> Event {
201 match item {
202 EventItem::NodeEvent { event, ack_channel } => match event {
203 NodeEvent::Stop => Event::Stop,
204 NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
205 NodeEvent::InputClosed { id } => Event::InputClosed { id },
206 NodeEvent::Input { id, metadata, data } => {
207 let data = match data {
208 None => Ok(None),
209 Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
210 Some(DataMessage::SharedMemory {
211 shared_memory_id,
212 len,
213 drop_token: _, }) => unsafe {
215 MappedInputData::map(&shared_memory_id, len).map(|data| {
216 Some(RawData::SharedMemory(SharedMemoryData {
217 data,
218 _drop: ack_channel,
219 }))
220 })
221 },
222 };
223 let data = data.and_then(|data| {
224 let raw_data = data.unwrap_or(RawData::Empty);
225 raw_data
226 .into_arrow_array(&metadata.type_info)
227 .map(arrow::array::make_array)
228 });
229 match data {
230 Ok(data) => Event::Input {
231 id,
232 metadata,
233 data: data.into(),
234 },
235 Err(err) => Event::Error(format!("{err:?}")),
236 }
237 }
238 NodeEvent::AllInputsClosed => {
239 let err = eyre!(
240 "received `AllInputsClosed` event, which should be handled by background task"
241 );
242 tracing::error!("{err:?}");
243 Event::Error(err.wrap_err("internal error").to_string())
244 }
245 },
246
247 EventItem::FatalError(err) => {
248 Event::Error(format!("fatal event stream error: {err:?}"))
249 }
250 EventItem::TimeoutError(err) => {
251 Event::Error(format!("Timeout event stream error: {err:?}"))
252 }
253 }
254 }
255}
256
257impl Stream for EventStream {
258 type Item = Event;
259
260 fn poll_next(
261 mut self: std::pin::Pin<&mut Self>,
262 cx: &mut std::task::Context<'_>,
263 ) -> std::task::Poll<Option<Self::Item>> {
264 self.receiver
265 .poll_next_unpin(cx)
266 .map(|item| item.map(Self::convert_event_item))
267 }
268}
269
270impl Drop for EventStream {
271 #[tracing::instrument(skip(self), fields(%self.node_id))]
272 fn drop(&mut self) {
273 let request = Timestamped {
274 inner: DaemonRequest::EventStreamDropped,
275 timestamp: self.clock.new_timestamp(),
276 };
277 let result = self
278 .close_channel
279 .request(&request)
280 .map_err(|e| eyre!(e))
281 .wrap_err("failed to signal event stream closure to dora-daemon")
282 .and_then(|r| match r {
283 DaemonReply::Result(Ok(())) => Ok(()),
284 DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
285 other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
286 });
287 if let Err(err) = result {
288 tracing::warn!("{err:?}")
289 }
290 }
291}