dora_daemon/
lib.rs

1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5    config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
6    descriptor::{
7        read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
8        DYNAMIC_SOURCE,
9    },
10    topics::LOCALHOST,
11    uhlc::{self, HLC},
12};
13use dora_message::{
14    common::{
15        DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
16    },
17    coordinator_to_cli::DataflowResult,
18    coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
19    daemon_to_coordinator::{
20        CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
21    },
22    daemon_to_daemon::InterDaemonEvent,
23    daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
24    metadata::{self, ArrowTypeInfo},
25    node_to_daemon::{DynamicNodeEvent, Timestamped},
26    DataflowId,
27};
28use dora_node_api::{arrow::datatypes::DataType, Parameter};
29use eyre::{bail, eyre, Context, ContextCompat, Result};
30use futures::{future, stream, FutureExt, TryFutureExt};
31use futures_concurrency::stream::Merge;
32use local_listener::DynamicNodeEventWrapper;
33use log::{DaemonLogger, DataflowLogger, Logger};
34use pending::PendingNodes;
35use shared_memory_server::ShmemConf;
36use socket_stream_utils::socket_stream_send;
37use std::{
38    collections::{BTreeMap, BTreeSet, HashMap},
39    net::SocketAddr,
40    path::{Path, PathBuf},
41    pin::pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use sysinfo::Pid;
46use tokio::{
47    fs::File,
48    io::AsyncReadExt,
49    net::TcpStream,
50    sync::{
51        broadcast,
52        mpsc::{self, UnboundedSender},
53        oneshot::{self, Sender},
54    },
55};
56use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
57use tracing::{error, warn};
58use uuid::{NoContext, Timestamp, Uuid};
59
60mod coordinator;
61mod local_listener;
62mod log;
63mod node_communication;
64mod pending;
65mod socket_stream_utils;
66mod spawn;
67
68#[cfg(feature = "telemetry")]
69use dora_tracing::telemetry::serialize_context;
70#[cfg(feature = "telemetry")]
71use tracing_opentelemetry::OpenTelemetrySpanExt;
72
73use crate::pending::DataflowStatus;
74
75const STDERR_LOG_LINES: usize = 10;
76
77pub struct Daemon {
78    running: HashMap<DataflowId, RunningDataflow>,
79    working_dir: HashMap<DataflowId, PathBuf>,
80
81    events_tx: mpsc::Sender<Timestamped<Event>>,
82
83    coordinator_connection: Option<TcpStream>,
84    last_coordinator_heartbeat: Instant,
85    daemon_id: DaemonId,
86
87    /// used for testing and examples
88    exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
89    /// set on ctrl-c
90    exit_when_all_finished: bool,
91    /// used to record results of local nodes
92    dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
93
94    clock: Arc<uhlc::HLC>,
95
96    zenoh_session: zenoh::Session,
97    remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
98
99    logger: DaemonLogger,
100}
101
102type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
103
104impl Daemon {
105    pub async fn run(
106        coordinator_addr: SocketAddr,
107        machine_id: Option<String>,
108        local_listen_port: u16,
109    ) -> eyre::Result<()> {
110        let clock = Arc::new(HLC::default());
111
112        let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
113        let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
114        let (daemon_id, incoming_events) = {
115            let incoming_events = set_up_event_stream(
116                coordinator_addr,
117                &machine_id,
118                &clock,
119                remote_daemon_events_rx,
120                local_listen_port,
121            );
122
123            // finish early if ctrl-c is is pressed during event stream setup
124            let ctrl_c = pin!(ctrlc_events.recv());
125            match futures::future::select(ctrl_c, pin!(incoming_events)).await {
126                future::Either::Left((_ctrl_c, _)) => {
127                    tracing::info!("received ctrl-c signal -> stopping daemon");
128                    return Ok(());
129                }
130                future::Either::Right((events, _)) => events?,
131            }
132        };
133        Self::run_general(
134            (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
135            Some(coordinator_addr),
136            daemon_id,
137            None,
138            clock,
139            Some(remote_daemon_events_tx),
140        )
141        .await
142        .map(|_| ())
143    }
144
145    pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
146        let working_dir = dataflow_path
147            .canonicalize()
148            .context("failed to canonicalize dataflow path")?
149            .parent()
150            .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
151            .to_owned();
152
153        let descriptor = read_as_descriptor(dataflow_path).await?;
154        descriptor.check(&working_dir)?;
155        let nodes = descriptor.resolve_aliases_and_set_defaults()?;
156
157        let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
158        let spawn_command = SpawnDataflowNodes {
159            dataflow_id,
160            working_dir,
161            spawn_nodes: nodes.keys().cloned().collect(),
162            nodes,
163            dataflow_descriptor: descriptor,
164            uv,
165        };
166
167        let clock = Arc::new(HLC::default());
168
169        let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
170
171        let exit_when_done = spawn_command
172            .nodes
173            .values()
174            .map(|n| (spawn_command.dataflow_id, n.id.clone()))
175            .collect();
176        let (reply_tx, reply_rx) = oneshot::channel();
177        let timestamp = clock.new_timestamp();
178        let coordinator_events = stream::once(async move {
179            Timestamped {
180                inner: Event::Coordinator(CoordinatorEvent {
181                    event: DaemonCoordinatorEvent::Spawn(spawn_command),
182                    reply_tx,
183                }),
184                timestamp,
185            }
186        });
187        let events = (coordinator_events, ctrlc_events).merge();
188        let run_result = Self::run_general(
189            Box::pin(events),
190            None,
191            DaemonId::new(None),
192            Some(exit_when_done),
193            clock.clone(),
194            None,
195        );
196
197        let spawn_result = reply_rx
198            .map_err(|err| eyre!("failed to receive spawn result: {err}"))
199            .and_then(|r| async {
200                match r {
201                    Some(DaemonCoordinatorReply::SpawnResult(result)) => {
202                        result.map_err(|err| eyre!(err))
203                    }
204                    _ => Err(eyre!("unexpected spawn reply")),
205                }
206            });
207
208        let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
209
210        Ok(DataflowResult {
211            uuid: dataflow_id,
212            timestamp: clock.new_timestamp(),
213            node_results: dataflow_results
214                .remove(&dataflow_id)
215                .context("no node results for dataflow_id")?,
216        })
217    }
218
219    async fn run_general(
220        external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
221        coordinator_addr: Option<SocketAddr>,
222        daemon_id: DaemonId,
223        exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
224        clock: Arc<HLC>,
225        remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
226    ) -> eyre::Result<DaemonRunResult> {
227        let coordinator_connection = match coordinator_addr {
228            Some(addr) => {
229                let stream = TcpStream::connect(addr)
230                    .await
231                    .wrap_err("failed to connect to dora-coordinator")?;
232                stream
233                    .set_nodelay(true)
234                    .wrap_err("failed to set TCP_NODELAY")?;
235                Some(stream)
236            }
237            None => None,
238        };
239
240        // additional connection for logging
241        let logger_coordinator_connection = match coordinator_addr {
242            Some(addr) => {
243                let stream = TcpStream::connect(addr)
244                    .await
245                    .wrap_err("failed to connect log to dora-coordinator")?;
246                stream
247                    .set_nodelay(true)
248                    .wrap_err("failed to set TCP_NODELAY")?;
249                Some(stream)
250            }
251            None => None,
252        };
253
254        let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
255            Ok(path) => {
256                let zenoh_config = zenoh::Config::from_file(&path)
257                    .map_err(|e| eyre!(e))
258                    .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
259                zenoh::open(zenoh_config)
260                    .await
261                    .map_err(|e| eyre!(e))
262                    .context("failed to open zenoh session")?
263            }
264            Err(std::env::VarError::NotPresent) => {
265                let mut zenoh_config = zenoh::Config::default();
266
267                if let Some(addr) = coordinator_addr {
268                    // Linkstate make it possible to connect two daemons on different network through a public daemon
269                    zenoh_config
270                        .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
271                        .unwrap();
272
273                    zenoh_config
274                        .insert_json5(
275                            "connect/endpoints",
276                            &format!(
277                                r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
278                                addr.ip()
279                            ),
280                        )
281                        .unwrap();
282                    zenoh_config
283                        .insert_json5(
284                            "listen/endpoints",
285                            r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
286                        )
287                        .unwrap();
288                    if cfg!(not(target_os = "linux")) {
289                        warn!("disabling multicast on non-linux systems. Enable it with the ZENOH_CONFIG env variable or file");
290                        zenoh_config
291                            .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
292                            .unwrap();
293                    }
294                };
295                if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
296                    zenoh_session
297                } else {
298                    warn!(
299                        "failed to open zenoh session, retrying with default config + coordinator"
300                    );
301                    let mut zenoh_config = zenoh::Config::default();
302                    zenoh_config
303                        .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
304                        .unwrap();
305
306                    if let Some(addr) = coordinator_addr {
307                        zenoh_config
308                            .insert_json5(
309                                "connect/endpoints",
310                                &format!(
311                                    r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
312                                    addr.ip()
313                                ),
314                            )
315                            .unwrap();
316                        if cfg!(not(target_os = "linux")) {
317                            warn!("disabling multicast on non-linux systems. Enable it with the ZENOH_CONFIG env variable or file");
318                            zenoh_config
319                                .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
320                                .unwrap();
321                        }
322                    }
323                    if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
324                        zenoh_session
325                    } else {
326                        warn!("failed to open zenoh session, retrying with default config");
327                        let zenoh_config = zenoh::Config::default();
328                        zenoh::open(zenoh_config)
329                            .await
330                            .map_err(|e| eyre!(e))
331                            .context("failed to open zenoh session")?
332                    }
333                }
334            }
335            Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
336                "{} env variable is not valid unicode",
337                zenoh::Config::DEFAULT_CONFIG_PATH_ENV
338            ),
339        };
340        let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
341        let daemon = Self {
342            logger: Logger {
343                coordinator_connection: logger_coordinator_connection,
344                daemon_id: daemon_id.clone(),
345                clock: clock.clone(),
346            }
347            .for_daemon(daemon_id.clone()),
348            running: HashMap::new(),
349            working_dir: HashMap::new(),
350            events_tx: dora_events_tx,
351            coordinator_connection,
352            last_coordinator_heartbeat: Instant::now(),
353            daemon_id,
354            exit_when_done,
355            exit_when_all_finished: false,
356            dataflow_node_results: BTreeMap::new(),
357            clock,
358            zenoh_session,
359            remote_daemon_events_tx,
360        };
361
362        let dora_events = ReceiverStream::new(dora_events_rx);
363        let watchdog_clock = daemon.clock.clone();
364        let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
365            Duration::from_secs(5),
366        ))
367        .map(|_| Timestamped {
368            inner: Event::HeartbeatInterval,
369            timestamp: watchdog_clock.new_timestamp(),
370        });
371        let events = (external_events, dora_events, watchdog_interval).merge();
372        daemon.run_inner(events).await
373    }
374
375    #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
376    async fn run_inner(
377        mut self,
378        incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
379    ) -> eyre::Result<DaemonRunResult> {
380        let mut events = incoming_events;
381
382        while let Some(event) = events.next().await {
383            let Timestamped { inner, timestamp } = event;
384            if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
385                tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
386            }
387
388            match inner {
389                Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
390                    let status = self.handle_coordinator_event(event, reply_tx).await?;
391
392                    match status {
393                        RunStatus::Continue => {}
394                        RunStatus::Exit => break,
395                    }
396                }
397                Event::Daemon(event) => {
398                    self.handle_inter_daemon_event(event).await?;
399                }
400                Event::Node {
401                    dataflow_id: dataflow,
402                    node_id,
403                    event,
404                } => self.handle_node_event(event, dataflow, node_id).await?,
405                Event::Dora(event) => match self.handle_dora_event(event).await? {
406                    RunStatus::Continue => {}
407                    RunStatus::Exit => break,
408                },
409                Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
410                Event::HeartbeatInterval => {
411                    if let Some(connection) = &mut self.coordinator_connection {
412                        let msg = serde_json::to_vec(&Timestamped {
413                            inner: CoordinatorRequest::Event {
414                                daemon_id: self.daemon_id.clone(),
415                                event: DaemonEvent::Heartbeat,
416                            },
417                            timestamp: self.clock.new_timestamp(),
418                        })?;
419                        socket_stream_send(connection, &msg)
420                            .await
421                            .wrap_err("failed to send watchdog message to dora-coordinator")?;
422
423                        if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
424                            bail!("lost connection to coordinator")
425                        }
426                    }
427                }
428                Event::CtrlC => {
429                    tracing::info!("received ctrlc signal -> stopping all dataflows");
430                    for dataflow in self.running.values_mut() {
431                        let mut logger = self.logger.for_dataflow(dataflow.id);
432                        dataflow
433                            .stop_all(
434                                &mut self.coordinator_connection,
435                                &self.clock,
436                                None,
437                                &mut logger,
438                            )
439                            .await?;
440                    }
441                    self.exit_when_all_finished = true;
442                    if self.running.is_empty() {
443                        break;
444                    }
445                }
446                Event::SecondCtrlC => {
447                    tracing::warn!("received second ctrlc signal -> exit immediately");
448                    bail!("received second ctrl-c signal");
449                }
450                Event::DaemonError(err) => {
451                    tracing::error!("Daemon error: {err:?}");
452                }
453            }
454        }
455
456        if let Some(mut connection) = self.coordinator_connection.take() {
457            let msg = serde_json::to_vec(&Timestamped {
458                inner: CoordinatorRequest::Event {
459                    daemon_id: self.daemon_id.clone(),
460                    event: DaemonEvent::Exit,
461                },
462                timestamp: self.clock.new_timestamp(),
463            })?;
464            socket_stream_send(&mut connection, &msg)
465                .await
466                .wrap_err("failed to send Exit message to dora-coordinator")?;
467        }
468
469        Ok(self.dataflow_node_results)
470    }
471
472    async fn handle_coordinator_event(
473        &mut self,
474        event: DaemonCoordinatorEvent,
475        reply_tx: Sender<Option<DaemonCoordinatorReply>>,
476    ) -> eyre::Result<RunStatus> {
477        let status = match event {
478            DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
479                dataflow_id,
480                working_dir,
481                nodes,
482                dataflow_descriptor,
483                spawn_nodes,
484                uv,
485            }) => {
486                match dataflow_descriptor.communication.remote {
487                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
488                }
489
490                // Use the working directory if it exists, otherwise use the working directory where the daemon is spawned
491                let working_dir = if working_dir.exists() {
492                    working_dir
493                } else {
494                    std::env::current_dir().wrap_err("failed to get current working dir")?
495                };
496
497                let result = self
498                    .spawn_dataflow(
499                        dataflow_id,
500                        working_dir,
501                        nodes,
502                        dataflow_descriptor,
503                        spawn_nodes,
504                        uv,
505                    )
506                    .await;
507                if let Err(err) = &result {
508                    tracing::error!("{err:?}");
509                }
510                let reply =
511                    DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
512                let _ = reply_tx.send(Some(reply)).map_err(|_| {
513                    error!("could not send `SpawnResult` reply from daemon to coordinator")
514                });
515                RunStatus::Continue
516            }
517            DaemonCoordinatorEvent::AllNodesReady {
518                dataflow_id,
519                exited_before_subscribe,
520            } => {
521                let mut logger = self.logger.for_dataflow(dataflow_id);
522                logger.log(LogLevel::Debug, None,
523                    Some("daemon".into()),
524                    format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
525                )).await;
526                match self.running.get_mut(&dataflow_id) {
527                    Some(dataflow) => {
528                        let ready = exited_before_subscribe.is_empty();
529                        dataflow
530                            .pending_nodes
531                            .handle_external_all_nodes_ready(
532                                exited_before_subscribe,
533                                &mut dataflow.cascading_error_causes,
534                            )
535                            .await?;
536                        if ready {
537                            logger.log(LogLevel::Info, None,
538                                Some("daemon".into()),
539                                "coordinator reported that all nodes are ready, starting dataflow",
540                            ).await;
541                            dataflow.start(&self.events_tx, &self.clock).await?;
542                        }
543                    }
544                    None => {
545                        tracing::warn!(
546                            "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
547                        );
548                    }
549                }
550                let _ = reply_tx.send(None).map_err(|_| {
551                    error!("could not send `AllNodesReady` reply from daemon to coordinator")
552                });
553                RunStatus::Continue
554            }
555            DaemonCoordinatorEvent::Logs {
556                dataflow_id,
557                node_id,
558            } => {
559                match self.working_dir.get(&dataflow_id) {
560                    Some(working_dir) => {
561                        let working_dir = working_dir.clone();
562                        tokio::spawn(async move {
563                            let logs = async {
564                                let mut file =
565                                    File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
566                                        .await
567                                        .wrap_err(format!(
568                                            "Could not open log file: {:#?}",
569                                            log::log_path(&working_dir, &dataflow_id, &node_id)
570                                        ))?;
571
572                                let mut contents = vec![];
573                                file.read_to_end(&mut contents)
574                                    .await
575                                    .wrap_err("Could not read content of log file")?;
576                                Result::<Vec<u8>, eyre::Report>::Ok(contents)
577                            }
578                            .await
579                            .map_err(|err| format!("{err:?}"));
580                            let _ = reply_tx
581                                .send(Some(DaemonCoordinatorReply::Logs(logs)))
582                                .map_err(|_| {
583                                    error!("could not send logs reply from daemon to coordinator")
584                                });
585                        });
586                    }
587                    None => {
588                        tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
589                        let _ = reply_tx.send(None).map_err(|_| {
590                            error!(
591                                "could not send `AllNodesReady` reply from daemon to coordinator"
592                            )
593                        });
594                    }
595                }
596                RunStatus::Continue
597            }
598            DaemonCoordinatorEvent::ReloadDataflow {
599                dataflow_id,
600                node_id,
601                operator_id,
602            } => {
603                let result = self.send_reload(dataflow_id, node_id, operator_id).await;
604                let reply =
605                    DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
606                let _ = reply_tx
607                    .send(Some(reply))
608                    .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
609                RunStatus::Continue
610            }
611            DaemonCoordinatorEvent::StopDataflow {
612                dataflow_id,
613                grace_duration,
614            } => {
615                let mut logger = self.logger.for_dataflow(dataflow_id);
616                let dataflow = self
617                    .running
618                    .get_mut(&dataflow_id)
619                    .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
620                let (reply, future) = match dataflow {
621                    Ok(dataflow) => {
622                        let future = dataflow.stop_all(
623                            &mut self.coordinator_connection,
624                            &self.clock,
625                            grace_duration,
626                            &mut logger,
627                        );
628                        (Ok(()), Some(future))
629                    }
630                    Err(err) => (Err(err.to_string()), None),
631                };
632
633                let _ = reply_tx
634                    .send(Some(DaemonCoordinatorReply::StopResult(reply)))
635                    .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
636
637                if let Some(future) = future {
638                    future.await?;
639                }
640
641                RunStatus::Continue
642            }
643            DaemonCoordinatorEvent::Destroy => {
644                tracing::info!("received destroy command -> exiting");
645                let (notify_tx, notify_rx) = oneshot::channel();
646                let reply = DaemonCoordinatorReply::DestroyResult {
647                    result: Ok(()),
648                    notify: Some(notify_tx),
649                };
650                let _ = reply_tx
651                    .send(Some(reply))
652                    .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
653                // wait until the reply is sent out
654                if notify_rx.await.is_err() {
655                    tracing::warn!("no confirmation received for DestroyReply");
656                }
657                RunStatus::Exit
658            }
659            DaemonCoordinatorEvent::Heartbeat => {
660                self.last_coordinator_heartbeat = Instant::now();
661                let _ = reply_tx.send(None);
662                RunStatus::Continue
663            }
664        };
665        Ok(status)
666    }
667
668    async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
669        match event {
670            InterDaemonEvent::Output {
671                dataflow_id,
672                node_id,
673                output_id,
674                metadata,
675                data,
676            } => {
677                let inner = async {
678                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
679                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
680                    })?;
681                    send_output_to_local_receivers(
682                        node_id.clone(),
683                        output_id.clone(),
684                        dataflow,
685                        &metadata,
686                        data.map(DataMessage::Vec),
687                        &self.clock,
688                    )
689                    .await?;
690                    Result::<_, eyre::Report>::Ok(())
691                };
692                if let Err(err) = inner
693                    .await
694                    .wrap_err("failed to forward remote output to local receivers")
695                {
696                    let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
697                    logger
698                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
699                        .await;
700                }
701                Ok(())
702            }
703            InterDaemonEvent::OutputClosed {
704                dataflow_id,
705                node_id,
706                output_id,
707            } => {
708                let output_id = OutputId(node_id.clone(), output_id);
709                let mut logger = self
710                    .logger
711                    .for_dataflow(dataflow_id)
712                    .for_node(node_id.clone());
713                logger
714                    .log(
715                        LogLevel::Debug,
716                        Some("daemon".into()),
717                        format!("received OutputClosed event for output {output_id:?}"),
718                    )
719                    .await;
720
721                let inner = async {
722                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
723                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
724                    })?;
725
726                    if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
727                        for (receiver_id, input_id) in &inputs {
728                            close_input(dataflow, receiver_id, input_id, &self.clock);
729                        }
730                    }
731                    Result::<(), eyre::Report>::Ok(())
732                };
733                if let Err(err) = inner
734                    .await
735                    .wrap_err("failed to handle InputsClosed event sent by coordinator")
736                {
737                    logger
738                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
739                        .await;
740                }
741                Ok(())
742            }
743        }
744    }
745
746    async fn spawn_dataflow(
747        &mut self,
748        dataflow_id: uuid::Uuid,
749        working_dir: PathBuf,
750        nodes: BTreeMap<NodeId, ResolvedNode>,
751        dataflow_descriptor: Descriptor,
752        spawn_nodes: BTreeSet<NodeId>,
753        uv: bool,
754    ) -> eyre::Result<()> {
755        let mut logger = self.logger.for_dataflow(dataflow_id);
756        let dataflow =
757            RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
758        let dataflow = match self.running.entry(dataflow_id) {
759            std::collections::hash_map::Entry::Vacant(entry) => {
760                self.working_dir.insert(dataflow_id, working_dir.clone());
761                entry.insert(dataflow)
762            }
763            std::collections::hash_map::Entry::Occupied(_) => {
764                bail!("there is already a running dataflow with ID `{dataflow_id}`")
765            }
766        };
767
768        let mut stopped = Vec::new();
769
770        // calculate info about mappings
771        for node in nodes.values() {
772            let local = spawn_nodes.contains(&node.id);
773
774            let inputs = node_inputs(node);
775            for (input_id, input) in inputs {
776                if local {
777                    dataflow
778                        .open_inputs
779                        .entry(node.id.clone())
780                        .or_default()
781                        .insert(input_id.clone());
782                    match input.mapping {
783                        InputMapping::User(mapping) => {
784                            dataflow
785                                .mappings
786                                .entry(OutputId(mapping.source, mapping.output))
787                                .or_default()
788                                .insert((node.id.clone(), input_id));
789                        }
790                        InputMapping::Timer { interval } => {
791                            dataflow
792                                .timers
793                                .entry(interval)
794                                .or_default()
795                                .insert((node.id.clone(), input_id));
796                        }
797                    }
798                } else if let InputMapping::User(mapping) = input.mapping {
799                    dataflow
800                        .open_external_mappings
801                        .insert(OutputId(mapping.source, mapping.output));
802                }
803            }
804        }
805
806        // spawn nodes and set up subscriptions
807        for node in nodes.into_values() {
808            let mut logger = logger.reborrow().for_node(node.id.clone());
809            let local = spawn_nodes.contains(&node.id);
810            if local {
811                if node.kind.dynamic() {
812                    dataflow.dynamic_nodes.insert(node.id.clone());
813                } else {
814                    dataflow.pending_nodes.insert(node.id.clone());
815                }
816
817                let node_id = node.id.clone();
818                let node_stderr_most_recent = dataflow
819                    .node_stderr_most_recent
820                    .entry(node.id.clone())
821                    .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
822                    .clone();
823                logger
824                    .log(LogLevel::Info, Some("daemon".into()), "spawning")
825                    .await;
826                match spawn::spawn_node(
827                    dataflow_id,
828                    &working_dir,
829                    node,
830                    self.events_tx.clone(),
831                    dataflow_descriptor.clone(),
832                    self.clock.clone(),
833                    node_stderr_most_recent,
834                    uv,
835                    &mut logger,
836                )
837                .await
838                .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
839                {
840                    Ok(running_node) => {
841                        dataflow.running_nodes.insert(node_id, running_node);
842                    }
843                    Err(err) => {
844                        logger
845                            .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
846                            .await;
847                        self.dataflow_node_results
848                            .entry(dataflow_id)
849                            .or_default()
850                            .insert(
851                                node_id.clone(),
852                                Err(NodeError {
853                                    timestamp: self.clock.new_timestamp(),
854                                    cause: NodeErrorCause::Other {
855                                        stderr: format!("spawn failed: {err:?}"),
856                                    },
857                                    exit_status: NodeExitStatus::Unknown,
858                                }),
859                            );
860                        stopped.push(node_id.clone());
861                    }
862                }
863            } else {
864                // wait until node is ready before starting
865                dataflow.pending_nodes.set_external_nodes(true);
866
867                // subscribe to all node outputs that are mapped to some local inputs
868                for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
869                    let tx = self
870                        .remote_daemon_events_tx
871                        .clone()
872                        .wrap_err("no remote_daemon_events_tx channel")?;
873                    let mut finished_rx = dataflow.finished_tx.subscribe();
874                    let subscribe_topic = dataflow.output_publish_topic(output_id);
875                    tracing::debug!("declaring subscriber on {subscribe_topic}");
876                    let subscriber = self
877                        .zenoh_session
878                        .declare_subscriber(subscribe_topic)
879                        .await
880                        .map_err(|e| eyre!(e))
881                        .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
882                    tokio::spawn(async move {
883                        let mut finished = pin!(finished_rx.recv());
884                        loop {
885                            let finished_or_next =
886                                futures::future::select(finished, subscriber.recv_async());
887                            match finished_or_next.await {
888                                future::Either::Left((finished, _)) => {
889                                    match finished {
890                                        Err(broadcast::error::RecvError::Closed) => {
891                                            tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
892                                            break;
893                                        }
894                                        other => {
895                                            tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
896                                            break;
897                                        }
898                                    }
899                                }
900                                future::Either::Right((sample, f)) => {
901                                    finished = f;
902                                    let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
903                                        Timestamped::deserialize_inter_daemon_event(
904                                            &s.payload().to_bytes(),
905                                        )
906                                    });
907                                    if tx.send_async(event).await.is_err() {
908                                        // daemon finished
909                                        break;
910                                    }
911                                }
912                            }
913                        }
914                    });
915                }
916            }
917        }
918        for node_id in stopped {
919            self.handle_node_stop(dataflow_id, &node_id).await?;
920        }
921
922        Ok(())
923    }
924
925    async fn handle_dynamic_node_event(
926        &mut self,
927        event: DynamicNodeEventWrapper,
928    ) -> eyre::Result<()> {
929        match event {
930            DynamicNodeEventWrapper {
931                event: DynamicNodeEvent::NodeConfig { node_id },
932                reply_tx,
933            } => {
934                let number_node_id = self
935                    .running
936                    .iter()
937                    .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
938                    .count();
939
940                let node_config = match number_node_id {
941                    2.. => Err(format!(
942                        "multiple dataflows contains dynamic node id {node_id}. \
943                        Please only have one running dataflow with the specified \
944                        node id if you want to use dynamic node",
945                    )),
946                    1 => self
947                        .running
948                        .iter()
949                        .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
950                        .map(|(id, dataflow)| -> Result<NodeConfig> {
951                            let node_config = dataflow
952                                .running_nodes
953                                .get(&node_id)
954                                .context("no node with ID `{node_id}` within the given dataflow")?
955                                .node_config
956                                .clone();
957                            if !node_config.dynamic {
958                                bail!("node with ID `{node_id}` in {id} is not dynamic");
959                            }
960                            Ok(node_config)
961                        })
962                        .next()
963                        .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
964                        .and_then(|r| r)
965                        .map_err(|err| {
966                            format!(
967                                "failed to get dynamic node config within given dataflow: {err}"
968                            )
969                        }),
970                    0 => Err("no node with ID `{node_id}`".to_string()),
971                };
972
973                let reply = DaemonReply::NodeConfig {
974                    result: node_config,
975                };
976                let _ = reply_tx.send(Some(reply)).map_err(|_| {
977                    error!("could not send node info reply from daemon to coordinator")
978                });
979                Ok(())
980            }
981        }
982    }
983
984    async fn handle_node_event(
985        &mut self,
986        event: DaemonNodeEvent,
987        dataflow_id: DataflowId,
988        node_id: NodeId,
989    ) -> eyre::Result<()> {
990        match event {
991            DaemonNodeEvent::Subscribe {
992                event_sender,
993                reply_sender,
994            } => {
995                let mut logger = self.logger.for_dataflow(dataflow_id);
996                logger
997                    .log(
998                        LogLevel::Info,
999                        Some(node_id.clone()),
1000                        Some("daemon".into()),
1001                        "node is ready",
1002                    )
1003                    .await;
1004
1005                let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1006                    format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1007                });
1008
1009                match dataflow {
1010                    Err(err) => {
1011                        let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1012                    }
1013                    Ok(dataflow) => {
1014                        Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1015
1016                        let status = dataflow
1017                            .pending_nodes
1018                            .handle_node_subscription(
1019                                node_id.clone(),
1020                                reply_sender,
1021                                &mut self.coordinator_connection,
1022                                &self.clock,
1023                                &mut dataflow.cascading_error_causes,
1024                                &mut logger,
1025                            )
1026                            .await?;
1027                        match status {
1028                            DataflowStatus::AllNodesReady => {
1029                                logger
1030                                    .log(
1031                                        LogLevel::Info,
1032                                        None,
1033                                        Some("daemon".into()),
1034                                        "all nodes are ready, starting dataflow",
1035                                    )
1036                                    .await;
1037                                dataflow.start(&self.events_tx, &self.clock).await?;
1038                            }
1039                            DataflowStatus::Pending => {}
1040                        }
1041                    }
1042                }
1043            }
1044            DaemonNodeEvent::SubscribeDrop {
1045                event_sender,
1046                reply_sender,
1047            } => {
1048                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1049                    format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1050                });
1051                let result = match dataflow {
1052                    Ok(dataflow) => {
1053                        dataflow.drop_channels.insert(node_id, event_sender);
1054                        Ok(())
1055                    }
1056                    Err(err) => Err(err.to_string()),
1057                };
1058                let _ = reply_sender.send(DaemonReply::Result(result));
1059            }
1060            DaemonNodeEvent::CloseOutputs {
1061                outputs,
1062                reply_sender,
1063            } => {
1064                // notify downstream nodes
1065                let inner = async {
1066                    self.send_output_closed_events(dataflow_id, node_id, outputs)
1067                        .await
1068                };
1069
1070                let reply = inner.await.map_err(|err| format!("{err:?}"));
1071                let _ = reply_sender.send(DaemonReply::Result(reply));
1072            }
1073            DaemonNodeEvent::OutputsDone { reply_sender } => {
1074                let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1075
1076                let _ = reply_sender.send(DaemonReply::Result(
1077                    result.map_err(|err| format!("{err:?}")),
1078                ));
1079            }
1080            DaemonNodeEvent::SendOut {
1081                output_id,
1082                metadata,
1083                data,
1084            } => self
1085                .send_out(dataflow_id, node_id, output_id, metadata, data)
1086                .await
1087                .context("failed to send out")?,
1088            DaemonNodeEvent::ReportDrop { tokens } => {
1089                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1090                    format!(
1091                        "failed to get handle drop tokens: \
1092                        no running dataflow with ID `{dataflow_id}`"
1093                    )
1094                });
1095
1096                match dataflow {
1097                    Ok(dataflow) => {
1098                        for token in tokens {
1099                            match dataflow.pending_drop_tokens.get_mut(&token) {
1100                                Some(info) => {
1101                                    if info.pending_nodes.remove(&node_id) {
1102                                        dataflow.check_drop_token(token, &self.clock).await?;
1103                                    } else {
1104                                        tracing::warn!(
1105                                            "node `{node_id}` is not pending for drop token `{token:?}`"
1106                                        );
1107                                    }
1108                                }
1109                                None => tracing::warn!("unknown drop token `{token:?}`"),
1110                            }
1111                        }
1112                    }
1113                    Err(err) => tracing::warn!("{err:?}"),
1114                }
1115            }
1116            DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1117                let inner = async {
1118                    let dataflow = self
1119                        .running
1120                        .get_mut(&dataflow_id)
1121                        .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1122                    dataflow.subscribe_channels.remove(&node_id);
1123                    Result::<_, eyre::Error>::Ok(())
1124                };
1125
1126                let reply = inner.await.map_err(|err| format!("{err:?}"));
1127                let _ = reply_sender.send(DaemonReply::Result(reply));
1128            }
1129        }
1130        Ok(())
1131    }
1132
1133    async fn send_reload(
1134        &mut self,
1135        dataflow_id: Uuid,
1136        node_id: NodeId,
1137        operator_id: Option<OperatorId>,
1138    ) -> Result<(), eyre::ErrReport> {
1139        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1140            format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1141        })?;
1142        if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1143            match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1144                Ok(()) => {}
1145                Err(_) => {
1146                    dataflow.subscribe_channels.remove(&node_id);
1147                }
1148            }
1149        }
1150        Ok(())
1151    }
1152
1153    async fn send_out(
1154        &mut self,
1155        dataflow_id: Uuid,
1156        node_id: NodeId,
1157        output_id: DataId,
1158        metadata: dora_message::metadata::Metadata,
1159        data: Option<DataMessage>,
1160    ) -> Result<(), eyre::ErrReport> {
1161        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1162            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1163        })?;
1164        let data_bytes = send_output_to_local_receivers(
1165            node_id.clone(),
1166            output_id.clone(),
1167            dataflow,
1168            &metadata,
1169            data,
1170            &self.clock,
1171        )
1172        .await?;
1173
1174        let output_id = OutputId(node_id, output_id);
1175        let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1176            || dataflow.publish_all_messages_to_zenoh;
1177        if remote_receivers {
1178            let event = InterDaemonEvent::Output {
1179                dataflow_id,
1180                node_id: output_id.0.clone(),
1181                output_id: output_id.1.clone(),
1182                metadata,
1183                data: data_bytes,
1184            };
1185            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1186                .await?;
1187        }
1188
1189        Ok(())
1190    }
1191
1192    async fn send_to_remote_receivers(
1193        &mut self,
1194        dataflow_id: Uuid,
1195        output_id: &OutputId,
1196        event: InterDaemonEvent,
1197    ) -> Result<(), eyre::Error> {
1198        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1199            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1200        })?;
1201
1202        // publish via zenoh
1203        let publisher = match dataflow.publishers.get(output_id) {
1204            Some(publisher) => publisher,
1205            None => {
1206                let publish_topic = dataflow.output_publish_topic(output_id);
1207                tracing::debug!("declaring publisher on {publish_topic}");
1208                let publisher = self
1209                    .zenoh_session
1210                    .declare_publisher(publish_topic)
1211                    .await
1212                    .map_err(|e| eyre!(e))
1213                    .context("failed to create zenoh publisher")?;
1214                dataflow.publishers.insert(output_id.clone(), publisher);
1215                dataflow.publishers.get(output_id).unwrap()
1216            }
1217        };
1218
1219        let serialized_event = Timestamped {
1220            inner: event,
1221            timestamp: self.clock.new_timestamp(),
1222        }
1223        .serialize();
1224        publisher
1225            .put(serialized_event)
1226            .await
1227            .map_err(|e| eyre!(e))
1228            .context("zenoh put failed")?;
1229        Ok(())
1230    }
1231
1232    async fn send_output_closed_events(
1233        &mut self,
1234        dataflow_id: DataflowId,
1235        node_id: NodeId,
1236        outputs: Vec<DataId>,
1237    ) -> eyre::Result<()> {
1238        let dataflow = self
1239            .running
1240            .get_mut(&dataflow_id)
1241            .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1242        let local_node_inputs: BTreeSet<_> = dataflow
1243            .mappings
1244            .iter()
1245            .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1246            .flat_map(|(_, v)| v)
1247            .cloned()
1248            .collect();
1249        for (receiver_id, input_id) in &local_node_inputs {
1250            close_input(dataflow, receiver_id, input_id, &self.clock);
1251        }
1252
1253        let mut closed = Vec::new();
1254        for output_id in &dataflow.open_external_mappings {
1255            if output_id.0 == node_id && outputs.contains(&output_id.1) {
1256                closed.push(output_id.clone());
1257            }
1258        }
1259
1260        for output_id in closed {
1261            let event = InterDaemonEvent::OutputClosed {
1262                dataflow_id,
1263                node_id: output_id.0.clone(),
1264                output_id: output_id.1.clone(),
1265            };
1266            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1267                .await?;
1268        }
1269
1270        Ok(())
1271    }
1272
1273    async fn subscribe(
1274        dataflow: &mut RunningDataflow,
1275        node_id: NodeId,
1276        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1277        clock: &HLC,
1278    ) {
1279        // some inputs might have been closed already -> report those events
1280        let closed_inputs = dataflow
1281            .mappings
1282            .values()
1283            .flatten()
1284            .filter(|(node, _)| node == &node_id)
1285            .map(|(_, input)| input)
1286            .filter(|input| {
1287                dataflow
1288                    .open_inputs
1289                    .get(&node_id)
1290                    .map(|open_inputs| !open_inputs.contains(*input))
1291                    .unwrap_or(true)
1292            });
1293        for input_id in closed_inputs {
1294            let _ = send_with_timestamp(
1295                &event_sender,
1296                NodeEvent::InputClosed {
1297                    id: input_id.clone(),
1298                },
1299                clock,
1300            );
1301        }
1302        if dataflow.open_inputs(&node_id).is_empty() {
1303            let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1304        }
1305
1306        // if a stop event was already sent for the dataflow, send it to
1307        // the newly connected node too
1308        if dataflow.stop_sent {
1309            let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1310        }
1311
1312        dataflow.subscribe_channels.insert(node_id, event_sender);
1313    }
1314
1315    #[tracing::instrument(skip(self), level = "trace")]
1316    async fn handle_outputs_done(
1317        &mut self,
1318        dataflow_id: DataflowId,
1319        node_id: &NodeId,
1320    ) -> eyre::Result<()> {
1321        let dataflow = self
1322            .running
1323            .get_mut(&dataflow_id)
1324            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1325
1326        let outputs = dataflow
1327            .mappings
1328            .keys()
1329            .filter(|m| &m.0 == node_id)
1330            .map(|m| &m.1)
1331            .cloned()
1332            .collect();
1333        self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1334            .await?;
1335
1336        let dataflow = self
1337            .running
1338            .get_mut(&dataflow_id)
1339            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1340        dataflow.drop_channels.remove(node_id);
1341        Ok(())
1342    }
1343
1344    async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
1345        let mut logger = self.logger.for_dataflow(dataflow_id);
1346        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1347            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1348        })?;
1349
1350        dataflow
1351            .pending_nodes
1352            .handle_node_stop(
1353                node_id,
1354                &mut self.coordinator_connection,
1355                &self.clock,
1356                &mut dataflow.cascading_error_causes,
1357                &mut logger,
1358            )
1359            .await?;
1360
1361        self.handle_outputs_done(dataflow_id, node_id).await?;
1362
1363        let mut logger = self.logger.for_dataflow(dataflow_id);
1364        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1365            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1366        })?;
1367        if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1368            pid.mark_as_stopped()
1369        }
1370        if dataflow
1371            .running_nodes
1372            .iter()
1373            .all(|(_id, n)| n.node_config.dynamic)
1374        {
1375            let result = DataflowDaemonResult {
1376                timestamp: self.clock.new_timestamp(),
1377                node_results: self
1378                    .dataflow_node_results
1379                    .get(&dataflow.id)
1380                    .context("failed to get dataflow node results")?
1381                    .clone(),
1382            };
1383
1384            logger
1385                .log(
1386                    LogLevel::Info,
1387                    None,
1388                    Some("daemon".into()),
1389                    format!("dataflow finished on machine `{}`", self.daemon_id),
1390                )
1391                .await;
1392            if let Some(connection) = &mut self.coordinator_connection {
1393                let msg = serde_json::to_vec(&Timestamped {
1394                    inner: CoordinatorRequest::Event {
1395                        daemon_id: self.daemon_id.clone(),
1396                        event: DaemonEvent::AllNodesFinished {
1397                            dataflow_id,
1398                            result,
1399                        },
1400                    },
1401                    timestamp: self.clock.new_timestamp(),
1402                })?;
1403                socket_stream_send(connection, &msg)
1404                    .await
1405                    .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1406            }
1407            self.running.remove(&dataflow_id);
1408        }
1409
1410        Ok(())
1411    }
1412
1413    async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<RunStatus> {
1414        match event {
1415            DoraEvent::Timer {
1416                dataflow_id,
1417                interval,
1418                metadata,
1419            } => {
1420                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1421                    tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1422                    return Ok(RunStatus::Continue);
1423                };
1424
1425                let Some(subscribers) = dataflow.timers.get(&interval) else {
1426                    return Ok(RunStatus::Continue);
1427                };
1428
1429                let mut closed = Vec::new();
1430                for (receiver_id, input_id) in subscribers {
1431                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1432                        continue;
1433                    };
1434
1435                    let send_result = send_with_timestamp(
1436                        channel,
1437                        NodeEvent::Input {
1438                            id: input_id.clone(),
1439                            metadata: metadata.clone(),
1440                            data: None,
1441                        },
1442                        &self.clock,
1443                    );
1444                    match send_result {
1445                        Ok(()) => {}
1446                        Err(_) => {
1447                            closed.push(receiver_id);
1448                        }
1449                    }
1450                }
1451                for id in closed {
1452                    dataflow.subscribe_channels.remove(id);
1453                }
1454            }
1455            DoraEvent::Logs {
1456                dataflow_id,
1457                output_id,
1458                message,
1459                metadata,
1460            } => {
1461                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1462                    tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
1463                    return Ok(RunStatus::Continue);
1464                };
1465
1466                let Some(subscribers) = dataflow.mappings.get(&output_id) else {
1467                    tracing::warn!(
1468                        "No subscribers found for {:?} in {:?}",
1469                        output_id,
1470                        dataflow.mappings
1471                    );
1472                    return Ok(RunStatus::Continue);
1473                };
1474
1475                let mut closed = Vec::new();
1476                for (receiver_id, input_id) in subscribers {
1477                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1478                        tracing::warn!("No subscriber channel found for {:?}", output_id);
1479                        continue;
1480                    };
1481
1482                    let send_result = send_with_timestamp(
1483                        channel,
1484                        NodeEvent::Input {
1485                            id: input_id.clone(),
1486                            metadata: metadata.clone(),
1487                            data: Some(message.clone()),
1488                        },
1489                        &self.clock,
1490                    );
1491                    match send_result {
1492                        Ok(()) => {}
1493                        Err(_) => {
1494                            closed.push(receiver_id);
1495                        }
1496                    }
1497                }
1498                for id in closed {
1499                    dataflow.subscribe_channels.remove(id);
1500                }
1501            }
1502            DoraEvent::SpawnedNodeResult {
1503                dataflow_id,
1504                node_id,
1505                exit_status,
1506            } => {
1507                let mut logger = self
1508                    .logger
1509                    .for_dataflow(dataflow_id)
1510                    .for_node(node_id.clone());
1511                logger
1512                    .log(
1513                        LogLevel::Debug,
1514                        Some("daemon".into()),
1515                        format!("handling node stop with exit status {exit_status:?}"),
1516                    )
1517                    .await;
1518
1519                let node_result = match exit_status {
1520                    NodeExitStatus::Success => Ok(()),
1521                    exit_status => {
1522                        let dataflow = self.running.get(&dataflow_id);
1523                        let caused_by_node = dataflow
1524                            .and_then(|dataflow| {
1525                                dataflow.cascading_error_causes.error_caused_by(&node_id)
1526                            })
1527                            .cloned();
1528                        let grace_duration_kill = dataflow
1529                            .map(|d| d.grace_duration_kills.contains(&node_id))
1530                            .unwrap_or_default();
1531
1532                        let cause = match caused_by_node {
1533                            Some(caused_by_node) => {
1534                                logger
1535                                    .log(
1536                                        LogLevel::Info,
1537                                        Some("daemon".into()),
1538                                        format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
1539                                    )
1540                                    .await;
1541
1542                                NodeErrorCause::Cascading { caused_by_node }
1543                            }
1544                            None if grace_duration_kill => NodeErrorCause::GraceDuration,
1545                            None => {
1546                                let cause = dataflow
1547                                    .and_then(|d| d.node_stderr_most_recent.get(&node_id))
1548                                    .map(|queue| {
1549                                        let mut s = if queue.is_full() {
1550                                            "[...]".into()
1551                                        } else {
1552                                            String::new()
1553                                        };
1554                                        while let Some(line) = queue.pop() {
1555                                            s += &line;
1556                                        }
1557                                        s
1558                                    })
1559                                    .unwrap_or_default();
1560
1561                                NodeErrorCause::Other { stderr: cause }
1562                            }
1563                        };
1564                        Err(NodeError {
1565                            timestamp: self.clock.new_timestamp(),
1566                            cause,
1567                            exit_status,
1568                        })
1569                    }
1570                };
1571
1572                logger
1573                    .log(
1574                        if node_result.is_ok() {
1575                            LogLevel::Info
1576                        } else {
1577                            LogLevel::Error
1578                        },
1579                        Some("daemon".into()),
1580                        match &node_result {
1581                            Ok(()) => format!("{node_id} finished successfully"),
1582                            Err(err) => format!("{err}"),
1583                        },
1584                    )
1585                    .await;
1586
1587                self.dataflow_node_results
1588                    .entry(dataflow_id)
1589                    .or_default()
1590                    .insert(node_id.clone(), node_result);
1591
1592                self.handle_node_stop(dataflow_id, &node_id).await?;
1593
1594                if let Some(exit_when_done) = &mut self.exit_when_done {
1595                    exit_when_done.remove(&(dataflow_id, node_id));
1596                    if exit_when_done.is_empty() {
1597                        tracing::info!(
1598                            "exiting daemon because all required dataflows are finished"
1599                        );
1600                        return Ok(RunStatus::Exit);
1601                    }
1602                }
1603                if self.exit_when_all_finished && self.running.is_empty() {
1604                    return Ok(RunStatus::Exit);
1605                }
1606            }
1607        }
1608        Ok(RunStatus::Continue)
1609    }
1610}
1611
1612async fn set_up_event_stream(
1613    coordinator_addr: SocketAddr,
1614    machine_id: &Option<String>,
1615    clock: &Arc<HLC>,
1616    remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
1617    // used for dynamic nodes
1618    local_listen_port: u16,
1619) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
1620    let clock_cloned = clock.clone();
1621    let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
1622        Ok(e) => Timestamped {
1623            inner: Event::Daemon(e.inner),
1624            timestamp: e.timestamp,
1625        },
1626        Err(err) => Timestamped {
1627            inner: Event::DaemonError(err),
1628            timestamp: clock_cloned.new_timestamp(),
1629        },
1630    });
1631    let (daemon_id, coordinator_events) =
1632        coordinator::register(coordinator_addr, machine_id.clone(), clock)
1633            .await
1634            .wrap_err("failed to connect to dora-coordinator")?;
1635    let coordinator_events = coordinator_events.map(
1636        |Timestamped {
1637             inner: event,
1638             timestamp,
1639         }| Timestamped {
1640            inner: Event::Coordinator(event),
1641            timestamp,
1642        },
1643    );
1644    let (events_tx, events_rx) = flume::bounded(10);
1645    let _listen_port =
1646        local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
1647            .await?;
1648    let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
1649        inner: Event::DynamicNode(e.inner),
1650        timestamp: e.timestamp,
1651    });
1652    let incoming = (
1653        coordinator_events,
1654        remote_daemon_events,
1655        dynamic_node_events,
1656    )
1657        .merge();
1658    Ok((daemon_id, incoming))
1659}
1660
1661async fn send_output_to_local_receivers(
1662    node_id: NodeId,
1663    output_id: DataId,
1664    dataflow: &mut RunningDataflow,
1665    metadata: &metadata::Metadata,
1666    data: Option<DataMessage>,
1667    clock: &HLC,
1668) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
1669    let timestamp = metadata.timestamp();
1670    let empty_set = BTreeSet::new();
1671    let output_id = OutputId(node_id, output_id);
1672    let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
1673    let OutputId(node_id, _) = output_id;
1674    let mut closed = Vec::new();
1675    for (receiver_id, input_id) in local_receivers {
1676        if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1677            let item = NodeEvent::Input {
1678                id: input_id.clone(),
1679                metadata: metadata.clone(),
1680                data: data.clone(),
1681            };
1682            match channel.send(Timestamped {
1683                inner: item,
1684                timestamp,
1685            }) {
1686                Ok(()) => {
1687                    if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
1688                        dataflow
1689                            .pending_drop_tokens
1690                            .entry(token)
1691                            .or_insert_with(|| DropTokenInformation {
1692                                owner: node_id.clone(),
1693                                pending_nodes: Default::default(),
1694                            })
1695                            .pending_nodes
1696                            .insert(receiver_id.clone());
1697                    }
1698                }
1699                Err(_) => {
1700                    closed.push(receiver_id);
1701                }
1702            }
1703        }
1704    }
1705    for id in closed {
1706        dataflow.subscribe_channels.remove(id);
1707    }
1708    let (data_bytes, drop_token) = match data {
1709        None => (None, None),
1710        Some(DataMessage::SharedMemory {
1711            shared_memory_id,
1712            len,
1713            drop_token,
1714        }) => {
1715            let memory = ShmemConf::new()
1716                .os_id(shared_memory_id)
1717                .open()
1718                .wrap_err("failed to map shared memory output")?;
1719            let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
1720            (data, Some(drop_token))
1721        }
1722        Some(DataMessage::Vec(v)) => (Some(v), None),
1723    };
1724    if let Some(token) = drop_token {
1725        // insert token into `pending_drop_tokens` even if there are no local subscribers
1726        dataflow
1727            .pending_drop_tokens
1728            .entry(token)
1729            .or_insert_with(|| DropTokenInformation {
1730                owner: node_id.clone(),
1731                pending_nodes: Default::default(),
1732            });
1733        // check if all local subscribers are finished with the token
1734        dataflow.check_drop_token(token, clock).await?;
1735    }
1736    Ok(data_bytes)
1737}
1738
1739fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
1740    match &node.kind {
1741        CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
1742        CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
1743    }
1744}
1745
1746fn close_input(
1747    dataflow: &mut RunningDataflow,
1748    receiver_id: &NodeId,
1749    input_id: &DataId,
1750    clock: &HLC,
1751) {
1752    if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
1753        if !open_inputs.remove(input_id) {
1754            return;
1755        }
1756    }
1757    if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1758        let _ = send_with_timestamp(
1759            channel,
1760            NodeEvent::InputClosed {
1761                id: input_id.clone(),
1762            },
1763            clock,
1764        );
1765
1766        if dataflow.open_inputs(receiver_id).is_empty() {
1767            let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
1768        }
1769    }
1770}
1771
1772#[derive(Debug)]
1773struct RunningNode {
1774    pid: Option<ProcessId>,
1775    node_config: NodeConfig,
1776}
1777
1778#[derive(Debug)]
1779struct ProcessId(Option<u32>);
1780
1781impl ProcessId {
1782    pub fn new(process_id: u32) -> Self {
1783        Self(Some(process_id))
1784    }
1785
1786    pub fn mark_as_stopped(&mut self) {
1787        self.0 = None;
1788    }
1789
1790    pub fn kill(&mut self) -> bool {
1791        if let Some(pid) = self.0 {
1792            let mut system = sysinfo::System::new();
1793            system.refresh_processes();
1794
1795            if let Some(process) = system.process(Pid::from(pid as usize)) {
1796                process.kill();
1797                self.mark_as_stopped();
1798                return true;
1799            }
1800        }
1801
1802        false
1803    }
1804}
1805
1806impl Drop for ProcessId {
1807    fn drop(&mut self) {
1808        // kill the process if it's still running
1809        if let Some(pid) = self.0 {
1810            if self.kill() {
1811                warn!("process {pid} was killed on drop because it was still running")
1812            }
1813        }
1814    }
1815}
1816
1817pub struct RunningDataflow {
1818    id: Uuid,
1819    /// Local nodes that are not started yet
1820    pending_nodes: PendingNodes,
1821
1822    subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
1823    drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
1824    mappings: HashMap<OutputId, BTreeSet<InputId>>,
1825    timers: BTreeMap<Duration, BTreeSet<InputId>>,
1826    open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
1827    running_nodes: BTreeMap<NodeId, RunningNode>,
1828
1829    /// List of all dynamic node IDs.
1830    ///
1831    /// We want to treat dynamic nodes differently in some cases, so we need
1832    /// to know which nodes are dynamic.
1833    dynamic_nodes: BTreeSet<NodeId>,
1834
1835    open_external_mappings: BTreeSet<OutputId>,
1836
1837    pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
1838
1839    /// Keep handles to all timer tasks of this dataflow to cancel them on drop.
1840    _timer_handles: Vec<futures::future::RemoteHandle<()>>,
1841    stop_sent: bool,
1842
1843    /// Used in `open_inputs`.
1844    ///
1845    /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
1846    empty_set: BTreeSet<DataId>,
1847
1848    /// Contains the node that caused the error for nodes that experienced a cascading error.
1849    cascading_error_causes: CascadingErrorCauses,
1850    grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
1851
1852    node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
1853
1854    publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
1855
1856    finished_tx: broadcast::Sender<()>,
1857
1858    publish_all_messages_to_zenoh: bool,
1859}
1860
1861impl RunningDataflow {
1862    fn new(
1863        dataflow_id: Uuid,
1864        daemon_id: DaemonId,
1865        dataflow_descriptor: &Descriptor,
1866    ) -> RunningDataflow {
1867        let (finished_tx, _) = broadcast::channel(1);
1868        Self {
1869            id: dataflow_id,
1870            pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
1871            subscribe_channels: HashMap::new(),
1872            drop_channels: HashMap::new(),
1873            mappings: HashMap::new(),
1874            timers: BTreeMap::new(),
1875            open_inputs: BTreeMap::new(),
1876            running_nodes: BTreeMap::new(),
1877            dynamic_nodes: BTreeSet::new(),
1878            open_external_mappings: Default::default(),
1879            pending_drop_tokens: HashMap::new(),
1880            _timer_handles: Vec::new(),
1881            stop_sent: false,
1882            empty_set: BTreeSet::new(),
1883            cascading_error_causes: Default::default(),
1884            grace_duration_kills: Default::default(),
1885            node_stderr_most_recent: BTreeMap::new(),
1886            publishers: Default::default(),
1887            finished_tx,
1888            publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
1889        }
1890    }
1891
1892    async fn start(
1893        &mut self,
1894        events_tx: &mpsc::Sender<Timestamped<Event>>,
1895        clock: &Arc<HLC>,
1896    ) -> eyre::Result<()> {
1897        for interval in self.timers.keys().copied() {
1898            let events_tx = events_tx.clone();
1899            let dataflow_id = self.id;
1900            let clock = clock.clone();
1901            let task = async move {
1902                let mut interval_stream = tokio::time::interval(interval);
1903                let hlc = HLC::default();
1904                loop {
1905                    interval_stream.tick().await;
1906
1907                    let span = tracing::span!(tracing::Level::TRACE, "tick");
1908                    let _ = span.enter();
1909
1910                    let mut parameters = BTreeMap::new();
1911                    parameters.insert(
1912                        "open_telemetry_context".to_string(),
1913                        #[cfg(feature = "telemetry")]
1914                        Parameter::String(serialize_context(&span.context())),
1915                        #[cfg(not(feature = "telemetry"))]
1916                        Parameter::String("".into()),
1917                    );
1918
1919                    let metadata = metadata::Metadata::from_parameters(
1920                        hlc.new_timestamp(),
1921                        empty_type_info(),
1922                        parameters,
1923                    );
1924
1925                    let event = Timestamped {
1926                        inner: DoraEvent::Timer {
1927                            dataflow_id,
1928                            interval,
1929                            metadata,
1930                        }
1931                        .into(),
1932                        timestamp: clock.new_timestamp(),
1933                    };
1934                    if events_tx.send(event).await.is_err() {
1935                        break;
1936                    }
1937                }
1938            };
1939            let (task, handle) = task.remote_handle();
1940            tokio::spawn(task);
1941            self._timer_handles.push(handle);
1942        }
1943
1944        Ok(())
1945    }
1946
1947    async fn stop_all(
1948        &mut self,
1949        coordinator_connection: &mut Option<TcpStream>,
1950        clock: &HLC,
1951        grace_duration: Option<Duration>,
1952        logger: &mut DataflowLogger<'_>,
1953    ) -> eyre::Result<()> {
1954        self.pending_nodes
1955            .handle_dataflow_stop(
1956                coordinator_connection,
1957                clock,
1958                &mut self.cascading_error_causes,
1959                &self.dynamic_nodes,
1960                logger,
1961            )
1962            .await?;
1963
1964        for (_node_id, channel) in self.subscribe_channels.drain() {
1965            let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
1966        }
1967
1968        let running_processes: Vec<_> = self
1969            .running_nodes
1970            .iter_mut()
1971            .map(|(id, n)| (id.clone(), n.pid.take()))
1972            .collect();
1973        let grace_duration_kills = self.grace_duration_kills.clone();
1974        tokio::spawn(async move {
1975            let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
1976            tokio::time::sleep(duration).await;
1977
1978            for (node, pid) in running_processes {
1979                if let Some(mut pid) = pid {
1980                    if pid.kill() {
1981                        grace_duration_kills.insert(node.clone());
1982                        warn!(
1983                            "{node} was killed due to not stopping within the {:#?} grace period",
1984                            duration
1985                        )
1986                    }
1987                }
1988            }
1989        });
1990        self.stop_sent = true;
1991        Ok(())
1992    }
1993
1994    fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
1995        self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
1996    }
1997
1998    async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
1999        match self.pending_drop_tokens.entry(token) {
2000            std::collections::hash_map::Entry::Occupied(entry) => {
2001                if entry.get().pending_nodes.is_empty() {
2002                    let (drop_token, info) = entry.remove_entry();
2003                    let result = match self.drop_channels.get_mut(&info.owner) {
2004                        Some(channel) => send_with_timestamp(
2005                            channel,
2006                            NodeDropEvent::OutputDropped { drop_token },
2007                            clock,
2008                        )
2009                        .wrap_err("send failed"),
2010                        None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2011                    };
2012                    if let Err(err) = result.wrap_err_with(|| {
2013                        format!(
2014                            "failed to report drop token `{drop_token:?}` to owner `{}`",
2015                            &info.owner
2016                        )
2017                    }) {
2018                        tracing::warn!("{err:?}");
2019                    }
2020                }
2021            }
2022            std::collections::hash_map::Entry::Vacant(_) => {
2023                tracing::warn!("check_drop_token called with already closed token")
2024            }
2025        }
2026
2027        Ok(())
2028    }
2029
2030    fn output_publish_topic(&self, output_id: &OutputId) -> String {
2031        let network_id = "default";
2032        let dataflow_id = self.id;
2033        let OutputId(node_id, output_id) = output_id;
2034        format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
2035    }
2036}
2037
2038fn empty_type_info() -> ArrowTypeInfo {
2039    ArrowTypeInfo {
2040        data_type: DataType::Null,
2041        len: 0,
2042        null_count: 0,
2043        validity: None,
2044        offset: 0,
2045        buffer_offsets: Vec::new(),
2046        child_data: Vec::new(),
2047    }
2048}
2049
2050#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2051pub struct OutputId(NodeId, DataId);
2052type InputId = (NodeId, DataId);
2053
2054struct DropTokenInformation {
2055    /// The node that created the associated drop token.
2056    owner: NodeId,
2057    /// Contains the set of pending nodes that still have access to the input
2058    /// associated with a drop token.
2059    pending_nodes: BTreeSet<NodeId>,
2060}
2061
2062#[derive(Debug)]
2063pub enum Event {
2064    Node {
2065        dataflow_id: DataflowId,
2066        node_id: NodeId,
2067        event: DaemonNodeEvent,
2068    },
2069    Coordinator(CoordinatorEvent),
2070    Daemon(InterDaemonEvent),
2071    Dora(DoraEvent),
2072    DynamicNode(DynamicNodeEventWrapper),
2073    HeartbeatInterval,
2074    CtrlC,
2075    SecondCtrlC,
2076    DaemonError(eyre::Report),
2077}
2078
2079impl From<DoraEvent> for Event {
2080    fn from(event: DoraEvent) -> Self {
2081        Event::Dora(event)
2082    }
2083}
2084
2085#[derive(Debug)]
2086pub enum DaemonNodeEvent {
2087    OutputsDone {
2088        reply_sender: oneshot::Sender<DaemonReply>,
2089    },
2090    Subscribe {
2091        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2092        reply_sender: oneshot::Sender<DaemonReply>,
2093    },
2094    SubscribeDrop {
2095        event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2096        reply_sender: oneshot::Sender<DaemonReply>,
2097    },
2098    CloseOutputs {
2099        outputs: Vec<dora_core::config::DataId>,
2100        reply_sender: oneshot::Sender<DaemonReply>,
2101    },
2102    SendOut {
2103        output_id: DataId,
2104        metadata: metadata::Metadata,
2105        data: Option<DataMessage>,
2106    },
2107    ReportDrop {
2108        tokens: Vec<DropToken>,
2109    },
2110    EventStreamDropped {
2111        reply_sender: oneshot::Sender<DaemonReply>,
2112    },
2113}
2114
2115#[derive(Debug)]
2116pub enum DoraEvent {
2117    Timer {
2118        dataflow_id: DataflowId,
2119        interval: Duration,
2120        metadata: metadata::Metadata,
2121    },
2122    Logs {
2123        dataflow_id: DataflowId,
2124        output_id: OutputId,
2125        message: DataMessage,
2126        metadata: metadata::Metadata,
2127    },
2128    SpawnedNodeResult {
2129        dataflow_id: DataflowId,
2130        node_id: NodeId,
2131        exit_status: NodeExitStatus,
2132    },
2133}
2134
2135#[must_use]
2136enum RunStatus {
2137    Continue,
2138    Exit,
2139}
2140
2141fn send_with_timestamp<T>(
2142    sender: &UnboundedSender<Timestamped<T>>,
2143    event: T,
2144    clock: &HLC,
2145) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2146    sender.send(Timestamped {
2147        inner: event,
2148        timestamp: clock.new_timestamp(),
2149    })
2150}
2151
2152fn set_up_ctrlc_handler(
2153    clock: Arc<HLC>,
2154) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2155    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2156
2157    let mut ctrlc_sent = 0;
2158    ctrlc::set_handler(move || {
2159        let event = match ctrlc_sent {
2160            0 => Event::CtrlC,
2161            1 => Event::SecondCtrlC,
2162            _ => {
2163                tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2164                std::process::abort();
2165            }
2166        };
2167        if ctrlc_tx
2168            .blocking_send(Timestamped {
2169                inner: event,
2170                timestamp: clock.new_timestamp(),
2171            })
2172            .is_err()
2173        {
2174            tracing::error!("failed to report ctrl-c event to dora-coordinator");
2175        }
2176
2177        ctrlc_sent += 1;
2178    })
2179    .wrap_err("failed to set ctrl-c handler")?;
2180
2181    Ok(ctrlc_rx)
2182}
2183
2184#[derive(Debug, Default, Clone, PartialEq, Eq)]
2185pub struct CascadingErrorCauses {
2186    caused_by: BTreeMap<NodeId, NodeId>,
2187}
2188
2189impl CascadingErrorCauses {
2190    pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2191        self.caused_by.contains_key(node)
2192    }
2193
2194    /// Return the ID of the node that caused a cascading error for the given node, if any.
2195    pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2196        self.caused_by.get(node)
2197    }
2198
2199    pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2200        self.caused_by.entry(affected_node).or_insert(causing_node);
2201    }
2202}
2203
2204fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2205    n.operators
2206        .iter()
2207        .flat_map(|operator| {
2208            operator.config.inputs.iter().map(|(input_id, mapping)| {
2209                (
2210                    DataId::from(format!("{}/{input_id}", operator.id)),
2211                    mapping.clone(),
2212                )
2213            })
2214        })
2215        .collect()
2216}
2217
2218fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2219    n.operators
2220        .iter()
2221        .flat_map(|operator| {
2222            operator
2223                .config
2224                .outputs
2225                .iter()
2226                .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2227        })
2228        .collect()
2229}
2230
2231trait CoreNodeKindExt {
2232    fn run_config(&self) -> NodeRunConfig;
2233    fn dynamic(&self) -> bool;
2234}
2235
2236impl CoreNodeKindExt for CoreNodeKind {
2237    fn run_config(&self) -> NodeRunConfig {
2238        match self {
2239            CoreNodeKind::Runtime(n) => NodeRunConfig {
2240                inputs: runtime_node_inputs(n),
2241                outputs: runtime_node_outputs(n),
2242            },
2243            CoreNodeKind::Custom(n) => n.run_config.clone(),
2244        }
2245    }
2246
2247    fn dynamic(&self) -> bool {
2248        match self {
2249            CoreNodeKind::Runtime(_n) => false,
2250            CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
2251        }
2252    }
2253}