1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5 config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
6 descriptor::{
7 read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
8 DYNAMIC_SOURCE,
9 },
10 topics::LOCALHOST,
11 uhlc::{self, HLC},
12};
13use dora_message::{
14 common::{
15 DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
16 },
17 coordinator_to_cli::DataflowResult,
18 coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
19 daemon_to_coordinator::{
20 CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
21 },
22 daemon_to_daemon::InterDaemonEvent,
23 daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
24 metadata::{self, ArrowTypeInfo},
25 node_to_daemon::{DynamicNodeEvent, Timestamped},
26 DataflowId,
27};
28use dora_node_api::{arrow::datatypes::DataType, Parameter};
29use eyre::{bail, eyre, Context, ContextCompat, Result};
30use futures::{future, stream, FutureExt, TryFutureExt};
31use futures_concurrency::stream::Merge;
32use local_listener::DynamicNodeEventWrapper;
33use log::{DaemonLogger, DataflowLogger, Logger};
34use pending::PendingNodes;
35use shared_memory_server::ShmemConf;
36use socket_stream_utils::socket_stream_send;
37use std::{
38 collections::{BTreeMap, BTreeSet, HashMap},
39 net::SocketAddr,
40 path::{Path, PathBuf},
41 pin::pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use sysinfo::Pid;
46use tokio::{
47 fs::File,
48 io::AsyncReadExt,
49 net::TcpStream,
50 sync::{
51 broadcast,
52 mpsc::{self, UnboundedSender},
53 oneshot::{self, Sender},
54 },
55};
56use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
57use tracing::{error, warn};
58use uuid::{NoContext, Timestamp, Uuid};
59
60mod coordinator;
61mod local_listener;
62mod log;
63mod node_communication;
64mod pending;
65mod socket_stream_utils;
66mod spawn;
67
68#[cfg(feature = "telemetry")]
69use dora_tracing::telemetry::serialize_context;
70#[cfg(feature = "telemetry")]
71use tracing_opentelemetry::OpenTelemetrySpanExt;
72
73use crate::pending::DataflowStatus;
74
75const STDERR_LOG_LINES: usize = 10;
76
77pub struct Daemon {
78 running: HashMap<DataflowId, RunningDataflow>,
79 working_dir: HashMap<DataflowId, PathBuf>,
80
81 events_tx: mpsc::Sender<Timestamped<Event>>,
82
83 coordinator_connection: Option<TcpStream>,
84 last_coordinator_heartbeat: Instant,
85 daemon_id: DaemonId,
86
87 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
89 exit_when_all_finished: bool,
91 dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
93
94 clock: Arc<uhlc::HLC>,
95
96 zenoh_session: zenoh::Session,
97 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
98
99 logger: DaemonLogger,
100}
101
102type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
103
104impl Daemon {
105 pub async fn run(
106 coordinator_addr: SocketAddr,
107 machine_id: Option<String>,
108 local_listen_port: u16,
109 ) -> eyre::Result<()> {
110 let clock = Arc::new(HLC::default());
111
112 let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
113 let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
114 let (daemon_id, incoming_events) = {
115 let incoming_events = set_up_event_stream(
116 coordinator_addr,
117 &machine_id,
118 &clock,
119 remote_daemon_events_rx,
120 local_listen_port,
121 );
122
123 let ctrl_c = pin!(ctrlc_events.recv());
125 match futures::future::select(ctrl_c, pin!(incoming_events)).await {
126 future::Either::Left((_ctrl_c, _)) => {
127 tracing::info!("received ctrl-c signal -> stopping daemon");
128 return Ok(());
129 }
130 future::Either::Right((events, _)) => events?,
131 }
132 };
133 Self::run_general(
134 (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
135 Some(coordinator_addr),
136 daemon_id,
137 None,
138 clock,
139 Some(remote_daemon_events_tx),
140 )
141 .await
142 .map(|_| ())
143 }
144
145 pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
146 let working_dir = dataflow_path
147 .canonicalize()
148 .context("failed to canonicalize dataflow path")?
149 .parent()
150 .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
151 .to_owned();
152
153 let descriptor = read_as_descriptor(dataflow_path).await?;
154 descriptor.check(&working_dir)?;
155 let nodes = descriptor.resolve_aliases_and_set_defaults()?;
156
157 let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
158 let spawn_command = SpawnDataflowNodes {
159 dataflow_id,
160 working_dir,
161 spawn_nodes: nodes.keys().cloned().collect(),
162 nodes,
163 dataflow_descriptor: descriptor,
164 uv,
165 };
166
167 let clock = Arc::new(HLC::default());
168
169 let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
170
171 let exit_when_done = spawn_command
172 .nodes
173 .values()
174 .map(|n| (spawn_command.dataflow_id, n.id.clone()))
175 .collect();
176 let (reply_tx, reply_rx) = oneshot::channel();
177 let timestamp = clock.new_timestamp();
178 let coordinator_events = stream::once(async move {
179 Timestamped {
180 inner: Event::Coordinator(CoordinatorEvent {
181 event: DaemonCoordinatorEvent::Spawn(spawn_command),
182 reply_tx,
183 }),
184 timestamp,
185 }
186 });
187 let events = (coordinator_events, ctrlc_events).merge();
188 let run_result = Self::run_general(
189 Box::pin(events),
190 None,
191 DaemonId::new(None),
192 Some(exit_when_done),
193 clock.clone(),
194 None,
195 );
196
197 let spawn_result = reply_rx
198 .map_err(|err| eyre!("failed to receive spawn result: {err}"))
199 .and_then(|r| async {
200 match r {
201 Some(DaemonCoordinatorReply::SpawnResult(result)) => {
202 result.map_err(|err| eyre!(err))
203 }
204 _ => Err(eyre!("unexpected spawn reply")),
205 }
206 });
207
208 let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
209
210 Ok(DataflowResult {
211 uuid: dataflow_id,
212 timestamp: clock.new_timestamp(),
213 node_results: dataflow_results
214 .remove(&dataflow_id)
215 .context("no node results for dataflow_id")?,
216 })
217 }
218
219 async fn run_general(
220 external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
221 coordinator_addr: Option<SocketAddr>,
222 daemon_id: DaemonId,
223 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
224 clock: Arc<HLC>,
225 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
226 ) -> eyre::Result<DaemonRunResult> {
227 let coordinator_connection = match coordinator_addr {
228 Some(addr) => {
229 let stream = TcpStream::connect(addr)
230 .await
231 .wrap_err("failed to connect to dora-coordinator")?;
232 stream
233 .set_nodelay(true)
234 .wrap_err("failed to set TCP_NODELAY")?;
235 Some(stream)
236 }
237 None => None,
238 };
239
240 let logger_coordinator_connection = match coordinator_addr {
242 Some(addr) => {
243 let stream = TcpStream::connect(addr)
244 .await
245 .wrap_err("failed to connect log to dora-coordinator")?;
246 stream
247 .set_nodelay(true)
248 .wrap_err("failed to set TCP_NODELAY")?;
249 Some(stream)
250 }
251 None => None,
252 };
253
254 let zenoh_config = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
255 Ok(path) => zenoh::Config::from_file(&path)
256 .map_err(|e| eyre!(e))
257 .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?,
258 Err(std::env::VarError::NotPresent) => zenoh::Config::default(),
259 Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
260 "{} env variable is not valid unicode",
261 zenoh::Config::DEFAULT_CONFIG_PATH_ENV
262 ),
263 };
264 let zenoh_session = zenoh::open(zenoh_config)
265 .await
266 .map_err(|e| eyre!(e))
267 .context("failed to open zenoh session")?;
268
269 let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
270 let daemon = Self {
271 logger: Logger {
272 coordinator_connection: logger_coordinator_connection,
273 daemon_id: daemon_id.clone(),
274 clock: clock.clone(),
275 }
276 .for_daemon(daemon_id.clone()),
277 running: HashMap::new(),
278 working_dir: HashMap::new(),
279 events_tx: dora_events_tx,
280 coordinator_connection,
281 last_coordinator_heartbeat: Instant::now(),
282 daemon_id,
283 exit_when_done,
284 exit_when_all_finished: false,
285 dataflow_node_results: BTreeMap::new(),
286 clock,
287 zenoh_session,
288 remote_daemon_events_tx,
289 };
290
291 let dora_events = ReceiverStream::new(dora_events_rx);
292 let watchdog_clock = daemon.clock.clone();
293 let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
294 Duration::from_secs(5),
295 ))
296 .map(|_| Timestamped {
297 inner: Event::HeartbeatInterval,
298 timestamp: watchdog_clock.new_timestamp(),
299 });
300 let events = (external_events, dora_events, watchdog_interval).merge();
301 daemon.run_inner(events).await
302 }
303
304 #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
305 async fn run_inner(
306 mut self,
307 incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
308 ) -> eyre::Result<DaemonRunResult> {
309 let mut events = incoming_events;
310
311 while let Some(event) = events.next().await {
312 let Timestamped { inner, timestamp } = event;
313 if let Err(err) = self.clock.update_with_timestamp(×tamp) {
314 tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
315 }
316
317 match inner {
318 Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
319 let status = self.handle_coordinator_event(event, reply_tx).await?;
320
321 match status {
322 RunStatus::Continue => {}
323 RunStatus::Exit => break,
324 }
325 }
326 Event::Daemon(event) => {
327 self.handle_inter_daemon_event(event).await?;
328 }
329 Event::Node {
330 dataflow_id: dataflow,
331 node_id,
332 event,
333 } => self.handle_node_event(event, dataflow, node_id).await?,
334 Event::Dora(event) => match self.handle_dora_event(event).await? {
335 RunStatus::Continue => {}
336 RunStatus::Exit => break,
337 },
338 Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
339 Event::HeartbeatInterval => {
340 if let Some(connection) = &mut self.coordinator_connection {
341 let msg = serde_json::to_vec(&Timestamped {
342 inner: CoordinatorRequest::Event {
343 daemon_id: self.daemon_id.clone(),
344 event: DaemonEvent::Heartbeat,
345 },
346 timestamp: self.clock.new_timestamp(),
347 })?;
348 socket_stream_send(connection, &msg)
349 .await
350 .wrap_err("failed to send watchdog message to dora-coordinator")?;
351
352 if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
353 bail!("lost connection to coordinator")
354 }
355 }
356 }
357 Event::CtrlC => {
358 tracing::info!("received ctrlc signal -> stopping all dataflows");
359 for dataflow in self.running.values_mut() {
360 let mut logger = self.logger.for_dataflow(dataflow.id);
361 dataflow
362 .stop_all(
363 &mut self.coordinator_connection,
364 &self.clock,
365 None,
366 &mut logger,
367 )
368 .await?;
369 }
370 self.exit_when_all_finished = true;
371 if self.running.is_empty() {
372 break;
373 }
374 }
375 Event::SecondCtrlC => {
376 tracing::warn!("received second ctrlc signal -> exit immediately");
377 bail!("received second ctrl-c signal");
378 }
379 Event::DaemonError(err) => {
380 tracing::error!("Daemon error: {err:?}");
381 }
382 }
383 }
384
385 if let Some(mut connection) = self.coordinator_connection.take() {
386 let msg = serde_json::to_vec(&Timestamped {
387 inner: CoordinatorRequest::Event {
388 daemon_id: self.daemon_id.clone(),
389 event: DaemonEvent::Exit,
390 },
391 timestamp: self.clock.new_timestamp(),
392 })?;
393 socket_stream_send(&mut connection, &msg)
394 .await
395 .wrap_err("failed to send Exit message to dora-coordinator")?;
396 }
397
398 Ok(self.dataflow_node_results)
399 }
400
401 async fn handle_coordinator_event(
402 &mut self,
403 event: DaemonCoordinatorEvent,
404 reply_tx: Sender<Option<DaemonCoordinatorReply>>,
405 ) -> eyre::Result<RunStatus> {
406 let status = match event {
407 DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
408 dataflow_id,
409 working_dir,
410 nodes,
411 dataflow_descriptor,
412 spawn_nodes,
413 uv,
414 }) => {
415 match dataflow_descriptor.communication.remote {
416 dora_core::config::RemoteCommunicationConfig::Tcp => {}
417 }
418
419 let working_dir = if working_dir.exists() {
421 working_dir
422 } else {
423 std::env::current_dir().wrap_err("failed to get current working dir")?
424 };
425
426 let result = self
427 .spawn_dataflow(
428 dataflow_id,
429 working_dir,
430 nodes,
431 dataflow_descriptor,
432 spawn_nodes,
433 uv,
434 )
435 .await;
436 if let Err(err) = &result {
437 tracing::error!("{err:?}");
438 }
439 let reply =
440 DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
441 let _ = reply_tx.send(Some(reply)).map_err(|_| {
442 error!("could not send `SpawnResult` reply from daemon to coordinator")
443 });
444 RunStatus::Continue
445 }
446 DaemonCoordinatorEvent::AllNodesReady {
447 dataflow_id,
448 exited_before_subscribe,
449 } => {
450 let mut logger = self.logger.for_dataflow(dataflow_id);
451 logger.log(LogLevel::Debug, None,
452 Some("daemon".into()),
453 format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
454 )).await;
455 match self.running.get_mut(&dataflow_id) {
456 Some(dataflow) => {
457 let ready = exited_before_subscribe.is_empty();
458 dataflow
459 .pending_nodes
460 .handle_external_all_nodes_ready(
461 exited_before_subscribe,
462 &mut dataflow.cascading_error_causes,
463 )
464 .await?;
465 if ready {
466 logger.log(LogLevel::Info, None,
467 Some("daemon".into()),
468 "coordinator reported that all nodes are ready, starting dataflow",
469 ).await;
470 dataflow.start(&self.events_tx, &self.clock).await?;
471 }
472 }
473 None => {
474 tracing::warn!(
475 "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
476 );
477 }
478 }
479 let _ = reply_tx.send(None).map_err(|_| {
480 error!("could not send `AllNodesReady` reply from daemon to coordinator")
481 });
482 RunStatus::Continue
483 }
484 DaemonCoordinatorEvent::Logs {
485 dataflow_id,
486 node_id,
487 } => {
488 match self.working_dir.get(&dataflow_id) {
489 Some(working_dir) => {
490 let working_dir = working_dir.clone();
491 tokio::spawn(async move {
492 let logs = async {
493 let mut file =
494 File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
495 .await
496 .wrap_err(format!(
497 "Could not open log file: {:#?}",
498 log::log_path(&working_dir, &dataflow_id, &node_id)
499 ))?;
500
501 let mut contents = vec![];
502 file.read_to_end(&mut contents)
503 .await
504 .wrap_err("Could not read content of log file")?;
505 Result::<Vec<u8>, eyre::Report>::Ok(contents)
506 }
507 .await
508 .map_err(|err| format!("{err:?}"));
509 let _ = reply_tx
510 .send(Some(DaemonCoordinatorReply::Logs(logs)))
511 .map_err(|_| {
512 error!("could not send logs reply from daemon to coordinator")
513 });
514 });
515 }
516 None => {
517 tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
518 let _ = reply_tx.send(None).map_err(|_| {
519 error!(
520 "could not send `AllNodesReady` reply from daemon to coordinator"
521 )
522 });
523 }
524 }
525 RunStatus::Continue
526 }
527 DaemonCoordinatorEvent::ReloadDataflow {
528 dataflow_id,
529 node_id,
530 operator_id,
531 } => {
532 let result = self.send_reload(dataflow_id, node_id, operator_id).await;
533 let reply =
534 DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
535 let _ = reply_tx
536 .send(Some(reply))
537 .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
538 RunStatus::Continue
539 }
540 DaemonCoordinatorEvent::StopDataflow {
541 dataflow_id,
542 grace_duration,
543 } => {
544 let mut logger = self.logger.for_dataflow(dataflow_id);
545 let dataflow = self
546 .running
547 .get_mut(&dataflow_id)
548 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
549 let (reply, future) = match dataflow {
550 Ok(dataflow) => {
551 let future = dataflow.stop_all(
552 &mut self.coordinator_connection,
553 &self.clock,
554 grace_duration,
555 &mut logger,
556 );
557 (Ok(()), Some(future))
558 }
559 Err(err) => (Err(err.to_string()), None),
560 };
561
562 let _ = reply_tx
563 .send(Some(DaemonCoordinatorReply::StopResult(reply)))
564 .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
565
566 if let Some(future) = future {
567 future.await?;
568 }
569
570 RunStatus::Continue
571 }
572 DaemonCoordinatorEvent::Destroy => {
573 tracing::info!("received destroy command -> exiting");
574 let (notify_tx, notify_rx) = oneshot::channel();
575 let reply = DaemonCoordinatorReply::DestroyResult {
576 result: Ok(()),
577 notify: Some(notify_tx),
578 };
579 let _ = reply_tx
580 .send(Some(reply))
581 .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
582 if notify_rx.await.is_err() {
584 tracing::warn!("no confirmation received for DestroyReply");
585 }
586 RunStatus::Exit
587 }
588 DaemonCoordinatorEvent::Heartbeat => {
589 self.last_coordinator_heartbeat = Instant::now();
590 let _ = reply_tx.send(None);
591 RunStatus::Continue
592 }
593 };
594 Ok(status)
595 }
596
597 async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
598 match event {
599 InterDaemonEvent::Output {
600 dataflow_id,
601 node_id,
602 output_id,
603 metadata,
604 data,
605 } => {
606 let inner = async {
607 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
608 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
609 })?;
610 send_output_to_local_receivers(
611 node_id.clone(),
612 output_id.clone(),
613 dataflow,
614 &metadata,
615 data.map(DataMessage::Vec),
616 &self.clock,
617 )
618 .await?;
619 Result::<_, eyre::Report>::Ok(())
620 };
621 if let Err(err) = inner
622 .await
623 .wrap_err("failed to forward remote output to local receivers")
624 {
625 let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
626 logger
627 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
628 .await;
629 }
630 Ok(())
631 }
632 InterDaemonEvent::OutputClosed {
633 dataflow_id,
634 node_id,
635 output_id,
636 } => {
637 let output_id = OutputId(node_id.clone(), output_id);
638 let mut logger = self
639 .logger
640 .for_dataflow(dataflow_id)
641 .for_node(node_id.clone());
642 logger
643 .log(
644 LogLevel::Debug,
645 Some("daemon".into()),
646 format!("received OutputClosed event for output {output_id:?}"),
647 )
648 .await;
649
650 let inner = async {
651 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
652 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
653 })?;
654
655 if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
656 for (receiver_id, input_id) in &inputs {
657 close_input(dataflow, receiver_id, input_id, &self.clock);
658 }
659 }
660 Result::<(), eyre::Report>::Ok(())
661 };
662 if let Err(err) = inner
663 .await
664 .wrap_err("failed to handle InputsClosed event sent by coordinator")
665 {
666 logger
667 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
668 .await;
669 }
670 Ok(())
671 }
672 }
673 }
674
675 async fn spawn_dataflow(
676 &mut self,
677 dataflow_id: uuid::Uuid,
678 working_dir: PathBuf,
679 nodes: BTreeMap<NodeId, ResolvedNode>,
680 dataflow_descriptor: Descriptor,
681 spawn_nodes: BTreeSet<NodeId>,
682 uv: bool,
683 ) -> eyre::Result<()> {
684 let mut logger = self.logger.for_dataflow(dataflow_id);
685 let dataflow =
686 RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
687 let dataflow = match self.running.entry(dataflow_id) {
688 std::collections::hash_map::Entry::Vacant(entry) => {
689 self.working_dir.insert(dataflow_id, working_dir.clone());
690 entry.insert(dataflow)
691 }
692 std::collections::hash_map::Entry::Occupied(_) => {
693 bail!("there is already a running dataflow with ID `{dataflow_id}`")
694 }
695 };
696
697 let mut stopped = Vec::new();
698
699 for node in nodes.values() {
701 let local = spawn_nodes.contains(&node.id);
702
703 let inputs = node_inputs(node);
704 for (input_id, input) in inputs {
705 if local {
706 dataflow
707 .open_inputs
708 .entry(node.id.clone())
709 .or_default()
710 .insert(input_id.clone());
711 match input.mapping {
712 InputMapping::User(mapping) => {
713 dataflow
714 .mappings
715 .entry(OutputId(mapping.source, mapping.output))
716 .or_default()
717 .insert((node.id.clone(), input_id));
718 }
719 InputMapping::Timer { interval } => {
720 dataflow
721 .timers
722 .entry(interval)
723 .or_default()
724 .insert((node.id.clone(), input_id));
725 }
726 }
727 } else if let InputMapping::User(mapping) = input.mapping {
728 dataflow
729 .open_external_mappings
730 .insert(OutputId(mapping.source, mapping.output));
731 }
732 }
733 }
734
735 for node in nodes.into_values() {
737 let mut logger = logger.reborrow().for_node(node.id.clone());
738 let local = spawn_nodes.contains(&node.id);
739 if local {
740 if node.kind.dynamic() {
741 dataflow.dynamic_nodes.insert(node.id.clone());
742 } else {
743 dataflow.pending_nodes.insert(node.id.clone());
744 }
745
746 let node_id = node.id.clone();
747 let node_stderr_most_recent = dataflow
748 .node_stderr_most_recent
749 .entry(node.id.clone())
750 .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
751 .clone();
752 logger
753 .log(LogLevel::Info, Some("daemon".into()), "spawning")
754 .await;
755 match spawn::spawn_node(
756 dataflow_id,
757 &working_dir,
758 node,
759 self.events_tx.clone(),
760 dataflow_descriptor.clone(),
761 self.clock.clone(),
762 node_stderr_most_recent,
763 uv,
764 &mut logger,
765 )
766 .await
767 .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
768 {
769 Ok(running_node) => {
770 dataflow.running_nodes.insert(node_id, running_node);
771 }
772 Err(err) => {
773 logger
774 .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
775 .await;
776 self.dataflow_node_results
777 .entry(dataflow_id)
778 .or_default()
779 .insert(
780 node_id.clone(),
781 Err(NodeError {
782 timestamp: self.clock.new_timestamp(),
783 cause: NodeErrorCause::Other {
784 stderr: format!("spawn failed: {err:?}"),
785 },
786 exit_status: NodeExitStatus::Unknown,
787 }),
788 );
789 stopped.push(node_id.clone());
790 }
791 }
792 } else {
793 dataflow.pending_nodes.set_external_nodes(true);
795
796 for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
798 let tx = self
799 .remote_daemon_events_tx
800 .clone()
801 .wrap_err("no remote_daemon_events_tx channel")?;
802 let mut finished_rx = dataflow.finished_tx.subscribe();
803 let subscribe_topic = dataflow.output_publish_topic(output_id);
804 tracing::debug!("declaring subscriber on {subscribe_topic}");
805 let subscriber = self
806 .zenoh_session
807 .declare_subscriber(subscribe_topic)
808 .await
809 .map_err(|e| eyre!(e))
810 .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
811 tokio::spawn(async move {
812 let mut finished = pin!(finished_rx.recv());
813 loop {
814 let finished_or_next =
815 futures::future::select(finished, subscriber.recv_async());
816 match finished_or_next.await {
817 future::Either::Left((finished, _)) => {
818 match finished {
819 Err(broadcast::error::RecvError::Closed) => {
820 tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
821 break;
822 }
823 other => {
824 tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
825 break;
826 }
827 }
828 }
829 future::Either::Right((sample, f)) => {
830 finished = f;
831 let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
832 Timestamped::deserialize_inter_daemon_event(
833 &s.payload().to_bytes(),
834 )
835 });
836 if tx.send_async(event).await.is_err() {
837 break;
839 }
840 }
841 }
842 }
843 });
844 }
845 }
846 }
847 for node_id in stopped {
848 self.handle_node_stop(dataflow_id, &node_id).await?;
849 }
850
851 Ok(())
852 }
853
854 async fn handle_dynamic_node_event(
855 &mut self,
856 event: DynamicNodeEventWrapper,
857 ) -> eyre::Result<()> {
858 match event {
859 DynamicNodeEventWrapper {
860 event: DynamicNodeEvent::NodeConfig { node_id },
861 reply_tx,
862 } => {
863 let number_node_id = self
864 .running
865 .iter()
866 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
867 .count();
868
869 let node_config = match number_node_id {
870 2.. => Err(format!(
871 "multiple dataflows contains dynamic node id {node_id}. \
872 Please only have one running dataflow with the specified \
873 node id if you want to use dynamic node",
874 )),
875 1 => self
876 .running
877 .iter()
878 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
879 .map(|(id, dataflow)| -> Result<NodeConfig> {
880 let node_config = dataflow
881 .running_nodes
882 .get(&node_id)
883 .context("no node with ID `{node_id}` within the given dataflow")?
884 .node_config
885 .clone();
886 if !node_config.dynamic {
887 bail!("node with ID `{node_id}` in {id} is not dynamic");
888 }
889 Ok(node_config)
890 })
891 .next()
892 .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
893 .and_then(|r| r)
894 .map_err(|err| {
895 format!(
896 "failed to get dynamic node config within given dataflow: {err}"
897 )
898 }),
899 0 => Err("no node with ID `{node_id}`".to_string()),
900 };
901
902 let reply = DaemonReply::NodeConfig {
903 result: node_config,
904 };
905 let _ = reply_tx.send(Some(reply)).map_err(|_| {
906 error!("could not send node info reply from daemon to coordinator")
907 });
908 Ok(())
909 }
910 }
911 }
912
913 async fn handle_node_event(
914 &mut self,
915 event: DaemonNodeEvent,
916 dataflow_id: DataflowId,
917 node_id: NodeId,
918 ) -> eyre::Result<()> {
919 match event {
920 DaemonNodeEvent::Subscribe {
921 event_sender,
922 reply_sender,
923 } => {
924 let mut logger = self.logger.for_dataflow(dataflow_id);
925 logger
926 .log(
927 LogLevel::Info,
928 Some(node_id.clone()),
929 Some("daemon".into()),
930 "node is ready",
931 )
932 .await;
933
934 let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
935 format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
936 });
937
938 match dataflow {
939 Err(err) => {
940 let _ = reply_sender.send(DaemonReply::Result(Err(err)));
941 }
942 Ok(dataflow) => {
943 Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
944
945 let status = dataflow
946 .pending_nodes
947 .handle_node_subscription(
948 node_id.clone(),
949 reply_sender,
950 &mut self.coordinator_connection,
951 &self.clock,
952 &mut dataflow.cascading_error_causes,
953 &mut logger,
954 )
955 .await?;
956 match status {
957 DataflowStatus::AllNodesReady => {
958 logger
959 .log(
960 LogLevel::Info,
961 None,
962 Some("daemon".into()),
963 "all nodes are ready, starting dataflow",
964 )
965 .await;
966 dataflow.start(&self.events_tx, &self.clock).await?;
967 }
968 DataflowStatus::Pending => {}
969 }
970 }
971 }
972 }
973 DaemonNodeEvent::SubscribeDrop {
974 event_sender,
975 reply_sender,
976 } => {
977 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
978 format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
979 });
980 let result = match dataflow {
981 Ok(dataflow) => {
982 dataflow.drop_channels.insert(node_id, event_sender);
983 Ok(())
984 }
985 Err(err) => Err(err.to_string()),
986 };
987 let _ = reply_sender.send(DaemonReply::Result(result));
988 }
989 DaemonNodeEvent::CloseOutputs {
990 outputs,
991 reply_sender,
992 } => {
993 let inner = async {
995 self.send_output_closed_events(dataflow_id, node_id, outputs)
996 .await
997 };
998
999 let reply = inner.await.map_err(|err| format!("{err:?}"));
1000 let _ = reply_sender.send(DaemonReply::Result(reply));
1001 }
1002 DaemonNodeEvent::OutputsDone { reply_sender } => {
1003 let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1004
1005 let _ = reply_sender.send(DaemonReply::Result(
1006 result.map_err(|err| format!("{err:?}")),
1007 ));
1008 }
1009 DaemonNodeEvent::SendOut {
1010 output_id,
1011 metadata,
1012 data,
1013 } => self
1014 .send_out(dataflow_id, node_id, output_id, metadata, data)
1015 .await
1016 .context("failed to send out")?,
1017 DaemonNodeEvent::ReportDrop { tokens } => {
1018 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1019 format!(
1020 "failed to get handle drop tokens: \
1021 no running dataflow with ID `{dataflow_id}`"
1022 )
1023 });
1024
1025 match dataflow {
1026 Ok(dataflow) => {
1027 for token in tokens {
1028 match dataflow.pending_drop_tokens.get_mut(&token) {
1029 Some(info) => {
1030 if info.pending_nodes.remove(&node_id) {
1031 dataflow.check_drop_token(token, &self.clock).await?;
1032 } else {
1033 tracing::warn!(
1034 "node `{node_id}` is not pending for drop token `{token:?}`"
1035 );
1036 }
1037 }
1038 None => tracing::warn!("unknown drop token `{token:?}`"),
1039 }
1040 }
1041 }
1042 Err(err) => tracing::warn!("{err:?}"),
1043 }
1044 }
1045 DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1046 let inner = async {
1047 let dataflow = self
1048 .running
1049 .get_mut(&dataflow_id)
1050 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1051 dataflow.subscribe_channels.remove(&node_id);
1052 Result::<_, eyre::Error>::Ok(())
1053 };
1054
1055 let reply = inner.await.map_err(|err| format!("{err:?}"));
1056 let _ = reply_sender.send(DaemonReply::Result(reply));
1057 }
1058 }
1059 Ok(())
1060 }
1061
1062 async fn send_reload(
1063 &mut self,
1064 dataflow_id: Uuid,
1065 node_id: NodeId,
1066 operator_id: Option<OperatorId>,
1067 ) -> Result<(), eyre::ErrReport> {
1068 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1069 format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1070 })?;
1071 if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1072 match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1073 Ok(()) => {}
1074 Err(_) => {
1075 dataflow.subscribe_channels.remove(&node_id);
1076 }
1077 }
1078 }
1079 Ok(())
1080 }
1081
1082 async fn send_out(
1083 &mut self,
1084 dataflow_id: Uuid,
1085 node_id: NodeId,
1086 output_id: DataId,
1087 metadata: dora_message::metadata::Metadata,
1088 data: Option<DataMessage>,
1089 ) -> Result<(), eyre::ErrReport> {
1090 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1091 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1092 })?;
1093 let data_bytes = send_output_to_local_receivers(
1094 node_id.clone(),
1095 output_id.clone(),
1096 dataflow,
1097 &metadata,
1098 data,
1099 &self.clock,
1100 )
1101 .await?;
1102
1103 let output_id = OutputId(node_id, output_id);
1104 let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1105 || dataflow.publish_all_messages_to_zenoh;
1106 if remote_receivers {
1107 let event = InterDaemonEvent::Output {
1108 dataflow_id,
1109 node_id: output_id.0.clone(),
1110 output_id: output_id.1.clone(),
1111 metadata,
1112 data: data_bytes,
1113 };
1114 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1115 .await?;
1116 }
1117
1118 Ok(())
1119 }
1120
1121 async fn send_to_remote_receivers(
1122 &mut self,
1123 dataflow_id: Uuid,
1124 output_id: &OutputId,
1125 event: InterDaemonEvent,
1126 ) -> Result<(), eyre::Error> {
1127 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1128 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1129 })?;
1130
1131 let publisher = match dataflow.publishers.get(output_id) {
1133 Some(publisher) => publisher,
1134 None => {
1135 let publish_topic = dataflow.output_publish_topic(output_id);
1136 tracing::debug!("declaring publisher on {publish_topic}");
1137 let publisher = self
1138 .zenoh_session
1139 .declare_publisher(publish_topic)
1140 .await
1141 .map_err(|e| eyre!(e))
1142 .context("failed to create zenoh publisher")?;
1143 dataflow.publishers.insert(output_id.clone(), publisher);
1144 dataflow.publishers.get(output_id).unwrap()
1145 }
1146 };
1147
1148 let serialized_event = Timestamped {
1149 inner: event,
1150 timestamp: self.clock.new_timestamp(),
1151 }
1152 .serialize();
1153 publisher
1154 .put(serialized_event)
1155 .await
1156 .map_err(|e| eyre!(e))
1157 .context("zenoh put failed")?;
1158 Ok(())
1159 }
1160
1161 async fn send_output_closed_events(
1162 &mut self,
1163 dataflow_id: DataflowId,
1164 node_id: NodeId,
1165 outputs: Vec<DataId>,
1166 ) -> eyre::Result<()> {
1167 let dataflow = self
1168 .running
1169 .get_mut(&dataflow_id)
1170 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1171 let local_node_inputs: BTreeSet<_> = dataflow
1172 .mappings
1173 .iter()
1174 .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1175 .flat_map(|(_, v)| v)
1176 .cloned()
1177 .collect();
1178 for (receiver_id, input_id) in &local_node_inputs {
1179 close_input(dataflow, receiver_id, input_id, &self.clock);
1180 }
1181
1182 let mut closed = Vec::new();
1183 for output_id in &dataflow.open_external_mappings {
1184 if output_id.0 == node_id && outputs.contains(&output_id.1) {
1185 closed.push(output_id.clone());
1186 }
1187 }
1188
1189 for output_id in closed {
1190 let event = InterDaemonEvent::OutputClosed {
1191 dataflow_id,
1192 node_id: output_id.0.clone(),
1193 output_id: output_id.1.clone(),
1194 };
1195 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1196 .await?;
1197 }
1198
1199 Ok(())
1200 }
1201
1202 async fn subscribe(
1203 dataflow: &mut RunningDataflow,
1204 node_id: NodeId,
1205 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1206 clock: &HLC,
1207 ) {
1208 let closed_inputs = dataflow
1210 .mappings
1211 .values()
1212 .flatten()
1213 .filter(|(node, _)| node == &node_id)
1214 .map(|(_, input)| input)
1215 .filter(|input| {
1216 dataflow
1217 .open_inputs
1218 .get(&node_id)
1219 .map(|open_inputs| !open_inputs.contains(*input))
1220 .unwrap_or(true)
1221 });
1222 for input_id in closed_inputs {
1223 let _ = send_with_timestamp(
1224 &event_sender,
1225 NodeEvent::InputClosed {
1226 id: input_id.clone(),
1227 },
1228 clock,
1229 );
1230 }
1231 if dataflow.open_inputs(&node_id).is_empty() {
1232 let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1233 }
1234
1235 if dataflow.stop_sent {
1238 let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1239 }
1240
1241 dataflow.subscribe_channels.insert(node_id, event_sender);
1242 }
1243
1244 #[tracing::instrument(skip(self), level = "trace")]
1245 async fn handle_outputs_done(
1246 &mut self,
1247 dataflow_id: DataflowId,
1248 node_id: &NodeId,
1249 ) -> eyre::Result<()> {
1250 let dataflow = self
1251 .running
1252 .get_mut(&dataflow_id)
1253 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1254
1255 let outputs = dataflow
1256 .mappings
1257 .keys()
1258 .filter(|m| &m.0 == node_id)
1259 .map(|m| &m.1)
1260 .cloned()
1261 .collect();
1262 self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1263 .await?;
1264
1265 let dataflow = self
1266 .running
1267 .get_mut(&dataflow_id)
1268 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1269 dataflow.drop_channels.remove(node_id);
1270 Ok(())
1271 }
1272
1273 async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
1274 let mut logger = self.logger.for_dataflow(dataflow_id);
1275 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1276 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1277 })?;
1278
1279 dataflow
1280 .pending_nodes
1281 .handle_node_stop(
1282 node_id,
1283 &mut self.coordinator_connection,
1284 &self.clock,
1285 &mut dataflow.cascading_error_causes,
1286 &mut logger,
1287 )
1288 .await?;
1289
1290 self.handle_outputs_done(dataflow_id, node_id).await?;
1291
1292 let mut logger = self.logger.for_dataflow(dataflow_id);
1293 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1294 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1295 })?;
1296 if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1297 pid.mark_as_stopped()
1298 }
1299 if dataflow
1300 .running_nodes
1301 .iter()
1302 .all(|(_id, n)| n.node_config.dynamic)
1303 {
1304 let result = DataflowDaemonResult {
1305 timestamp: self.clock.new_timestamp(),
1306 node_results: self
1307 .dataflow_node_results
1308 .get(&dataflow.id)
1309 .context("failed to get dataflow node results")?
1310 .clone(),
1311 };
1312
1313 logger
1314 .log(
1315 LogLevel::Info,
1316 None,
1317 Some("daemon".into()),
1318 format!("dataflow finished on machine `{}`", self.daemon_id),
1319 )
1320 .await;
1321 if let Some(connection) = &mut self.coordinator_connection {
1322 let msg = serde_json::to_vec(&Timestamped {
1323 inner: CoordinatorRequest::Event {
1324 daemon_id: self.daemon_id.clone(),
1325 event: DaemonEvent::AllNodesFinished {
1326 dataflow_id,
1327 result,
1328 },
1329 },
1330 timestamp: self.clock.new_timestamp(),
1331 })?;
1332 socket_stream_send(connection, &msg)
1333 .await
1334 .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1335 }
1336 self.running.remove(&dataflow_id);
1337 }
1338
1339 Ok(())
1340 }
1341
1342 async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<RunStatus> {
1343 match event {
1344 DoraEvent::Timer {
1345 dataflow_id,
1346 interval,
1347 metadata,
1348 } => {
1349 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1350 tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1351 return Ok(RunStatus::Continue);
1352 };
1353
1354 let Some(subscribers) = dataflow.timers.get(&interval) else {
1355 return Ok(RunStatus::Continue);
1356 };
1357
1358 let mut closed = Vec::new();
1359 for (receiver_id, input_id) in subscribers {
1360 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1361 continue;
1362 };
1363
1364 let send_result = send_with_timestamp(
1365 channel,
1366 NodeEvent::Input {
1367 id: input_id.clone(),
1368 metadata: metadata.clone(),
1369 data: None,
1370 },
1371 &self.clock,
1372 );
1373 match send_result {
1374 Ok(()) => {}
1375 Err(_) => {
1376 closed.push(receiver_id);
1377 }
1378 }
1379 }
1380 for id in closed {
1381 dataflow.subscribe_channels.remove(id);
1382 }
1383 }
1384 DoraEvent::Logs {
1385 dataflow_id,
1386 output_id,
1387 message,
1388 metadata,
1389 } => {
1390 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1391 tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
1392 return Ok(RunStatus::Continue);
1393 };
1394
1395 let Some(subscribers) = dataflow.mappings.get(&output_id) else {
1396 tracing::warn!(
1397 "No subscribers found for {:?} in {:?}",
1398 output_id,
1399 dataflow.mappings
1400 );
1401 return Ok(RunStatus::Continue);
1402 };
1403
1404 let mut closed = Vec::new();
1405 for (receiver_id, input_id) in subscribers {
1406 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1407 tracing::warn!("No subscriber channel found for {:?}", output_id);
1408 continue;
1409 };
1410
1411 let send_result = send_with_timestamp(
1412 channel,
1413 NodeEvent::Input {
1414 id: input_id.clone(),
1415 metadata: metadata.clone(),
1416 data: Some(message.clone()),
1417 },
1418 &self.clock,
1419 );
1420 match send_result {
1421 Ok(()) => {}
1422 Err(_) => {
1423 closed.push(receiver_id);
1424 }
1425 }
1426 }
1427 for id in closed {
1428 dataflow.subscribe_channels.remove(id);
1429 }
1430 }
1431 DoraEvent::SpawnedNodeResult {
1432 dataflow_id,
1433 node_id,
1434 exit_status,
1435 } => {
1436 let mut logger = self
1437 .logger
1438 .for_dataflow(dataflow_id)
1439 .for_node(node_id.clone());
1440 logger
1441 .log(
1442 LogLevel::Debug,
1443 Some("daemon".into()),
1444 format!("handling node stop with exit status {exit_status:?}"),
1445 )
1446 .await;
1447
1448 let node_result = match exit_status {
1449 NodeExitStatus::Success => Ok(()),
1450 exit_status => {
1451 let dataflow = self.running.get(&dataflow_id);
1452 let caused_by_node = dataflow
1453 .and_then(|dataflow| {
1454 dataflow.cascading_error_causes.error_caused_by(&node_id)
1455 })
1456 .cloned();
1457 let grace_duration_kill = dataflow
1458 .map(|d| d.grace_duration_kills.contains(&node_id))
1459 .unwrap_or_default();
1460
1461 let cause = match caused_by_node {
1462 Some(caused_by_node) => {
1463 logger
1464 .log(
1465 LogLevel::Info,
1466 Some("daemon".into()),
1467 format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
1468 )
1469 .await;
1470
1471 NodeErrorCause::Cascading { caused_by_node }
1472 }
1473 None if grace_duration_kill => NodeErrorCause::GraceDuration,
1474 None => {
1475 let cause = dataflow
1476 .and_then(|d| d.node_stderr_most_recent.get(&node_id))
1477 .map(|queue| {
1478 let mut s = if queue.is_full() {
1479 "[...]".into()
1480 } else {
1481 String::new()
1482 };
1483 while let Some(line) = queue.pop() {
1484 s += &line;
1485 }
1486 s
1487 })
1488 .unwrap_or_default();
1489
1490 NodeErrorCause::Other { stderr: cause }
1491 }
1492 };
1493 Err(NodeError {
1494 timestamp: self.clock.new_timestamp(),
1495 cause,
1496 exit_status,
1497 })
1498 }
1499 };
1500
1501 logger
1502 .log(
1503 if node_result.is_ok() {
1504 LogLevel::Info
1505 } else {
1506 LogLevel::Error
1507 },
1508 Some("daemon".into()),
1509 match &node_result {
1510 Ok(()) => format!("{node_id} finished successfully"),
1511 Err(err) => format!("{err}"),
1512 },
1513 )
1514 .await;
1515
1516 self.dataflow_node_results
1517 .entry(dataflow_id)
1518 .or_default()
1519 .insert(node_id.clone(), node_result);
1520
1521 self.handle_node_stop(dataflow_id, &node_id).await?;
1522
1523 if let Some(exit_when_done) = &mut self.exit_when_done {
1524 exit_when_done.remove(&(dataflow_id, node_id));
1525 if exit_when_done.is_empty() {
1526 tracing::info!(
1527 "exiting daemon because all required dataflows are finished"
1528 );
1529 return Ok(RunStatus::Exit);
1530 }
1531 }
1532 if self.exit_when_all_finished && self.running.is_empty() {
1533 return Ok(RunStatus::Exit);
1534 }
1535 }
1536 }
1537 Ok(RunStatus::Continue)
1538 }
1539}
1540
1541async fn set_up_event_stream(
1542 coordinator_addr: SocketAddr,
1543 machine_id: &Option<String>,
1544 clock: &Arc<HLC>,
1545 remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
1546 local_listen_port: u16,
1548) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
1549 let clock_cloned = clock.clone();
1550 let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
1551 Ok(e) => Timestamped {
1552 inner: Event::Daemon(e.inner),
1553 timestamp: e.timestamp,
1554 },
1555 Err(err) => Timestamped {
1556 inner: Event::DaemonError(err),
1557 timestamp: clock_cloned.new_timestamp(),
1558 },
1559 });
1560 let (daemon_id, coordinator_events) =
1561 coordinator::register(coordinator_addr, machine_id.clone(), clock)
1562 .await
1563 .wrap_err("failed to connect to dora-coordinator")?;
1564 let coordinator_events = coordinator_events.map(
1565 |Timestamped {
1566 inner: event,
1567 timestamp,
1568 }| Timestamped {
1569 inner: Event::Coordinator(event),
1570 timestamp,
1571 },
1572 );
1573 let (events_tx, events_rx) = flume::bounded(10);
1574 let _listen_port =
1575 local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
1576 .await?;
1577 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
1578 inner: Event::DynamicNode(e.inner),
1579 timestamp: e.timestamp,
1580 });
1581 let incoming = (
1582 coordinator_events,
1583 remote_daemon_events,
1584 dynamic_node_events,
1585 )
1586 .merge();
1587 Ok((daemon_id, incoming))
1588}
1589
1590async fn send_output_to_local_receivers(
1591 node_id: NodeId,
1592 output_id: DataId,
1593 dataflow: &mut RunningDataflow,
1594 metadata: &metadata::Metadata,
1595 data: Option<DataMessage>,
1596 clock: &HLC,
1597) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
1598 let timestamp = metadata.timestamp();
1599 let empty_set = BTreeSet::new();
1600 let output_id = OutputId(node_id, output_id);
1601 let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
1602 let OutputId(node_id, _) = output_id;
1603 let mut closed = Vec::new();
1604 for (receiver_id, input_id) in local_receivers {
1605 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1606 let item = NodeEvent::Input {
1607 id: input_id.clone(),
1608 metadata: metadata.clone(),
1609 data: data.clone(),
1610 };
1611 match channel.send(Timestamped {
1612 inner: item,
1613 timestamp,
1614 }) {
1615 Ok(()) => {
1616 if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
1617 dataflow
1618 .pending_drop_tokens
1619 .entry(token)
1620 .or_insert_with(|| DropTokenInformation {
1621 owner: node_id.clone(),
1622 pending_nodes: Default::default(),
1623 })
1624 .pending_nodes
1625 .insert(receiver_id.clone());
1626 }
1627 }
1628 Err(_) => {
1629 closed.push(receiver_id);
1630 }
1631 }
1632 }
1633 }
1634 for id in closed {
1635 dataflow.subscribe_channels.remove(id);
1636 }
1637 let (data_bytes, drop_token) = match data {
1638 None => (None, None),
1639 Some(DataMessage::SharedMemory {
1640 shared_memory_id,
1641 len,
1642 drop_token,
1643 }) => {
1644 let memory = ShmemConf::new()
1645 .os_id(shared_memory_id)
1646 .open()
1647 .wrap_err("failed to map shared memory output")?;
1648 let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
1649 (data, Some(drop_token))
1650 }
1651 Some(DataMessage::Vec(v)) => (Some(v), None),
1652 };
1653 if let Some(token) = drop_token {
1654 dataflow
1656 .pending_drop_tokens
1657 .entry(token)
1658 .or_insert_with(|| DropTokenInformation {
1659 owner: node_id.clone(),
1660 pending_nodes: Default::default(),
1661 });
1662 dataflow.check_drop_token(token, clock).await?;
1664 }
1665 Ok(data_bytes)
1666}
1667
1668fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
1669 match &node.kind {
1670 CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
1671 CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
1672 }
1673}
1674
1675fn close_input(
1676 dataflow: &mut RunningDataflow,
1677 receiver_id: &NodeId,
1678 input_id: &DataId,
1679 clock: &HLC,
1680) {
1681 if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
1682 if !open_inputs.remove(input_id) {
1683 return;
1684 }
1685 }
1686 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1687 let _ = send_with_timestamp(
1688 channel,
1689 NodeEvent::InputClosed {
1690 id: input_id.clone(),
1691 },
1692 clock,
1693 );
1694
1695 if dataflow.open_inputs(receiver_id).is_empty() {
1696 let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
1697 }
1698 }
1699}
1700
1701#[derive(Debug)]
1702struct RunningNode {
1703 pid: Option<ProcessId>,
1704 node_config: NodeConfig,
1705}
1706
1707#[derive(Debug)]
1708struct ProcessId(Option<u32>);
1709
1710impl ProcessId {
1711 pub fn new(process_id: u32) -> Self {
1712 Self(Some(process_id))
1713 }
1714
1715 pub fn mark_as_stopped(&mut self) {
1716 self.0 = None;
1717 }
1718
1719 pub fn kill(&mut self) -> bool {
1720 if let Some(pid) = self.0 {
1721 let mut system = sysinfo::System::new();
1722 system.refresh_processes();
1723
1724 if let Some(process) = system.process(Pid::from(pid as usize)) {
1725 process.kill();
1726 self.mark_as_stopped();
1727 return true;
1728 }
1729 }
1730
1731 false
1732 }
1733}
1734
1735impl Drop for ProcessId {
1736 fn drop(&mut self) {
1737 if let Some(pid) = self.0 {
1739 if self.kill() {
1740 warn!("process {pid} was killed on drop because it was still running")
1741 }
1742 }
1743 }
1744}
1745
1746pub struct RunningDataflow {
1747 id: Uuid,
1748 pending_nodes: PendingNodes,
1750
1751 subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
1752 drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
1753 mappings: HashMap<OutputId, BTreeSet<InputId>>,
1754 timers: BTreeMap<Duration, BTreeSet<InputId>>,
1755 open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
1756 running_nodes: BTreeMap<NodeId, RunningNode>,
1757
1758 dynamic_nodes: BTreeSet<NodeId>,
1763
1764 open_external_mappings: BTreeSet<OutputId>,
1765
1766 pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
1767
1768 _timer_handles: Vec<futures::future::RemoteHandle<()>>,
1770 stop_sent: bool,
1771
1772 empty_set: BTreeSet<DataId>,
1776
1777 cascading_error_causes: CascadingErrorCauses,
1779 grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
1780
1781 node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
1782
1783 publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
1784
1785 finished_tx: broadcast::Sender<()>,
1786
1787 publish_all_messages_to_zenoh: bool,
1788}
1789
1790impl RunningDataflow {
1791 fn new(
1792 dataflow_id: Uuid,
1793 daemon_id: DaemonId,
1794 dataflow_descriptor: &Descriptor,
1795 ) -> RunningDataflow {
1796 let (finished_tx, _) = broadcast::channel(1);
1797 Self {
1798 id: dataflow_id,
1799 pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
1800 subscribe_channels: HashMap::new(),
1801 drop_channels: HashMap::new(),
1802 mappings: HashMap::new(),
1803 timers: BTreeMap::new(),
1804 open_inputs: BTreeMap::new(),
1805 running_nodes: BTreeMap::new(),
1806 dynamic_nodes: BTreeSet::new(),
1807 open_external_mappings: Default::default(),
1808 pending_drop_tokens: HashMap::new(),
1809 _timer_handles: Vec::new(),
1810 stop_sent: false,
1811 empty_set: BTreeSet::new(),
1812 cascading_error_causes: Default::default(),
1813 grace_duration_kills: Default::default(),
1814 node_stderr_most_recent: BTreeMap::new(),
1815 publishers: Default::default(),
1816 finished_tx,
1817 publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
1818 }
1819 }
1820
1821 async fn start(
1822 &mut self,
1823 events_tx: &mpsc::Sender<Timestamped<Event>>,
1824 clock: &Arc<HLC>,
1825 ) -> eyre::Result<()> {
1826 for interval in self.timers.keys().copied() {
1827 let events_tx = events_tx.clone();
1828 let dataflow_id = self.id;
1829 let clock = clock.clone();
1830 let task = async move {
1831 let mut interval_stream = tokio::time::interval(interval);
1832 let hlc = HLC::default();
1833 loop {
1834 interval_stream.tick().await;
1835
1836 let span = tracing::span!(tracing::Level::TRACE, "tick");
1837 let _ = span.enter();
1838
1839 let mut parameters = BTreeMap::new();
1840 parameters.insert(
1841 "open_telemetry_context".to_string(),
1842 #[cfg(feature = "telemetry")]
1843 Parameter::String(serialize_context(&span.context())),
1844 #[cfg(not(feature = "telemetry"))]
1845 Parameter::String("".into()),
1846 );
1847
1848 let metadata = metadata::Metadata::from_parameters(
1849 hlc.new_timestamp(),
1850 empty_type_info(),
1851 parameters,
1852 );
1853
1854 let event = Timestamped {
1855 inner: DoraEvent::Timer {
1856 dataflow_id,
1857 interval,
1858 metadata,
1859 }
1860 .into(),
1861 timestamp: clock.new_timestamp(),
1862 };
1863 if events_tx.send(event).await.is_err() {
1864 break;
1865 }
1866 }
1867 };
1868 let (task, handle) = task.remote_handle();
1869 tokio::spawn(task);
1870 self._timer_handles.push(handle);
1871 }
1872
1873 Ok(())
1874 }
1875
1876 async fn stop_all(
1877 &mut self,
1878 coordinator_connection: &mut Option<TcpStream>,
1879 clock: &HLC,
1880 grace_duration: Option<Duration>,
1881 logger: &mut DataflowLogger<'_>,
1882 ) -> eyre::Result<()> {
1883 self.pending_nodes
1884 .handle_dataflow_stop(
1885 coordinator_connection,
1886 clock,
1887 &mut self.cascading_error_causes,
1888 &self.dynamic_nodes,
1889 logger,
1890 )
1891 .await?;
1892
1893 for (_node_id, channel) in self.subscribe_channels.drain() {
1894 let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
1895 }
1896
1897 let running_processes: Vec<_> = self
1898 .running_nodes
1899 .iter_mut()
1900 .map(|(id, n)| (id.clone(), n.pid.take()))
1901 .collect();
1902 let grace_duration_kills = self.grace_duration_kills.clone();
1903 tokio::spawn(async move {
1904 let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
1905 tokio::time::sleep(duration).await;
1906
1907 for (node, pid) in running_processes {
1908 if let Some(mut pid) = pid {
1909 if pid.kill() {
1910 grace_duration_kills.insert(node.clone());
1911 warn!(
1912 "{node} was killed due to not stopping within the {:#?} grace period",
1913 duration
1914 )
1915 }
1916 }
1917 }
1918 });
1919 self.stop_sent = true;
1920 Ok(())
1921 }
1922
1923 fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
1924 self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
1925 }
1926
1927 async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
1928 match self.pending_drop_tokens.entry(token) {
1929 std::collections::hash_map::Entry::Occupied(entry) => {
1930 if entry.get().pending_nodes.is_empty() {
1931 let (drop_token, info) = entry.remove_entry();
1932 let result = match self.drop_channels.get_mut(&info.owner) {
1933 Some(channel) => send_with_timestamp(
1934 channel,
1935 NodeDropEvent::OutputDropped { drop_token },
1936 clock,
1937 )
1938 .wrap_err("send failed"),
1939 None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
1940 };
1941 if let Err(err) = result.wrap_err_with(|| {
1942 format!(
1943 "failed to report drop token `{drop_token:?}` to owner `{}`",
1944 &info.owner
1945 )
1946 }) {
1947 tracing::warn!("{err:?}");
1948 }
1949 }
1950 }
1951 std::collections::hash_map::Entry::Vacant(_) => {
1952 tracing::warn!("check_drop_token called with already closed token")
1953 }
1954 }
1955
1956 Ok(())
1957 }
1958
1959 fn output_publish_topic(&self, output_id: &OutputId) -> String {
1960 let network_id = "default";
1961 let dataflow_id = self.id;
1962 let OutputId(node_id, output_id) = output_id;
1963 format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
1964 }
1965}
1966
1967fn empty_type_info() -> ArrowTypeInfo {
1968 ArrowTypeInfo {
1969 data_type: DataType::Null,
1970 len: 0,
1971 null_count: 0,
1972 validity: None,
1973 offset: 0,
1974 buffer_offsets: Vec::new(),
1975 child_data: Vec::new(),
1976 }
1977}
1978
1979#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
1980pub struct OutputId(NodeId, DataId);
1981type InputId = (NodeId, DataId);
1982
1983struct DropTokenInformation {
1984 owner: NodeId,
1986 pending_nodes: BTreeSet<NodeId>,
1989}
1990
1991#[derive(Debug)]
1992pub enum Event {
1993 Node {
1994 dataflow_id: DataflowId,
1995 node_id: NodeId,
1996 event: DaemonNodeEvent,
1997 },
1998 Coordinator(CoordinatorEvent),
1999 Daemon(InterDaemonEvent),
2000 Dora(DoraEvent),
2001 DynamicNode(DynamicNodeEventWrapper),
2002 HeartbeatInterval,
2003 CtrlC,
2004 SecondCtrlC,
2005 DaemonError(eyre::Report),
2006}
2007
2008impl From<DoraEvent> for Event {
2009 fn from(event: DoraEvent) -> Self {
2010 Event::Dora(event)
2011 }
2012}
2013
2014#[derive(Debug)]
2015pub enum DaemonNodeEvent {
2016 OutputsDone {
2017 reply_sender: oneshot::Sender<DaemonReply>,
2018 },
2019 Subscribe {
2020 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2021 reply_sender: oneshot::Sender<DaemonReply>,
2022 },
2023 SubscribeDrop {
2024 event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2025 reply_sender: oneshot::Sender<DaemonReply>,
2026 },
2027 CloseOutputs {
2028 outputs: Vec<dora_core::config::DataId>,
2029 reply_sender: oneshot::Sender<DaemonReply>,
2030 },
2031 SendOut {
2032 output_id: DataId,
2033 metadata: metadata::Metadata,
2034 data: Option<DataMessage>,
2035 },
2036 ReportDrop {
2037 tokens: Vec<DropToken>,
2038 },
2039 EventStreamDropped {
2040 reply_sender: oneshot::Sender<DaemonReply>,
2041 },
2042}
2043
2044#[derive(Debug)]
2045pub enum DoraEvent {
2046 Timer {
2047 dataflow_id: DataflowId,
2048 interval: Duration,
2049 metadata: metadata::Metadata,
2050 },
2051 Logs {
2052 dataflow_id: DataflowId,
2053 output_id: OutputId,
2054 message: DataMessage,
2055 metadata: metadata::Metadata,
2056 },
2057 SpawnedNodeResult {
2058 dataflow_id: DataflowId,
2059 node_id: NodeId,
2060 exit_status: NodeExitStatus,
2061 },
2062}
2063
2064#[must_use]
2065enum RunStatus {
2066 Continue,
2067 Exit,
2068}
2069
2070fn send_with_timestamp<T>(
2071 sender: &UnboundedSender<Timestamped<T>>,
2072 event: T,
2073 clock: &HLC,
2074) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2075 sender.send(Timestamped {
2076 inner: event,
2077 timestamp: clock.new_timestamp(),
2078 })
2079}
2080
2081fn set_up_ctrlc_handler(
2082 clock: Arc<HLC>,
2083) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2084 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2085
2086 let mut ctrlc_sent = 0;
2087 ctrlc::set_handler(move || {
2088 let event = match ctrlc_sent {
2089 0 => Event::CtrlC,
2090 1 => Event::SecondCtrlC,
2091 _ => {
2092 tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2093 std::process::abort();
2094 }
2095 };
2096 if ctrlc_tx
2097 .blocking_send(Timestamped {
2098 inner: event,
2099 timestamp: clock.new_timestamp(),
2100 })
2101 .is_err()
2102 {
2103 tracing::error!("failed to report ctrl-c event to dora-coordinator");
2104 }
2105
2106 ctrlc_sent += 1;
2107 })
2108 .wrap_err("failed to set ctrl-c handler")?;
2109
2110 Ok(ctrlc_rx)
2111}
2112
2113#[derive(Debug, Default, Clone, PartialEq, Eq)]
2114pub struct CascadingErrorCauses {
2115 caused_by: BTreeMap<NodeId, NodeId>,
2116}
2117
2118impl CascadingErrorCauses {
2119 pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2120 self.caused_by.contains_key(node)
2121 }
2122
2123 pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2125 self.caused_by.get(node)
2126 }
2127
2128 pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2129 self.caused_by.entry(affected_node).or_insert(causing_node);
2130 }
2131}
2132
2133fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2134 n.operators
2135 .iter()
2136 .flat_map(|operator| {
2137 operator.config.inputs.iter().map(|(input_id, mapping)| {
2138 (
2139 DataId::from(format!("{}/{input_id}", operator.id)),
2140 mapping.clone(),
2141 )
2142 })
2143 })
2144 .collect()
2145}
2146
2147fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2148 n.operators
2149 .iter()
2150 .flat_map(|operator| {
2151 operator
2152 .config
2153 .outputs
2154 .iter()
2155 .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2156 })
2157 .collect()
2158}
2159
2160trait CoreNodeKindExt {
2161 fn run_config(&self) -> NodeRunConfig;
2162 fn dynamic(&self) -> bool;
2163}
2164
2165impl CoreNodeKindExt for CoreNodeKind {
2166 fn run_config(&self) -> NodeRunConfig {
2167 match self {
2168 CoreNodeKind::Runtime(n) => NodeRunConfig {
2169 inputs: runtime_node_inputs(n),
2170 outputs: runtime_node_outputs(n),
2171 },
2172 CoreNodeKind::Custom(n) => n.run_config.clone(),
2173 }
2174 }
2175
2176 fn dynamic(&self) -> bool {
2177 match self {
2178 CoreNodeKind::Runtime(_n) => false,
2179 CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
2180 }
2181 }
2182}