dora_daemon/
lib.rs

1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5    config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
6    descriptor::{
7        read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
8        DYNAMIC_SOURCE,
9    },
10    topics::LOCALHOST,
11    uhlc::{self, HLC},
12};
13use dora_message::{
14    common::{
15        DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
16    },
17    coordinator_to_cli::DataflowResult,
18    coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
19    daemon_to_coordinator::{
20        CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
21    },
22    daemon_to_daemon::InterDaemonEvent,
23    daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
24    metadata::{self, ArrowTypeInfo},
25    node_to_daemon::{DynamicNodeEvent, Timestamped},
26    DataflowId,
27};
28use dora_node_api::{arrow::datatypes::DataType, Parameter};
29use eyre::{bail, eyre, Context, ContextCompat, Result};
30use futures::{future, stream, FutureExt, TryFutureExt};
31use futures_concurrency::stream::Merge;
32use local_listener::DynamicNodeEventWrapper;
33use log::{DaemonLogger, DataflowLogger, Logger};
34use pending::PendingNodes;
35use shared_memory_server::ShmemConf;
36use socket_stream_utils::socket_stream_send;
37use std::{
38    collections::{BTreeMap, BTreeSet, HashMap},
39    net::SocketAddr,
40    path::{Path, PathBuf},
41    pin::pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use sysinfo::Pid;
46use tokio::{
47    fs::File,
48    io::AsyncReadExt,
49    net::TcpStream,
50    sync::{
51        broadcast,
52        mpsc::{self, UnboundedSender},
53        oneshot::{self, Sender},
54    },
55};
56use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
57use tracing::{error, warn};
58use uuid::{NoContext, Timestamp, Uuid};
59
60mod coordinator;
61mod local_listener;
62mod log;
63mod node_communication;
64mod pending;
65mod socket_stream_utils;
66mod spawn;
67
68#[cfg(feature = "telemetry")]
69use dora_tracing::telemetry::serialize_context;
70#[cfg(feature = "telemetry")]
71use tracing_opentelemetry::OpenTelemetrySpanExt;
72
73use crate::pending::DataflowStatus;
74
75const STDERR_LOG_LINES: usize = 10;
76
77pub struct Daemon {
78    running: HashMap<DataflowId, RunningDataflow>,
79    working_dir: HashMap<DataflowId, PathBuf>,
80
81    events_tx: mpsc::Sender<Timestamped<Event>>,
82
83    coordinator_connection: Option<TcpStream>,
84    last_coordinator_heartbeat: Instant,
85    daemon_id: DaemonId,
86
87    /// used for testing and examples
88    exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
89    /// set on ctrl-c
90    exit_when_all_finished: bool,
91    /// used to record results of local nodes
92    dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
93
94    clock: Arc<uhlc::HLC>,
95
96    zenoh_session: zenoh::Session,
97    remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
98
99    logger: DaemonLogger,
100}
101
102type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
103
104impl Daemon {
105    pub async fn run(
106        coordinator_addr: SocketAddr,
107        machine_id: Option<String>,
108        local_listen_port: u16,
109    ) -> eyre::Result<()> {
110        let clock = Arc::new(HLC::default());
111
112        let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
113        let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
114        let (daemon_id, incoming_events) = {
115            let incoming_events = set_up_event_stream(
116                coordinator_addr,
117                &machine_id,
118                &clock,
119                remote_daemon_events_rx,
120                local_listen_port,
121            );
122
123            // finish early if ctrl-c is is pressed during event stream setup
124            let ctrl_c = pin!(ctrlc_events.recv());
125            match futures::future::select(ctrl_c, pin!(incoming_events)).await {
126                future::Either::Left((_ctrl_c, _)) => {
127                    tracing::info!("received ctrl-c signal -> stopping daemon");
128                    return Ok(());
129                }
130                future::Either::Right((events, _)) => events?,
131            }
132        };
133        Self::run_general(
134            (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
135            Some(coordinator_addr),
136            daemon_id,
137            None,
138            clock,
139            Some(remote_daemon_events_tx),
140        )
141        .await
142        .map(|_| ())
143    }
144
145    pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
146        let working_dir = dataflow_path
147            .canonicalize()
148            .context("failed to canonicalize dataflow path")?
149            .parent()
150            .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
151            .to_owned();
152
153        let descriptor = read_as_descriptor(dataflow_path).await?;
154        descriptor.check(&working_dir)?;
155        let nodes = descriptor.resolve_aliases_and_set_defaults()?;
156
157        let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
158        let spawn_command = SpawnDataflowNodes {
159            dataflow_id,
160            working_dir,
161            spawn_nodes: nodes.keys().cloned().collect(),
162            nodes,
163            dataflow_descriptor: descriptor,
164            uv,
165        };
166
167        let clock = Arc::new(HLC::default());
168
169        let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
170
171        let exit_when_done = spawn_command
172            .nodes
173            .values()
174            .map(|n| (spawn_command.dataflow_id, n.id.clone()))
175            .collect();
176        let (reply_tx, reply_rx) = oneshot::channel();
177        let timestamp = clock.new_timestamp();
178        let coordinator_events = stream::once(async move {
179            Timestamped {
180                inner: Event::Coordinator(CoordinatorEvent {
181                    event: DaemonCoordinatorEvent::Spawn(spawn_command),
182                    reply_tx,
183                }),
184                timestamp,
185            }
186        });
187        let events = (coordinator_events, ctrlc_events).merge();
188        let run_result = Self::run_general(
189            Box::pin(events),
190            None,
191            DaemonId::new(None),
192            Some(exit_when_done),
193            clock.clone(),
194            None,
195        );
196
197        let spawn_result = reply_rx
198            .map_err(|err| eyre!("failed to receive spawn result: {err}"))
199            .and_then(|r| async {
200                match r {
201                    Some(DaemonCoordinatorReply::SpawnResult(result)) => {
202                        result.map_err(|err| eyre!(err))
203                    }
204                    _ => Err(eyre!("unexpected spawn reply")),
205                }
206            });
207
208        let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
209
210        Ok(DataflowResult {
211            uuid: dataflow_id,
212            timestamp: clock.new_timestamp(),
213            node_results: dataflow_results
214                .remove(&dataflow_id)
215                .context("no node results for dataflow_id")?,
216        })
217    }
218
219    async fn run_general(
220        external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
221        coordinator_addr: Option<SocketAddr>,
222        daemon_id: DaemonId,
223        exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
224        clock: Arc<HLC>,
225        remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
226    ) -> eyre::Result<DaemonRunResult> {
227        let coordinator_connection = match coordinator_addr {
228            Some(addr) => {
229                let stream = TcpStream::connect(addr)
230                    .await
231                    .wrap_err("failed to connect to dora-coordinator")?;
232                stream
233                    .set_nodelay(true)
234                    .wrap_err("failed to set TCP_NODELAY")?;
235                Some(stream)
236            }
237            None => None,
238        };
239
240        // additional connection for logging
241        let logger_coordinator_connection = match coordinator_addr {
242            Some(addr) => {
243                let stream = TcpStream::connect(addr)
244                    .await
245                    .wrap_err("failed to connect log to dora-coordinator")?;
246                stream
247                    .set_nodelay(true)
248                    .wrap_err("failed to set TCP_NODELAY")?;
249                Some(stream)
250            }
251            None => None,
252        };
253
254        let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
255            Ok(path) => {
256                let zenoh_config = zenoh::Config::from_file(&path)
257                    .map_err(|e| eyre!(e))
258                    .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
259                zenoh::open(zenoh_config)
260                    .await
261                    .map_err(|e| eyre!(e))
262                    .context("failed to open zenoh session")?
263            }
264            Err(std::env::VarError::NotPresent) => {
265                let mut zenoh_config = zenoh::Config::default();
266
267                if let Some(addr) = coordinator_addr {
268                    // Linkstate make it possible to connect two daemons on different network through a public daemon
269                    // TODO: There is currently a CI/CD Error in windows linkstate.
270                    if cfg!(not(target_os = "windows")) {
271                        zenoh_config
272                            .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
273                            .unwrap();
274                    }
275
276                    zenoh_config
277                        .insert_json5(
278                            "connect/endpoints",
279                            &format!(
280                                r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
281                                addr.ip()
282                            ),
283                        )
284                        .unwrap();
285                    zenoh_config
286                        .insert_json5(
287                            "listen/endpoints",
288                            r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
289                        )
290                        .unwrap();
291                    if cfg!(target_os = "macos") {
292                        warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
293                        zenoh_config
294                            .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
295                            .unwrap();
296                    }
297                };
298                if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
299                    zenoh_session
300                } else {
301                    warn!(
302                        "failed to open zenoh session, retrying with default config + coordinator"
303                    );
304                    let mut zenoh_config = zenoh::Config::default();
305                    // Linkstate make it possible to connect two daemons on different network through a public daemon
306                    // TODO: There is currently a CI/CD Error in windows linkstate.
307                    if cfg!(not(target_os = "windows")) {
308                        zenoh_config
309                            .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
310                            .unwrap();
311                    }
312
313                    if let Some(addr) = coordinator_addr {
314                        zenoh_config
315                            .insert_json5(
316                                "connect/endpoints",
317                                &format!(
318                                    r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
319                                    addr.ip()
320                                ),
321                            )
322                            .unwrap();
323                        if cfg!(target_os = "macos") {
324                            warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
325                            zenoh_config
326                                .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
327                                .unwrap();
328                        }
329                    }
330                    if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
331                        zenoh_session
332                    } else {
333                        warn!("failed to open zenoh session, retrying with default config");
334                        let zenoh_config = zenoh::Config::default();
335                        zenoh::open(zenoh_config)
336                            .await
337                            .map_err(|e| eyre!(e))
338                            .context("failed to open zenoh session")?
339                    }
340                }
341            }
342            Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
343                "{} env variable is not valid unicode",
344                zenoh::Config::DEFAULT_CONFIG_PATH_ENV
345            ),
346        };
347        let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
348        let daemon = Self {
349            logger: Logger {
350                coordinator_connection: logger_coordinator_connection,
351                daemon_id: daemon_id.clone(),
352                clock: clock.clone(),
353            }
354            .for_daemon(daemon_id.clone()),
355            running: HashMap::new(),
356            working_dir: HashMap::new(),
357            events_tx: dora_events_tx,
358            coordinator_connection,
359            last_coordinator_heartbeat: Instant::now(),
360            daemon_id,
361            exit_when_done,
362            exit_when_all_finished: false,
363            dataflow_node_results: BTreeMap::new(),
364            clock,
365            zenoh_session,
366            remote_daemon_events_tx,
367        };
368
369        let dora_events = ReceiverStream::new(dora_events_rx);
370        let watchdog_clock = daemon.clock.clone();
371        let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
372            Duration::from_secs(5),
373        ))
374        .map(|_| Timestamped {
375            inner: Event::HeartbeatInterval,
376            timestamp: watchdog_clock.new_timestamp(),
377        });
378        let events = (external_events, dora_events, watchdog_interval).merge();
379        daemon.run_inner(events).await
380    }
381
382    #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
383    async fn run_inner(
384        mut self,
385        incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
386    ) -> eyre::Result<DaemonRunResult> {
387        let mut events = incoming_events;
388
389        while let Some(event) = events.next().await {
390            let Timestamped { inner, timestamp } = event;
391            if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
392                tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
393            }
394
395            match inner {
396                Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
397                    let status = self.handle_coordinator_event(event, reply_tx).await?;
398
399                    match status {
400                        RunStatus::Continue => {}
401                        RunStatus::Exit => break,
402                    }
403                }
404                Event::Daemon(event) => {
405                    self.handle_inter_daemon_event(event).await?;
406                }
407                Event::Node {
408                    dataflow_id: dataflow,
409                    node_id,
410                    event,
411                } => self.handle_node_event(event, dataflow, node_id).await?,
412                Event::Dora(event) => match self.handle_dora_event(event).await? {
413                    RunStatus::Continue => {}
414                    RunStatus::Exit => break,
415                },
416                Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
417                Event::HeartbeatInterval => {
418                    if let Some(connection) = &mut self.coordinator_connection {
419                        let msg = serde_json::to_vec(&Timestamped {
420                            inner: CoordinatorRequest::Event {
421                                daemon_id: self.daemon_id.clone(),
422                                event: DaemonEvent::Heartbeat,
423                            },
424                            timestamp: self.clock.new_timestamp(),
425                        })?;
426                        socket_stream_send(connection, &msg)
427                            .await
428                            .wrap_err("failed to send watchdog message to dora-coordinator")?;
429
430                        if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
431                            bail!("lost connection to coordinator")
432                        }
433                    }
434                }
435                Event::CtrlC => {
436                    tracing::info!("received ctrlc signal -> stopping all dataflows");
437                    for dataflow in self.running.values_mut() {
438                        let mut logger = self.logger.for_dataflow(dataflow.id);
439                        dataflow
440                            .stop_all(
441                                &mut self.coordinator_connection,
442                                &self.clock,
443                                None,
444                                &mut logger,
445                            )
446                            .await?;
447                    }
448                    self.exit_when_all_finished = true;
449                    if self.running.is_empty() {
450                        break;
451                    }
452                }
453                Event::SecondCtrlC => {
454                    tracing::warn!("received second ctrlc signal -> exit immediately");
455                    bail!("received second ctrl-c signal");
456                }
457                Event::DaemonError(err) => {
458                    tracing::error!("Daemon error: {err:?}");
459                }
460            }
461        }
462
463        if let Some(mut connection) = self.coordinator_connection.take() {
464            let msg = serde_json::to_vec(&Timestamped {
465                inner: CoordinatorRequest::Event {
466                    daemon_id: self.daemon_id.clone(),
467                    event: DaemonEvent::Exit,
468                },
469                timestamp: self.clock.new_timestamp(),
470            })?;
471            socket_stream_send(&mut connection, &msg)
472                .await
473                .wrap_err("failed to send Exit message to dora-coordinator")?;
474        }
475
476        Ok(self.dataflow_node_results)
477    }
478
479    async fn handle_coordinator_event(
480        &mut self,
481        event: DaemonCoordinatorEvent,
482        reply_tx: Sender<Option<DaemonCoordinatorReply>>,
483    ) -> eyre::Result<RunStatus> {
484        let status = match event {
485            DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
486                dataflow_id,
487                working_dir,
488                nodes,
489                dataflow_descriptor,
490                spawn_nodes,
491                uv,
492            }) => {
493                match dataflow_descriptor.communication.remote {
494                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
495                }
496
497                // Use the working directory if it exists, otherwise use the working directory where the daemon is spawned
498                let working_dir = if working_dir.exists() {
499                    working_dir
500                } else {
501                    std::env::current_dir().wrap_err("failed to get current working dir")?
502                };
503
504                let result = self
505                    .spawn_dataflow(
506                        dataflow_id,
507                        working_dir,
508                        nodes,
509                        dataflow_descriptor,
510                        spawn_nodes,
511                        uv,
512                    )
513                    .await;
514                if let Err(err) = &result {
515                    tracing::error!("{err:?}");
516                }
517                let reply =
518                    DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
519                let _ = reply_tx.send(Some(reply)).map_err(|_| {
520                    error!("could not send `SpawnResult` reply from daemon to coordinator")
521                });
522                RunStatus::Continue
523            }
524            DaemonCoordinatorEvent::AllNodesReady {
525                dataflow_id,
526                exited_before_subscribe,
527            } => {
528                let mut logger = self.logger.for_dataflow(dataflow_id);
529                logger.log(LogLevel::Debug, None,
530                    Some("daemon".into()),
531                    format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
532                )).await;
533                match self.running.get_mut(&dataflow_id) {
534                    Some(dataflow) => {
535                        let ready = exited_before_subscribe.is_empty();
536                        dataflow
537                            .pending_nodes
538                            .handle_external_all_nodes_ready(
539                                exited_before_subscribe,
540                                &mut dataflow.cascading_error_causes,
541                            )
542                            .await?;
543                        if ready {
544                            logger.log(LogLevel::Info, None,
545                                Some("daemon".into()),
546                                "coordinator reported that all nodes are ready, starting dataflow",
547                            ).await;
548                            dataflow.start(&self.events_tx, &self.clock).await?;
549                        }
550                    }
551                    None => {
552                        tracing::warn!(
553                            "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
554                        );
555                    }
556                }
557                let _ = reply_tx.send(None).map_err(|_| {
558                    error!("could not send `AllNodesReady` reply from daemon to coordinator")
559                });
560                RunStatus::Continue
561            }
562            DaemonCoordinatorEvent::Logs {
563                dataflow_id,
564                node_id,
565            } => {
566                match self.working_dir.get(&dataflow_id) {
567                    Some(working_dir) => {
568                        let working_dir = working_dir.clone();
569                        tokio::spawn(async move {
570                            let logs = async {
571                                let mut file =
572                                    File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
573                                        .await
574                                        .wrap_err(format!(
575                                            "Could not open log file: {:#?}",
576                                            log::log_path(&working_dir, &dataflow_id, &node_id)
577                                        ))?;
578
579                                let mut contents = vec![];
580                                file.read_to_end(&mut contents)
581                                    .await
582                                    .wrap_err("Could not read content of log file")?;
583                                Result::<Vec<u8>, eyre::Report>::Ok(contents)
584                            }
585                            .await
586                            .map_err(|err| format!("{err:?}"));
587                            let _ = reply_tx
588                                .send(Some(DaemonCoordinatorReply::Logs(logs)))
589                                .map_err(|_| {
590                                    error!("could not send logs reply from daemon to coordinator")
591                                });
592                        });
593                    }
594                    None => {
595                        tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
596                        let _ = reply_tx.send(None).map_err(|_| {
597                            error!(
598                                "could not send `AllNodesReady` reply from daemon to coordinator"
599                            )
600                        });
601                    }
602                }
603                RunStatus::Continue
604            }
605            DaemonCoordinatorEvent::ReloadDataflow {
606                dataflow_id,
607                node_id,
608                operator_id,
609            } => {
610                let result = self.send_reload(dataflow_id, node_id, operator_id).await;
611                let reply =
612                    DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
613                let _ = reply_tx
614                    .send(Some(reply))
615                    .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
616                RunStatus::Continue
617            }
618            DaemonCoordinatorEvent::StopDataflow {
619                dataflow_id,
620                grace_duration,
621            } => {
622                let mut logger = self.logger.for_dataflow(dataflow_id);
623                let dataflow = self
624                    .running
625                    .get_mut(&dataflow_id)
626                    .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
627                let (reply, future) = match dataflow {
628                    Ok(dataflow) => {
629                        let future = dataflow.stop_all(
630                            &mut self.coordinator_connection,
631                            &self.clock,
632                            grace_duration,
633                            &mut logger,
634                        );
635                        (Ok(()), Some(future))
636                    }
637                    Err(err) => (Err(err.to_string()), None),
638                };
639
640                let _ = reply_tx
641                    .send(Some(DaemonCoordinatorReply::StopResult(reply)))
642                    .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
643
644                if let Some(future) = future {
645                    future.await?;
646                }
647
648                RunStatus::Continue
649            }
650            DaemonCoordinatorEvent::Destroy => {
651                tracing::info!("received destroy command -> exiting");
652                let (notify_tx, notify_rx) = oneshot::channel();
653                let reply = DaemonCoordinatorReply::DestroyResult {
654                    result: Ok(()),
655                    notify: Some(notify_tx),
656                };
657                let _ = reply_tx
658                    .send(Some(reply))
659                    .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
660                // wait until the reply is sent out
661                if notify_rx.await.is_err() {
662                    tracing::warn!("no confirmation received for DestroyReply");
663                }
664                RunStatus::Exit
665            }
666            DaemonCoordinatorEvent::Heartbeat => {
667                self.last_coordinator_heartbeat = Instant::now();
668                let _ = reply_tx.send(None);
669                RunStatus::Continue
670            }
671        };
672        Ok(status)
673    }
674
675    async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
676        match event {
677            InterDaemonEvent::Output {
678                dataflow_id,
679                node_id,
680                output_id,
681                metadata,
682                data,
683            } => {
684                let inner = async {
685                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
686                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
687                    })?;
688                    send_output_to_local_receivers(
689                        node_id.clone(),
690                        output_id.clone(),
691                        dataflow,
692                        &metadata,
693                        data.map(DataMessage::Vec),
694                        &self.clock,
695                    )
696                    .await?;
697                    Result::<_, eyre::Report>::Ok(())
698                };
699                if let Err(err) = inner
700                    .await
701                    .wrap_err("failed to forward remote output to local receivers")
702                {
703                    let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
704                    logger
705                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
706                        .await;
707                }
708                Ok(())
709            }
710            InterDaemonEvent::OutputClosed {
711                dataflow_id,
712                node_id,
713                output_id,
714            } => {
715                let output_id = OutputId(node_id.clone(), output_id);
716                let mut logger = self
717                    .logger
718                    .for_dataflow(dataflow_id)
719                    .for_node(node_id.clone());
720                logger
721                    .log(
722                        LogLevel::Debug,
723                        Some("daemon".into()),
724                        format!("received OutputClosed event for output {output_id:?}"),
725                    )
726                    .await;
727
728                let inner = async {
729                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
730                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
731                    })?;
732
733                    if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
734                        for (receiver_id, input_id) in &inputs {
735                            close_input(dataflow, receiver_id, input_id, &self.clock);
736                        }
737                    }
738                    Result::<(), eyre::Report>::Ok(())
739                };
740                if let Err(err) = inner
741                    .await
742                    .wrap_err("failed to handle InputsClosed event sent by coordinator")
743                {
744                    logger
745                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
746                        .await;
747                }
748                Ok(())
749            }
750        }
751    }
752
753    async fn spawn_dataflow(
754        &mut self,
755        dataflow_id: uuid::Uuid,
756        working_dir: PathBuf,
757        nodes: BTreeMap<NodeId, ResolvedNode>,
758        dataflow_descriptor: Descriptor,
759        spawn_nodes: BTreeSet<NodeId>,
760        uv: bool,
761    ) -> eyre::Result<()> {
762        let mut logger = self.logger.for_dataflow(dataflow_id);
763        let dataflow =
764            RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
765        let dataflow = match self.running.entry(dataflow_id) {
766            std::collections::hash_map::Entry::Vacant(entry) => {
767                self.working_dir.insert(dataflow_id, working_dir.clone());
768                entry.insert(dataflow)
769            }
770            std::collections::hash_map::Entry::Occupied(_) => {
771                bail!("there is already a running dataflow with ID `{dataflow_id}`")
772            }
773        };
774
775        let mut stopped = Vec::new();
776
777        // calculate info about mappings
778        for node in nodes.values() {
779            let local = spawn_nodes.contains(&node.id);
780
781            let inputs = node_inputs(node);
782            for (input_id, input) in inputs {
783                if local {
784                    dataflow
785                        .open_inputs
786                        .entry(node.id.clone())
787                        .or_default()
788                        .insert(input_id.clone());
789                    match input.mapping {
790                        InputMapping::User(mapping) => {
791                            dataflow
792                                .mappings
793                                .entry(OutputId(mapping.source, mapping.output))
794                                .or_default()
795                                .insert((node.id.clone(), input_id));
796                        }
797                        InputMapping::Timer { interval } => {
798                            dataflow
799                                .timers
800                                .entry(interval)
801                                .or_default()
802                                .insert((node.id.clone(), input_id));
803                        }
804                    }
805                } else if let InputMapping::User(mapping) = input.mapping {
806                    dataflow
807                        .open_external_mappings
808                        .insert(OutputId(mapping.source, mapping.output));
809                }
810            }
811        }
812
813        // spawn nodes and set up subscriptions
814        for node in nodes.into_values() {
815            let mut logger = logger.reborrow().for_node(node.id.clone());
816            let local = spawn_nodes.contains(&node.id);
817            if local {
818                if node.kind.dynamic() {
819                    dataflow.dynamic_nodes.insert(node.id.clone());
820                } else {
821                    dataflow.pending_nodes.insert(node.id.clone());
822                }
823
824                let node_id = node.id.clone();
825                let node_stderr_most_recent = dataflow
826                    .node_stderr_most_recent
827                    .entry(node.id.clone())
828                    .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
829                    .clone();
830                logger
831                    .log(LogLevel::Info, Some("daemon".into()), "spawning")
832                    .await;
833                match spawn::spawn_node(
834                    dataflow_id,
835                    &working_dir,
836                    node,
837                    self.events_tx.clone(),
838                    dataflow_descriptor.clone(),
839                    self.clock.clone(),
840                    node_stderr_most_recent,
841                    uv,
842                    &mut logger,
843                )
844                .await
845                .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
846                {
847                    Ok(running_node) => {
848                        dataflow.running_nodes.insert(node_id, running_node);
849                    }
850                    Err(err) => {
851                        logger
852                            .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
853                            .await;
854                        self.dataflow_node_results
855                            .entry(dataflow_id)
856                            .or_default()
857                            .insert(
858                                node_id.clone(),
859                                Err(NodeError {
860                                    timestamp: self.clock.new_timestamp(),
861                                    cause: NodeErrorCause::Other {
862                                        stderr: format!("spawn failed: {err:?}"),
863                                    },
864                                    exit_status: NodeExitStatus::Unknown,
865                                }),
866                            );
867                        stopped.push(node_id.clone());
868                    }
869                }
870            } else {
871                // wait until node is ready before starting
872                dataflow.pending_nodes.set_external_nodes(true);
873
874                // subscribe to all node outputs that are mapped to some local inputs
875                for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
876                    let tx = self
877                        .remote_daemon_events_tx
878                        .clone()
879                        .wrap_err("no remote_daemon_events_tx channel")?;
880                    let mut finished_rx = dataflow.finished_tx.subscribe();
881                    let subscribe_topic = dataflow.output_publish_topic(output_id);
882                    tracing::debug!("declaring subscriber on {subscribe_topic}");
883                    let subscriber = self
884                        .zenoh_session
885                        .declare_subscriber(subscribe_topic)
886                        .await
887                        .map_err(|e| eyre!(e))
888                        .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
889                    tokio::spawn(async move {
890                        let mut finished = pin!(finished_rx.recv());
891                        loop {
892                            let finished_or_next =
893                                futures::future::select(finished, subscriber.recv_async());
894                            match finished_or_next.await {
895                                future::Either::Left((finished, _)) => {
896                                    match finished {
897                                        Err(broadcast::error::RecvError::Closed) => {
898                                            tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
899                                            break;
900                                        }
901                                        other => {
902                                            tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
903                                            break;
904                                        }
905                                    }
906                                }
907                                future::Either::Right((sample, f)) => {
908                                    finished = f;
909                                    let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
910                                        Timestamped::deserialize_inter_daemon_event(
911                                            &s.payload().to_bytes(),
912                                        )
913                                    });
914                                    if tx.send_async(event).await.is_err() {
915                                        // daemon finished
916                                        break;
917                                    }
918                                }
919                            }
920                        }
921                    });
922                }
923            }
924        }
925        for node_id in stopped {
926            self.handle_node_stop(dataflow_id, &node_id).await?;
927        }
928
929        Ok(())
930    }
931
932    async fn handle_dynamic_node_event(
933        &mut self,
934        event: DynamicNodeEventWrapper,
935    ) -> eyre::Result<()> {
936        match event {
937            DynamicNodeEventWrapper {
938                event: DynamicNodeEvent::NodeConfig { node_id },
939                reply_tx,
940            } => {
941                let number_node_id = self
942                    .running
943                    .iter()
944                    .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
945                    .count();
946
947                let node_config = match number_node_id {
948                    2.. => Err(format!(
949                        "multiple dataflows contains dynamic node id {node_id}. \
950                        Please only have one running dataflow with the specified \
951                        node id if you want to use dynamic node",
952                    )),
953                    1 => self
954                        .running
955                        .iter()
956                        .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
957                        .map(|(id, dataflow)| -> Result<NodeConfig> {
958                            let node_config = dataflow
959                                .running_nodes
960                                .get(&node_id)
961                                .context("no node with ID `{node_id}` within the given dataflow")?
962                                .node_config
963                                .clone();
964                            if !node_config.dynamic {
965                                bail!("node with ID `{node_id}` in {id} is not dynamic");
966                            }
967                            Ok(node_config)
968                        })
969                        .next()
970                        .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
971                        .and_then(|r| r)
972                        .map_err(|err| {
973                            format!(
974                                "failed to get dynamic node config within given dataflow: {err}"
975                            )
976                        }),
977                    0 => Err("no node with ID `{node_id}`".to_string()),
978                };
979
980                let reply = DaemonReply::NodeConfig {
981                    result: node_config,
982                };
983                let _ = reply_tx.send(Some(reply)).map_err(|_| {
984                    error!("could not send node info reply from daemon to coordinator")
985                });
986                Ok(())
987            }
988        }
989    }
990
991    async fn handle_node_event(
992        &mut self,
993        event: DaemonNodeEvent,
994        dataflow_id: DataflowId,
995        node_id: NodeId,
996    ) -> eyre::Result<()> {
997        match event {
998            DaemonNodeEvent::Subscribe {
999                event_sender,
1000                reply_sender,
1001            } => {
1002                let mut logger = self.logger.for_dataflow(dataflow_id);
1003                logger
1004                    .log(
1005                        LogLevel::Info,
1006                        Some(node_id.clone()),
1007                        Some("daemon".into()),
1008                        "node is ready",
1009                    )
1010                    .await;
1011
1012                let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1013                    format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1014                });
1015
1016                match dataflow {
1017                    Err(err) => {
1018                        let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1019                    }
1020                    Ok(dataflow) => {
1021                        Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1022
1023                        let status = dataflow
1024                            .pending_nodes
1025                            .handle_node_subscription(
1026                                node_id.clone(),
1027                                reply_sender,
1028                                &mut self.coordinator_connection,
1029                                &self.clock,
1030                                &mut dataflow.cascading_error_causes,
1031                                &mut logger,
1032                            )
1033                            .await?;
1034                        match status {
1035                            DataflowStatus::AllNodesReady => {
1036                                logger
1037                                    .log(
1038                                        LogLevel::Info,
1039                                        None,
1040                                        Some("daemon".into()),
1041                                        "all nodes are ready, starting dataflow",
1042                                    )
1043                                    .await;
1044                                dataflow.start(&self.events_tx, &self.clock).await?;
1045                            }
1046                            DataflowStatus::Pending => {}
1047                        }
1048                    }
1049                }
1050            }
1051            DaemonNodeEvent::SubscribeDrop {
1052                event_sender,
1053                reply_sender,
1054            } => {
1055                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1056                    format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1057                });
1058                let result = match dataflow {
1059                    Ok(dataflow) => {
1060                        dataflow.drop_channels.insert(node_id, event_sender);
1061                        Ok(())
1062                    }
1063                    Err(err) => Err(err.to_string()),
1064                };
1065                let _ = reply_sender.send(DaemonReply::Result(result));
1066            }
1067            DaemonNodeEvent::CloseOutputs {
1068                outputs,
1069                reply_sender,
1070            } => {
1071                // notify downstream nodes
1072                let inner = async {
1073                    self.send_output_closed_events(dataflow_id, node_id, outputs)
1074                        .await
1075                };
1076
1077                let reply = inner.await.map_err(|err| format!("{err:?}"));
1078                let _ = reply_sender.send(DaemonReply::Result(reply));
1079            }
1080            DaemonNodeEvent::OutputsDone { reply_sender } => {
1081                let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1082
1083                let _ = reply_sender.send(DaemonReply::Result(
1084                    result.map_err(|err| format!("{err:?}")),
1085                ));
1086            }
1087            DaemonNodeEvent::SendOut {
1088                output_id,
1089                metadata,
1090                data,
1091            } => self
1092                .send_out(dataflow_id, node_id, output_id, metadata, data)
1093                .await
1094                .context("failed to send out")?,
1095            DaemonNodeEvent::ReportDrop { tokens } => {
1096                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1097                    format!(
1098                        "failed to get handle drop tokens: \
1099                        no running dataflow with ID `{dataflow_id}`"
1100                    )
1101                });
1102
1103                match dataflow {
1104                    Ok(dataflow) => {
1105                        for token in tokens {
1106                            match dataflow.pending_drop_tokens.get_mut(&token) {
1107                                Some(info) => {
1108                                    if info.pending_nodes.remove(&node_id) {
1109                                        dataflow.check_drop_token(token, &self.clock).await?;
1110                                    } else {
1111                                        tracing::warn!(
1112                                            "node `{node_id}` is not pending for drop token `{token:?}`"
1113                                        );
1114                                    }
1115                                }
1116                                None => tracing::warn!("unknown drop token `{token:?}`"),
1117                            }
1118                        }
1119                    }
1120                    Err(err) => tracing::warn!("{err:?}"),
1121                }
1122            }
1123            DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1124                let inner = async {
1125                    let dataflow = self
1126                        .running
1127                        .get_mut(&dataflow_id)
1128                        .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1129                    dataflow.subscribe_channels.remove(&node_id);
1130                    Result::<_, eyre::Error>::Ok(())
1131                };
1132
1133                let reply = inner.await.map_err(|err| format!("{err:?}"));
1134                let _ = reply_sender.send(DaemonReply::Result(reply));
1135            }
1136        }
1137        Ok(())
1138    }
1139
1140    async fn send_reload(
1141        &mut self,
1142        dataflow_id: Uuid,
1143        node_id: NodeId,
1144        operator_id: Option<OperatorId>,
1145    ) -> Result<(), eyre::ErrReport> {
1146        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1147            format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1148        })?;
1149        if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1150            match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1151                Ok(()) => {}
1152                Err(_) => {
1153                    dataflow.subscribe_channels.remove(&node_id);
1154                }
1155            }
1156        }
1157        Ok(())
1158    }
1159
1160    async fn send_out(
1161        &mut self,
1162        dataflow_id: Uuid,
1163        node_id: NodeId,
1164        output_id: DataId,
1165        metadata: dora_message::metadata::Metadata,
1166        data: Option<DataMessage>,
1167    ) -> Result<(), eyre::ErrReport> {
1168        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1169            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1170        })?;
1171        let data_bytes = send_output_to_local_receivers(
1172            node_id.clone(),
1173            output_id.clone(),
1174            dataflow,
1175            &metadata,
1176            data,
1177            &self.clock,
1178        )
1179        .await?;
1180
1181        let output_id = OutputId(node_id, output_id);
1182        let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1183            || dataflow.publish_all_messages_to_zenoh;
1184        if remote_receivers {
1185            let event = InterDaemonEvent::Output {
1186                dataflow_id,
1187                node_id: output_id.0.clone(),
1188                output_id: output_id.1.clone(),
1189                metadata,
1190                data: data_bytes,
1191            };
1192            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1193                .await?;
1194        }
1195
1196        Ok(())
1197    }
1198
1199    async fn send_to_remote_receivers(
1200        &mut self,
1201        dataflow_id: Uuid,
1202        output_id: &OutputId,
1203        event: InterDaemonEvent,
1204    ) -> Result<(), eyre::Error> {
1205        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1206            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1207        })?;
1208
1209        // publish via zenoh
1210        let publisher = match dataflow.publishers.get(output_id) {
1211            Some(publisher) => publisher,
1212            None => {
1213                let publish_topic = dataflow.output_publish_topic(output_id);
1214                tracing::debug!("declaring publisher on {publish_topic}");
1215                let publisher = self
1216                    .zenoh_session
1217                    .declare_publisher(publish_topic)
1218                    .await
1219                    .map_err(|e| eyre!(e))
1220                    .context("failed to create zenoh publisher")?;
1221                dataflow.publishers.insert(output_id.clone(), publisher);
1222                dataflow.publishers.get(output_id).unwrap()
1223            }
1224        };
1225
1226        let serialized_event = Timestamped {
1227            inner: event,
1228            timestamp: self.clock.new_timestamp(),
1229        }
1230        .serialize();
1231        publisher
1232            .put(serialized_event)
1233            .await
1234            .map_err(|e| eyre!(e))
1235            .context("zenoh put failed")?;
1236        Ok(())
1237    }
1238
1239    async fn send_output_closed_events(
1240        &mut self,
1241        dataflow_id: DataflowId,
1242        node_id: NodeId,
1243        outputs: Vec<DataId>,
1244    ) -> eyre::Result<()> {
1245        let dataflow = self
1246            .running
1247            .get_mut(&dataflow_id)
1248            .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1249        let local_node_inputs: BTreeSet<_> = dataflow
1250            .mappings
1251            .iter()
1252            .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1253            .flat_map(|(_, v)| v)
1254            .cloned()
1255            .collect();
1256        for (receiver_id, input_id) in &local_node_inputs {
1257            close_input(dataflow, receiver_id, input_id, &self.clock);
1258        }
1259
1260        let mut closed = Vec::new();
1261        for output_id in &dataflow.open_external_mappings {
1262            if output_id.0 == node_id && outputs.contains(&output_id.1) {
1263                closed.push(output_id.clone());
1264            }
1265        }
1266
1267        for output_id in closed {
1268            let event = InterDaemonEvent::OutputClosed {
1269                dataflow_id,
1270                node_id: output_id.0.clone(),
1271                output_id: output_id.1.clone(),
1272            };
1273            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1274                .await?;
1275        }
1276
1277        Ok(())
1278    }
1279
1280    async fn subscribe(
1281        dataflow: &mut RunningDataflow,
1282        node_id: NodeId,
1283        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1284        clock: &HLC,
1285    ) {
1286        // some inputs might have been closed already -> report those events
1287        let closed_inputs = dataflow
1288            .mappings
1289            .values()
1290            .flatten()
1291            .filter(|(node, _)| node == &node_id)
1292            .map(|(_, input)| input)
1293            .filter(|input| {
1294                dataflow
1295                    .open_inputs
1296                    .get(&node_id)
1297                    .map(|open_inputs| !open_inputs.contains(*input))
1298                    .unwrap_or(true)
1299            });
1300        for input_id in closed_inputs {
1301            let _ = send_with_timestamp(
1302                &event_sender,
1303                NodeEvent::InputClosed {
1304                    id: input_id.clone(),
1305                },
1306                clock,
1307            );
1308        }
1309        if dataflow.open_inputs(&node_id).is_empty() {
1310            let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1311        }
1312
1313        // if a stop event was already sent for the dataflow, send it to
1314        // the newly connected node too
1315        if dataflow.stop_sent {
1316            let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1317        }
1318
1319        dataflow.subscribe_channels.insert(node_id, event_sender);
1320    }
1321
1322    #[tracing::instrument(skip(self), level = "trace")]
1323    async fn handle_outputs_done(
1324        &mut self,
1325        dataflow_id: DataflowId,
1326        node_id: &NodeId,
1327    ) -> eyre::Result<()> {
1328        let dataflow = self
1329            .running
1330            .get_mut(&dataflow_id)
1331            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1332
1333        let outputs = dataflow
1334            .mappings
1335            .keys()
1336            .filter(|m| &m.0 == node_id)
1337            .map(|m| &m.1)
1338            .cloned()
1339            .collect();
1340        self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1341            .await?;
1342
1343        let dataflow = self
1344            .running
1345            .get_mut(&dataflow_id)
1346            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1347        dataflow.drop_channels.remove(node_id);
1348        Ok(())
1349    }
1350
1351    async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
1352        let mut logger = self.logger.for_dataflow(dataflow_id);
1353        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1354            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1355        })?;
1356
1357        dataflow
1358            .pending_nodes
1359            .handle_node_stop(
1360                node_id,
1361                &mut self.coordinator_connection,
1362                &self.clock,
1363                &mut dataflow.cascading_error_causes,
1364                &mut logger,
1365            )
1366            .await?;
1367
1368        self.handle_outputs_done(dataflow_id, node_id).await?;
1369
1370        let mut logger = self.logger.for_dataflow(dataflow_id);
1371        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1372            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1373        })?;
1374        if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1375            pid.mark_as_stopped()
1376        }
1377        if dataflow
1378            .running_nodes
1379            .iter()
1380            .all(|(_id, n)| n.node_config.dynamic)
1381        {
1382            let result = DataflowDaemonResult {
1383                timestamp: self.clock.new_timestamp(),
1384                node_results: self
1385                    .dataflow_node_results
1386                    .get(&dataflow.id)
1387                    .context("failed to get dataflow node results")?
1388                    .clone(),
1389            };
1390
1391            logger
1392                .log(
1393                    LogLevel::Info,
1394                    None,
1395                    Some("daemon".into()),
1396                    format!("dataflow finished on machine `{}`", self.daemon_id),
1397                )
1398                .await;
1399            if let Some(connection) = &mut self.coordinator_connection {
1400                let msg = serde_json::to_vec(&Timestamped {
1401                    inner: CoordinatorRequest::Event {
1402                        daemon_id: self.daemon_id.clone(),
1403                        event: DaemonEvent::AllNodesFinished {
1404                            dataflow_id,
1405                            result,
1406                        },
1407                    },
1408                    timestamp: self.clock.new_timestamp(),
1409                })?;
1410                socket_stream_send(connection, &msg)
1411                    .await
1412                    .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1413            }
1414            self.running.remove(&dataflow_id);
1415        }
1416
1417        Ok(())
1418    }
1419
1420    async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<RunStatus> {
1421        match event {
1422            DoraEvent::Timer {
1423                dataflow_id,
1424                interval,
1425                metadata,
1426            } => {
1427                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1428                    tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1429                    return Ok(RunStatus::Continue);
1430                };
1431
1432                let Some(subscribers) = dataflow.timers.get(&interval) else {
1433                    return Ok(RunStatus::Continue);
1434                };
1435
1436                let mut closed = Vec::new();
1437                for (receiver_id, input_id) in subscribers {
1438                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1439                        continue;
1440                    };
1441
1442                    let send_result = send_with_timestamp(
1443                        channel,
1444                        NodeEvent::Input {
1445                            id: input_id.clone(),
1446                            metadata: metadata.clone(),
1447                            data: None,
1448                        },
1449                        &self.clock,
1450                    );
1451                    match send_result {
1452                        Ok(()) => {}
1453                        Err(_) => {
1454                            closed.push(receiver_id);
1455                        }
1456                    }
1457                }
1458                for id in closed {
1459                    dataflow.subscribe_channels.remove(id);
1460                }
1461            }
1462            DoraEvent::Logs {
1463                dataflow_id,
1464                output_id,
1465                message,
1466                metadata,
1467            } => {
1468                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1469                    tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
1470                    return Ok(RunStatus::Continue);
1471                };
1472
1473                let Some(subscribers) = dataflow.mappings.get(&output_id) else {
1474                    tracing::warn!(
1475                        "No subscribers found for {:?} in {:?}",
1476                        output_id,
1477                        dataflow.mappings
1478                    );
1479                    return Ok(RunStatus::Continue);
1480                };
1481
1482                let mut closed = Vec::new();
1483                for (receiver_id, input_id) in subscribers {
1484                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1485                        tracing::warn!("No subscriber channel found for {:?}", output_id);
1486                        continue;
1487                    };
1488
1489                    let send_result = send_with_timestamp(
1490                        channel,
1491                        NodeEvent::Input {
1492                            id: input_id.clone(),
1493                            metadata: metadata.clone(),
1494                            data: Some(message.clone()),
1495                        },
1496                        &self.clock,
1497                    );
1498                    match send_result {
1499                        Ok(()) => {}
1500                        Err(_) => {
1501                            closed.push(receiver_id);
1502                        }
1503                    }
1504                }
1505                for id in closed {
1506                    dataflow.subscribe_channels.remove(id);
1507                }
1508            }
1509            DoraEvent::SpawnedNodeResult {
1510                dataflow_id,
1511                node_id,
1512                exit_status,
1513            } => {
1514                let mut logger = self
1515                    .logger
1516                    .for_dataflow(dataflow_id)
1517                    .for_node(node_id.clone());
1518                logger
1519                    .log(
1520                        LogLevel::Debug,
1521                        Some("daemon".into()),
1522                        format!("handling node stop with exit status {exit_status:?}"),
1523                    )
1524                    .await;
1525
1526                let node_result = match exit_status {
1527                    NodeExitStatus::Success => Ok(()),
1528                    exit_status => {
1529                        let dataflow = self.running.get(&dataflow_id);
1530                        let caused_by_node = dataflow
1531                            .and_then(|dataflow| {
1532                                dataflow.cascading_error_causes.error_caused_by(&node_id)
1533                            })
1534                            .cloned();
1535                        let grace_duration_kill = dataflow
1536                            .map(|d| d.grace_duration_kills.contains(&node_id))
1537                            .unwrap_or_default();
1538
1539                        let cause = match caused_by_node {
1540                            Some(caused_by_node) => {
1541                                logger
1542                                    .log(
1543                                        LogLevel::Info,
1544                                        Some("daemon".into()),
1545                                        format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
1546                                    )
1547                                    .await;
1548
1549                                NodeErrorCause::Cascading { caused_by_node }
1550                            }
1551                            None if grace_duration_kill => NodeErrorCause::GraceDuration,
1552                            None => {
1553                                let cause = dataflow
1554                                    .and_then(|d| d.node_stderr_most_recent.get(&node_id))
1555                                    .map(|queue| {
1556                                        let mut s = if queue.is_full() {
1557                                            "[...]".into()
1558                                        } else {
1559                                            String::new()
1560                                        };
1561                                        while let Some(line) = queue.pop() {
1562                                            s += &line;
1563                                        }
1564                                        s
1565                                    })
1566                                    .unwrap_or_default();
1567
1568                                NodeErrorCause::Other { stderr: cause }
1569                            }
1570                        };
1571                        Err(NodeError {
1572                            timestamp: self.clock.new_timestamp(),
1573                            cause,
1574                            exit_status,
1575                        })
1576                    }
1577                };
1578
1579                logger
1580                    .log(
1581                        if node_result.is_ok() {
1582                            LogLevel::Info
1583                        } else {
1584                            LogLevel::Error
1585                        },
1586                        Some("daemon".into()),
1587                        match &node_result {
1588                            Ok(()) => format!("{node_id} finished successfully"),
1589                            Err(err) => format!("{err}"),
1590                        },
1591                    )
1592                    .await;
1593
1594                self.dataflow_node_results
1595                    .entry(dataflow_id)
1596                    .or_default()
1597                    .insert(node_id.clone(), node_result);
1598
1599                self.handle_node_stop(dataflow_id, &node_id).await?;
1600
1601                if let Some(exit_when_done) = &mut self.exit_when_done {
1602                    exit_when_done.remove(&(dataflow_id, node_id));
1603                    if exit_when_done.is_empty() {
1604                        tracing::info!(
1605                            "exiting daemon because all required dataflows are finished"
1606                        );
1607                        return Ok(RunStatus::Exit);
1608                    }
1609                }
1610                if self.exit_when_all_finished && self.running.is_empty() {
1611                    return Ok(RunStatus::Exit);
1612                }
1613            }
1614        }
1615        Ok(RunStatus::Continue)
1616    }
1617}
1618
1619async fn set_up_event_stream(
1620    coordinator_addr: SocketAddr,
1621    machine_id: &Option<String>,
1622    clock: &Arc<HLC>,
1623    remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
1624    // used for dynamic nodes
1625    local_listen_port: u16,
1626) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
1627    let clock_cloned = clock.clone();
1628    let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
1629        Ok(e) => Timestamped {
1630            inner: Event::Daemon(e.inner),
1631            timestamp: e.timestamp,
1632        },
1633        Err(err) => Timestamped {
1634            inner: Event::DaemonError(err),
1635            timestamp: clock_cloned.new_timestamp(),
1636        },
1637    });
1638    let (daemon_id, coordinator_events) =
1639        coordinator::register(coordinator_addr, machine_id.clone(), clock)
1640            .await
1641            .wrap_err("failed to connect to dora-coordinator")?;
1642    let coordinator_events = coordinator_events.map(
1643        |Timestamped {
1644             inner: event,
1645             timestamp,
1646         }| Timestamped {
1647            inner: Event::Coordinator(event),
1648            timestamp,
1649        },
1650    );
1651    let (events_tx, events_rx) = flume::bounded(10);
1652    let _listen_port =
1653        local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
1654            .await?;
1655    let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
1656        inner: Event::DynamicNode(e.inner),
1657        timestamp: e.timestamp,
1658    });
1659    let incoming = (
1660        coordinator_events,
1661        remote_daemon_events,
1662        dynamic_node_events,
1663    )
1664        .merge();
1665    Ok((daemon_id, incoming))
1666}
1667
1668async fn send_output_to_local_receivers(
1669    node_id: NodeId,
1670    output_id: DataId,
1671    dataflow: &mut RunningDataflow,
1672    metadata: &metadata::Metadata,
1673    data: Option<DataMessage>,
1674    clock: &HLC,
1675) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
1676    let timestamp = metadata.timestamp();
1677    let empty_set = BTreeSet::new();
1678    let output_id = OutputId(node_id, output_id);
1679    let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
1680    let OutputId(node_id, _) = output_id;
1681    let mut closed = Vec::new();
1682    for (receiver_id, input_id) in local_receivers {
1683        if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1684            let item = NodeEvent::Input {
1685                id: input_id.clone(),
1686                metadata: metadata.clone(),
1687                data: data.clone(),
1688            };
1689            match channel.send(Timestamped {
1690                inner: item,
1691                timestamp,
1692            }) {
1693                Ok(()) => {
1694                    if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
1695                        dataflow
1696                            .pending_drop_tokens
1697                            .entry(token)
1698                            .or_insert_with(|| DropTokenInformation {
1699                                owner: node_id.clone(),
1700                                pending_nodes: Default::default(),
1701                            })
1702                            .pending_nodes
1703                            .insert(receiver_id.clone());
1704                    }
1705                }
1706                Err(_) => {
1707                    closed.push(receiver_id);
1708                }
1709            }
1710        }
1711    }
1712    for id in closed {
1713        dataflow.subscribe_channels.remove(id);
1714    }
1715    let (data_bytes, drop_token) = match data {
1716        None => (None, None),
1717        Some(DataMessage::SharedMemory {
1718            shared_memory_id,
1719            len,
1720            drop_token,
1721        }) => {
1722            let memory = ShmemConf::new()
1723                .os_id(shared_memory_id)
1724                .open()
1725                .wrap_err("failed to map shared memory output")?;
1726            let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
1727            (data, Some(drop_token))
1728        }
1729        Some(DataMessage::Vec(v)) => (Some(v), None),
1730    };
1731    if let Some(token) = drop_token {
1732        // insert token into `pending_drop_tokens` even if there are no local subscribers
1733        dataflow
1734            .pending_drop_tokens
1735            .entry(token)
1736            .or_insert_with(|| DropTokenInformation {
1737                owner: node_id.clone(),
1738                pending_nodes: Default::default(),
1739            });
1740        // check if all local subscribers are finished with the token
1741        dataflow.check_drop_token(token, clock).await?;
1742    }
1743    Ok(data_bytes)
1744}
1745
1746fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
1747    match &node.kind {
1748        CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
1749        CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
1750    }
1751}
1752
1753fn close_input(
1754    dataflow: &mut RunningDataflow,
1755    receiver_id: &NodeId,
1756    input_id: &DataId,
1757    clock: &HLC,
1758) {
1759    if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
1760        if !open_inputs.remove(input_id) {
1761            return;
1762        }
1763    }
1764    if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1765        let _ = send_with_timestamp(
1766            channel,
1767            NodeEvent::InputClosed {
1768                id: input_id.clone(),
1769            },
1770            clock,
1771        );
1772
1773        if dataflow.open_inputs(receiver_id).is_empty() {
1774            let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
1775        }
1776    }
1777}
1778
1779#[derive(Debug)]
1780struct RunningNode {
1781    pid: Option<ProcessId>,
1782    node_config: NodeConfig,
1783}
1784
1785#[derive(Debug)]
1786struct ProcessId(Option<u32>);
1787
1788impl ProcessId {
1789    pub fn new(process_id: u32) -> Self {
1790        Self(Some(process_id))
1791    }
1792
1793    pub fn mark_as_stopped(&mut self) {
1794        self.0 = None;
1795    }
1796
1797    pub fn kill(&mut self) -> bool {
1798        if let Some(pid) = self.0 {
1799            let mut system = sysinfo::System::new();
1800            system.refresh_processes();
1801
1802            if let Some(process) = system.process(Pid::from(pid as usize)) {
1803                process.kill();
1804                self.mark_as_stopped();
1805                return true;
1806            }
1807        }
1808
1809        false
1810    }
1811}
1812
1813impl Drop for ProcessId {
1814    fn drop(&mut self) {
1815        // kill the process if it's still running
1816        if let Some(pid) = self.0 {
1817            if self.kill() {
1818                warn!("process {pid} was killed on drop because it was still running")
1819            }
1820        }
1821    }
1822}
1823
1824pub struct RunningDataflow {
1825    id: Uuid,
1826    /// Local nodes that are not started yet
1827    pending_nodes: PendingNodes,
1828
1829    subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
1830    drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
1831    mappings: HashMap<OutputId, BTreeSet<InputId>>,
1832    timers: BTreeMap<Duration, BTreeSet<InputId>>,
1833    open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
1834    running_nodes: BTreeMap<NodeId, RunningNode>,
1835
1836    /// List of all dynamic node IDs.
1837    ///
1838    /// We want to treat dynamic nodes differently in some cases, so we need
1839    /// to know which nodes are dynamic.
1840    dynamic_nodes: BTreeSet<NodeId>,
1841
1842    open_external_mappings: BTreeSet<OutputId>,
1843
1844    pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
1845
1846    /// Keep handles to all timer tasks of this dataflow to cancel them on drop.
1847    _timer_handles: Vec<futures::future::RemoteHandle<()>>,
1848    stop_sent: bool,
1849
1850    /// Used in `open_inputs`.
1851    ///
1852    /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
1853    empty_set: BTreeSet<DataId>,
1854
1855    /// Contains the node that caused the error for nodes that experienced a cascading error.
1856    cascading_error_causes: CascadingErrorCauses,
1857    grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
1858
1859    node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
1860
1861    publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
1862
1863    finished_tx: broadcast::Sender<()>,
1864
1865    publish_all_messages_to_zenoh: bool,
1866}
1867
1868impl RunningDataflow {
1869    fn new(
1870        dataflow_id: Uuid,
1871        daemon_id: DaemonId,
1872        dataflow_descriptor: &Descriptor,
1873    ) -> RunningDataflow {
1874        let (finished_tx, _) = broadcast::channel(1);
1875        Self {
1876            id: dataflow_id,
1877            pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
1878            subscribe_channels: HashMap::new(),
1879            drop_channels: HashMap::new(),
1880            mappings: HashMap::new(),
1881            timers: BTreeMap::new(),
1882            open_inputs: BTreeMap::new(),
1883            running_nodes: BTreeMap::new(),
1884            dynamic_nodes: BTreeSet::new(),
1885            open_external_mappings: Default::default(),
1886            pending_drop_tokens: HashMap::new(),
1887            _timer_handles: Vec::new(),
1888            stop_sent: false,
1889            empty_set: BTreeSet::new(),
1890            cascading_error_causes: Default::default(),
1891            grace_duration_kills: Default::default(),
1892            node_stderr_most_recent: BTreeMap::new(),
1893            publishers: Default::default(),
1894            finished_tx,
1895            publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
1896        }
1897    }
1898
1899    async fn start(
1900        &mut self,
1901        events_tx: &mpsc::Sender<Timestamped<Event>>,
1902        clock: &Arc<HLC>,
1903    ) -> eyre::Result<()> {
1904        for interval in self.timers.keys().copied() {
1905            let events_tx = events_tx.clone();
1906            let dataflow_id = self.id;
1907            let clock = clock.clone();
1908            let task = async move {
1909                let mut interval_stream = tokio::time::interval(interval);
1910                let hlc = HLC::default();
1911                loop {
1912                    interval_stream.tick().await;
1913
1914                    let span = tracing::span!(tracing::Level::TRACE, "tick");
1915                    let _ = span.enter();
1916
1917                    let mut parameters = BTreeMap::new();
1918                    parameters.insert(
1919                        "open_telemetry_context".to_string(),
1920                        #[cfg(feature = "telemetry")]
1921                        Parameter::String(serialize_context(&span.context())),
1922                        #[cfg(not(feature = "telemetry"))]
1923                        Parameter::String("".into()),
1924                    );
1925
1926                    let metadata = metadata::Metadata::from_parameters(
1927                        hlc.new_timestamp(),
1928                        empty_type_info(),
1929                        parameters,
1930                    );
1931
1932                    let event = Timestamped {
1933                        inner: DoraEvent::Timer {
1934                            dataflow_id,
1935                            interval,
1936                            metadata,
1937                        }
1938                        .into(),
1939                        timestamp: clock.new_timestamp(),
1940                    };
1941                    if events_tx.send(event).await.is_err() {
1942                        break;
1943                    }
1944                }
1945            };
1946            let (task, handle) = task.remote_handle();
1947            tokio::spawn(task);
1948            self._timer_handles.push(handle);
1949        }
1950
1951        Ok(())
1952    }
1953
1954    async fn stop_all(
1955        &mut self,
1956        coordinator_connection: &mut Option<TcpStream>,
1957        clock: &HLC,
1958        grace_duration: Option<Duration>,
1959        logger: &mut DataflowLogger<'_>,
1960    ) -> eyre::Result<()> {
1961        self.pending_nodes
1962            .handle_dataflow_stop(
1963                coordinator_connection,
1964                clock,
1965                &mut self.cascading_error_causes,
1966                &self.dynamic_nodes,
1967                logger,
1968            )
1969            .await?;
1970
1971        for (_node_id, channel) in self.subscribe_channels.drain() {
1972            let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
1973        }
1974
1975        let running_processes: Vec<_> = self
1976            .running_nodes
1977            .iter_mut()
1978            .map(|(id, n)| (id.clone(), n.pid.take()))
1979            .collect();
1980        let grace_duration_kills = self.grace_duration_kills.clone();
1981        tokio::spawn(async move {
1982            let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
1983            tokio::time::sleep(duration).await;
1984
1985            for (node, pid) in running_processes {
1986                if let Some(mut pid) = pid {
1987                    if pid.kill() {
1988                        grace_duration_kills.insert(node.clone());
1989                        warn!(
1990                            "{node} was killed due to not stopping within the {:#?} grace period",
1991                            duration
1992                        )
1993                    }
1994                }
1995            }
1996        });
1997        self.stop_sent = true;
1998        Ok(())
1999    }
2000
2001    fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2002        self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2003    }
2004
2005    async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2006        match self.pending_drop_tokens.entry(token) {
2007            std::collections::hash_map::Entry::Occupied(entry) => {
2008                if entry.get().pending_nodes.is_empty() {
2009                    let (drop_token, info) = entry.remove_entry();
2010                    let result = match self.drop_channels.get_mut(&info.owner) {
2011                        Some(channel) => send_with_timestamp(
2012                            channel,
2013                            NodeDropEvent::OutputDropped { drop_token },
2014                            clock,
2015                        )
2016                        .wrap_err("send failed"),
2017                        None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2018                    };
2019                    if let Err(err) = result.wrap_err_with(|| {
2020                        format!(
2021                            "failed to report drop token `{drop_token:?}` to owner `{}`",
2022                            &info.owner
2023                        )
2024                    }) {
2025                        tracing::warn!("{err:?}");
2026                    }
2027                }
2028            }
2029            std::collections::hash_map::Entry::Vacant(_) => {
2030                tracing::warn!("check_drop_token called with already closed token")
2031            }
2032        }
2033
2034        Ok(())
2035    }
2036
2037    fn output_publish_topic(&self, output_id: &OutputId) -> String {
2038        let network_id = "default";
2039        let dataflow_id = self.id;
2040        let OutputId(node_id, output_id) = output_id;
2041        format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
2042    }
2043}
2044
2045fn empty_type_info() -> ArrowTypeInfo {
2046    ArrowTypeInfo {
2047        data_type: DataType::Null,
2048        len: 0,
2049        null_count: 0,
2050        validity: None,
2051        offset: 0,
2052        buffer_offsets: Vec::new(),
2053        child_data: Vec::new(),
2054    }
2055}
2056
2057#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2058pub struct OutputId(NodeId, DataId);
2059type InputId = (NodeId, DataId);
2060
2061struct DropTokenInformation {
2062    /// The node that created the associated drop token.
2063    owner: NodeId,
2064    /// Contains the set of pending nodes that still have access to the input
2065    /// associated with a drop token.
2066    pending_nodes: BTreeSet<NodeId>,
2067}
2068
2069#[derive(Debug)]
2070pub enum Event {
2071    Node {
2072        dataflow_id: DataflowId,
2073        node_id: NodeId,
2074        event: DaemonNodeEvent,
2075    },
2076    Coordinator(CoordinatorEvent),
2077    Daemon(InterDaemonEvent),
2078    Dora(DoraEvent),
2079    DynamicNode(DynamicNodeEventWrapper),
2080    HeartbeatInterval,
2081    CtrlC,
2082    SecondCtrlC,
2083    DaemonError(eyre::Report),
2084}
2085
2086impl From<DoraEvent> for Event {
2087    fn from(event: DoraEvent) -> Self {
2088        Event::Dora(event)
2089    }
2090}
2091
2092#[derive(Debug)]
2093pub enum DaemonNodeEvent {
2094    OutputsDone {
2095        reply_sender: oneshot::Sender<DaemonReply>,
2096    },
2097    Subscribe {
2098        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2099        reply_sender: oneshot::Sender<DaemonReply>,
2100    },
2101    SubscribeDrop {
2102        event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2103        reply_sender: oneshot::Sender<DaemonReply>,
2104    },
2105    CloseOutputs {
2106        outputs: Vec<dora_core::config::DataId>,
2107        reply_sender: oneshot::Sender<DaemonReply>,
2108    },
2109    SendOut {
2110        output_id: DataId,
2111        metadata: metadata::Metadata,
2112        data: Option<DataMessage>,
2113    },
2114    ReportDrop {
2115        tokens: Vec<DropToken>,
2116    },
2117    EventStreamDropped {
2118        reply_sender: oneshot::Sender<DaemonReply>,
2119    },
2120}
2121
2122#[derive(Debug)]
2123pub enum DoraEvent {
2124    Timer {
2125        dataflow_id: DataflowId,
2126        interval: Duration,
2127        metadata: metadata::Metadata,
2128    },
2129    Logs {
2130        dataflow_id: DataflowId,
2131        output_id: OutputId,
2132        message: DataMessage,
2133        metadata: metadata::Metadata,
2134    },
2135    SpawnedNodeResult {
2136        dataflow_id: DataflowId,
2137        node_id: NodeId,
2138        exit_status: NodeExitStatus,
2139    },
2140}
2141
2142#[must_use]
2143enum RunStatus {
2144    Continue,
2145    Exit,
2146}
2147
2148fn send_with_timestamp<T>(
2149    sender: &UnboundedSender<Timestamped<T>>,
2150    event: T,
2151    clock: &HLC,
2152) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2153    sender.send(Timestamped {
2154        inner: event,
2155        timestamp: clock.new_timestamp(),
2156    })
2157}
2158
2159fn set_up_ctrlc_handler(
2160    clock: Arc<HLC>,
2161) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2162    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2163
2164    let mut ctrlc_sent = 0;
2165    ctrlc::set_handler(move || {
2166        let event = match ctrlc_sent {
2167            0 => Event::CtrlC,
2168            1 => Event::SecondCtrlC,
2169            _ => {
2170                tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2171                std::process::abort();
2172            }
2173        };
2174        if ctrlc_tx
2175            .blocking_send(Timestamped {
2176                inner: event,
2177                timestamp: clock.new_timestamp(),
2178            })
2179            .is_err()
2180        {
2181            tracing::error!("failed to report ctrl-c event to dora-coordinator");
2182        }
2183
2184        ctrlc_sent += 1;
2185    })
2186    .wrap_err("failed to set ctrl-c handler")?;
2187
2188    Ok(ctrlc_rx)
2189}
2190
2191#[derive(Debug, Default, Clone, PartialEq, Eq)]
2192pub struct CascadingErrorCauses {
2193    caused_by: BTreeMap<NodeId, NodeId>,
2194}
2195
2196impl CascadingErrorCauses {
2197    pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2198        self.caused_by.contains_key(node)
2199    }
2200
2201    /// Return the ID of the node that caused a cascading error for the given node, if any.
2202    pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2203        self.caused_by.get(node)
2204    }
2205
2206    pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2207        self.caused_by.entry(affected_node).or_insert(causing_node);
2208    }
2209}
2210
2211fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2212    n.operators
2213        .iter()
2214        .flat_map(|operator| {
2215            operator.config.inputs.iter().map(|(input_id, mapping)| {
2216                (
2217                    DataId::from(format!("{}/{input_id}", operator.id)),
2218                    mapping.clone(),
2219                )
2220            })
2221        })
2222        .collect()
2223}
2224
2225fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2226    n.operators
2227        .iter()
2228        .flat_map(|operator| {
2229            operator
2230                .config
2231                .outputs
2232                .iter()
2233                .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2234        })
2235        .collect()
2236}
2237
2238trait CoreNodeKindExt {
2239    fn run_config(&self) -> NodeRunConfig;
2240    fn dynamic(&self) -> bool;
2241}
2242
2243impl CoreNodeKindExt for CoreNodeKind {
2244    fn run_config(&self) -> NodeRunConfig {
2245        match self {
2246            CoreNodeKind::Runtime(n) => NodeRunConfig {
2247                inputs: runtime_node_inputs(n),
2248                outputs: runtime_node_outputs(n),
2249            },
2250            CoreNodeKind::Custom(n) => n.run_config.clone(),
2251        }
2252    }
2253
2254    fn dynamic(&self) -> bool {
2255        match self {
2256            CoreNodeKind::Runtime(_n) => false,
2257            CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
2258        }
2259    }
2260}