1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 path::PathBuf,
4 pin::pin,
5 sync::Arc,
6 time::Duration,
7};
8
9use dora_message::{
10 DataflowId,
11 daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
12 id::DataId,
13 node_to_daemon::{DaemonRequest, Timestamped},
14};
15pub use event::{Event, StopCause};
16use futures::{
17 FutureExt, Stream, StreamExt,
18 future::{Either, select},
19};
20use futures_timer::Delay;
21use scheduler::{NON_INPUT_EVENT, Scheduler};
22
23use self::thread::{EventItem, EventStreamThreadHandle};
24use crate::{
25 DaemonCommunicationWrapper,
26 daemon_connection::{DaemonChannel, node_integration_testing::convert_output_to_json},
27 event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData},
28};
29use dora_core::{
30 config::{Input, NodeId},
31 uhlc,
32};
33use eyre::{Context, eyre};
34
35pub use scheduler::Scheduler as EventScheduler;
36
37mod data_conversion;
38mod event;
39pub mod merged;
40mod scheduler;
41mod thread;
42
43pub struct EventStream {
63 node_id: NodeId,
64 receiver: flume::r#async::RecvStream<'static, EventItem>,
65 _thread_handle: EventStreamThreadHandle,
66 close_channel: DaemonChannel,
67 clock: Arc<uhlc::HLC>,
68 scheduler: Scheduler,
69 write_events_to: Option<WriteEventsTo>,
70 start_timestamp: uhlc::Timestamp,
71 use_scheduler: bool,
72}
73
74impl EventStream {
75 #[tracing::instrument(level = "trace", skip(clock))]
76 pub(crate) fn init(
77 dataflow_id: DataflowId,
78 node_id: &NodeId,
79 daemon_communication: &DaemonCommunicationWrapper,
80 input_config: BTreeMap<DataId, Input>,
81 clock: Arc<uhlc::HLC>,
82 write_events_to: Option<PathBuf>,
83 ) -> eyre::Result<Self> {
84 let channel = match daemon_communication {
85 DaemonCommunicationWrapper::Standard(daemon_communication) => {
86 match daemon_communication {
87 DaemonCommunication::Tcp { socket_addr } => {
88 DaemonChannel::new_tcp(*socket_addr).wrap_err_with(|| {
89 format!("failed to connect event stream for node `{node_id}`")
90 })?
91 }
92 #[cfg(unix)]
93 DaemonCommunication::UnixDomain { socket_file } => {
94 DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
95 format!("failed to connect event stream for node `{node_id}`")
96 })?
97 }
98 DaemonCommunication::Interactive => {
99 DaemonChannel::Interactive(Default::default())
100 }
101 }
102 }
103
104 DaemonCommunicationWrapper::Testing { channel } => {
105 DaemonChannel::IntegrationTestChannel(channel.clone())
106 }
107 };
108
109 let close_channel = match daemon_communication {
110 DaemonCommunicationWrapper::Standard(daemon_communication) => {
111 match daemon_communication {
112 DaemonCommunication::Tcp { socket_addr } => {
113 DaemonChannel::new_tcp(*socket_addr).wrap_err_with(|| {
114 format!("failed to connect event close channel for node `{node_id}`")
115 })?
116 }
117 #[cfg(unix)]
118 DaemonCommunication::UnixDomain { socket_file } => {
119 DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
120 format!("failed to connect event close channel for node `{node_id}`")
121 })?
122 }
123 DaemonCommunication::Interactive => {
124 DaemonChannel::Interactive(Default::default())
125 }
126 }
127 }
128 DaemonCommunicationWrapper::Testing { channel } => {
129 DaemonChannel::IntegrationTestChannel(channel.clone())
130 }
131 };
132
133 let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
134 .iter()
135 .map(|(input, config)| {
136 (
137 input.clone(),
138 (config.queue_size.unwrap_or(1), VecDeque::new()),
139 )
140 })
141 .collect();
142
143 queue_size_limit.insert(
144 DataId::from(NON_INPUT_EVENT.to_string()),
145 (1_000, VecDeque::new()),
146 );
147
148 let scheduler = Scheduler::new(queue_size_limit);
149
150 let write_events_to = match write_events_to {
151 Some(path) => {
152 if let Some(parent) = path.parent() {
153 std::fs::create_dir_all(parent).wrap_err_with(|| {
154 format!(
155 "failed to create parent directories for event output file `{}` for node `{}`",
156 path.display(),
157 node_id
158 )
159 })?;
160 }
161
162 let file = std::fs::File::create(&path).wrap_err_with(|| {
163 format!(
164 "failed to create event output file `{}` for node `{}`",
165 path.display(),
166 node_id
167 )
168 })?;
169
170 Some(WriteEventsTo {
171 node_id: node_id.clone(),
172 file,
173 events_buffer: Vec::new(),
174 })
175 }
176 None => None,
177 };
178
179 Self::init_on_channel(
180 dataflow_id,
181 node_id,
182 channel,
183 close_channel,
184 clock,
185 scheduler,
186 write_events_to,
187 )
188 }
189
190 pub(crate) fn init_on_channel(
191 dataflow_id: DataflowId,
192 node_id: &NodeId,
193 mut channel: DaemonChannel,
194 mut close_channel: DaemonChannel,
195 clock: Arc<uhlc::HLC>,
196 scheduler: Scheduler,
197 write_events_to: Option<WriteEventsTo>,
198 ) -> eyre::Result<Self> {
199 channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
200 let reply = channel
201 .request(&Timestamped {
202 inner: DaemonRequest::Subscribe,
203 timestamp: clock.new_timestamp(),
204 })
205 .map_err(|e| eyre!(e))
206 .wrap_err("failed to create subscription with dora-daemon")?;
207
208 match reply {
209 DaemonReply::Result(Ok(())) => {}
210 DaemonReply::Result(Err(err)) => {
211 eyre::bail!("subscribe failed: {err}")
212 }
213 other => eyre::bail!("unexpected subscribe reply: {other:?}"),
214 }
215
216 close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
217
218 let (tx, rx) = flume::bounded(100_000_000);
219
220 let use_scheduler = match &channel {
221 DaemonChannel::IntegrationTestChannel(_) => {
222 false
225 }
226 _ => true,
227 };
228
229 let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;
230
231 Ok(EventStream {
232 node_id: node_id.clone(),
233 receiver: rx.into_stream(),
234 _thread_handle: thread_handle,
235 close_channel,
236 start_timestamp: clock.new_timestamp(),
237 clock,
238 scheduler,
239 write_events_to,
240 use_scheduler,
241 })
242 }
243
244 pub fn recv(&mut self) -> Option<Event> {
261 futures::executor::block_on(self.recv_async())
262 }
263
264 pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
283 futures::executor::block_on(self.recv_async_timeout(dur))
284 }
285
286 pub async fn recv_async(&mut self) -> Option<Event> {
300 if !self.use_scheduler {
301 return self.receiver.next().await.map(Self::convert_event_item);
302 }
303 loop {
304 if self.scheduler.is_empty() {
305 if let Some(event) = self.receiver.next().await {
306 self.add_event(event);
307 } else {
308 break;
309 }
310 } else {
311 match self.receiver.next().now_or_never().flatten() {
312 Some(event) => self.add_event(event),
313 None => break, };
315 }
316 }
317 let event = self.scheduler.next();
318 event.map(Self::convert_event_item)
319 }
320
321 pub fn is_empty(&self) -> bool {
323 self.scheduler.is_empty() & self.receiver.is_empty()
324 }
325
326 fn add_event(&mut self, event: EventItem) {
327 self.record_event(&event).unwrap();
328 self.scheduler.add_event(event);
329 }
330
331 fn record_event(&mut self, event: &EventItem) -> eyre::Result<()> {
332 if let Some(write_events_to) = &mut self.write_events_to {
333 let event_json = match event {
334 EventItem::NodeEvent { event, .. } => match event {
335 NodeEvent::Stop => {
336 let time_offset = self
337 .clock
338 .new_timestamp()
339 .get_diff_duration(&self.start_timestamp);
340 let event_json = serde_json::json!({
341 "type": "Stop",
342 "time_offset_secs": time_offset.as_secs_f64(),
343 });
344 Some(event_json)
345 }
346 NodeEvent::Reload { .. } => None,
347 NodeEvent::Input { id, metadata, data } => {
348 let mut event_json = convert_output_to_json(
349 id,
350 metadata,
351 data,
352 self.start_timestamp,
353 false,
354 )?;
355 event_json.insert("type".into(), "Input".into());
356 Some(event_json.into())
357 }
358 NodeEvent::InputClosed { id } => {
359 let time_offset = self
360 .clock
361 .new_timestamp()
362 .get_diff_duration(&self.start_timestamp);
363 let event_json = serde_json::json!({
364 "type": "InputClosed",
365 "id": id.to_string(),
366 "time_offset_secs": time_offset.as_secs_f64(),
367 });
368 Some(event_json)
369 }
370 NodeEvent::NodeFailed {
371 affected_input_ids,
372 error,
373 source_node_id,
374 } => {
375 let time_offset = self
376 .clock
377 .new_timestamp()
378 .get_diff_duration(&self.start_timestamp);
379 let event_json = serde_json::json!({
380 "type": "NodeFailed",
381 "affected_input_ids": affected_input_ids.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
382 "error": error,
383 "source_node_id": source_node_id.to_string(),
384 "time_offset_secs": time_offset.as_secs_f64(),
385 });
386 Some(event_json)
387 }
388 NodeEvent::AllInputsClosed => {
389 let time_offset = self
390 .clock
391 .new_timestamp()
392 .get_diff_duration(&self.start_timestamp);
393 let event_json = serde_json::json!({
394 "type": "AllInputsClosed",
395 "time_offset_secs": time_offset.as_secs_f64(),
396 });
397 Some(event_json)
398 }
399 },
400 _ => None,
401 };
402 if let Some(event_json) = event_json {
403 write_events_to.events_buffer.push(event_json);
404 }
405 }
406 Ok(())
407 }
408
409 pub fn try_recv(&mut self) -> Result<Event, TryRecvError> {
427 match self.recv_async().now_or_never() {
428 Some(Some(event)) => Ok(event),
429 Some(None) => Err(TryRecvError::Closed),
430 None => Err(TryRecvError::Empty),
431 }
432 }
433
434 pub fn drain(&mut self) -> Option<Vec<Event>> {
444 let mut events = Vec::new();
445 loop {
446 match self.try_recv() {
447 Ok(event) => events.push(event),
448 Err(TryRecvError::Empty) => break,
449 Err(TryRecvError::Closed) => {
450 if events.is_empty() {
451 return None;
452 } else {
453 break;
454 }
455 }
456 }
457 }
458 Some(events)
459 }
460
461 pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
477 match select(Delay::new(dur), pin!(self.recv_async())).await {
478 Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
479 eyre!("Receiver timed out"),
480 ))),
481 Either::Right((event, _)) => event,
482 }
483 }
484
485 fn convert_event_item(item: EventItem) -> Event {
486 match item {
487 EventItem::NodeEvent { event, ack_channel } => match event {
488 NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
489 NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
490 NodeEvent::InputClosed { id } => Event::InputClosed { id },
491 NodeEvent::NodeFailed {
492 affected_input_ids,
493 error,
494 source_node_id,
495 } => Event::NodeFailed {
496 affected_input_ids,
497 error,
498 source_node_id,
499 },
500 NodeEvent::Input { id, metadata, data } => {
501 let data = data_to_arrow_array(data, &metadata, ack_channel);
502 match data {
503 Ok(data) => Event::Input {
504 id,
505 metadata,
506 data: data.into(),
507 },
508 Err(err) => Event::Error(format!("{err:?}")),
509 }
510 }
511 NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed),
512 },
513
514 EventItem::FatalError(err) => {
515 Event::Error(format!("fatal event stream error: {err:?}"))
516 }
517 EventItem::TimeoutError(err) => {
518 Event::Error(format!("Timeout event stream error: {err:?}"))
519 }
520 }
521 }
522}
523
524#[derive(Debug)]
526pub enum TryRecvError {
527 Empty,
529 Closed,
531}
532
533pub fn data_to_arrow_array(
534 data: Option<DataMessage>,
535 metadata: &dora_message::metadata::Metadata,
536 drop_channel: flume::Sender<()>,
537) -> eyre::Result<Arc<dyn arrow::array::Array>> {
538 let data = match data {
539 None => Ok(None),
540 Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
541 Some(DataMessage::SharedMemory {
542 shared_memory_id,
543 len,
544 drop_token: _, }) => unsafe {
546 MappedInputData::map(&shared_memory_id, len).map(|data| {
547 Some(RawData::SharedMemory(SharedMemoryData {
548 data,
549 _drop: drop_channel,
550 }))
551 })
552 },
553 };
554
555 data.and_then(|data| {
556 let raw_data = data.unwrap_or(RawData::Empty);
557 raw_data
558 .into_arrow_array(&metadata.type_info)
559 .map(arrow::array::make_array)
560 })
561}
562
563impl Stream for EventStream {
564 type Item = Event;
565
566 fn poll_next(
567 mut self: std::pin::Pin<&mut Self>,
568 cx: &mut std::task::Context<'_>,
569 ) -> std::task::Poll<Option<Self::Item>> {
570 self.receiver
571 .poll_next_unpin(cx)
572 .map(|item| item.map(Self::convert_event_item))
573 }
574}
575
576impl Drop for EventStream {
577 fn drop(&mut self) {
578 let request = Timestamped {
579 inner: DaemonRequest::EventStreamDropped,
580 timestamp: self.clock.new_timestamp(),
581 };
582 let result = self
583 .close_channel
584 .request(&request)
585 .map_err(|e| eyre!(e))
586 .wrap_err("failed to signal event stream closure to dora-daemon")
587 .and_then(|r| match r {
588 DaemonReply::Result(Ok(())) => Ok(()),
589 DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
590 other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
591 });
592 if let Err(err) = result {
593 tracing::warn!("{err:?}")
594 }
595
596 if let Some(write_events_to) = self.write_events_to.take() {
597 if let Err(err) = write_events_to.write_out() {
598 tracing::warn!(
599 "failed to write out events for node {}: {err:?}",
600 self.node_id
601 );
602 }
603 }
604 }
605}
606
607pub(crate) struct WriteEventsTo {
608 node_id: NodeId,
609 file: std::fs::File,
610 events_buffer: Vec<serde_json::Value>,
611}
612
613impl WriteEventsTo {
614 fn write_out(self) -> eyre::Result<()> {
615 let Self {
616 node_id,
617 file,
618 events_buffer,
619 } = self;
620 let mut inputs_file = serde_json::Map::new();
621 inputs_file.insert("id".into(), node_id.to_string().into());
622 inputs_file.insert("events".into(), events_buffer.into());
623
624 serde_json::to_writer_pretty(file, &inputs_file)
625 .context("failed to write events to file")?;
626 Ok(())
627 }
628}