dora_daemon/
lib.rs

1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5    build::{self, BuildInfo, GitManager, PrevGitSource},
6    config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
7    descriptor::{
8        CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
9        read_as_descriptor,
10    },
11    topics::{
12        DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST, open_zenoh_session,
13        zenoh_output_publish_topic,
14    },
15    uhlc::{self, HLC},
16};
17use dora_message::{
18    BuildId, DataflowId, SessionId,
19    common::{
20        DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
21        NodeExitStatus,
22    },
23    coordinator_to_cli::DataflowResult,
24    coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
25    daemon_to_coordinator::{
26        CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
27    },
28    daemon_to_daemon::InterDaemonEvent,
29    daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
30    descriptor::{NodeSource, RestartPolicy},
31    metadata::{self, ArrowTypeInfo},
32    node_to_daemon::{DynamicNodeEvent, Timestamped},
33};
34use dora_node_api::{Parameter, arrow::datatypes::DataType};
35use eyre::{Context, ContextCompat, Result, bail, eyre};
36use futures::{FutureExt, TryFutureExt, future, stream};
37use futures_concurrency::stream::Merge;
38use local_listener::DynamicNodeEventWrapper;
39use log::{DaemonLogger, DataflowLogger, Logger};
40use pending::PendingNodes;
41use process_wrap::tokio::TokioChildWrapper;
42use shared_memory_server::ShmemConf;
43use socket_stream_utils::socket_stream_send;
44use spawn::Spawner;
45use std::{
46    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
47    env::current_dir,
48    future::Future,
49    io,
50    net::SocketAddr,
51    path::{Path, PathBuf},
52    pin::pin,
53    sync::{
54        Arc,
55        atomic::{self, AtomicBool, AtomicU32},
56    },
57    time::{Duration, Instant},
58};
59use tokio::{
60    fs::File,
61    io::{AsyncReadExt, AsyncSeekExt},
62    net::TcpStream,
63    sync::{
64        broadcast,
65        mpsc::{self, UnboundedSender},
66        oneshot::{self, Sender},
67    },
68};
69use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
70use tracing::{error, warn};
71use uuid::{NoContext, Timestamp, Uuid};
72
73pub use flume;
74pub use log::LogDestination;
75
76mod coordinator;
77mod extract_err_from_stderr;
78mod local_listener;
79mod log;
80mod node_communication;
81mod pending;
82mod socket_stream_utils;
83mod spawn;
84
85#[cfg(feature = "telemetry")]
86use dora_tracing::telemetry::serialize_context;
87#[cfg(feature = "telemetry")]
88use tracing_opentelemetry::OpenTelemetrySpanExt;
89
90use crate::{extract_err_from_stderr::extract_err_from_stderr, pending::DataflowStatus};
91
92const STDERR_LOG_LINES_MAX: usize = 500;
93
94pub struct Daemon {
95    running: HashMap<DataflowId, RunningDataflow>,
96    working_dir: HashMap<DataflowId, PathBuf>,
97
98    events_tx: mpsc::Sender<Timestamped<Event>>,
99
100    coordinator_connection: Option<TcpStream>,
101    last_coordinator_heartbeat: Instant,
102    daemon_id: DaemonId,
103
104    /// used for testing and examples
105    exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
106    /// set on ctrl-c
107    exit_when_all_finished: bool,
108    /// used to record results of local nodes
109    dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
110
111    clock: Arc<uhlc::HLC>,
112
113    zenoh_session: zenoh::Session,
114    remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
115
116    logger: DaemonLogger,
117
118    sessions: BTreeMap<SessionId, BuildId>,
119    builds: BTreeMap<BuildId, BuildInfo>,
120    git_manager: GitManager,
121    /// System instance for metrics collection (reused across calls)
122    metrics_system: sysinfo::System,
123}
124
125type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
126
127struct NodeBuildTask<F> {
128    node_id: NodeId,
129    dynamic_node: bool,
130    task: F,
131}
132
133impl Daemon {
134    pub async fn run(
135        coordinator_addr: SocketAddr,
136        machine_id: Option<String>,
137        local_listen_port: u16,
138    ) -> eyre::Result<()> {
139        let clock = Arc::new(HLC::default());
140
141        let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
142        let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
143        let (daemon_id, incoming_events) = {
144            let incoming_events = set_up_event_stream(
145                coordinator_addr,
146                &machine_id,
147                &clock,
148                remote_daemon_events_rx,
149                local_listen_port,
150            );
151
152            // finish early if ctrl-c is is pressed during event stream setup
153            let ctrl_c = pin!(ctrlc_events.recv());
154            match futures::future::select(ctrl_c, pin!(incoming_events)).await {
155                future::Either::Left((_ctrl_c, _)) => {
156                    tracing::info!("received ctrl-c signal -> stopping daemon");
157                    return Ok(());
158                }
159                future::Either::Right((events, _)) => events?,
160            }
161        };
162
163        let log_destination = {
164            // additional connection for logging
165            let stream = TcpStream::connect(coordinator_addr)
166                .await
167                .wrap_err("failed to connect log to dora-coordinator")?;
168            stream
169                .set_nodelay(true)
170                .wrap_err("failed to set TCP_NODELAY")?;
171            LogDestination::Coordinator {
172                coordinator_connection: stream,
173            }
174        };
175
176        Self::run_general(
177            (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
178            Some(coordinator_addr),
179            daemon_id,
180            None,
181            clock.clone(),
182            Some(remote_daemon_events_tx),
183            Default::default(),
184            log_destination,
185        )
186        .await
187        .map(|_| ())
188    }
189
190    pub async fn run_dataflow(
191        dataflow_path: &Path,
192        build_id: Option<BuildId>,
193        local_build: Option<BuildInfo>,
194        session_id: SessionId,
195        uv: bool,
196        log_destination: LogDestination,
197        write_events_to: Option<PathBuf>,
198    ) -> eyre::Result<DataflowResult> {
199        let working_dir = dataflow_path
200            .canonicalize()
201            .context("failed to canonicalize dataflow path")?
202            .parent()
203            .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
204            .to_owned();
205
206        let descriptor = read_as_descriptor(dataflow_path).await?;
207        if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
208            eyre::bail!(
209                "node {} has a `deploy` section, which is not supported in `dora run`\n\n
210                Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
211                instances and then use `dora start`.",
212                node.id
213            )
214        }
215
216        descriptor.check(&working_dir)?;
217        let nodes = descriptor.resolve_aliases_and_set_defaults()?;
218
219        let (events_tx, events_rx) = flume::bounded(10);
220        if nodes
221            .iter()
222            .find(|(_n, resolved_nodes)| resolved_nodes.kind.dynamic())
223            .is_some()
224        {
225            // Spawn local listener for dynamic nodes
226            let _listen_port = local_listener::spawn_listener_loop(
227                (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(),
228                events_tx,
229            )
230            .await?;
231        }
232        let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
233            inner: Event::DynamicNode(e.inner),
234            timestamp: e.timestamp,
235        });
236
237        let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
238        let spawn_command = SpawnDataflowNodes {
239            build_id,
240            session_id,
241            dataflow_id,
242            local_working_dir: Some(working_dir),
243            spawn_nodes: nodes.keys().cloned().collect(),
244            nodes,
245            dataflow_descriptor: descriptor,
246            uv,
247            write_events_to,
248        };
249
250        let clock = Arc::new(HLC::default());
251
252        let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
253
254        let exit_when_done = spawn_command
255            .nodes
256            .values()
257            .filter(|n| !n.kind.dynamic())
258            .map(|n| (spawn_command.dataflow_id, n.id.clone()))
259            .collect();
260        let (reply_tx, reply_rx) = oneshot::channel();
261        let timestamp = clock.new_timestamp();
262        let coordinator_events = stream::once(async move {
263            Timestamped {
264                inner: Event::Coordinator(CoordinatorEvent {
265                    event: DaemonCoordinatorEvent::Spawn(spawn_command),
266                    reply_tx,
267                }),
268                timestamp,
269            }
270        });
271        let events = (coordinator_events, ctrlc_events, dynamic_node_events).merge();
272        let run_result = Self::run_general(
273            Box::pin(events),
274            None,
275            DaemonId::new(None),
276            Some(exit_when_done),
277            clock.clone(),
278            None,
279            if let Some(local_build) = local_build {
280                let Some(build_id) = build_id else {
281                    bail!("no build_id, but local_build set")
282                };
283                let mut builds = BTreeMap::new();
284                builds.insert(build_id, local_build);
285                builds
286            } else {
287                Default::default()
288            },
289            log_destination,
290        );
291
292        let spawn_result = reply_rx
293            .map_err(|err| eyre!("failed to receive spawn result: {err}"))
294            .and_then(|r| async {
295                match r {
296                    Some(DaemonCoordinatorReply::TriggerSpawnResult(result)) => {
297                        result.map_err(|err| eyre!(err))
298                    }
299                    _ => Err(eyre!("unexpected spawn reply")),
300                }
301            });
302
303        let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
304
305        Ok(DataflowResult {
306            uuid: dataflow_id,
307            timestamp: clock.new_timestamp(),
308            node_results: dataflow_results
309                .remove(&dataflow_id)
310                .context("no node results for dataflow_id")?,
311        })
312    }
313
314    #[allow(clippy::too_many_arguments)]
315    async fn run_general(
316        external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
317        coordinator_addr: Option<SocketAddr>,
318        daemon_id: DaemonId,
319        exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
320        clock: Arc<HLC>,
321        remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
322        builds: BTreeMap<BuildId, BuildInfo>,
323        log_destination: LogDestination,
324    ) -> eyre::Result<DaemonRunResult> {
325        let coordinator_connection = match coordinator_addr {
326            Some(addr) => {
327                let stream = TcpStream::connect(addr)
328                    .await
329                    .wrap_err("failed to connect to dora-coordinator")?;
330                stream
331                    .set_nodelay(true)
332                    .wrap_err("failed to set TCP_NODELAY")?;
333                Some(stream)
334            }
335            None => None,
336        };
337
338        let zenoh_session = open_zenoh_session(coordinator_addr.map(|addr| addr.ip()))
339            .await
340            .wrap_err("failed to open zenoh session")?;
341        // Use a large channel capacity to prevent deadlock
342        let (dora_events_tx, dora_events_rx) = mpsc::channel(1000);
343        let daemon = Self {
344            logger: Logger {
345                destination: log_destination,
346                daemon_id: daemon_id.clone(),
347                clock: clock.clone(),
348            }
349            .for_daemon(daemon_id.clone()),
350            running: HashMap::new(),
351            working_dir: HashMap::new(),
352            events_tx: dora_events_tx,
353            coordinator_connection,
354            last_coordinator_heartbeat: Instant::now(),
355            daemon_id,
356            exit_when_done,
357            exit_when_all_finished: false,
358            dataflow_node_results: BTreeMap::new(),
359            clock,
360            zenoh_session,
361            remote_daemon_events_tx,
362            git_manager: Default::default(),
363            builds,
364            sessions: Default::default(),
365            metrics_system: sysinfo::System::new(),
366        };
367
368        let dora_events = ReceiverStream::new(dora_events_rx);
369        let watchdog_clock = daemon.clock.clone();
370        let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
371            Duration::from_secs(5),
372        ))
373        .map(|_| Timestamped {
374            inner: Event::HeartbeatInterval,
375            timestamp: watchdog_clock.new_timestamp(),
376        });
377
378        let metrics_clock = daemon.clock.clone();
379        let metrics_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
380            Duration::from_secs(2), // Collect metrics every 2 seconds
381        ))
382        .map(|_| Timestamped {
383            inner: Event::MetricsInterval,
384            timestamp: metrics_clock.new_timestamp(),
385        });
386
387        let events = (
388            external_events,
389            dora_events,
390            watchdog_interval,
391            metrics_interval,
392        )
393            .merge();
394        daemon.run_inner(events).await
395    }
396
397    #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
398    async fn run_inner(
399        mut self,
400        incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
401    ) -> eyre::Result<DaemonRunResult> {
402        let mut events = incoming_events;
403
404        while let Some(event) = events.next().await {
405            let Timestamped { inner, timestamp } = event;
406            if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
407                tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
408            }
409
410            // used below for checking the duration of event handling
411            let start = Instant::now();
412            let event_kind = inner.kind();
413
414            match inner {
415                Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
416                    let status = self.handle_coordinator_event(event, reply_tx).await?;
417
418                    match status {
419                        RunStatus::Continue => {}
420                        RunStatus::Exit => break,
421                    }
422                }
423                Event::Daemon(event) => {
424                    self.handle_inter_daemon_event(event).await?;
425                }
426                Event::Node {
427                    dataflow_id: dataflow,
428                    node_id,
429                    event,
430                } => self.handle_node_event(event, dataflow, node_id).await?,
431                Event::Dora(event) => self.handle_dora_event(event).await?,
432                Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
433                Event::HeartbeatInterval => {
434                    if let Some(connection) = &mut self.coordinator_connection {
435                        let msg = serde_json::to_vec(&Timestamped {
436                            inner: CoordinatorRequest::Event {
437                                daemon_id: self.daemon_id.clone(),
438                                event: DaemonEvent::Heartbeat,
439                            },
440                            timestamp: self.clock.new_timestamp(),
441                        })?;
442                        socket_stream_send(connection, &msg)
443                            .await
444                            .wrap_err("failed to send watchdog message to dora-coordinator")?;
445
446                        if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
447                            bail!("lost connection to coordinator")
448                        }
449                    }
450                }
451                Event::MetricsInterval => {
452                    self.collect_and_send_metrics().await?;
453                }
454                Event::CtrlC => {
455                    tracing::info!("received ctrlc signal -> stopping all dataflows");
456                    for dataflow in self.running.values_mut() {
457                        let mut logger = self.logger.for_dataflow(dataflow.id);
458                        dataflow
459                            .stop_all(
460                                &mut self.coordinator_connection,
461                                &self.clock,
462                                None,
463                                false,
464                                &mut logger,
465                            )
466                            .await?;
467                    }
468                    self.exit_when_all_finished = true;
469                    if self.running.is_empty() {
470                        break;
471                    }
472                }
473                Event::SecondCtrlC => {
474                    tracing::warn!("received second ctrlc signal -> exit immediately");
475                    bail!("received second ctrl-c signal");
476                }
477                Event::DaemonError(err) => {
478                    tracing::error!("Daemon error: {err:?}");
479                }
480                Event::SpawnNodeResult {
481                    dataflow_id,
482                    node_id,
483                    dynamic_node,
484                    result,
485                } => match result {
486                    Ok(running_node) => {
487                        if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
488                            dataflow.running_nodes.insert(node_id, running_node);
489                        } else {
490                            tracing::error!(
491                                "failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}"
492                            );
493                        }
494                    }
495                    Err(error) => {
496                        self.dataflow_node_results
497                            .entry(dataflow_id)
498                            .or_default()
499                            .insert(node_id.clone(), Err(error));
500                        self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
501                            .await?;
502                    }
503                },
504                Event::BuildDataflowResult {
505                    build_id,
506                    session_id,
507                    result,
508                } => {
509                    let (build_info, result) = match result {
510                        Ok(build_info) => (Some(build_info), Ok(())),
511                        Err(err) => (None, Err(err)),
512                    };
513                    if let Some(build_info) = build_info {
514                        self.builds.insert(build_id, build_info);
515                        if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
516                            self.builds.remove(&old_build_id);
517                        }
518                    }
519                    if let Some(connection) = &mut self.coordinator_connection {
520                        let msg = serde_json::to_vec(&Timestamped {
521                            inner: CoordinatorRequest::Event {
522                                daemon_id: self.daemon_id.clone(),
523                                event: DaemonEvent::BuildResult {
524                                    build_id,
525                                    result: result.map_err(|err| format!("{err:?}")),
526                                },
527                            },
528                            timestamp: self.clock.new_timestamp(),
529                        })?;
530                        socket_stream_send(connection, &msg).await.wrap_err(
531                            "failed to send BuildDataflowResult message to dora-coordinator",
532                        )?;
533                    }
534                }
535                Event::SpawnDataflowResult {
536                    dataflow_id,
537                    result,
538                } => {
539                    if let Some(connection) = &mut self.coordinator_connection {
540                        let msg = serde_json::to_vec(&Timestamped {
541                            inner: CoordinatorRequest::Event {
542                                daemon_id: self.daemon_id.clone(),
543                                event: DaemonEvent::SpawnResult {
544                                    dataflow_id,
545                                    result: result.map_err(|err| format!("{err:?}")),
546                                },
547                            },
548                            timestamp: self.clock.new_timestamp(),
549                        })?;
550                        socket_stream_send(connection, &msg).await.wrap_err(
551                            "failed to send SpawnDataflowResult message to dora-coordinator",
552                        )?;
553                    }
554                }
555                Event::NodeStopped {
556                    dataflow_id,
557                    node_id,
558                } => {
559                    if let Some(exit_when_done) = &mut self.exit_when_done {
560                        exit_when_done.remove(&(dataflow_id, node_id));
561                        if exit_when_done.is_empty() {
562                            tracing::info!(
563                                "exiting daemon because all required dataflows are finished"
564                            );
565                            break;
566                        }
567                    }
568                    if self.exit_when_all_finished && self.running.is_empty() {
569                        break;
570                    }
571                }
572            }
573
574            // warn if event handling took too long -> the main loop should never be blocked for too long
575            let elapsed = start.elapsed();
576            if elapsed > Duration::from_millis(100) {
577                tracing::warn!(
578                    "Daemon took {}ms for handling event: {event_kind}",
579                    elapsed.as_millis()
580                );
581            }
582        }
583
584        if let Some(mut connection) = self.coordinator_connection.take() {
585            let msg = serde_json::to_vec(&Timestamped {
586                inner: CoordinatorRequest::Event {
587                    daemon_id: self.daemon_id.clone(),
588                    event: DaemonEvent::Exit,
589                },
590                timestamp: self.clock.new_timestamp(),
591            })?;
592            socket_stream_send(&mut connection, &msg)
593                .await
594                .wrap_err("failed to send Exit message to dora-coordinator")?;
595        }
596
597        Ok(self.dataflow_node_results)
598    }
599
600    async fn handle_coordinator_event(
601        &mut self,
602        event: DaemonCoordinatorEvent,
603        reply_tx: Sender<Option<DaemonCoordinatorReply>>,
604    ) -> eyre::Result<RunStatus> {
605        let status = match event {
606            DaemonCoordinatorEvent::Build(BuildDataflowNodes {
607                build_id,
608                session_id,
609                local_working_dir,
610                git_sources,
611                prev_git_sources,
612                dataflow_descriptor,
613                nodes_on_machine,
614                uv,
615            }) => {
616                match dataflow_descriptor.communication.remote {
617                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
618                }
619
620                let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
621
622                let result = self
623                    .build_dataflow(
624                        build_id,
625                        session_id,
626                        base_working_dir,
627                        git_sources,
628                        prev_git_sources,
629                        dataflow_descriptor,
630                        nodes_on_machine,
631                        uv,
632                    )
633                    .await;
634                let (trigger_result, result_task) = match result {
635                    Ok(result_task) => (Ok(()), Some(result_task)),
636                    Err(err) => (Err(format!("{err:?}")), None),
637                };
638                let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
639                let _ = reply_tx.send(Some(reply)).map_err(|_| {
640                    error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
641                });
642
643                let result_tx = self.events_tx.clone();
644                let clock = self.clock.clone();
645                if let Some(result_task) = result_task {
646                    tokio::spawn(async move {
647                        let message = Timestamped {
648                            inner: Event::BuildDataflowResult {
649                                build_id,
650                                session_id,
651                                result: result_task.await,
652                            },
653                            timestamp: clock.new_timestamp(),
654                        };
655                        let _ = result_tx
656                            .send(message)
657                            .map_err(|_| {
658                                error!(
659                                    "could not send `BuildResult` reply from daemon to coordinator"
660                                )
661                            })
662                            .await;
663                    });
664                }
665
666                RunStatus::Continue
667            }
668            DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
669                build_id,
670                session_id,
671                dataflow_id,
672                local_working_dir,
673                nodes,
674                dataflow_descriptor,
675                spawn_nodes,
676                uv,
677                write_events_to,
678            }) => {
679                match dataflow_descriptor.communication.remote {
680                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
681                }
682
683                let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
684
685                let result = self
686                    .spawn_dataflow(
687                        build_id,
688                        dataflow_id,
689                        base_working_dir,
690                        nodes,
691                        dataflow_descriptor,
692                        spawn_nodes,
693                        uv,
694                        write_events_to,
695                    )
696                    .await;
697                let (trigger_result, result_task) = match result {
698                    Ok(result_task) => (Ok(()), Some(result_task)),
699                    Err(err) => (Err(format!("{err:?}")), None),
700                };
701                let reply = DaemonCoordinatorReply::TriggerSpawnResult(trigger_result);
702                let _ = reply_tx.send(Some(reply)).map_err(|_| {
703                    error!("could not send `TriggerSpawnResult` reply from daemon to coordinator")
704                });
705
706                let result_tx = self.events_tx.clone();
707                let clock = self.clock.clone();
708                if let Some(result_task) = result_task {
709                    tokio::spawn(async move {
710                        let message = Timestamped {
711                            inner: Event::SpawnDataflowResult {
712                                dataflow_id,
713                                result: result_task.await,
714                            },
715                            timestamp: clock.new_timestamp(),
716                        };
717                        let _ = result_tx
718                            .send(message)
719                            .map_err(|_| {
720                                error!(
721                                    "could not send `SpawnResult` reply from daemon to coordinator"
722                                )
723                            })
724                            .await;
725                    });
726                }
727
728                RunStatus::Continue
729            }
730            DaemonCoordinatorEvent::AllNodesReady {
731                dataflow_id,
732                exited_before_subscribe,
733            } => {
734                let mut logger = self.logger.for_dataflow(dataflow_id);
735                logger.log(LogLevel::Debug, None,
736                    Some("daemon".into()),
737                    format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
738                )).await;
739                match self.running.get_mut(&dataflow_id) {
740                    Some(dataflow) => {
741                        let ready = exited_before_subscribe.is_empty();
742                        dataflow
743                            .pending_nodes
744                            .handle_external_all_nodes_ready(
745                                exited_before_subscribe,
746                                &mut dataflow.cascading_error_causes,
747                            )
748                            .await?;
749                        if ready {
750                            logger.log(LogLevel::Info, None,
751                                Some("daemon".into()),
752                                "coordinator reported that all nodes are ready, starting dataflow",
753                            ).await;
754                            dataflow.start(&self.events_tx, &self.clock).await?;
755                        }
756                    }
757                    None => {
758                        tracing::warn!(
759                            "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
760                        );
761                    }
762                }
763                let _ = reply_tx.send(None).map_err(|_| {
764                    error!("could not send `AllNodesReady` reply from daemon to coordinator")
765                });
766                RunStatus::Continue
767            }
768            DaemonCoordinatorEvent::Logs {
769                dataflow_id,
770                node_id,
771                tail,
772            } => {
773                match self.working_dir.get(&dataflow_id) {
774                    Some(working_dir) => {
775                        let working_dir = working_dir.clone();
776                        tokio::spawn(async move {
777                            let logs = async {
778                                let mut file =
779                                    File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
780                                        .await
781                                        .wrap_err(format!(
782                                            "Could not open log file: {:#?}",
783                                            log::log_path(&working_dir, &dataflow_id, &node_id)
784                                        ))?;
785
786                                let mut contents = match tail {
787                                    None | Some(0) => {
788                                        let mut contents = vec![];
789                                        file.read_to_end(&mut contents).await.map(|_| contents)
790                                    }
791                                    Some(tail) => read_last_n_lines(&mut file, tail).await,
792                                }
793                                .wrap_err("Could not read last n lines of log file")?;
794                                if !contents.ends_with(b"\n") {
795                                    // Append newline for better readability
796                                    contents.push(b'\n');
797                                }
798                                Result::<Vec<u8>, eyre::Report>::Ok(contents)
799                            }
800                            .await
801                            .map_err(|err| format!("{err:?}"));
802                            let _ = reply_tx
803                                .send(Some(DaemonCoordinatorReply::Logs(logs)))
804                                .map_err(|_| {
805                                    error!("could not send logs reply from daemon to coordinator")
806                                });
807                        });
808                    }
809                    None => {
810                        tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
811                        let _ = reply_tx.send(None).map_err(|_| {
812                            error!(
813                                "could not send `AllNodesReady` reply from daemon to coordinator"
814                            )
815                        });
816                    }
817                }
818                RunStatus::Continue
819            }
820            DaemonCoordinatorEvent::ReloadDataflow {
821                dataflow_id,
822                node_id,
823                operator_id,
824            } => {
825                let result = self.send_reload(dataflow_id, node_id, operator_id).await;
826                let reply =
827                    DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
828                let _ = reply_tx
829                    .send(Some(reply))
830                    .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
831                RunStatus::Continue
832            }
833            DaemonCoordinatorEvent::StopDataflow {
834                dataflow_id,
835                grace_duration,
836                force,
837            } => {
838                let mut logger = self.logger.for_dataflow(dataflow_id);
839                let dataflow = self
840                    .running
841                    .get_mut(&dataflow_id)
842                    .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
843                let (reply, future) = match dataflow {
844                    Ok(dataflow) => {
845                        let future = dataflow.stop_all(
846                            &mut self.coordinator_connection,
847                            &self.clock,
848                            grace_duration,
849                            force,
850                            &mut logger,
851                        );
852                        (Ok(()), Some(future))
853                    }
854                    Err(err) => (Err(err.to_string()), None),
855                };
856
857                let _ = reply_tx
858                    .send(Some(DaemonCoordinatorReply::StopResult(reply)))
859                    .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
860
861                if let Some(future) = future {
862                    future.await?;
863                }
864
865                RunStatus::Continue
866            }
867            DaemonCoordinatorEvent::Destroy => {
868                tracing::info!("received destroy command -> exiting");
869                let (notify_tx, notify_rx) = oneshot::channel();
870                let reply = DaemonCoordinatorReply::DestroyResult {
871                    result: Ok(()),
872                    notify: Some(notify_tx),
873                };
874                let _ = reply_tx
875                    .send(Some(reply))
876                    .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
877                // wait until the reply is sent out
878                if notify_rx.await.is_err() {
879                    tracing::warn!("no confirmation received for DestroyReply");
880                }
881                RunStatus::Exit
882            }
883            DaemonCoordinatorEvent::Heartbeat => {
884                self.last_coordinator_heartbeat = Instant::now();
885                let _ = reply_tx.send(None);
886                RunStatus::Continue
887            }
888        };
889        Ok(status)
890    }
891
892    async fn collect_and_send_metrics(&mut self) -> eyre::Result<()> {
893        use dora_message::daemon_to_coordinator::NodeMetrics;
894        use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate};
895
896        if self.coordinator_connection.is_none() {
897            return Ok(());
898        }
899
900        // Reuse system instance for metrics collection
901        let system = &mut self.metrics_system;
902
903        // Metrics are collected every 2 seconds (metrics_interval)
904        const METRICS_INTERVAL_SECS: f64 = 2.0;
905
906        // Collect metrics for all running dataflows
907        for (dataflow_id, dataflow) in &self.running {
908            let mut metrics = BTreeMap::new();
909
910            // Collect all PIDs for this dataflow
911            let pids: Vec<Pid> = dataflow
912                .running_nodes
913                .values()
914                .filter_map(|node| {
915                    node.pid
916                        .as_ref()
917                        .map(|pid| Pid::from_u32(pid.load(atomic::Ordering::Acquire)))
918                })
919                .collect();
920
921            if !pids.is_empty() {
922                // Refresh process metrics (cpu, memory, disk)
923                let refresh_kind = ProcessRefreshKind::nothing()
924                    .with_cpu()
925                    .with_memory()
926                    .with_disk_usage();
927                system.refresh_processes_specifics(
928                    ProcessesToUpdate::Some(&pids),
929                    true,
930                    refresh_kind,
931                );
932
933                // Collect metrics for each node
934                for (node_id, running_node) in &dataflow.running_nodes {
935                    if let Some(pid) = running_node.pid.as_ref() {
936                        let pid = pid.load(atomic::Ordering::Acquire);
937                        let sys_pid = Pid::from_u32(pid);
938                        if let Some(process) = system.process(sys_pid) {
939                            let disk_usage = process.disk_usage();
940                            // Divide by metrics_interval to get per-second averages
941                            metrics.insert(
942                                node_id.clone(),
943                                NodeMetrics {
944                                    pid,
945                                    cpu_usage: process.cpu_usage(),
946                                    memory_bytes: process.memory(),
947                                    disk_read_bytes: Some(
948                                        (disk_usage.read_bytes as f64 / METRICS_INTERVAL_SECS)
949                                            as u64,
950                                    ),
951                                    disk_write_bytes: Some(
952                                        (disk_usage.written_bytes as f64 / METRICS_INTERVAL_SECS)
953                                            as u64,
954                                    ),
955                                },
956                            );
957                        }
958                    }
959                }
960            }
961
962            // Send metrics to coordinator if we have any
963            if !metrics.is_empty() {
964                if let Some(connection) = &mut self.coordinator_connection {
965                    let msg = serde_json::to_vec(&Timestamped {
966                        inner: CoordinatorRequest::Event {
967                            daemon_id: self.daemon_id.clone(),
968                            event: DaemonEvent::NodeMetrics {
969                                dataflow_id: *dataflow_id,
970                                metrics,
971                            },
972                        },
973                        timestamp: self.clock.new_timestamp(),
974                    })?;
975                    socket_stream_send(connection, &msg)
976                        .await
977                        .wrap_err("failed to send metrics to coordinator")?;
978                }
979            }
980        }
981
982        Ok(())
983    }
984
985    async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
986        match event {
987            InterDaemonEvent::Output {
988                dataflow_id,
989                node_id,
990                output_id,
991                metadata,
992                data,
993            } => {
994                let inner = async {
995                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
996                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
997                    })?;
998                    send_output_to_local_receivers(
999                        node_id.clone(),
1000                        output_id.clone(),
1001                        dataflow,
1002                        &metadata,
1003                        data.map(DataMessage::Vec),
1004                        &self.clock,
1005                    )
1006                    .await?;
1007                    Result::<_, eyre::Report>::Ok(())
1008                };
1009                if let Err(err) = inner
1010                    .await
1011                    .wrap_err("failed to forward remote output to local receivers")
1012                {
1013                    let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
1014                    logger
1015                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
1016                        .await;
1017                }
1018                Ok(())
1019            }
1020            InterDaemonEvent::OutputClosed {
1021                dataflow_id,
1022                node_id,
1023                output_id,
1024            } => {
1025                let output_id = OutputId(node_id.clone(), output_id);
1026                let mut logger = self
1027                    .logger
1028                    .for_dataflow(dataflow_id)
1029                    .for_node(node_id.clone());
1030                logger
1031                    .log(
1032                        LogLevel::Debug,
1033                        Some("daemon".into()),
1034                        format!("received OutputClosed event for output {output_id:?}"),
1035                    )
1036                    .await;
1037
1038                let inner = async {
1039                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1040                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1041                    })?;
1042
1043                    if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
1044                        for (receiver_id, input_id) in &inputs {
1045                            close_input(dataflow, receiver_id, input_id, &self.clock);
1046                        }
1047                    }
1048                    Result::<(), eyre::Report>::Ok(())
1049                };
1050                if let Err(err) = inner
1051                    .await
1052                    .wrap_err("failed to handle InputsClosed event sent by coordinator")
1053                {
1054                    logger
1055                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
1056                        .await;
1057                }
1058                Ok(())
1059            }
1060        }
1061    }
1062
1063    #[allow(clippy::too_many_arguments)]
1064    async fn build_dataflow(
1065        &mut self,
1066        build_id: BuildId,
1067        session_id: SessionId,
1068        base_working_dir: PathBuf,
1069        git_sources: BTreeMap<NodeId, GitSource>,
1070        prev_git_sources: BTreeMap<NodeId, GitSource>,
1071        dataflow_descriptor: Descriptor,
1072        local_nodes: BTreeSet<NodeId>,
1073        uv: bool,
1074    ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
1075        let builder = build::Builder {
1076            session_id,
1077            base_working_dir,
1078            uv,
1079        };
1080        self.git_manager.clear_planned_builds(session_id);
1081
1082        let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1083
1084        let mut tasks = Vec::new();
1085
1086        // build nodes
1087        for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1088            let dynamic_node = node.kind.dynamic();
1089
1090            let node_id = node.id.clone();
1091            let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1092            logger.log(LogLevel::Debug, "building").await;
1093            let git_source = git_sources.get(&node_id).cloned();
1094            let prev_git_source = prev_git_sources.get(&node_id).cloned();
1095            let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1096                still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1097                git_source: prev_source,
1098            });
1099
1100            let logger_cloned = logger
1101                .try_clone_impl()
1102                .await
1103                .wrap_err("failed to clone logger")?;
1104
1105            let mut builder = builder.clone();
1106            if let Some(node_working_dir) =
1107                node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1108            {
1109                builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1110            }
1111
1112            match builder
1113                .build_node(
1114                    node,
1115                    git_source,
1116                    prev_git,
1117                    logger_cloned,
1118                    &mut self.git_manager,
1119                )
1120                .await
1121                .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1122            {
1123                Ok(result) => {
1124                    tasks.push(NodeBuildTask {
1125                        node_id,
1126                        task: result,
1127                        dynamic_node,
1128                    });
1129                }
1130                Err(err) => {
1131                    logger.log(LogLevel::Error, format!("{err:?}")).await;
1132                    return Err(err);
1133                }
1134            }
1135        }
1136
1137        let task = async move {
1138            let mut info = BuildInfo {
1139                node_working_dirs: Default::default(),
1140            };
1141            for task in tasks {
1142                let NodeBuildTask {
1143                    node_id,
1144                    dynamic_node: _,
1145                    task,
1146                } = task;
1147                let node = task
1148                    .await
1149                    .with_context(|| format!("failed to build node `{node_id}`"))?;
1150                info.node_working_dirs
1151                    .insert(node_id, node.node_working_dir);
1152            }
1153            Ok(info)
1154        };
1155
1156        Ok(task)
1157    }
1158
1159    #[allow(clippy::too_many_arguments)]
1160    async fn spawn_dataflow(
1161        &mut self,
1162        build_id: Option<BuildId>,
1163        dataflow_id: DataflowId,
1164        base_working_dir: PathBuf,
1165        nodes: BTreeMap<NodeId, ResolvedNode>,
1166        dataflow_descriptor: Descriptor,
1167        spawn_nodes: BTreeSet<NodeId>,
1168        uv: bool,
1169        write_events_to: Option<PathBuf>,
1170    ) -> eyre::Result<impl Future<Output = eyre::Result<()>> + use<>> {
1171        let mut logger = self
1172            .logger
1173            .for_dataflow(dataflow_id)
1174            .try_clone()
1175            .await
1176            .context("failed to clone logger")?;
1177        let dataflow = RunningDataflow::new(
1178            dataflow_id,
1179            self.daemon_id.clone(),
1180            dataflow_descriptor.clone(),
1181        );
1182        let dataflow = match self.running.entry(dataflow_id) {
1183            std::collections::hash_map::Entry::Vacant(entry) => {
1184                self.working_dir
1185                    .insert(dataflow_id, base_working_dir.clone());
1186                entry.insert(dataflow)
1187            }
1188            std::collections::hash_map::Entry::Occupied(_) => {
1189                bail!("there is already a running dataflow with ID `{dataflow_id}`")
1190            }
1191        };
1192
1193        let mut stopped = Vec::new();
1194
1195        let build_info = build_id.and_then(|build_id| self.builds.get(&build_id));
1196        let node_with_git_source = nodes.values().find(|n| n.has_git_source());
1197        if let Some(git_node) = node_with_git_source {
1198            if build_info.is_none() {
1199                eyre::bail!(
1200                    "node {} has git source, but no `dora build` was run yet\n\n\
1201                    nodes with a `git` field must be built using `dora build` before starting the \
1202                    dataflow",
1203                    git_node.id
1204                )
1205            }
1206        }
1207        let node_working_dirs = build_info
1208            .map(|info| info.node_working_dirs.clone())
1209            .unwrap_or_default();
1210
1211        // calculate info about mappings
1212        for node in nodes.values() {
1213            let local = spawn_nodes.contains(&node.id);
1214
1215            let inputs = node_inputs(node);
1216            for (input_id, input) in inputs {
1217                if local {
1218                    dataflow
1219                        .open_inputs
1220                        .entry(node.id.clone())
1221                        .or_default()
1222                        .insert(input_id.clone());
1223                    match input.mapping {
1224                        InputMapping::User(mapping) => {
1225                            dataflow
1226                                .mappings
1227                                .entry(OutputId(mapping.source, mapping.output))
1228                                .or_default()
1229                                .insert((node.id.clone(), input_id));
1230                        }
1231                        InputMapping::Timer { interval } => {
1232                            dataflow
1233                                .timers
1234                                .entry(interval)
1235                                .or_default()
1236                                .insert((node.id.clone(), input_id));
1237                        }
1238                    }
1239                } else if let InputMapping::User(mapping) = input.mapping {
1240                    dataflow
1241                        .open_external_mappings
1242                        .insert(OutputId(mapping.source, mapping.output));
1243                }
1244            }
1245        }
1246
1247        let spawner = Spawner {
1248            dataflow_id,
1249            daemon_tx: self.events_tx.clone(),
1250            dataflow_descriptor,
1251            clock: self.clock.clone(),
1252            uv,
1253        };
1254
1255        let mut tasks = Vec::new();
1256
1257        // spawn nodes and set up subscriptions
1258        for node in nodes.into_values() {
1259            let mut logger = logger.reborrow().for_node(node.id.clone());
1260            let local = spawn_nodes.contains(&node.id);
1261            if local {
1262                let dynamic_node = node.kind.dynamic();
1263                if dynamic_node {
1264                    dataflow.dynamic_nodes.insert(node.id.clone());
1265                } else {
1266                    dataflow.pending_nodes.insert(node.id.clone());
1267                }
1268
1269                let node_id = node.id.clone();
1270                let node_stderr_most_recent = dataflow
1271                    .node_stderr_most_recent
1272                    .entry(node.id.clone())
1273                    .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES_MAX)))
1274                    .clone();
1275
1276                let configured_node_working_dir = node_working_dirs.get(&node_id).cloned();
1277                if configured_node_working_dir.is_none() && node.has_git_source() {
1278                    eyre::bail!(
1279                        "node {} has git source, but no git clone directory was found for it\n\n\
1280                        try running `dora build` again",
1281                        node.id
1282                    )
1283                }
1284                let node_working_dir = configured_node_working_dir
1285                    .or_else(|| {
1286                        node.deploy
1287                            .as_ref()
1288                            .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1289                    })
1290                    .unwrap_or(base_working_dir.clone())
1291                    .clone();
1292                let node_write_events_to = write_events_to
1293                    .as_ref()
1294                    .map(|p| p.join(format!("inputs-{}.json", node.id)));
1295                match spawner
1296                    .clone()
1297                    .spawn_node(
1298                        node,
1299                        node_working_dir,
1300                        node_stderr_most_recent,
1301                        node_write_events_to,
1302                        &mut logger,
1303                    )
1304                    .await
1305                    .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1306                {
1307                    Ok(result) => {
1308                        tasks.push(NodeBuildTask {
1309                            node_id,
1310                            task: result,
1311                            dynamic_node,
1312                        });
1313                    }
1314                    Err(err) => {
1315                        logger
1316                            .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1317                            .await;
1318                        self.dataflow_node_results
1319                            .entry(dataflow_id)
1320                            .or_default()
1321                            .insert(
1322                                node_id.clone(),
1323                                Err(NodeError {
1324                                    timestamp: self.clock.new_timestamp(),
1325                                    cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1326                                    exit_status: NodeExitStatus::Unknown,
1327                                }),
1328                            );
1329                        stopped.push((node_id.clone(), dynamic_node));
1330                    }
1331                }
1332            } else {
1333                // wait until node is ready before starting
1334                dataflow.pending_nodes.set_external_nodes(true);
1335
1336                // subscribe to all node outputs that are mapped to some local inputs
1337                for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1338                    let tx = self
1339                        .remote_daemon_events_tx
1340                        .clone()
1341                        .wrap_err("no remote_daemon_events_tx channel")?;
1342                    let mut finished_rx = dataflow.finished_tx.subscribe();
1343                    let subscribe_topic =
1344                        zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1345                    tracing::debug!("declaring subscriber on {subscribe_topic}");
1346                    let subscriber = self
1347                        .zenoh_session
1348                        .declare_subscriber(subscribe_topic)
1349                        .await
1350                        .map_err(|e| eyre!(e))
1351                        .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1352                    tokio::spawn(async move {
1353                        let mut finished = pin!(finished_rx.recv());
1354                        loop {
1355                            let finished_or_next =
1356                                futures::future::select(finished, subscriber.recv_async());
1357                            match finished_or_next.await {
1358                                future::Either::Left((finished, _)) => match finished {
1359                                    Err(broadcast::error::RecvError::Closed) => {
1360                                        tracing::debug!(
1361                                            "dataflow finished, breaking from zenoh subscribe task"
1362                                        );
1363                                        break;
1364                                    }
1365                                    other => {
1366                                        tracing::warn!(
1367                                            "unexpected return value of dataflow finished_rx channel: {other:?}"
1368                                        );
1369                                        break;
1370                                    }
1371                                },
1372                                future::Either::Right((sample, f)) => {
1373                                    finished = f;
1374                                    let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1375                                        Timestamped::deserialize_inter_daemon_event(
1376                                            &s.payload().to_bytes(),
1377                                        )
1378                                    });
1379                                    if tx.send_async(event).await.is_err() {
1380                                        // daemon finished
1381                                        break;
1382                                    }
1383                                }
1384                            }
1385                        }
1386                    });
1387                }
1388            }
1389        }
1390        for (node_id, dynamic) in stopped {
1391            self.handle_node_stop(dataflow_id, &node_id, dynamic)
1392                .await?;
1393        }
1394
1395        let spawn_result = Self::spawn_prepared_nodes(
1396            dataflow_id,
1397            logger,
1398            tasks,
1399            self.events_tx.clone(),
1400            self.clock.clone(),
1401        );
1402
1403        Ok(spawn_result)
1404    }
1405
1406    async fn spawn_prepared_nodes(
1407        dataflow_id: Uuid,
1408        mut logger: DataflowLogger<'_>,
1409        tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1410        events_tx: mpsc::Sender<Timestamped<Event>>,
1411        clock: Arc<HLC>,
1412    ) -> eyre::Result<()> {
1413        let node_result = |node_id, dynamic_node, result| Timestamped {
1414            inner: Event::SpawnNodeResult {
1415                dataflow_id,
1416                node_id,
1417                dynamic_node,
1418                result,
1419            },
1420            timestamp: clock.new_timestamp(),
1421        };
1422        let mut failed_to_prepare = None;
1423        let mut prepared_nodes = Vec::new();
1424        for task in tasks {
1425            let NodeBuildTask {
1426                node_id,
1427                dynamic_node,
1428                task,
1429            } = task;
1430            match task.await {
1431                Ok(node) => prepared_nodes.push(node),
1432                Err(err) => {
1433                    if failed_to_prepare.is_none() {
1434                        failed_to_prepare = Some(node_id.clone());
1435                    }
1436                    let node_err: NodeError = NodeError {
1437                        timestamp: clock.new_timestamp(),
1438                        cause: NodeErrorCause::FailedToSpawn(format!(
1439                            "preparing for spawn failed: {err:?}"
1440                        )),
1441                        exit_status: NodeExitStatus::Unknown,
1442                    };
1443                    let send_result = events_tx
1444                        .send(node_result(node_id, dynamic_node, Err(node_err)))
1445                        .await;
1446                    if send_result.is_err() {
1447                        tracing::error!("failed to send SpawnNodeResult to main daemon task")
1448                    }
1449                }
1450            }
1451        }
1452
1453        // once all nodes are prepared, do the actual spawning
1454        if let Some(failed_node) = failed_to_prepare {
1455            // don't spawn any nodes when an error occurred before
1456            for node in prepared_nodes {
1457                let err = NodeError {
1458                    timestamp: clock.new_timestamp(),
1459                    cause: NodeErrorCause::Cascading {
1460                        caused_by_node: failed_node.clone(),
1461                    },
1462                    exit_status: NodeExitStatus::Unknown,
1463                };
1464                let send_result = events_tx
1465                    .send(node_result(
1466                        node.node_id().clone(),
1467                        node.dynamic(),
1468                        Err(err),
1469                    ))
1470                    .await;
1471                if send_result.is_err() {
1472                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1473                }
1474            }
1475            Err(eyre!("failed to prepare node {failed_node}"))
1476        } else {
1477            let mut spawn_result = Ok(());
1478
1479            logger
1480                .log(
1481                    LogLevel::Info,
1482                    None,
1483                    Some("dora daemon".into()),
1484                    "finished building nodes, spawning...",
1485                )
1486                .await;
1487
1488            // spawn the nodes
1489            for node in prepared_nodes {
1490                let node_id = node.node_id().clone();
1491                let dynamic_node = node.dynamic();
1492                let logger = logger
1493                    .reborrow()
1494                    .for_node(node_id.clone())
1495                    .try_clone()
1496                    .await
1497                    .context("failed to clone NodeLogger")?;
1498                let result = node.spawn(logger).await;
1499                let node_spawn_result = match result {
1500                    Ok(node) => Ok(node),
1501                    Err(err) => {
1502                        let node_err = NodeError {
1503                            timestamp: clock.new_timestamp(),
1504                            cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1505                            exit_status: NodeExitStatus::Unknown,
1506                        };
1507                        if spawn_result.is_ok() {
1508                            spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1509                        }
1510                        Err(node_err)
1511                    }
1512                };
1513                let send_result = events_tx
1514                    .send(node_result(node_id, dynamic_node, node_spawn_result))
1515                    .await;
1516                if send_result.is_err() {
1517                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1518                }
1519            }
1520            spawn_result
1521        }
1522    }
1523
1524    async fn handle_dynamic_node_event(
1525        &mut self,
1526        event: DynamicNodeEventWrapper,
1527    ) -> eyre::Result<()> {
1528        match event {
1529            DynamicNodeEventWrapper {
1530                event: DynamicNodeEvent::NodeConfig { node_id },
1531                reply_tx,
1532            } => {
1533                let number_node_id = self
1534                    .running
1535                    .iter()
1536                    .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1537                    .count();
1538
1539                let node_config = match number_node_id {
1540                    2.. => Err(format!(
1541                        "multiple dataflows contain dynamic node id {node_id}. \
1542                        Please only have one running dataflow with the specified \
1543                        node id if you want to use dynamic node",
1544                    )),
1545                    1 => self
1546                        .running
1547                        .iter()
1548                        .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1549                        .map(|(id, dataflow)| -> Result<NodeConfig> {
1550                            let node_config = dataflow
1551                                .running_nodes
1552                                .get(&node_id)
1553                                .with_context(|| {
1554                                    format!("no node with ID `{node_id}` within the given dataflow")
1555                                })?
1556                                .node_config
1557                                .clone();
1558                            if !node_config.dynamic {
1559                                bail!("node with ID `{node_id}` in {id} is not dynamic");
1560                            }
1561                            Ok(node_config)
1562                        })
1563                        .next()
1564                        .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1565                        .and_then(|r| r)
1566                        .map_err(|err| {
1567                            format!(
1568                                "failed to get dynamic node config within given dataflow: {err}"
1569                            )
1570                        }),
1571                    0 => Err(format!("no node with ID `{node_id}`")),
1572                };
1573
1574                let reply = DaemonReply::NodeConfig {
1575                    result: node_config,
1576                };
1577                let _ = reply_tx.send(Some(reply)).map_err(|_| {
1578                    error!("could not send node info reply from daemon to coordinator")
1579                });
1580                Ok(())
1581            }
1582        }
1583    }
1584
1585    async fn handle_node_event(
1586        &mut self,
1587        event: DaemonNodeEvent,
1588        dataflow_id: DataflowId,
1589        node_id: NodeId,
1590    ) -> eyre::Result<()> {
1591        let might_restart = || {
1592            let dataflow = self.running.get(&dataflow_id)?;
1593            let node = dataflow.running_nodes.get(&node_id)?;
1594            Some(match node.restart_policy {
1595                RestartPolicy::Never => false,
1596                _ if node.restarts_disabled() => false,
1597                RestartPolicy::OnFailure | RestartPolicy::Always => true,
1598            })
1599        };
1600        match event {
1601            DaemonNodeEvent::Subscribe {
1602                event_sender,
1603                reply_sender,
1604            } => {
1605                let mut logger = self.logger.for_dataflow(dataflow_id);
1606                logger
1607                    .log(
1608                        LogLevel::Info,
1609                        Some(node_id.clone()),
1610                        Some("daemon".into()),
1611                        "node is ready",
1612                    )
1613                    .await;
1614
1615                let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1616                    format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1617                });
1618
1619                match dataflow {
1620                    Err(err) => {
1621                        let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1622                    }
1623                    Ok(dataflow) => {
1624                        Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1625
1626                        let status = dataflow
1627                            .pending_nodes
1628                            .handle_node_subscription(
1629                                node_id.clone(),
1630                                reply_sender,
1631                                &mut self.coordinator_connection,
1632                                &self.clock,
1633                                &mut dataflow.cascading_error_causes,
1634                                &mut logger,
1635                            )
1636                            .await?;
1637                        match status {
1638                            DataflowStatus::AllNodesReady if !dataflow.dataflow_started => {
1639                                logger
1640                                    .log(
1641                                        LogLevel::Info,
1642                                        None,
1643                                        Some("daemon".into()),
1644                                        "all nodes are ready, starting dataflow",
1645                                    )
1646                                    .await;
1647                                dataflow.start(&self.events_tx, &self.clock).await?;
1648                                dataflow.dataflow_started = true;
1649                            }
1650                            _ => {}
1651                        }
1652                    }
1653                }
1654            }
1655            DaemonNodeEvent::SubscribeDrop {
1656                event_sender,
1657                reply_sender,
1658            } => {
1659                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1660                    format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1661                });
1662                let result = match dataflow {
1663                    Ok(dataflow) => {
1664                        dataflow.drop_channels.insert(node_id, event_sender);
1665                        Ok(())
1666                    }
1667                    Err(err) => Err(err.to_string()),
1668                };
1669                let _ = reply_sender.send(DaemonReply::Result(result));
1670            }
1671            DaemonNodeEvent::CloseOutputs {
1672                outputs,
1673                reply_sender,
1674            } => {
1675                let reply = if might_restart().unwrap_or(false) {
1676                    self.logger
1677                        .for_dataflow(dataflow_id)
1678                        .for_node(node_id.clone())
1679                        .log(
1680                            LogLevel::Debug,
1681                            Some("daemon".into()),
1682                            "skipping CloseOutputs because node might restart",
1683                        )
1684                        .await;
1685                    Ok(())
1686                } else {
1687                    // notify downstream nodes
1688                    let inner = async {
1689                        self.send_output_closed_events(dataflow_id, node_id, outputs)
1690                            .await
1691                    };
1692
1693                    inner.await.map_err(|err| format!("{err:?}"))
1694                };
1695                let _ = reply_sender.send(DaemonReply::Result(reply));
1696            }
1697            DaemonNodeEvent::OutputsDone { reply_sender } => {
1698                let result = self
1699                    .handle_outputs_done(dataflow_id, &node_id, might_restart().unwrap_or(false))
1700                    .await;
1701
1702                let _ = reply_sender.send(DaemonReply::Result(
1703                    result.map_err(|err| format!("{err:?}")),
1704                ));
1705            }
1706            DaemonNodeEvent::SendOut {
1707                output_id,
1708                metadata,
1709                data,
1710            } => self
1711                .send_out(dataflow_id, node_id, output_id, metadata, data)
1712                .await
1713                .context("failed to send out")?,
1714            DaemonNodeEvent::ReportDrop { tokens } => {
1715                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1716                    format!(
1717                        "failed to get handle drop tokens: \
1718                        no running dataflow with ID `{dataflow_id}`"
1719                    )
1720                });
1721
1722                match dataflow {
1723                    Ok(dataflow) => {
1724                        for token in tokens {
1725                            match dataflow.pending_drop_tokens.get_mut(&token) {
1726                                Some(info) => {
1727                                    if info.pending_nodes.remove(&node_id) {
1728                                        dataflow.check_drop_token(token, &self.clock).await?;
1729                                    } else {
1730                                        tracing::warn!(
1731                                            "node `{node_id}` is not pending for drop token `{token:?}`"
1732                                        );
1733                                    }
1734                                }
1735                                None => tracing::warn!("unknown drop token `{token:?}`"),
1736                            }
1737                        }
1738                    }
1739                    Err(err) => tracing::warn!("{err:?}"),
1740                }
1741            }
1742            DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1743                let inner = async {
1744                    let dataflow = self
1745                        .running
1746                        .get_mut(&dataflow_id)
1747                        .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1748                    dataflow.subscribe_channels.remove(&node_id);
1749                    Result::<_, eyre::Error>::Ok(())
1750                };
1751
1752                let reply = inner.await.map_err(|err| format!("{err:?}"));
1753                let _ = reply_sender.send(DaemonReply::Result(reply));
1754            }
1755        }
1756        Ok(())
1757    }
1758
1759    async fn send_reload(
1760        &mut self,
1761        dataflow_id: Uuid,
1762        node_id: NodeId,
1763        operator_id: Option<OperatorId>,
1764    ) -> Result<(), eyre::ErrReport> {
1765        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1766            format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1767        })?;
1768        if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1769            match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1770                Ok(()) => {}
1771                Err(_) => {
1772                    dataflow.subscribe_channels.remove(&node_id);
1773                }
1774            }
1775        }
1776        Ok(())
1777    }
1778
1779    async fn send_out(
1780        &mut self,
1781        dataflow_id: Uuid,
1782        node_id: NodeId,
1783        output_id: DataId,
1784        metadata: dora_message::metadata::Metadata,
1785        data: Option<DataMessage>,
1786    ) -> Result<(), eyre::ErrReport> {
1787        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1788            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1789        })?;
1790        let data_bytes = send_output_to_local_receivers(
1791            node_id.clone(),
1792            output_id.clone(),
1793            dataflow,
1794            &metadata,
1795            data,
1796            &self.clock,
1797        )
1798        .await?;
1799
1800        let output_id = OutputId(node_id, output_id);
1801        let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1802            || dataflow.publish_all_messages_to_zenoh;
1803        if remote_receivers {
1804            let event = InterDaemonEvent::Output {
1805                dataflow_id,
1806                node_id: output_id.0.clone(),
1807                output_id: output_id.1.clone(),
1808                metadata,
1809                data: data_bytes,
1810            };
1811            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1812                .await?;
1813        }
1814
1815        Ok(())
1816    }
1817
1818    async fn send_to_remote_receivers(
1819        &mut self,
1820        dataflow_id: Uuid,
1821        output_id: &OutputId,
1822        event: InterDaemonEvent,
1823    ) -> Result<(), eyre::Error> {
1824        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1825            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1826        })?;
1827
1828        // publish via zenoh
1829        let publisher = match dataflow.publishers.get(output_id) {
1830            Some(publisher) => publisher,
1831            None => {
1832                let publish_topic =
1833                    zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1834                tracing::debug!("declaring publisher on {publish_topic}");
1835                let publisher = self
1836                    .zenoh_session
1837                    .declare_publisher(publish_topic)
1838                    .await
1839                    .map_err(|e| eyre!(e))
1840                    .context("failed to create zenoh publisher")?;
1841                dataflow.publishers.insert(output_id.clone(), publisher);
1842                dataflow.publishers.get(output_id).unwrap()
1843            }
1844        };
1845
1846        let serialized_event = Timestamped {
1847            inner: event,
1848            timestamp: self.clock.new_timestamp(),
1849        }
1850        .serialize();
1851        publisher
1852            .put(serialized_event)
1853            .await
1854            .map_err(|e| eyre!(e))
1855            .context("zenoh put failed")?;
1856        Ok(())
1857    }
1858
1859    async fn send_output_closed_events(
1860        &mut self,
1861        dataflow_id: DataflowId,
1862        node_id: NodeId,
1863        outputs: Vec<DataId>,
1864    ) -> eyre::Result<()> {
1865        let dataflow = self
1866            .running
1867            .get_mut(&dataflow_id)
1868            .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1869        let local_node_inputs: BTreeSet<_> = dataflow
1870            .mappings
1871            .iter()
1872            .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1873            .flat_map(|(_, v)| v)
1874            .cloned()
1875            .collect();
1876        for (receiver_id, input_id) in &local_node_inputs {
1877            close_input(dataflow, receiver_id, input_id, &self.clock);
1878        }
1879
1880        let mut closed = Vec::new();
1881        for output_id in &dataflow.open_external_mappings {
1882            if output_id.0 == node_id && outputs.contains(&output_id.1) {
1883                closed.push(output_id.clone());
1884            }
1885        }
1886
1887        for output_id in closed {
1888            let event = InterDaemonEvent::OutputClosed {
1889                dataflow_id,
1890                node_id: output_id.0.clone(),
1891                output_id: output_id.1.clone(),
1892            };
1893            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1894                .await?;
1895        }
1896
1897        Ok(())
1898    }
1899
1900    async fn subscribe(
1901        dataflow: &mut RunningDataflow,
1902        node_id: NodeId,
1903        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1904        clock: &HLC,
1905    ) {
1906        // some inputs might have been closed already -> report those events
1907        let closed_inputs = dataflow
1908            .mappings
1909            .values()
1910            .flatten()
1911            .filter(|(node, _)| node == &node_id)
1912            .map(|(_, input)| input)
1913            .filter(|input| {
1914                dataflow
1915                    .open_inputs
1916                    .get(&node_id)
1917                    .map(|open_inputs| !open_inputs.contains(*input))
1918                    .unwrap_or(true)
1919            });
1920        for input_id in closed_inputs {
1921            let _ = send_with_timestamp(
1922                &event_sender,
1923                NodeEvent::InputClosed {
1924                    id: input_id.clone(),
1925                },
1926                clock,
1927            );
1928        }
1929        if dataflow.open_inputs(&node_id).is_empty() {
1930            if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
1931                node.disable_restart();
1932            }
1933            if let Some(node) = dataflow.descriptor.nodes.iter().find(|n| n.id == node_id) {
1934                if node.inputs.is_empty() {
1935                    // do not send AllInputsClosed for source nodes
1936                } else {
1937                    let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1938                }
1939            }
1940        }
1941
1942        // if a stop event was already sent for the dataflow, send it to
1943        // the newly connected node too
1944        if dataflow.stop_sent {
1945            if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
1946                node.disable_restart();
1947            }
1948            let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1949        }
1950
1951        dataflow.subscribe_channels.insert(node_id, event_sender);
1952    }
1953
1954    #[tracing::instrument(skip(self), level = "trace")]
1955    async fn handle_outputs_done(
1956        &mut self,
1957        dataflow_id: DataflowId,
1958        node_id: &NodeId,
1959        might_restart: bool,
1960    ) -> eyre::Result<()> {
1961        let dataflow = self
1962            .running
1963            .get_mut(&dataflow_id)
1964            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1965
1966        let outputs = dataflow
1967            .mappings
1968            .keys()
1969            .filter(|m| &m.0 == node_id)
1970            .map(|m| &m.1)
1971            .cloned()
1972            .collect();
1973
1974        if might_restart {
1975            self.logger
1976                .for_dataflow(dataflow_id)
1977                .for_node(node_id.clone())
1978                .log(
1979                    LogLevel::Debug,
1980                    Some("daemon".into()),
1981                    "keeping outputs open because node might restart",
1982                )
1983                .await;
1984        } else {
1985            self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1986                .await?;
1987        }
1988
1989        let dataflow = self
1990            .running
1991            .get_mut(&dataflow_id)
1992            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1993        dataflow.drop_channels.remove(node_id);
1994        Ok(())
1995    }
1996
1997    async fn handle_node_stop(
1998        &mut self,
1999        dataflow_id: Uuid,
2000        node_id: &NodeId,
2001        dynamic_node: bool,
2002    ) -> eyre::Result<()> {
2003        let result = self
2004            .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
2005            .await;
2006        let _ = self
2007            .events_tx
2008            .send(Timestamped {
2009                inner: Event::NodeStopped {
2010                    dataflow_id,
2011                    node_id: node_id.clone(),
2012                },
2013                timestamp: self.clock.new_timestamp(),
2014            })
2015            .await;
2016        result
2017    }
2018
2019    async fn handle_node_stop_inner(
2020        &mut self,
2021        dataflow_id: Uuid,
2022        node_id: &NodeId,
2023        dynamic_node: bool,
2024    ) -> eyre::Result<()> {
2025        let mut logger = self.logger.for_dataflow(dataflow_id);
2026        let dataflow = match self.running.get_mut(&dataflow_id) {
2027            Some(dataflow) => dataflow,
2028            None if dynamic_node => {
2029                // The dataflow might be done already as we don't wait for dynamic nodes. In this
2030                // case, we don't need to do anything to handle the node stop.
2031                tracing::debug!(
2032                    "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
2033                );
2034                return Ok(());
2035            }
2036            None => eyre::bail!(
2037                "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2038            ),
2039        };
2040
2041        dataflow
2042            .pending_nodes
2043            .handle_node_stop(
2044                node_id,
2045                &mut self.coordinator_connection,
2046                &self.clock,
2047                &mut dataflow.cascading_error_causes,
2048                &mut logger,
2049            )
2050            .await?;
2051
2052        // node only reaches here if it will not be restarted
2053        let might_restart = false;
2054
2055        self.handle_outputs_done(dataflow_id, node_id, might_restart)
2056            .await?;
2057
2058        let mut logger = self.logger.for_dataflow(dataflow_id);
2059        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
2060            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
2061        })?;
2062        dataflow.running_nodes.remove(node_id);
2063        if !dataflow.pending_nodes.local_nodes_pending()
2064            && dataflow
2065                .running_nodes
2066                .iter()
2067                .all(|(_id, n)| n.node_config.dynamic)
2068        {
2069            let result = DataflowDaemonResult {
2070                timestamp: self.clock.new_timestamp(),
2071                node_results: self
2072                    .dataflow_node_results
2073                    .get(&dataflow.id)
2074                    .context("failed to get dataflow node results")?
2075                    .clone(),
2076            };
2077
2078            self.git_manager
2079                .clones_in_use
2080                .values_mut()
2081                .for_each(|dataflows| {
2082                    dataflows.remove(&dataflow_id);
2083                });
2084
2085            logger
2086                .log(
2087                    LogLevel::Info,
2088                    None,
2089                    Some("daemon".into()),
2090                    format!("dataflow finished on machine `{}`", self.daemon_id),
2091                )
2092                .await;
2093            if let Some(connection) = &mut self.coordinator_connection {
2094                let msg = serde_json::to_vec(&Timestamped {
2095                    inner: CoordinatorRequest::Event {
2096                        daemon_id: self.daemon_id.clone(),
2097                        event: DaemonEvent::AllNodesFinished {
2098                            dataflow_id,
2099                            result,
2100                        },
2101                    },
2102                    timestamp: self.clock.new_timestamp(),
2103                })?;
2104                socket_stream_send(connection, &msg)
2105                    .await
2106                    .wrap_err("failed to report dataflow finish to dora-coordinator")?;
2107            }
2108            self.running.remove(&dataflow_id);
2109        }
2110
2111        Ok(())
2112    }
2113
2114    async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
2115        match event {
2116            DoraEvent::Timer {
2117                dataflow_id,
2118                interval,
2119                metadata,
2120            } => {
2121                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2122                    tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
2123                    return Ok(());
2124                };
2125
2126                let Some(subscribers) = dataflow.timers.get(&interval) else {
2127                    return Ok(());
2128                };
2129
2130                let mut closed = Vec::new();
2131                for (receiver_id, input_id) in subscribers {
2132                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2133                        continue;
2134                    };
2135
2136                    let send_result = send_with_timestamp(
2137                        channel,
2138                        NodeEvent::Input {
2139                            id: input_id.clone(),
2140                            metadata: metadata.clone(),
2141                            data: None,
2142                        },
2143                        &self.clock,
2144                    );
2145                    match send_result {
2146                        Ok(()) => {}
2147                        Err(_) => {
2148                            closed.push(receiver_id);
2149                        }
2150                    }
2151                }
2152                for id in closed {
2153                    dataflow.subscribe_channels.remove(id);
2154                }
2155            }
2156            DoraEvent::Logs {
2157                dataflow_id,
2158                output_id,
2159                message,
2160                metadata,
2161            } => {
2162                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2163                    tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2164                    return Ok(());
2165                };
2166
2167                let Some(subscribers) = dataflow.mappings.get(&output_id) else {
2168                    tracing::warn!(
2169                        "No subscribers found for {:?} in {:?}",
2170                        output_id,
2171                        dataflow.mappings
2172                    );
2173                    return Ok(());
2174                };
2175
2176                let mut closed = Vec::new();
2177                for (receiver_id, input_id) in subscribers {
2178                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2179                        tracing::warn!("No subscriber channel found for {:?}", output_id);
2180                        continue;
2181                    };
2182
2183                    let send_result = send_with_timestamp(
2184                        channel,
2185                        NodeEvent::Input {
2186                            id: input_id.clone(),
2187                            metadata: metadata.clone(),
2188                            data: Some(message.clone()),
2189                        },
2190                        &self.clock,
2191                    );
2192                    match send_result {
2193                        Ok(()) => {}
2194                        Err(_) => {
2195                            closed.push(receiver_id);
2196                        }
2197                    }
2198                }
2199                for id in closed {
2200                    dataflow.subscribe_channels.remove(id);
2201                }
2202            }
2203            DoraEvent::SpawnedNodeResult {
2204                dataflow_id,
2205                node_id,
2206                dynamic_node,
2207                exit_status,
2208                restart,
2209            } => {
2210                let mut logger = self
2211                    .logger
2212                    .for_dataflow(dataflow_id)
2213                    .for_node(node_id.clone());
2214                logger
2215                    .log(
2216                        LogLevel::Debug,
2217                        Some("daemon".into()),
2218                        format!("handling node stop with exit status {exit_status:?} (restart: {restart})"),
2219                    )
2220                    .await;
2221
2222                let node_result = match exit_status {
2223                    NodeExitStatus::Success => Ok(()),
2224                    exit_status => {
2225                        let dataflow = self.running.get(&dataflow_id);
2226                        let caused_by_node = dataflow
2227                            .and_then(|dataflow| {
2228                                dataflow.cascading_error_causes.error_caused_by(&node_id)
2229                            })
2230                            .cloned();
2231                        let grace_duration_kill = dataflow
2232                            .map(|d| d.grace_duration_kills.contains(&node_id))
2233                            .unwrap_or_default();
2234
2235                        let cause = match caused_by_node {
2236                            Some(caused_by_node) => {
2237                                logger
2238                                    .log(
2239                                        LogLevel::Info,
2240                                        Some("daemon".into()),
2241                                        format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2242                                    )
2243                                    .await;
2244
2245                                NodeErrorCause::Cascading { caused_by_node }
2246                            }
2247                            None if grace_duration_kill => NodeErrorCause::GraceDuration,
2248                            None => {
2249                                let cause = dataflow
2250                                    .and_then(|d| d.node_stderr_most_recent.get(&node_id))
2251                                    .map(|queue| {
2252                                        let mut lines = Vec::new();
2253                                        if queue.is_full() {
2254                                            lines.push("[...]".into());
2255                                        }
2256                                        while let Some(line) = queue.pop() {
2257                                            lines.push(line);
2258                                        }
2259                                        lines
2260                                    })
2261                                    .map(extract_err_from_stderr)
2262                                    .unwrap_or_default();
2263
2264                                NodeErrorCause::Other { stderr: cause }
2265                            }
2266                        };
2267                        Err(NodeError {
2268                            timestamp: self.clock.new_timestamp(),
2269                            cause,
2270                            exit_status,
2271                        })
2272                    }
2273                };
2274
2275                logger
2276                    .log(
2277                        if node_result.is_ok() {
2278                            LogLevel::Info
2279                        } else {
2280                            LogLevel::Error
2281                        },
2282                        Some("daemon".into()),
2283                        match &node_result {
2284                            Ok(()) => format!("{node_id} finished successfully"),
2285                            Err(err) => format!("{err}"),
2286                        },
2287                    )
2288                    .await;
2289
2290                if restart {
2291                    logger
2292                        .log(
2293                            LogLevel::Info,
2294                            Some("daemon".into()),
2295                            "node will be restarted",
2296                        )
2297                        .await;
2298                } else {
2299                    self.dataflow_node_results
2300                        .entry(dataflow_id)
2301                        .or_default()
2302                        .insert(node_id.clone(), node_result);
2303
2304                    self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2305                        .await?;
2306                }
2307            }
2308        }
2309        Ok(())
2310    }
2311
2312    fn base_working_dir(
2313        &self,
2314        local_working_dir: Option<PathBuf>,
2315        session_id: SessionId,
2316    ) -> eyre::Result<PathBuf> {
2317        match local_working_dir {
2318            Some(working_dir) => {
2319                // check that working directory exists
2320                if working_dir.exists() {
2321                    Ok(working_dir)
2322                } else {
2323                    bail!(
2324                        "working directory does not exist: {}",
2325                        working_dir.display(),
2326                    )
2327                }
2328            }
2329            None => {
2330                // use subfolder of daemon working dir
2331                let daemon_working_dir =
2332                    current_dir().context("failed to get daemon working dir")?;
2333                Ok(daemon_working_dir
2334                    .join("_work")
2335                    .join(session_id.uuid().to_string()))
2336            }
2337        }
2338    }
2339}
2340
2341async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result<Vec<u8>> {
2342    let mut pos = file.seek(io::SeekFrom::End(0)).await?;
2343
2344    let mut output = VecDeque::<u8>::new();
2345    let mut extend_slice_to_start = |slice: &[u8]| {
2346        output.extend(slice);
2347        output.rotate_right(slice.len());
2348    };
2349
2350    let mut buffer = vec![0; 2048];
2351    let mut estimated_line_length = 0;
2352    let mut at_end = true;
2353    'main: while tail > 0 && pos > 0 {
2354        let new_pos = pos.saturating_sub(buffer.len() as u64);
2355        file.seek(io::SeekFrom::Start(new_pos)).await?;
2356        let read_len = (pos - new_pos) as usize;
2357        pos = new_pos;
2358
2359        file.read_exact(&mut buffer[..read_len]).await?;
2360        let read_buf = if at_end {
2361            at_end = false;
2362            &buffer[..read_len].trim_ascii_end()
2363        } else {
2364            &buffer[..read_len]
2365        };
2366
2367        let mut iter = memchr::memrchr_iter(b'\n', read_buf);
2368        let mut lines = 1;
2369        loop {
2370            let Some(pos) = iter.next() else {
2371                extend_slice_to_start(read_buf);
2372                break;
2373            };
2374            lines += 1;
2375            tail -= 1;
2376            if tail == 0 {
2377                extend_slice_to_start(&read_buf[(pos + 1)..]);
2378                break 'main;
2379            }
2380        }
2381
2382        estimated_line_length = estimated_line_length.max((read_buf.len() + 1).div_ceil(lines));
2383        let estimated_buffer_length = estimated_line_length * tail;
2384        if estimated_buffer_length >= buffer.len() * 2 {
2385            buffer.resize(buffer.len() * 2, 0);
2386        }
2387    }
2388
2389    Ok(output.into())
2390}
2391
2392async fn set_up_event_stream(
2393    coordinator_addr: SocketAddr,
2394    machine_id: &Option<String>,
2395    clock: &Arc<HLC>,
2396    remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2397    // used for dynamic nodes
2398    local_listen_port: u16,
2399) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
2400    let clock_cloned = clock.clone();
2401    let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2402        Ok(e) => Timestamped {
2403            inner: Event::Daemon(e.inner),
2404            timestamp: e.timestamp,
2405        },
2406        Err(err) => Timestamped {
2407            inner: Event::DaemonError(err),
2408            timestamp: clock_cloned.new_timestamp(),
2409        },
2410    });
2411    let (daemon_id, coordinator_events) =
2412        coordinator::register(coordinator_addr, machine_id.clone(), clock)
2413            .await
2414            .wrap_err("failed to connect to dora-coordinator")?;
2415    let coordinator_events = coordinator_events.map(
2416        |Timestamped {
2417             inner: event,
2418             timestamp,
2419         }| Timestamped {
2420            inner: Event::Coordinator(event),
2421            timestamp,
2422        },
2423    );
2424    let (events_tx, events_rx) = flume::bounded(10);
2425    let _listen_port =
2426        local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2427            .await?;
2428    let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2429        inner: Event::DynamicNode(e.inner),
2430        timestamp: e.timestamp,
2431    });
2432    let incoming = (
2433        coordinator_events,
2434        remote_daemon_events,
2435        dynamic_node_events,
2436    )
2437        .merge();
2438    Ok((daemon_id, incoming))
2439}
2440
2441async fn send_output_to_local_receivers(
2442    node_id: NodeId,
2443    output_id: DataId,
2444    dataflow: &mut RunningDataflow,
2445    metadata: &metadata::Metadata,
2446    data: Option<DataMessage>,
2447    clock: &HLC,
2448) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2449    let timestamp = metadata.timestamp();
2450    let empty_set = BTreeSet::new();
2451    let output_id = OutputId(node_id, output_id);
2452    let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2453    let OutputId(node_id, _) = output_id;
2454    let mut closed = Vec::new();
2455    for (receiver_id, input_id) in local_receivers {
2456        if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2457            let item = NodeEvent::Input {
2458                id: input_id.clone(),
2459                metadata: metadata.clone(),
2460                data: data.clone(),
2461            };
2462            match channel.send(Timestamped {
2463                inner: item,
2464                timestamp,
2465            }) {
2466                Ok(()) => {
2467                    if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2468                        dataflow
2469                            .pending_drop_tokens
2470                            .entry(token)
2471                            .or_insert_with(|| DropTokenInformation {
2472                                owner: node_id.clone(),
2473                                pending_nodes: Default::default(),
2474                            })
2475                            .pending_nodes
2476                            .insert(receiver_id.clone());
2477                    }
2478                }
2479                Err(_) => {
2480                    closed.push(receiver_id);
2481                }
2482            }
2483        }
2484    }
2485    for id in closed {
2486        dataflow.subscribe_channels.remove(id);
2487    }
2488    let (data_bytes, drop_token) = match data {
2489        None => (None, None),
2490        Some(DataMessage::SharedMemory {
2491            shared_memory_id,
2492            len,
2493            drop_token,
2494        }) => {
2495            let memory = ShmemConf::new()
2496                .os_id(shared_memory_id)
2497                .open()
2498                .wrap_err("failed to map shared memory output")?;
2499            let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2500            (data, Some(drop_token))
2501        }
2502        Some(DataMessage::Vec(v)) => (Some(v), None),
2503    };
2504    if let Some(token) = drop_token {
2505        // insert token into `pending_drop_tokens` even if there are no local subscribers
2506        dataflow
2507            .pending_drop_tokens
2508            .entry(token)
2509            .or_insert_with(|| DropTokenInformation {
2510                owner: node_id.clone(),
2511                pending_nodes: Default::default(),
2512            });
2513        // check if all local subscribers are finished with the token
2514        dataflow.check_drop_token(token, clock).await?;
2515    }
2516    Ok(data_bytes)
2517}
2518
2519fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2520    match &node.kind {
2521        CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2522        CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2523    }
2524}
2525
2526fn close_input(
2527    dataflow: &mut RunningDataflow,
2528    receiver_id: &NodeId,
2529    input_id: &DataId,
2530    clock: &HLC,
2531) {
2532    if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2533        if !open_inputs.remove(input_id) {
2534            return;
2535        }
2536    }
2537    if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2538        let _ = send_with_timestamp(
2539            channel,
2540            NodeEvent::InputClosed {
2541                id: input_id.clone(),
2542            },
2543            clock,
2544        );
2545
2546        if dataflow.open_inputs(receiver_id).is_empty() {
2547            if let Some(node) = dataflow.running_nodes.get_mut(receiver_id) {
2548                node.disable_restart();
2549            }
2550            let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2551        }
2552    }
2553}
2554
2555#[derive(Debug)]
2556pub struct RunningNode {
2557    process: Option<ProcessHandle>,
2558    node_config: NodeConfig,
2559    pid: Option<Arc<AtomicU32>>,
2560    restart_policy: RestartPolicy,
2561    /// Don't restart the node even if the restart policy says so.
2562    ///
2563    /// This flag is set when all inputs of the node were closed and when a manual stop command
2564    /// was sent.
2565    disable_restart: Arc<AtomicBool>,
2566}
2567
2568impl RunningNode {
2569    pub fn restarts_disabled(&self) -> bool {
2570        self.disable_restart.load(atomic::Ordering::Acquire)
2571    }
2572
2573    pub fn disable_restart(&mut self) {
2574        self.disable_restart.store(true, atomic::Ordering::Release);
2575    }
2576}
2577
2578#[derive(Debug)]
2579enum ProcessOperation {
2580    SoftKill,
2581    Kill,
2582}
2583
2584impl ProcessOperation {
2585    pub fn execute(&self, child: &mut dyn TokioChildWrapper) {
2586        match self {
2587            Self::SoftKill => {
2588                #[cfg(unix)]
2589                {
2590                    // Send SIGTERM
2591                    if let Err(err) = child.signal(15) {
2592                        warn!("failed to send SIGTERM to process {:?}: {err}", child.id());
2593                    }
2594                }
2595
2596                #[cfg(windows)]
2597                unsafe {
2598                    let Some(pid) = child.id() else {
2599                        warn!("failed to get child process id");
2600                        return;
2601                    };
2602                    if let Err(err) = windows::Win32::System::Console::GenerateConsoleCtrlEvent(
2603                        windows::Win32::System::Console::CTRL_BREAK_EVENT,
2604                        pid,
2605                    ) {
2606                        warn!("failed to send CTRL_BREAK_EVENT to process {pid}: {err}");
2607                    }
2608                }
2609
2610                #[cfg(not(any(unix, windows)))]
2611                {
2612                    warn!("killing process is not implemented on this platform");
2613                }
2614            }
2615            Self::Kill => {
2616                if let Err(err) = child.start_kill() {
2617                    warn!("failed to kill child process: {err}");
2618                }
2619            }
2620        }
2621    }
2622}
2623
2624#[derive(Debug)]
2625struct ProcessHandle {
2626    op_tx: flume::Sender<ProcessOperation>,
2627}
2628
2629impl ProcessHandle {
2630    pub fn new(op_tx: flume::Sender<ProcessOperation>) -> Self {
2631        Self { op_tx }
2632    }
2633
2634    /// Returns true if the process is not finished yet and the operation is
2635    /// delivered.
2636    pub fn submit(&self, operation: ProcessOperation) -> bool {
2637        self.op_tx.send(operation).is_ok()
2638    }
2639}
2640
2641impl Drop for ProcessHandle {
2642    fn drop(&mut self) {
2643        if self.submit(ProcessOperation::Kill) {
2644            warn!("process was killed on drop because it was still running");
2645        }
2646    }
2647}
2648
2649pub struct RunningDataflow {
2650    id: Uuid,
2651
2652    descriptor: Descriptor,
2653
2654    /// Local nodes that are not started yet
2655    pending_nodes: PendingNodes,
2656
2657    dataflow_started: bool,
2658
2659    subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2660    drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2661    mappings: HashMap<OutputId, BTreeSet<InputId>>,
2662    timers: BTreeMap<Duration, BTreeSet<InputId>>,
2663    open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2664    running_nodes: BTreeMap<NodeId, RunningNode>,
2665
2666    /// List of all dynamic node IDs.
2667    ///
2668    /// We want to treat dynamic nodes differently in some cases, so we need
2669    /// to know which nodes are dynamic.
2670    dynamic_nodes: BTreeSet<NodeId>,
2671
2672    open_external_mappings: BTreeSet<OutputId>,
2673
2674    pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
2675
2676    /// Keep handles to all timer tasks of this dataflow to cancel them on drop.
2677    _timer_handles: BTreeMap<Duration, futures::future::RemoteHandle<()>>,
2678    stop_sent: bool,
2679
2680    /// Used in `open_inputs`.
2681    ///
2682    /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
2683    empty_set: BTreeSet<DataId>,
2684
2685    /// Contains the node that caused the error for nodes that experienced a cascading error.
2686    cascading_error_causes: CascadingErrorCauses,
2687    grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
2688
2689    node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
2690
2691    publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
2692
2693    finished_tx: broadcast::Sender<()>,
2694
2695    publish_all_messages_to_zenoh: bool,
2696}
2697
2698impl RunningDataflow {
2699    fn new(
2700        dataflow_id: Uuid,
2701        daemon_id: DaemonId,
2702        dataflow_descriptor: Descriptor,
2703    ) -> RunningDataflow {
2704        let (finished_tx, _) = broadcast::channel(1);
2705        Self {
2706            id: dataflow_id,
2707            pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
2708            dataflow_started: false,
2709            subscribe_channels: HashMap::new(),
2710            drop_channels: HashMap::new(),
2711            mappings: HashMap::new(),
2712            timers: BTreeMap::new(),
2713            open_inputs: BTreeMap::new(),
2714            running_nodes: BTreeMap::new(),
2715            dynamic_nodes: BTreeSet::new(),
2716            open_external_mappings: Default::default(),
2717            pending_drop_tokens: HashMap::new(),
2718            _timer_handles: BTreeMap::new(),
2719            stop_sent: false,
2720            empty_set: BTreeSet::new(),
2721            cascading_error_causes: Default::default(),
2722            grace_duration_kills: Default::default(),
2723            node_stderr_most_recent: BTreeMap::new(),
2724            publishers: Default::default(),
2725            finished_tx,
2726            publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
2727            descriptor: dataflow_descriptor,
2728        }
2729    }
2730
2731    async fn start(
2732        &mut self,
2733        events_tx: &mpsc::Sender<Timestamped<Event>>,
2734        clock: &Arc<HLC>,
2735    ) -> eyre::Result<()> {
2736        for interval in self.timers.keys().copied() {
2737            if self._timer_handles.get(&interval).is_some() {
2738                continue;
2739            }
2740            let events_tx = events_tx.clone();
2741            let dataflow_id = self.id;
2742            let clock = clock.clone();
2743            let task = async move {
2744                let mut interval_stream = tokio::time::interval(interval);
2745                let hlc = HLC::default();
2746                loop {
2747                    interval_stream.tick().await;
2748
2749                    let span = tracing::span!(tracing::Level::TRACE, "tick");
2750                    let _ = span.enter();
2751
2752                    let mut parameters = BTreeMap::new();
2753                    parameters.insert(
2754                        "open_telemetry_context".to_string(),
2755                        #[cfg(feature = "telemetry")]
2756                        Parameter::String(serialize_context(&span.context())),
2757                        #[cfg(not(feature = "telemetry"))]
2758                        Parameter::String("".into()),
2759                    );
2760
2761                    let metadata = metadata::Metadata::from_parameters(
2762                        hlc.new_timestamp(),
2763                        empty_type_info(),
2764                        parameters,
2765                    );
2766
2767                    let event = Timestamped {
2768                        inner: DoraEvent::Timer {
2769                            dataflow_id,
2770                            interval,
2771                            metadata,
2772                        }
2773                        .into(),
2774                        timestamp: clock.new_timestamp(),
2775                    };
2776                    if events_tx.send(event).await.is_err() {
2777                        break;
2778                    }
2779                }
2780            };
2781            let (task, handle) = task.remote_handle();
2782            tokio::spawn(task);
2783            self._timer_handles.insert(interval, handle);
2784        }
2785
2786        Ok(())
2787    }
2788
2789    async fn stop_all(
2790        &mut self,
2791        coordinator_connection: &mut Option<TcpStream>,
2792        clock: &HLC,
2793        grace_duration: Option<Duration>,
2794        force: bool,
2795        logger: &mut DataflowLogger<'_>,
2796    ) -> eyre::Result<()> {
2797        self.pending_nodes
2798            .handle_dataflow_stop(
2799                coordinator_connection,
2800                clock,
2801                &mut self.cascading_error_causes,
2802                &self.dynamic_nodes,
2803                logger,
2804            )
2805            .await?;
2806
2807        for node in self.running_nodes.values_mut() {
2808            node.disable_restart();
2809        }
2810
2811        for (_node_id, channel) in self.subscribe_channels.drain() {
2812            let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
2813        }
2814
2815        let running_processes: Vec<_> = self
2816            .running_nodes
2817            .iter_mut()
2818            .map(|(id, n)| (id.clone(), n.process.take()))
2819            .collect();
2820        if force {
2821            for (_, proc) in &running_processes {
2822                if let Some(proc) = proc {
2823                    proc.submit(crate::ProcessOperation::Kill);
2824                }
2825            }
2826        } else {
2827            let grace_duration_kills = self.grace_duration_kills.clone();
2828            tokio::spawn(async move {
2829                let duration = grace_duration.unwrap_or(Duration::from_millis(10000));
2830                tokio::time::sleep(duration).await;
2831
2832                for (node, proc) in &running_processes {
2833                    if let Some(proc) = proc {
2834                        if proc.submit(crate::ProcessOperation::SoftKill) {
2835                            grace_duration_kills.insert(node.clone());
2836                        }
2837                    }
2838                }
2839
2840                let kill_duration = duration / 2;
2841                tokio::time::sleep(kill_duration).await;
2842
2843                for (node, proc) in &running_processes {
2844                    if let Some(proc) = proc {
2845                        if proc.submit(crate::ProcessOperation::Kill) {
2846                            grace_duration_kills.insert(node.clone());
2847                            warn!(
2848                                "{node} was killed due to not stopping within the {:#?} grace period",
2849                                duration + kill_duration
2850                            );
2851                        }
2852                    }
2853                }
2854            });
2855        }
2856        self.stop_sent = true;
2857        Ok(())
2858    }
2859
2860    fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2861        self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2862    }
2863
2864    async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2865        match self.pending_drop_tokens.entry(token) {
2866            std::collections::hash_map::Entry::Occupied(entry) => {
2867                if entry.get().pending_nodes.is_empty() {
2868                    let (drop_token, info) = entry.remove_entry();
2869                    let result = match self.drop_channels.get_mut(&info.owner) {
2870                        Some(channel) => send_with_timestamp(
2871                            channel,
2872                            NodeDropEvent::OutputDropped { drop_token },
2873                            clock,
2874                        )
2875                        .wrap_err("send failed"),
2876                        None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2877                    };
2878                    if let Err(err) = result.wrap_err_with(|| {
2879                        format!(
2880                            "failed to report drop token `{drop_token:?}` to owner `{}`",
2881                            &info.owner
2882                        )
2883                    }) {
2884                        tracing::warn!("{err:?}");
2885                    }
2886                }
2887            }
2888            std::collections::hash_map::Entry::Vacant(_) => {
2889                tracing::warn!("check_drop_token called with already closed token")
2890            }
2891        }
2892
2893        Ok(())
2894    }
2895}
2896
2897fn empty_type_info() -> ArrowTypeInfo {
2898    ArrowTypeInfo {
2899        data_type: DataType::Null,
2900        len: 0,
2901        null_count: 0,
2902        validity: None,
2903        offset: 0,
2904        buffer_offsets: Vec::new(),
2905        child_data: Vec::new(),
2906    }
2907}
2908
2909#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2910pub struct OutputId(NodeId, DataId);
2911type InputId = (NodeId, DataId);
2912
2913struct DropTokenInformation {
2914    /// The node that created the associated drop token.
2915    owner: NodeId,
2916    /// Contains the set of pending nodes that still have access to the input
2917    /// associated with a drop token.
2918    pending_nodes: BTreeSet<NodeId>,
2919}
2920
2921#[derive(Debug)]
2922pub enum Event {
2923    Node {
2924        dataflow_id: DataflowId,
2925        node_id: NodeId,
2926        event: DaemonNodeEvent,
2927    },
2928    Coordinator(CoordinatorEvent),
2929    Daemon(InterDaemonEvent),
2930    Dora(DoraEvent),
2931    DynamicNode(DynamicNodeEventWrapper),
2932    HeartbeatInterval,
2933    MetricsInterval,
2934    CtrlC,
2935    SecondCtrlC,
2936    DaemonError(eyre::Report),
2937    SpawnNodeResult {
2938        dataflow_id: DataflowId,
2939        node_id: NodeId,
2940        dynamic_node: bool,
2941        result: Result<RunningNode, NodeError>,
2942    },
2943    BuildDataflowResult {
2944        build_id: BuildId,
2945        session_id: SessionId,
2946        result: eyre::Result<BuildInfo>,
2947    },
2948    SpawnDataflowResult {
2949        dataflow_id: Uuid,
2950        result: eyre::Result<()>,
2951    },
2952    NodeStopped {
2953        dataflow_id: Uuid,
2954        node_id: NodeId,
2955    },
2956}
2957
2958impl From<DoraEvent> for Event {
2959    fn from(event: DoraEvent) -> Self {
2960        Event::Dora(event)
2961    }
2962}
2963
2964impl Event {
2965    pub fn kind(&self) -> &'static str {
2966        match self {
2967            Event::Node { .. } => "Node",
2968            Event::Coordinator(_) => "Coordinator",
2969            Event::Daemon(_) => "Daemon",
2970            Event::Dora(_) => "Dora",
2971            Event::DynamicNode(_) => "DynamicNode",
2972            Event::HeartbeatInterval => "HeartbeatInterval",
2973            Event::MetricsInterval => "MetricsInterval",
2974            Event::CtrlC => "CtrlC",
2975            Event::SecondCtrlC => "SecondCtrlC",
2976            Event::DaemonError(_) => "DaemonError",
2977            Event::SpawnNodeResult { .. } => "SpawnNodeResult",
2978            Event::BuildDataflowResult { .. } => "BuildDataflowResult",
2979            Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
2980            Event::NodeStopped { .. } => "NodeStopped",
2981        }
2982    }
2983}
2984
2985#[derive(Debug)]
2986#[allow(clippy::large_enum_variant)]
2987pub enum DaemonNodeEvent {
2988    OutputsDone {
2989        reply_sender: oneshot::Sender<DaemonReply>,
2990    },
2991    Subscribe {
2992        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2993        reply_sender: oneshot::Sender<DaemonReply>,
2994    },
2995    SubscribeDrop {
2996        event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2997        reply_sender: oneshot::Sender<DaemonReply>,
2998    },
2999    CloseOutputs {
3000        outputs: Vec<dora_core::config::DataId>,
3001        reply_sender: oneshot::Sender<DaemonReply>,
3002    },
3003    SendOut {
3004        output_id: DataId,
3005        metadata: metadata::Metadata,
3006        data: Option<DataMessage>,
3007    },
3008    ReportDrop {
3009        tokens: Vec<DropToken>,
3010    },
3011    EventStreamDropped {
3012        reply_sender: oneshot::Sender<DaemonReply>,
3013    },
3014}
3015
3016#[derive(Debug)]
3017pub enum DoraEvent {
3018    Timer {
3019        dataflow_id: DataflowId,
3020        interval: Duration,
3021        metadata: metadata::Metadata,
3022    },
3023    Logs {
3024        dataflow_id: DataflowId,
3025        output_id: OutputId,
3026        message: DataMessage,
3027        metadata: metadata::Metadata,
3028    },
3029    SpawnedNodeResult {
3030        dataflow_id: DataflowId,
3031        node_id: NodeId,
3032        dynamic_node: bool,
3033        exit_status: NodeExitStatus,
3034        /// Whether the node will be restarted
3035        restart: bool,
3036    },
3037}
3038
3039#[must_use]
3040enum RunStatus {
3041    Continue,
3042    Exit,
3043}
3044
3045fn send_with_timestamp<T>(
3046    sender: &UnboundedSender<Timestamped<T>>,
3047    event: T,
3048    clock: &HLC,
3049) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
3050    sender.send(Timestamped {
3051        inner: event,
3052        timestamp: clock.new_timestamp(),
3053    })
3054}
3055
3056fn set_up_ctrlc_handler(
3057    clock: Arc<HLC>,
3058) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
3059    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
3060
3061    let mut ctrlc_sent = 0;
3062    ctrlc::set_handler(move || {
3063        let event = match ctrlc_sent {
3064            0 => Event::CtrlC,
3065            1 => Event::SecondCtrlC,
3066            _ => {
3067                tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
3068                std::process::abort();
3069            }
3070        };
3071        if ctrlc_tx
3072            .blocking_send(Timestamped {
3073                inner: event,
3074                timestamp: clock.new_timestamp(),
3075            })
3076            .is_err()
3077        {
3078            tracing::error!("failed to report ctrl-c event to dora-coordinator");
3079        }
3080
3081        ctrlc_sent += 1;
3082    })
3083    .wrap_err("failed to set ctrl-c handler")?;
3084
3085    Ok(ctrlc_rx)
3086}
3087
3088#[derive(Debug, Default, Clone, PartialEq, Eq)]
3089pub struct CascadingErrorCauses {
3090    caused_by: BTreeMap<NodeId, NodeId>,
3091}
3092
3093impl CascadingErrorCauses {
3094    pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
3095        self.caused_by.contains_key(node)
3096    }
3097
3098    /// Return the ID of the node that caused a cascading error for the given node, if any.
3099    pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
3100        self.caused_by.get(node)
3101    }
3102
3103    pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
3104        self.caused_by.entry(affected_node).or_insert(causing_node);
3105    }
3106}
3107
3108fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
3109    n.operators
3110        .iter()
3111        .flat_map(|operator| {
3112            operator.config.inputs.iter().map(|(input_id, mapping)| {
3113                (
3114                    DataId::from(format!("{}/{input_id}", operator.id)),
3115                    mapping.clone(),
3116                )
3117            })
3118        })
3119        .collect()
3120}
3121
3122fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
3123    n.operators
3124        .iter()
3125        .flat_map(|operator| {
3126            operator
3127                .config
3128                .outputs
3129                .iter()
3130                .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
3131        })
3132        .collect()
3133}
3134
3135trait CoreNodeKindExt {
3136    fn run_config(&self) -> NodeRunConfig;
3137    fn dynamic(&self) -> bool;
3138}
3139
3140impl CoreNodeKindExt for CoreNodeKind {
3141    fn run_config(&self) -> NodeRunConfig {
3142        match self {
3143            CoreNodeKind::Runtime(n) => NodeRunConfig {
3144                inputs: runtime_node_inputs(n),
3145                outputs: runtime_node_outputs(n),
3146            },
3147            CoreNodeKind::Custom(n) => n.run_config.clone(),
3148        }
3149    }
3150
3151    fn dynamic(&self) -> bool {
3152        match self {
3153            CoreNodeKind::Runtime(_n) => false,
3154            CoreNodeKind::Custom(n) => {
3155                matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
3156            }
3157        }
3158    }
3159}