1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 path::PathBuf,
4 pin::pin,
5 sync::Arc,
6 time::Duration,
7};
8
9use dora_message::{
10 DataflowId,
11 daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
12 id::DataId,
13 node_to_daemon::{DaemonRequest, Timestamped},
14};
15pub use event::{Event, StopCause};
16use futures::{
17 FutureExt, Stream, StreamExt,
18 future::{Either, select},
19};
20use futures_timer::Delay;
21use scheduler::{NON_INPUT_EVENT, Scheduler};
22
23use self::thread::{EventItem, EventStreamThreadHandle};
24use crate::{
25 DaemonCommunicationWrapper,
26 daemon_connection::{DaemonChannel, node_integration_testing::convert_output_to_json},
27 event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData},
28};
29use dora_core::{
30 config::{Input, NodeId},
31 uhlc,
32};
33use eyre::{Context, eyre};
34
35pub use scheduler::Scheduler as EventScheduler;
36
37mod data_conversion;
38mod event;
39pub mod merged;
40mod scheduler;
41mod thread;
42
43pub struct EventStream {
63 node_id: NodeId,
64 receiver: flume::r#async::RecvStream<'static, EventItem>,
65 _thread_handle: EventStreamThreadHandle,
66 close_channel: DaemonChannel,
67 clock: Arc<uhlc::HLC>,
68 scheduler: Scheduler,
69 write_events_to: Option<WriteEventsTo>,
70 start_timestamp: uhlc::Timestamp,
71 use_scheduler: bool,
72}
73
74impl EventStream {
75 #[tracing::instrument(level = "trace", skip(clock))]
76 pub(crate) fn init(
77 dataflow_id: DataflowId,
78 node_id: &NodeId,
79 daemon_communication: &DaemonCommunicationWrapper,
80 input_config: BTreeMap<DataId, Input>,
81 clock: Arc<uhlc::HLC>,
82 write_events_to: Option<PathBuf>,
83 ) -> eyre::Result<Self> {
84 let channel = match daemon_communication {
85 DaemonCommunicationWrapper::Standard(daemon_communication) => {
86 match daemon_communication {
87 DaemonCommunication::Shmem {
88 daemon_events_region_id,
89 ..
90 } => unsafe { DaemonChannel::new_shmem(daemon_events_region_id) }
91 .wrap_err_with(|| {
92 format!("failed to create shmem event stream for node `{node_id}`")
93 })?,
94 DaemonCommunication::Tcp { socket_addr } => {
95 DaemonChannel::new_tcp(*socket_addr).wrap_err_with(|| {
96 format!("failed to connect event stream for node `{node_id}`")
97 })?
98 }
99 #[cfg(unix)]
100 DaemonCommunication::UnixDomain { socket_file } => {
101 DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
102 format!("failed to connect event stream for node `{node_id}`")
103 })?
104 }
105 DaemonCommunication::Interactive => {
106 DaemonChannel::Interactive(Default::default())
107 }
108 }
109 }
110
111 DaemonCommunicationWrapper::Testing { channel } => {
112 DaemonChannel::IntegrationTestChannel(channel.clone())
113 }
114 };
115
116 let close_channel = match daemon_communication {
117 DaemonCommunicationWrapper::Standard(daemon_communication) => {
118 match daemon_communication {
119 DaemonCommunication::Shmem {
120 daemon_events_close_region_id,
121 ..
122 } => unsafe { DaemonChannel::new_shmem(daemon_events_close_region_id) }
123 .wrap_err_with(|| {
124 format!(
125 "failed to create shmem event close channel for node `{node_id}`"
126 )
127 })?,
128 DaemonCommunication::Tcp { socket_addr } => {
129 DaemonChannel::new_tcp(*socket_addr).wrap_err_with(|| {
130 format!("failed to connect event close channel for node `{node_id}`")
131 })?
132 }
133 #[cfg(unix)]
134 DaemonCommunication::UnixDomain { socket_file } => {
135 DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
136 format!("failed to connect event close channel for node `{node_id}`")
137 })?
138 }
139 DaemonCommunication::Interactive => {
140 DaemonChannel::Interactive(Default::default())
141 }
142 }
143 }
144 DaemonCommunicationWrapper::Testing { channel } => {
145 DaemonChannel::IntegrationTestChannel(channel.clone())
146 }
147 };
148
149 let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
150 .iter()
151 .map(|(input, config)| {
152 (
153 input.clone(),
154 (config.queue_size.unwrap_or(1), VecDeque::new()),
155 )
156 })
157 .collect();
158
159 queue_size_limit.insert(
160 DataId::from(NON_INPUT_EVENT.to_string()),
161 (1_000, VecDeque::new()),
162 );
163
164 let scheduler = Scheduler::new(queue_size_limit);
165
166 let write_events_to = match write_events_to {
167 Some(path) => {
168 if let Some(parent) = path.parent() {
169 std::fs::create_dir_all(parent).wrap_err_with(|| {
170 format!(
171 "failed to create parent directories for event output file `{}` for node `{}`",
172 path.display(),
173 node_id
174 )
175 })?;
176 }
177
178 let file = std::fs::File::create(&path).wrap_err_with(|| {
179 format!(
180 "failed to create event output file `{}` for node `{}`",
181 path.display(),
182 node_id
183 )
184 })?;
185
186 Some(WriteEventsTo {
187 node_id: node_id.clone(),
188 file,
189 events_buffer: Vec::new(),
190 })
191 }
192 None => None,
193 };
194
195 Self::init_on_channel(
196 dataflow_id,
197 node_id,
198 channel,
199 close_channel,
200 clock,
201 scheduler,
202 write_events_to,
203 )
204 }
205
206 pub(crate) fn init_on_channel(
207 dataflow_id: DataflowId,
208 node_id: &NodeId,
209 mut channel: DaemonChannel,
210 mut close_channel: DaemonChannel,
211 clock: Arc<uhlc::HLC>,
212 scheduler: Scheduler,
213 write_events_to: Option<WriteEventsTo>,
214 ) -> eyre::Result<Self> {
215 channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
216 let reply = channel
217 .request(&Timestamped {
218 inner: DaemonRequest::Subscribe,
219 timestamp: clock.new_timestamp(),
220 })
221 .map_err(|e| eyre!(e))
222 .wrap_err("failed to create subscription with dora-daemon")?;
223
224 match reply {
225 DaemonReply::Result(Ok(())) => {}
226 DaemonReply::Result(Err(err)) => {
227 eyre::bail!("subscribe failed: {err}")
228 }
229 other => eyre::bail!("unexpected subscribe reply: {other:?}"),
230 }
231
232 close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
233
234 let (tx, rx) = flume::bounded(100_000_000);
235
236 let use_scheduler = match &channel {
237 DaemonChannel::IntegrationTestChannel(_) => {
238 false
241 }
242 _ => true,
243 };
244
245 let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;
246
247 Ok(EventStream {
248 node_id: node_id.clone(),
249 receiver: rx.into_stream(),
250 _thread_handle: thread_handle,
251 close_channel,
252 start_timestamp: clock.new_timestamp(),
253 clock,
254 scheduler,
255 write_events_to,
256 use_scheduler,
257 })
258 }
259
260 pub fn recv(&mut self) -> Option<Event> {
277 futures::executor::block_on(self.recv_async())
278 }
279
280 pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
299 futures::executor::block_on(self.recv_async_timeout(dur))
300 }
301
302 pub async fn recv_async(&mut self) -> Option<Event> {
316 if !self.use_scheduler {
317 return self.receiver.next().await.map(Self::convert_event_item);
318 }
319 loop {
320 if self.scheduler.is_empty() {
321 if let Some(event) = self.receiver.next().await {
322 self.add_event(event);
323 } else {
324 break;
325 }
326 } else {
327 match self.receiver.next().now_or_never().flatten() {
328 Some(event) => self.add_event(event),
329 None => break, };
331 }
332 }
333 let event = self.scheduler.next();
334 event.map(Self::convert_event_item)
335 }
336
337 pub fn is_empty(&self) -> bool {
339 self.scheduler.is_empty() & self.receiver.is_empty()
340 }
341
342 fn add_event(&mut self, event: EventItem) {
343 self.record_event(&event).unwrap();
344 self.scheduler.add_event(event);
345 }
346
347 fn record_event(&mut self, event: &EventItem) -> eyre::Result<()> {
348 if let Some(write_events_to) = &mut self.write_events_to {
349 let event_json = match event {
350 EventItem::NodeEvent { event, .. } => match event {
351 NodeEvent::Stop => {
352 let time_offset = self
353 .clock
354 .new_timestamp()
355 .get_diff_duration(&self.start_timestamp);
356 let event_json = serde_json::json!({
357 "type": "Stop",
358 "time_offset_secs": time_offset.as_secs_f64(),
359 });
360 Some(event_json)
361 }
362 NodeEvent::Reload { .. } => None,
363 NodeEvent::Input { id, metadata, data } => {
364 let mut event_json = convert_output_to_json(
365 id,
366 metadata,
367 data,
368 self.start_timestamp,
369 false,
370 )?;
371 event_json.insert("type".into(), "Input".into());
372 Some(event_json.into())
373 }
374 NodeEvent::InputClosed { id } => {
375 let time_offset = self
376 .clock
377 .new_timestamp()
378 .get_diff_duration(&self.start_timestamp);
379 let event_json = serde_json::json!({
380 "type": "InputClosed",
381 "id": id.to_string(),
382 "time_offset_secs": time_offset.as_secs_f64(),
383 });
384 Some(event_json)
385 }
386 NodeEvent::AllInputsClosed => {
387 let time_offset = self
388 .clock
389 .new_timestamp()
390 .get_diff_duration(&self.start_timestamp);
391 let event_json = serde_json::json!({
392 "type": "AllInputsClosed",
393 "time_offset_secs": time_offset.as_secs_f64(),
394 });
395 Some(event_json)
396 }
397 },
398 _ => None,
399 };
400 if let Some(event_json) = event_json {
401 write_events_to.events_buffer.push(event_json);
402 }
403 }
404 Ok(())
405 }
406
407 pub fn try_recv(&mut self) -> Result<Event, TryRecvError> {
425 match self.recv_async().now_or_never() {
426 Some(Some(event)) => Ok(event),
427 Some(None) => Err(TryRecvError::Closed),
428 None => Err(TryRecvError::Empty),
429 }
430 }
431
432 pub fn drain(&mut self) -> Option<Vec<Event>> {
442 let mut events = Vec::new();
443 loop {
444 match self.try_recv() {
445 Ok(event) => events.push(event),
446 Err(TryRecvError::Empty) => break,
447 Err(TryRecvError::Closed) => {
448 if events.is_empty() {
449 return None;
450 } else {
451 break;
452 }
453 }
454 }
455 }
456 Some(events)
457 }
458
459 pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
475 match select(Delay::new(dur), pin!(self.recv_async())).await {
476 Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
477 eyre!("Receiver timed out"),
478 ))),
479 Either::Right((event, _)) => event,
480 }
481 }
482
483 fn convert_event_item(item: EventItem) -> Event {
484 match item {
485 EventItem::NodeEvent { event, ack_channel } => match event {
486 NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
487 NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
488 NodeEvent::InputClosed { id } => Event::InputClosed { id },
489 NodeEvent::Input { id, metadata, data } => {
490 let data = data_to_arrow_array(data, &metadata, ack_channel);
491 match data {
492 Ok(data) => Event::Input {
493 id,
494 metadata,
495 data: data.into(),
496 },
497 Err(err) => Event::Error(format!("{err:?}")),
498 }
499 }
500 NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed),
501 },
502
503 EventItem::FatalError(err) => {
504 Event::Error(format!("fatal event stream error: {err:?}"))
505 }
506 EventItem::TimeoutError(err) => {
507 Event::Error(format!("Timeout event stream error: {err:?}"))
508 }
509 }
510 }
511}
512
513#[derive(Debug)]
515pub enum TryRecvError {
516 Empty,
518 Closed,
520}
521
522pub fn data_to_arrow_array(
523 data: Option<DataMessage>,
524 metadata: &dora_message::metadata::Metadata,
525 drop_channel: flume::Sender<()>,
526) -> eyre::Result<Arc<dyn arrow::array::Array>> {
527 let data = match data {
528 None => Ok(None),
529 Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
530 Some(DataMessage::SharedMemory {
531 shared_memory_id,
532 len,
533 drop_token: _, }) => unsafe {
535 MappedInputData::map(&shared_memory_id, len).map(|data| {
536 Some(RawData::SharedMemory(SharedMemoryData {
537 data,
538 _drop: drop_channel,
539 }))
540 })
541 },
542 };
543
544 data.and_then(|data| {
545 let raw_data = data.unwrap_or(RawData::Empty);
546 raw_data
547 .into_arrow_array(&metadata.type_info)
548 .map(arrow::array::make_array)
549 })
550}
551
552impl Stream for EventStream {
553 type Item = Event;
554
555 fn poll_next(
556 mut self: std::pin::Pin<&mut Self>,
557 cx: &mut std::task::Context<'_>,
558 ) -> std::task::Poll<Option<Self::Item>> {
559 self.receiver
560 .poll_next_unpin(cx)
561 .map(|item| item.map(Self::convert_event_item))
562 }
563}
564
565impl Drop for EventStream {
566 fn drop(&mut self) {
567 let request = Timestamped {
568 inner: DaemonRequest::EventStreamDropped,
569 timestamp: self.clock.new_timestamp(),
570 };
571 let result = self
572 .close_channel
573 .request(&request)
574 .map_err(|e| eyre!(e))
575 .wrap_err("failed to signal event stream closure to dora-daemon")
576 .and_then(|r| match r {
577 DaemonReply::Result(Ok(())) => Ok(()),
578 DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
579 other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
580 });
581 if let Err(err) = result {
582 tracing::warn!("{err:?}")
583 }
584
585 if let Some(write_events_to) = self.write_events_to.take() {
586 if let Err(err) = write_events_to.write_out() {
587 tracing::warn!(
588 "failed to write out events for node {}: {err:?}",
589 self.node_id
590 );
591 }
592 }
593 }
594}
595
596pub(crate) struct WriteEventsTo {
597 node_id: NodeId,
598 file: std::fs::File,
599 events_buffer: Vec<serde_json::Value>,
600}
601
602impl WriteEventsTo {
603 fn write_out(self) -> eyre::Result<()> {
604 let Self {
605 node_id,
606 file,
607 events_buffer,
608 } = self;
609 let mut inputs_file = serde_json::Map::new();
610 inputs_file.insert("id".into(), node_id.to_string().into());
611 inputs_file.insert("events".into(), events_buffer.into());
612
613 serde_json::to_writer_pretty(file, &inputs_file)
614 .context("failed to write events to file")?;
615 Ok(())
616 }
617}