Skip to main content

dora_daemon/
lib.rs

1use aligned_vec::{AVec, ConstAlign};
2use crossbeam::queue::ArrayQueue;
3use dora_core::{
4    build::{self, BuildInfo, PrevGitSource, TracingBuildLogger},
5    config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
6    descriptor::{
7        CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
8        read_as_descriptor,
9    },
10    topics::{
11        DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST, open_zenoh_session,
12        zenoh_output_publish_topic,
13    },
14    uhlc::HLC,
15};
16use dora_message::{
17    BuildId, DataflowId, SessionId,
18    common::{
19        DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
20        NodeExitStatus,
21    },
22    coordinator_to_cli::DataflowResult,
23    coordinator_to_daemon::SpawnDataflowNodes,
24    daemon_to_coordinator::DataflowDaemonResult,
25    daemon_to_daemon::InterDaemonEvent,
26    daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
27    descriptor::{NodeSource, RestartPolicy},
28    metadata::{self, ArrowTypeInfo},
29    node_to_daemon::{DynamicNodeEvent, Timestamped},
30    tarpc,
31};
32use dora_node_api::{Parameter, arrow::datatypes::DataType};
33use eyre::{Context, ContextCompat, Result, bail, eyre};
34use futures::{FutureExt, TryFutureExt, future, stream};
35use futures_concurrency::stream::Merge;
36use local_listener::DynamicNodeEventWrapper;
37use log::{DaemonLogger, DataflowLogger, Logger};
38use pending::PendingNodes;
39use process_wrap::tokio::TokioChildWrapper;
40use shared_memory_extended::ShmemConf;
41use spawn::Spawner;
42use std::{
43    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
44    env::current_dir,
45    future::Future,
46    io,
47    net::SocketAddr,
48    path::{Path, PathBuf},
49    pin::pin,
50    sync::{
51        Arc,
52        atomic::{self, AtomicBool, AtomicU32},
53    },
54    time::{Duration, Instant},
55};
56use tokio::{
57    fs::File,
58    io::{AsyncReadExt, AsyncSeekExt},
59    net::TcpStream,
60    sync::{
61        broadcast,
62        mpsc::{self, UnboundedSender},
63        oneshot::{self, Sender},
64    },
65};
66use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
67use tracing::{error, warn};
68use uuid::{NoContext, Timestamp, Uuid};
69
70pub use flume;
71pub use log::LogDestination;
72
73mod coordinator;
74mod extract_err_from_stderr;
75mod local_listener;
76mod log;
77mod node_communication;
78mod pending;
79mod socket_stream_utils;
80mod spawn;
81pub(crate) mod state;
82
83#[cfg(feature = "telemetry")]
84use dora_tracing::telemetry::serialize_context;
85#[cfg(feature = "telemetry")]
86use tracing_opentelemetry::OpenTelemetrySpanExt;
87
88use crate::{extract_err_from_stderr::extract_err_from_stderr, pending::DataflowStatus};
89
90const STDERR_LOG_LINES_MAX: usize = 500;
91
92static EMPTY_SET: BTreeSet<DataId> = BTreeSet::new();
93
94pub struct Daemon {
95    pub(crate) state: Arc<state::DaemonState>,
96
97    /// used for testing and examples
98    exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
99    /// set on ctrl-c
100    exit_when_all_finished: bool,
101
102    logger: DaemonLogger,
103
104    /// System instance for metrics collection (reused across calls)
105    metrics_system: sysinfo::System,
106}
107
108type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
109
110struct NodeBuildTask<F> {
111    node_id: NodeId,
112    dynamic_node: bool,
113    task: F,
114}
115
116impl Daemon {
117    pub async fn run(
118        coordinator_addr: SocketAddr,
119        machine_id: Option<String>,
120        local_listen_port: u16,
121    ) -> eyre::Result<()> {
122        let clock = Arc::new(HLC::default());
123
124        let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
125        let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
126
127        // Use a large channel capacity to prevent deadlock
128        let (dora_events_tx, dora_events_rx) = mpsc::channel(1000);
129
130        let zenoh_session = open_zenoh_session(Some(coordinator_addr.ip()))
131            .await
132            .wrap_err("failed to open zenoh session")?;
133
134        // Build shared state early so the RPC server can use it.
135        // `daemon_id` and `coordinator_client` are set via OnceLock after registration.
136        let daemon_state = Arc::new(state::DaemonState::new(
137            clock.clone(),
138            dora_events_tx,
139            Some(zenoh_session),
140            Some(remote_daemon_events_tx),
141        ));
142
143        let ((daemon_id, coordinator_client), incoming_events) = {
144            let incoming_events = set_up_event_stream(
145                coordinator_addr,
146                &machine_id,
147                &clock,
148                daemon_state.clone(),
149                remote_daemon_events_rx,
150                local_listen_port,
151            );
152
153            // finish early if ctrl-c is pressed during event stream setup
154            let ctrl_c = pin!(ctrlc_events.recv());
155            match futures::future::select(ctrl_c, pin!(incoming_events)).await {
156                future::Either::Left((_ctrl_c, _)) => {
157                    tracing::info!("received ctrl-c signal -> stopping daemon");
158                    return Ok(());
159                }
160                future::Either::Right((events, _)) => events?,
161            }
162        };
163
164        daemon_state.set_daemon_id(daemon_id.clone());
165        daemon_state.set_coordinator_client(coordinator_client);
166
167        let log_destination = {
168            // additional connection for logging
169            let stream = TcpStream::connect(coordinator_addr)
170                .await
171                .wrap_err("failed to connect log to dora-coordinator")?;
172            stream
173                .set_nodelay(true)
174                .wrap_err("failed to set TCP_NODELAY")?;
175            LogDestination::Coordinator {
176                coordinator_connection: stream,
177            }
178        };
179
180        Self::run_general(
181            (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
182            daemon_id,
183            None,
184            daemon_state,
185            dora_events_rx,
186            log_destination,
187        )
188        .await
189        .map(|_| ())
190    }
191
192    pub async fn run_dataflow(
193        dataflow_path: &Path,
194        build_id: Option<BuildId>,
195        local_build: Option<BuildInfo>,
196        session_id: SessionId,
197        uv: bool,
198        log_destination: LogDestination,
199        write_events_to: Option<PathBuf>,
200        stop_after: Option<Duration>,
201    ) -> eyre::Result<DataflowResult> {
202        let working_dir = dataflow_path
203            .canonicalize()
204            .context("failed to canonicalize dataflow path")?
205            .parent()
206            .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
207            .to_owned();
208
209        let descriptor = read_as_descriptor(dataflow_path).await?;
210        if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
211            eyre::bail!(
212                "node {} has a `deploy` section, which is not supported in `dora run`\n\n
213                Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
214                instances and then use `dora start`.",
215                node.id
216            )
217        }
218
219        descriptor.check(&working_dir)?;
220        let nodes = descriptor.resolve_aliases_and_set_defaults()?;
221
222        let (events_tx, events_rx) = flume::bounded(10);
223        if nodes
224            .iter()
225            .find(|(_n, resolved_nodes)| resolved_nodes.kind.dynamic())
226            .is_some()
227        {
228            // Spawn local listener for dynamic nodes
229            let _listen_port = local_listener::spawn_listener_loop(
230                (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(),
231                events_tx,
232            )
233            .await?;
234        }
235        let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
236            inner: Event::DynamicNode(e.inner),
237            timestamp: e.timestamp,
238        });
239
240        let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
241        let spawn_command = SpawnDataflowNodes {
242            build_id,
243            session_id,
244            dataflow_id,
245            local_working_dir: Some(working_dir),
246            spawn_nodes: nodes.keys().cloned().collect(),
247            nodes,
248            dataflow_descriptor: descriptor,
249            uv,
250            write_events_to,
251        };
252
253        let clock = Arc::new(HLC::default());
254
255        let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
256
257        // Set up optional timeout for --stop-after
258        let timeout_events = if let Some(duration) = stop_after {
259            let clock = clock.clone();
260            let (tx, rx) = tokio::sync::mpsc::channel(1);
261            tokio::spawn(async move {
262                tokio::time::sleep(duration).await;
263                tracing::info!("stop-after timeout reached ({duration:?}) -> stopping dataflow");
264                let _ = tx
265                    .send(Timestamped {
266                        inner: Event::StopAfter(duration),
267                        timestamp: clock.new_timestamp(),
268                    })
269                    .await;
270            });
271            ReceiverStream::new(rx)
272        } else {
273            // Create an empty stream that never emits events
274            ReceiverStream::new(tokio::sync::mpsc::channel(1).1)
275        };
276
277        let all_nodes_dynamic = spawn_command.nodes.values().all(|n| n.kind.dynamic());
278        let exit_when_done: BTreeSet<(Uuid, NodeId)> = spawn_command
279            .nodes
280            .values()
281            .filter(|n| !n.kind.dynamic())
282            .map(|n| (spawn_command.dataflow_id, n.id.clone()))
283            .collect();
284
285        // In standalone mode (`dora run`), inject the spawn as a SpawnRequest event
286        // instead of going through the coordinator.
287        let (reply_tx, reply_rx) = oneshot::channel();
288        let spawn_event_clock = clock.clone();
289        let spawn_event = stream::once(async move {
290            Timestamped {
291                inner: Event::SpawnRequest {
292                    build_id: spawn_command.build_id,
293                    dataflow_id: spawn_command.dataflow_id,
294                    local_working_dir: spawn_command.local_working_dir,
295                    nodes: spawn_command.nodes,
296                    dataflow_descriptor: spawn_command.dataflow_descriptor,
297                    spawn_nodes: spawn_command.spawn_nodes,
298                    uv: spawn_command.uv,
299                    write_events_to: spawn_command.write_events_to,
300                    reply_tx,
301                },
302                timestamp: spawn_event_clock.new_timestamp(),
303            }
304        });
305        let events = (
306            spawn_event,
307            ctrlc_events,
308            timeout_events,
309            dynamic_node_events,
310        )
311            .merge();
312
313        let builds_map: BTreeMap<BuildId, BuildInfo> = if let Some(local_build) = local_build {
314            let Some(build_id) = build_id else {
315                bail!("no build_id, but local_build set")
316            };
317            let mut builds = BTreeMap::new();
318            builds.insert(build_id, local_build);
319            builds
320        } else {
321            Default::default()
322        };
323
324        let zenoh_session = open_zenoh_session(None)
325            .await
326            .wrap_err("failed to open zenoh session")?;
327        let daemon_id = DaemonId::new(None);
328        let (dora_events_tx, dora_events_rx) = mpsc::channel(1000);
329        let daemon_state = Arc::new(state::DaemonState::new_standalone(
330            clock.clone(),
331            daemon_id.clone(),
332            dora_events_tx,
333            zenoh_session,
334            builds_map,
335        ));
336
337        let run_result = Self::run_general(
338            Box::pin(events),
339            daemon_id,
340            Some(exit_when_done),
341            daemon_state,
342            dora_events_rx,
343            log_destination,
344        );
345
346        let spawn_result = reply_rx
347            .map_err(|err| eyre!("failed to receive spawn result: {err}"))
348            .and_then(|r| async { r.map_err(|err| eyre!(err)) });
349
350        let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
351
352        let node_results = match dataflow_results.remove(&dataflow_id) {
353            Some(results) => results,
354            None if all_nodes_dynamic => {
355                // All nodes are dynamic - they don't send SpawnedNodeResult events,
356                // so there are no node results to report. This is expected and means success.
357                BTreeMap::new()
358            }
359            None => {
360                return Err(eyre::eyre!("no node results for dataflow_id {dataflow_id}"));
361            }
362        };
363
364        Ok(DataflowResult {
365            uuid: dataflow_id,
366            timestamp: clock.new_timestamp(),
367            node_results,
368        })
369    }
370
371    /// Shared daemon run logic used by both `run` (with coordinator) and
372    /// `run_dataflow` (standalone).
373    #[allow(clippy::too_many_arguments)]
374    async fn run_general(
375        external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
376        daemon_id: DaemonId,
377        exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
378        daemon_state: Arc<state::DaemonState>,
379        dora_events_rx: mpsc::Receiver<Timestamped<Event>>,
380        log_destination: LogDestination,
381    ) -> eyre::Result<DaemonRunResult> {
382        let clock = daemon_state.clock.clone();
383        let daemon = Self {
384            logger: Logger {
385                destination: log_destination,
386                daemon_id: daemon_id.clone(),
387                clock: clock.clone(),
388            }
389            .for_daemon(daemon_id.clone()),
390            state: daemon_state,
391            exit_when_done,
392            exit_when_all_finished: false,
393            metrics_system: sysinfo::System::new(),
394        };
395
396        let dora_events = ReceiverStream::new(dora_events_rx);
397        let watchdog_clock = daemon.state.clock.clone();
398        let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
399            Duration::from_secs(5),
400        ))
401        .map(move |_| Timestamped {
402            inner: Event::HeartbeatInterval,
403            timestamp: watchdog_clock.new_timestamp(),
404        });
405
406        let metrics_clock = daemon.state.clock.clone();
407        let metrics_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
408            Duration::from_secs(2), // Collect metrics every 2 seconds
409        ))
410        .map(move |_| Timestamped {
411            inner: Event::MetricsInterval,
412            timestamp: metrics_clock.new_timestamp(),
413        });
414
415        let events = (
416            external_events,
417            dora_events,
418            watchdog_interval,
419            metrics_interval,
420        )
421            .merge();
422        daemon.run_inner(events).await
423    }
424
425    #[tracing::instrument(skip(incoming_events, self), fields(daemon_id = ?self.state.daemon_id()))]
426    async fn run_inner(
427        mut self,
428        incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
429    ) -> eyre::Result<DaemonRunResult> {
430        let mut events = incoming_events;
431
432        while let Some(event) = events.next().await {
433            let Timestamped { inner, timestamp } = event;
434            if let Err(err) = self.state.clock.update_with_timestamp(&timestamp) {
435                tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
436            }
437
438            // used below for checking the duration of event handling
439            let start = Instant::now();
440            let event_kind = inner.kind();
441
442            match inner {
443                Event::Daemon(event) => {
444                    self.handle_inter_daemon_event(event).await?;
445                }
446                Event::Node {
447                    dataflow_id: dataflow,
448                    node_id,
449                    event,
450                } => self.handle_node_event(event, dataflow, node_id).await?,
451                Event::Dora(event) => self.handle_dora_event(event).await?,
452                Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
453                Event::HeartbeatInterval => {
454                    if let Some(client) = self.state.coordinator_client() {
455                        // Fire-and-forget: notify the coordinator we're alive.
456                        // Don't block the event loop waiting for the RPC response.
457                        let client = client.clone();
458                        tokio::spawn(async move {
459                            let _ = client.heartbeat(tarpc::context::current()).await;
460                        });
461
462                        let last_hb = self.state.last_coordinator_heartbeat.lock().await;
463                        if last_hb.elapsed() > Duration::from_secs(20) {
464                            bail!("lost connection to coordinator")
465                        }
466                    }
467                }
468                Event::Destroy => {
469                    tracing::info!("received destroy command -> exiting");
470                    break;
471                }
472                Event::SpawnRequest {
473                    build_id,
474                    dataflow_id,
475                    local_working_dir,
476                    nodes,
477                    dataflow_descriptor,
478                    spawn_nodes,
479                    uv,
480                    write_events_to,
481                    reply_tx,
482                } => {
483                    let result = self
484                        .handle_spawn_request(
485                            build_id,
486                            dataflow_id,
487                            local_working_dir,
488                            nodes,
489                            dataflow_descriptor,
490                            spawn_nodes,
491                            uv,
492                            write_events_to,
493                        )
494                        .await;
495                    let _ = reply_tx.send(result);
496                }
497                Event::StopDataflowRequest {
498                    dataflow_id,
499                    grace_duration,
500                    force,
501                    reply_tx,
502                } => {
503                    let result = async {
504                        // First, handle pending nodes (may do RPC to coordinator)
505                        // while holding the DashMap guard briefly for extraction.
506                        {
507                            let mut logger = self.logger.for_dataflow(dataflow_id);
508                            let mut dataflow =
509                                self.state.running.get_mut(&dataflow_id).ok_or_else(|| {
510                                    format!("no running dataflow with ID `{dataflow_id}`")
511                                })?;
512                            let df = &mut *dataflow;
513                            df.pending_nodes
514                                .handle_dataflow_stop(
515                                    &self.state.coordinator_client().cloned(),
516                                    &self.state.clock,
517                                    &mut df.cascading_error_causes,
518                                    &df.dynamic_nodes,
519                                    &mut logger,
520                                )
521                                .await
522                                .map_err(|err| format!("{err:?}"))?;
523                        }
524                        // DashMap guard is dropped — safe to re-acquire.
525                        let finish_when = {
526                            let mut dataflow =
527                                self.state.running.get_mut(&dataflow_id).ok_or_else(|| {
528                                    format!("no running dataflow with ID `{dataflow_id}`")
529                                })?;
530                            dataflow
531                                .stop_all_after_pending(&self.state, grace_duration, force)
532                                .map_err(|err| format!("{err:?}"))?
533                        };
534                        // DashMap guard is dropped — safe to call finish_dataflow.
535                        if matches!(finish_when, FinishDataflowWhen::Now) {
536                            self.state
537                                .finish_dataflow(dataflow_id)
538                                .await
539                                .map_err(|err| format!("{err:?}"))?;
540                        }
541                        Ok(())
542                    }
543                    .await;
544                    let _ = reply_tx.send(result);
545                }
546                Event::MetricsInterval => {
547                    self.collect_and_send_metrics().await?;
548                }
549                Event::CtrlC => {
550                    tracing::info!("received ctrlc signal -> stopping all dataflows");
551                    self.trigger_manual_stop().await?;
552                    if self.state.running.is_empty() {
553                        break;
554                    }
555                }
556                Event::SecondCtrlC => {
557                    tracing::warn!("received second ctrlc signal -> exit immediately");
558                    bail!("received second ctrl-c signal");
559                }
560                Event::StopAfter(duration) => {
561                    tracing::info!("stopping after {duration:?} as requested");
562                    self.trigger_manual_stop().await?;
563                    if self.state.running.is_empty() {
564                        break;
565                    }
566                }
567                Event::DaemonError(err) => {
568                    bail!(err.wrap_err("fatal error from background task"));
569                }
570                Event::SpawnNodeResult {
571                    dataflow_id,
572                    node_id,
573                    dynamic_node,
574                    result,
575                } => match result {
576                    Ok(running_node) => {
577                        if let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) {
578                            if let Some(ref abort_handle) = running_node.listener_abort_handle {
579                                dataflow
580                                    ._listener_tasks
581                                    .push(ListenerTask(abort_handle.clone()));
582                            }
583                            dataflow.running_nodes.insert(node_id, running_node);
584                        } else {
585                            tracing::error!(
586                                "failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}"
587                            );
588                        }
589                    }
590                    Err(error) => {
591                        self.state
592                            .dataflow_node_results
593                            .entry(dataflow_id)
594                            .or_default()
595                            .insert(node_id.clone(), Err(error));
596                        self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
597                            .await?;
598                    }
599                },
600                Event::SpawnDataflowResult {
601                    dataflow_id,
602                    result,
603                } => {
604                    if let Some(client) = self.state.coordinator_client() {
605                        let client = client.clone();
606                        let result = result.map_err(|err| format!("{err:?}"));
607                        tokio::spawn(async move {
608                            if let Err(err) = client
609                                .spawn_result(tarpc::context::current(), dataflow_id, result)
610                                .await
611                            {
612                                tracing::error!(
613                                    ?err,
614                                    "failed to send spawn_result notification to coordinator"
615                                );
616                            }
617                        });
618                    }
619                }
620                Event::NodeStopped {
621                    dataflow_id,
622                    node_id,
623                } => {
624                    if let Some(exit_when_done) = &mut self.exit_when_done {
625                        exit_when_done.remove(&(dataflow_id, node_id));
626                        if exit_when_done.is_empty() {
627                            tracing::info!(
628                                "exiting daemon because all required dataflows are finished"
629                            );
630                            break;
631                        }
632                    }
633                    // Exit when all dataflows are finished after Ctrl-C
634                    if self.exit_when_all_finished && self.state.running.is_empty() {
635                        tracing::info!("exiting daemon because all dataflows are finished");
636                        break;
637                    }
638                }
639            }
640
641            // warn if event handling took too long -> the main loop should never be blocked for too long
642            let elapsed = start.elapsed();
643            if elapsed > Duration::from_millis(100) {
644                tracing::warn!(
645                    "Daemon took {}ms for handling event: {event_kind}",
646                    elapsed.as_millis()
647                );
648            }
649        }
650
651        if let Some(client) = self.state.coordinator_client() {
652            let ctx = tarpc::context::current();
653            if let Err(err) = client.daemon_exit(ctx).await {
654                tracing::error!(
655                    ?err,
656                    "failed to send daemon_exit notification to coordinator"
657                );
658            }
659        }
660
661        Ok(self
662            .state
663            .dataflow_node_results
664            .iter()
665            .map(|entry| (*entry.key(), entry.value().clone()))
666            .collect())
667    }
668
669    async fn trigger_manual_stop(&mut self) -> eyre::Result<()> {
670        // Collect dataflow IDs first, then process one at a time to avoid
671        // holding DashMap guards across `.await` points.
672        let dataflow_ids: Vec<DataflowId> = self
673            .state
674            .running
675            .iter()
676            .map(|entry| *entry.key())
677            .collect();
678
679        let mut dataflows_to_finish = Vec::new();
680        for dataflow_id in dataflow_ids {
681            if let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) {
682                let mut logger = self.logger.for_dataflow(dataflow_id);
683                let finish_when = dataflow
684                    .stop_all(&self.state, None, false, &mut logger)
685                    .await?;
686                if matches!(finish_when, FinishDataflowWhen::Now) {
687                    dataflows_to_finish.push(dataflow_id);
688                }
689            }
690        }
691
692        // DashMap guards are dropped — safe to call finish_dataflow.
693        for dataflow_id in dataflows_to_finish {
694            self.state.finish_dataflow(dataflow_id).await?;
695        }
696
697        self.exit_when_all_finished = true;
698        Ok(())
699    }
700
701    #[allow(clippy::too_many_arguments)]
702    async fn handle_spawn_request(
703        &mut self,
704        build_id: Option<BuildId>,
705        dataflow_id: DataflowId,
706        local_working_dir: Option<PathBuf>,
707        nodes: BTreeMap<NodeId, ResolvedNode>,
708        dataflow_descriptor: Descriptor,
709        spawn_nodes: BTreeSet<NodeId>,
710        uv: bool,
711        write_events_to: Option<PathBuf>,
712    ) -> Result<(), String> {
713        match dataflow_descriptor.communication.remote {
714            dora_core::config::RemoteCommunicationConfig::Tcp => {}
715        }
716
717        // Resolve base working dir — for spawn we use the daemon's working dir
718        let base_working_dir = match local_working_dir {
719            Some(working_dir) => {
720                if working_dir.exists() {
721                    Ok(working_dir)
722                } else {
723                    Err(format!(
724                        "working directory does not exist: {}",
725                        working_dir.display(),
726                    ))
727                }
728            }
729            None => {
730                let daemon_working_dir =
731                    current_dir().map_err(|e| format!("failed to get daemon working dir: {e}"))?;
732                Ok(daemon_working_dir)
733            }
734        }?;
735
736        let result = self
737            .spawn_dataflow(
738                build_id,
739                dataflow_id,
740                base_working_dir,
741                nodes,
742                dataflow_descriptor,
743                spawn_nodes,
744                uv,
745                write_events_to,
746            )
747            .await;
748        let (trigger_result, result_task) = match result {
749            Ok(result_task) => (Ok(()), Some(result_task)),
750            Err(err) => (Err(format!("{err:?}")), None),
751        };
752
753        // Spawn background task to report spawn completion to coordinator
754        if let Some(result_task) = result_task {
755            let state = self.state.clone();
756            tokio::spawn(async move {
757                let result = result_task.await;
758                let spawn_result = result
759                    .as_ref()
760                    .map(|_| ())
761                    .map_err(|err| format!("{err:?}"));
762
763                if let Some(client) = state.coordinator_client() {
764                    let ctx = tarpc::context::current();
765                    if let Err(err) = client.spawn_result(ctx, dataflow_id, spawn_result).await {
766                        tracing::error!(
767                            ?err,
768                            "failed to send spawn_result notification to coordinator"
769                        );
770                    }
771                }
772            });
773        }
774
775        trigger_result
776    }
777
778    async fn collect_and_send_metrics(&mut self) -> eyre::Result<()> {
779        use dora_message::daemon_to_coordinator::NodeMetrics;
780        use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate};
781
782        if self.state.coordinator_client().is_none() {
783            return Ok(());
784        }
785
786        // Reuse system instance for metrics collection
787        let system = &mut self.metrics_system;
788
789        // Metrics are collected every 2 seconds (metrics_interval)
790        const METRICS_INTERVAL_SECS: f64 = 2.0;
791
792        // Collect metrics for all running dataflows
793        for entry in self.state.running.iter() {
794            let (dataflow_id, dataflow) = (entry.key(), entry.value());
795            let mut metrics = BTreeMap::new();
796
797            // Collect all PIDs for this dataflow
798            let pids: Vec<Pid> = dataflow
799                .running_nodes
800                .values()
801                .filter_map(|node| {
802                    node.pid
803                        .as_ref()
804                        .map(|pid| Pid::from_u32(pid.load(atomic::Ordering::Acquire)))
805                })
806                .collect();
807
808            if !pids.is_empty() {
809                // Refresh process metrics (cpu, memory, disk)
810                let refresh_kind = ProcessRefreshKind::nothing()
811                    .with_cpu()
812                    .with_memory()
813                    .with_disk_usage();
814                system.refresh_processes_specifics(
815                    ProcessesToUpdate::Some(&pids),
816                    true,
817                    refresh_kind,
818                );
819
820                // Collect metrics for each node
821                for (node_id, running_node) in &dataflow.running_nodes {
822                    if let Some(pid) = running_node.pid.as_ref() {
823                        let pid = pid.load(atomic::Ordering::Acquire);
824                        let sys_pid = Pid::from_u32(pid);
825                        if let Some(process) = system.process(sys_pid) {
826                            let disk_usage = process.disk_usage();
827                            // Divide by metrics_interval to get per-second averages
828                            metrics.insert(
829                                node_id.clone(),
830                                NodeMetrics {
831                                    pid,
832                                    cpu_usage: process.cpu_usage(),
833                                    memory_bytes: process.memory(),
834                                    disk_read_bytes: Some(
835                                        (disk_usage.read_bytes as f64 / METRICS_INTERVAL_SECS)
836                                            as u64,
837                                    ),
838                                    disk_write_bytes: Some(
839                                        (disk_usage.written_bytes as f64 / METRICS_INTERVAL_SECS)
840                                            as u64,
841                                    ),
842                                },
843                            );
844                        }
845                    }
846                }
847            }
848
849            // Send metrics to coordinator if we have any (fire-and-forget).
850            if !metrics.is_empty() {
851                if let Some(client) = self.state.coordinator_client() {
852                    let client = client.clone();
853                    let dataflow_id = *dataflow_id;
854                    tokio::spawn(async move {
855                        let _ = client
856                            .node_metrics(tarpc::context::current(), dataflow_id, metrics)
857                            .await;
858                    });
859                }
860            }
861        }
862
863        Ok(())
864    }
865
866    async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
867        match event {
868            InterDaemonEvent::Output {
869                dataflow_id,
870                node_id,
871                output_id,
872                metadata,
873                data,
874            } => {
875                let inner = async {
876                    let mut dataflow =
877                        self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
878                            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
879                        })?;
880                    send_output_to_local_receivers(
881                        node_id.clone(),
882                        output_id.clone(),
883                        &mut *dataflow,
884                        &metadata,
885                        data.map(DataMessage::Vec),
886                        &self.state.clock,
887                    )
888                    .await?;
889                    Result::<_, eyre::Report>::Ok(())
890                };
891                if let Err(err) = inner
892                    .await
893                    .wrap_err("failed to forward remote output to local receivers")
894                {
895                    let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
896                    logger
897                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
898                        .await;
899                }
900                Ok(())
901            }
902            InterDaemonEvent::OutputClosed {
903                dataflow_id,
904                node_id,
905                output_id,
906            } => {
907                let output_id = OutputId(node_id.clone(), output_id);
908                let mut logger = self
909                    .logger
910                    .for_dataflow(dataflow_id)
911                    .for_node(node_id.clone());
912                logger
913                    .log(
914                        LogLevel::Debug,
915                        Some("daemon".into()),
916                        format!("received OutputClosed event for output {output_id:?}"),
917                    )
918                    .await;
919
920                let result = match self.state.running.get_mut(&dataflow_id) {
921                    Some(mut dataflow) => {
922                        if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
923                            for (receiver_id, input_id) in &inputs {
924                                close_input(
925                                    &mut *dataflow,
926                                    receiver_id,
927                                    input_id,
928                                    &self.state.clock,
929                                );
930                            }
931                        }
932                        Ok(())
933                    }
934                    None => Err(eyre::eyre!(
935                        "send out failed: no running dataflow with ID `{dataflow_id}`"
936                    )),
937                };
938                if let Err(err) =
939                    result.wrap_err("failed to handle InputsClosed event sent by coordinator")
940                {
941                    logger
942                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
943                        .await;
944                }
945                Ok(())
946            }
947            InterDaemonEvent::NodeFailed {
948                dataflow_id,
949                source_node_id,
950                error,
951            } => {
952                tracing::debug!(
953                    "received NodeFailed event from {source_node_id} for dataflow {dataflow_id}"
954                );
955
956                let result = match self.state.running.get_mut(&dataflow_id) {
957                    Some(mut dataflow) => {
958                        // Deduplicate: the sending daemon publishes one NodeFailed per output topic,
959                        // so we may receive the same event multiple times. Only notify local receivers
960                        // once per source node.
961                        if dataflow.handled_node_failed.insert(source_node_id.clone()) {
962                            let (_outputs, _remote_receivers) =
963                                Self::find_and_notify_local_receivers(
964                                    &mut dataflow,
965                                    &source_node_id,
966                                    &error,
967                                    &self.state.clock,
968                                );
969                        }
970                        Ok(())
971                    }
972                    None => Err(eyre::eyre!(
973                        "failed to handle NodeFailed: no running dataflow with ID `{dataflow_id}`"
974                    )),
975                };
976                if let Err(err) =
977                    result.wrap_err("failed to handle NodeFailed event sent by remote daemon")
978                {
979                    tracing::warn!("Failed to handle NodeFailed event: {err:?}");
980                }
981                Ok(())
982            }
983        }
984    }
985
986    #[allow(clippy::too_many_arguments)]
987    async fn build_dataflow(
988        &mut self,
989        build_id: BuildId,
990        session_id: SessionId,
991        base_working_dir: PathBuf,
992        git_sources: BTreeMap<NodeId, GitSource>,
993        prev_git_sources: BTreeMap<NodeId, GitSource>,
994        dataflow_descriptor: Descriptor,
995        local_nodes: BTreeSet<NodeId>,
996        uv: bool,
997    ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
998        let builder = build::Builder {
999            session_id,
1000            base_working_dir,
1001            uv,
1002        };
1003        {
1004            let mut git_manager = self.state.git_manager.lock().await;
1005            git_manager.clear_planned_builds(session_id);
1006        }
1007
1008        let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1009
1010        let mut tasks = Vec::new();
1011
1012        // build nodes
1013        for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1014            let dynamic_node = node.kind.dynamic();
1015
1016            let node_id = node.id.clone();
1017            let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1018            logger.log(LogLevel::Debug, "building").await;
1019            let git_source = git_sources.get(&node_id).cloned();
1020            let prev_git_source = prev_git_sources.get(&node_id).cloned();
1021            let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1022                still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1023                git_source: prev_source,
1024            });
1025
1026            let logger_cloned = logger
1027                .try_clone_impl()
1028                .await
1029                .wrap_err("failed to clone logger")?;
1030
1031            let mut builder = builder.clone();
1032            if let Some(node_working_dir) =
1033                node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1034            {
1035                builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1036            }
1037
1038            let mut git_manager = self.state.git_manager.lock().await;
1039            match builder
1040                .build_node(node, git_source, prev_git, logger_cloned, &mut git_manager)
1041                .await
1042                .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1043            {
1044                Ok(result) => {
1045                    tasks.push(NodeBuildTask {
1046                        node_id,
1047                        task: result,
1048                        dynamic_node,
1049                    });
1050                }
1051                Err(err) => {
1052                    logger.log(LogLevel::Error, format!("{err:?}")).await;
1053                    return Err(err);
1054                }
1055            }
1056        }
1057
1058        let task = async move {
1059            let mut info = BuildInfo {
1060                node_working_dirs: Default::default(),
1061            };
1062            for task in tasks {
1063                let NodeBuildTask {
1064                    node_id,
1065                    dynamic_node: _,
1066                    task,
1067                } = task;
1068                let node = task
1069                    .await
1070                    .with_context(|| format!("failed to build node `{node_id}`"))?;
1071                info.node_working_dirs
1072                    .insert(node_id, node.node_working_dir);
1073            }
1074            Ok(info)
1075        };
1076
1077        Ok(task)
1078    }
1079
1080    #[allow(clippy::too_many_arguments)]
1081    async fn spawn_dataflow(
1082        &mut self,
1083        build_id: Option<BuildId>,
1084        dataflow_id: DataflowId,
1085        base_working_dir: PathBuf,
1086        nodes: BTreeMap<NodeId, ResolvedNode>,
1087        dataflow_descriptor: Descriptor,
1088        spawn_nodes: BTreeSet<NodeId>,
1089        uv: bool,
1090        write_events_to: Option<PathBuf>,
1091    ) -> eyre::Result<impl Future<Output = eyre::Result<()>> + use<>> {
1092        let mut logger = self
1093            .logger
1094            .for_dataflow(dataflow_id)
1095            .try_clone()
1096            .await
1097            .context("failed to clone logger")?;
1098        let dataflow = RunningDataflow::new(
1099            dataflow_id,
1100            self.state.daemon_id().clone(),
1101            dataflow_descriptor.clone(),
1102            self.state.events_tx.clone(),
1103            self.state.clock.clone(),
1104        );
1105        let mut dataflow = match self.state.running.entry(dataflow_id) {
1106            dashmap::Entry::Vacant(entry) => {
1107                self.state
1108                    .working_dir
1109                    .insert(dataflow_id, base_working_dir.clone());
1110                entry.insert(dataflow)
1111            }
1112            dashmap::Entry::Occupied(_) => {
1113                bail!("there is already a running dataflow with ID `{dataflow_id}`")
1114            }
1115        };
1116
1117        let mut stopped = Vec::new();
1118
1119        let build_info = build_id.and_then(|build_id| self.state.builds.get(&build_id));
1120        let node_with_git_source = nodes.values().find(|n| n.has_git_source());
1121        if let Some(git_node) = node_with_git_source {
1122            if build_info.is_none() {
1123                eyre::bail!(
1124                    "node {} has git source, but no `dora build` was run yet\n\n\
1125                    nodes with a `git` field must be built using `dora build` before starting the \
1126                    dataflow",
1127                    git_node.id
1128                )
1129            }
1130        }
1131        let node_working_dirs = build_info
1132            .map(|info| info.node_working_dirs.clone())
1133            .unwrap_or_default();
1134
1135        // calculate info about mappings
1136        for node in nodes.values() {
1137            let local = spawn_nodes.contains(&node.id);
1138
1139            let inputs = node_inputs(node);
1140            for (input_id, input) in inputs {
1141                if local {
1142                    dataflow
1143                        .open_inputs
1144                        .entry(node.id.clone())
1145                        .or_default()
1146                        .insert(input_id.clone());
1147                    match input.mapping {
1148                        InputMapping::User(mapping) => {
1149                            dataflow
1150                                .mappings
1151                                .entry(OutputId(mapping.source, mapping.output))
1152                                .or_default()
1153                                .insert((node.id.clone(), input_id));
1154                        }
1155                        InputMapping::Timer { interval } => {
1156                            dataflow
1157                                .timers
1158                                .entry(interval)
1159                                .or_default()
1160                                .insert((node.id.clone(), input_id));
1161                        }
1162                    }
1163                } else if let InputMapping::User(mapping) = input.mapping {
1164                    dataflow
1165                        .open_external_mappings
1166                        .insert(OutputId(mapping.source, mapping.output));
1167                }
1168            }
1169        }
1170
1171        let spawner = Spawner {
1172            dataflow_id,
1173            daemon_tx: self.state.events_tx.clone(),
1174            dataflow_descriptor,
1175            clock: self.state.clock.clone(),
1176            uv,
1177        };
1178
1179        let mut tasks = Vec::new();
1180
1181        // spawn nodes and set up subscriptions
1182        for node in nodes.into_values() {
1183            let mut logger = logger.reborrow().for_node(node.id.clone());
1184            let local = spawn_nodes.contains(&node.id);
1185            if local {
1186                let dynamic_node = node.kind.dynamic();
1187                if dynamic_node {
1188                    dataflow.dynamic_nodes.insert(node.id.clone());
1189                } else {
1190                    dataflow.pending_nodes.insert(node.id.clone());
1191                }
1192
1193                let node_id = node.id.clone();
1194                let node_stderr_most_recent = dataflow
1195                    .node_stderr_most_recent
1196                    .entry(node.id.clone())
1197                    .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES_MAX)))
1198                    .clone();
1199
1200                let configured_node_working_dir = node_working_dirs.get(&node_id).cloned();
1201                if configured_node_working_dir.is_none() && node.has_git_source() {
1202                    eyre::bail!(
1203                        "node {} has git source, but no git clone directory was found for it\n\n\
1204                        try running `dora build` again",
1205                        node.id
1206                    )
1207                }
1208                let node_working_dir = configured_node_working_dir
1209                    .or_else(|| {
1210                        node.deploy
1211                            .as_ref()
1212                            .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1213                    })
1214                    .unwrap_or(base_working_dir.clone())
1215                    .clone();
1216                let node_write_events_to = write_events_to
1217                    .as_ref()
1218                    .map(|p| p.join(format!("inputs-{}.json", node.id)));
1219                match spawner
1220                    .clone()
1221                    .spawn_node(
1222                        node,
1223                        node_working_dir,
1224                        node_stderr_most_recent,
1225                        node_write_events_to,
1226                        &mut logger,
1227                    )
1228                    .await
1229                    .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1230                {
1231                    Ok(result) => {
1232                        tasks.push(NodeBuildTask {
1233                            node_id,
1234                            task: result,
1235                            dynamic_node,
1236                        });
1237                    }
1238                    Err(err) => {
1239                        logger
1240                            .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1241                            .await;
1242                        self.state
1243                            .dataflow_node_results
1244                            .entry(dataflow_id)
1245                            .or_default()
1246                            .insert(
1247                                node_id.clone(),
1248                                Err(NodeError {
1249                                    timestamp: self.state.clock.new_timestamp(),
1250                                    cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1251                                    exit_status: NodeExitStatus::Unknown,
1252                                }),
1253                            );
1254                        stopped.push((node_id.clone(), dynamic_node));
1255                    }
1256                }
1257            } else {
1258                // wait until node is ready before starting
1259                dataflow.pending_nodes.set_external_nodes(true);
1260
1261                // subscribe to all node outputs that are mapped to some local inputs
1262                for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1263                    let tx = self
1264                        .state
1265                        .remote_daemon_events_tx
1266                        .clone()
1267                        .wrap_err("no remote_daemon_events_tx channel")?;
1268                    let mut finished_rx = dataflow.finished_tx.subscribe();
1269                    let subscribe_topic =
1270                        zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1271                    tracing::debug!("declaring subscriber on {subscribe_topic}");
1272                    let zenoh = self
1273                        .state
1274                        .zenoh_session
1275                        .as_ref()
1276                        .wrap_err("no zenoh session")?;
1277                    let subscriber = zenoh
1278                        .declare_subscriber(subscribe_topic)
1279                        .await
1280                        .map_err(|e| eyre!(e))
1281                        .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1282                    tokio::spawn(async move {
1283                        let mut finished = pin!(finished_rx.recv());
1284                        loop {
1285                            let finished_or_next =
1286                                futures::future::select(finished, subscriber.recv_async());
1287                            match finished_or_next.await {
1288                                future::Either::Left((finished, _)) => match finished {
1289                                    Err(broadcast::error::RecvError::Closed) => {
1290                                        tracing::debug!(
1291                                            "dataflow finished, breaking from zenoh subscribe task"
1292                                        );
1293                                        break;
1294                                    }
1295                                    other => {
1296                                        tracing::warn!(
1297                                            "unexpected return value of dataflow finished_rx channel: {other:?}"
1298                                        );
1299                                        break;
1300                                    }
1301                                },
1302                                future::Either::Right((sample, f)) => {
1303                                    finished = f;
1304                                    let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1305                                        Timestamped::deserialize_inter_daemon_event(
1306                                            &s.payload().to_bytes(),
1307                                        )
1308                                    });
1309                                    if tx.send_async(event).await.is_err() {
1310                                        // daemon finished
1311                                        break;
1312                                    }
1313                                }
1314                            }
1315                        }
1316                    });
1317                }
1318            }
1319        }
1320        drop(dataflow);
1321        for (node_id, dynamic) in stopped {
1322            self.handle_node_stop(dataflow_id, &node_id, dynamic)
1323                .await?;
1324        }
1325
1326        let spawn_result = Self::spawn_prepared_nodes(
1327            dataflow_id,
1328            logger,
1329            tasks,
1330            self.state.events_tx.clone(),
1331            self.state.clock.clone(),
1332        );
1333
1334        Ok(spawn_result)
1335    }
1336
1337    async fn spawn_prepared_nodes(
1338        dataflow_id: Uuid,
1339        mut logger: DataflowLogger<'_>,
1340        tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1341        events_tx: mpsc::Sender<Timestamped<Event>>,
1342        clock: Arc<HLC>,
1343    ) -> eyre::Result<()> {
1344        let node_result = |node_id, dynamic_node, result| Timestamped {
1345            inner: Event::SpawnNodeResult {
1346                dataflow_id,
1347                node_id,
1348                dynamic_node,
1349                result,
1350            },
1351            timestamp: clock.new_timestamp(),
1352        };
1353        let mut failed_to_prepare = None;
1354        let mut prepared_nodes = Vec::new();
1355        for task in tasks {
1356            let NodeBuildTask {
1357                node_id,
1358                dynamic_node,
1359                task,
1360            } = task;
1361            match task.await {
1362                Ok(node) => prepared_nodes.push(node),
1363                Err(err) => {
1364                    if failed_to_prepare.is_none() {
1365                        failed_to_prepare = Some(node_id.clone());
1366                    }
1367                    let node_err: NodeError = NodeError {
1368                        timestamp: clock.new_timestamp(),
1369                        cause: NodeErrorCause::FailedToSpawn(format!(
1370                            "preparing for spawn failed: {err:?}"
1371                        )),
1372                        exit_status: NodeExitStatus::Unknown,
1373                    };
1374                    let send_result = events_tx
1375                        .send(node_result(node_id, dynamic_node, Err(node_err)))
1376                        .await;
1377                    if send_result.is_err() {
1378                        tracing::error!("failed to send SpawnNodeResult to main daemon task")
1379                    }
1380                }
1381            }
1382        }
1383
1384        // once all nodes are prepared, do the actual spawning
1385        if let Some(failed_node) = failed_to_prepare {
1386            // don't spawn any nodes when an error occurred before
1387            for node in prepared_nodes {
1388                let err = NodeError {
1389                    timestamp: clock.new_timestamp(),
1390                    cause: NodeErrorCause::Cascading {
1391                        caused_by_node: failed_node.clone(),
1392                    },
1393                    exit_status: NodeExitStatus::Unknown,
1394                };
1395                let send_result = events_tx
1396                    .send(node_result(
1397                        node.node_id().clone(),
1398                        node.dynamic(),
1399                        Err(err),
1400                    ))
1401                    .await;
1402                if send_result.is_err() {
1403                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1404                }
1405            }
1406            Err(eyre!("failed to prepare node {failed_node}"))
1407        } else {
1408            let mut spawn_result = Ok(());
1409
1410            logger
1411                .log(
1412                    LogLevel::Info,
1413                    None,
1414                    Some("dora daemon".into()),
1415                    "finished building nodes, spawning...",
1416                )
1417                .await;
1418
1419            // spawn the nodes
1420            for node in prepared_nodes {
1421                let node_id = node.node_id().clone();
1422                let dynamic_node = node.dynamic();
1423                let logger = logger
1424                    .reborrow()
1425                    .for_node(node_id.clone())
1426                    .try_clone()
1427                    .await
1428                    .context("failed to clone NodeLogger")?;
1429                let result = node.spawn(logger).await;
1430                let node_spawn_result = match result {
1431                    Ok(node) => Ok(node),
1432                    Err(err) => {
1433                        let node_err = NodeError {
1434                            timestamp: clock.new_timestamp(),
1435                            cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1436                            exit_status: NodeExitStatus::Unknown,
1437                        };
1438                        if spawn_result.is_ok() {
1439                            spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1440                        }
1441                        Err(node_err)
1442                    }
1443                };
1444                let send_result = events_tx
1445                    .send(node_result(node_id, dynamic_node, node_spawn_result))
1446                    .await;
1447                if send_result.is_err() {
1448                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1449                }
1450            }
1451            spawn_result
1452        }
1453    }
1454
1455    async fn handle_dynamic_node_event(
1456        &mut self,
1457        event: DynamicNodeEventWrapper,
1458    ) -> eyre::Result<()> {
1459        match event {
1460            DynamicNodeEventWrapper {
1461                event: DynamicNodeEvent::NodeConfig { node_id },
1462                reply_tx,
1463            } => {
1464                let number_node_id = self
1465                    .state
1466                    .running
1467                    .iter()
1468                    .filter(|entry| entry.value().running_nodes.contains_key(&node_id))
1469                    .count();
1470
1471                let node_config = match number_node_id {
1472                    2.. => Err(format!(
1473                        "multiple dataflows contain dynamic node id {node_id}. \
1474                        Please only have one running dataflow with the specified \
1475                        node id if you want to use dynamic node",
1476                    )),
1477                    1 => self
1478                        .state
1479                        .running
1480                        .iter()
1481                        .filter(|entry| entry.value().running_nodes.contains_key(&node_id))
1482                        .map(|entry| -> Result<NodeConfig> {
1483                            let (id, dataflow) = (entry.key(), entry.value());
1484                            let node_config = dataflow
1485                                .running_nodes
1486                                .get(&node_id)
1487                                .with_context(|| {
1488                                    format!("no node with ID `{node_id}` within the given dataflow")
1489                                })?
1490                                .node_config
1491                                .clone();
1492                            if !node_config.dynamic {
1493                                bail!("node with ID `{node_id}` in {id} is not dynamic");
1494                            }
1495                            Ok(node_config)
1496                        })
1497                        .next()
1498                        .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1499                        .and_then(|r| r)
1500                        .map_err(|err| {
1501                            format!(
1502                                "failed to get dynamic node config within given dataflow: {err}"
1503                            )
1504                        }),
1505                    0 => Err(format!("no node with ID `{node_id}`")),
1506                };
1507
1508                let reply = DaemonReply::NodeConfig {
1509                    result: node_config,
1510                };
1511                let _ = reply_tx.send(Some(reply)).map_err(|_| {
1512                    error!("could not send node info reply from daemon to coordinator")
1513                });
1514                Ok(())
1515            }
1516        }
1517    }
1518
1519    async fn handle_node_event(
1520        &mut self,
1521        event: DaemonNodeEvent,
1522        dataflow_id: DataflowId,
1523        node_id: NodeId,
1524    ) -> eyre::Result<()> {
1525        let might_restart = || {
1526            let dataflow = self.state.running.get(&dataflow_id)?;
1527            let node = dataflow.running_nodes.get(&node_id)?;
1528            Some(match node.restart_policy {
1529                RestartPolicy::Never => false,
1530                _ if node.restarts_disabled() => false,
1531                RestartPolicy::OnFailure | RestartPolicy::Always => true,
1532            })
1533        };
1534        match event {
1535            DaemonNodeEvent::Subscribe {
1536                event_sender,
1537                reply_sender,
1538            } => {
1539                let mut logger = self.logger.for_dataflow(dataflow_id);
1540                logger
1541                    .log(
1542                        LogLevel::Info,
1543                        Some(node_id.clone()),
1544                        Some("daemon".into()),
1545                        "node is ready",
1546                    )
1547                    .await;
1548
1549                let dataflow = self.state.running.get_mut(&dataflow_id).ok_or_else(|| {
1550                    format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1551                });
1552
1553                match dataflow {
1554                    Err(err) => {
1555                        let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1556                    }
1557                    Ok(mut dataflow) => {
1558                        Self::subscribe(
1559                            &mut *dataflow,
1560                            node_id.clone(),
1561                            event_sender,
1562                            &self.state.clock,
1563                        )
1564                        .await;
1565
1566                        let df = &mut *dataflow;
1567                        let status = df
1568                            .pending_nodes
1569                            .handle_node_subscription(
1570                                node_id.clone(),
1571                                reply_sender,
1572                                &self.state.coordinator_client().cloned(),
1573                                &self.state.clock,
1574                                &mut df.cascading_error_causes,
1575                                &mut logger,
1576                            )
1577                            .await?;
1578                        match status {
1579                            DataflowStatus::AllNodesReady if !dataflow.dataflow_started => {
1580                                logger
1581                                    .log(
1582                                        LogLevel::Info,
1583                                        None,
1584                                        Some("daemon".into()),
1585                                        "all nodes are ready, starting dataflow",
1586                                    )
1587                                    .await;
1588                                dataflow
1589                                    .start(&self.state.events_tx, &self.state.clock)
1590                                    .await?;
1591                                dataflow.dataflow_started = true;
1592                            }
1593                            _ => {}
1594                        }
1595                    }
1596                }
1597            }
1598            DaemonNodeEvent::SubscribeDrop {
1599                event_sender,
1600                reply_sender,
1601            } => {
1602                let dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1603                    format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1604                });
1605                let result = match dataflow {
1606                    Ok(mut dataflow) => {
1607                        dataflow.drop_channels.insert(node_id, event_sender);
1608                        Ok(())
1609                    }
1610                    Err(err) => Err(err.to_string()),
1611                };
1612                let _ = reply_sender.send(DaemonReply::Result(result));
1613            }
1614            DaemonNodeEvent::CloseOutputs {
1615                outputs,
1616                reply_sender,
1617            } => {
1618                let reply = if might_restart().unwrap_or(false) {
1619                    self.logger
1620                        .for_dataflow(dataflow_id)
1621                        .for_node(node_id.clone())
1622                        .log(
1623                            LogLevel::Debug,
1624                            Some("daemon".into()),
1625                            "skipping CloseOutputs because node might restart",
1626                        )
1627                        .await;
1628                    Ok(())
1629                } else {
1630                    // notify downstream nodes
1631                    let inner = async {
1632                        self.send_output_closed_events(dataflow_id, node_id, outputs)
1633                            .await
1634                    };
1635
1636                    inner.await.map_err(|err| format!("{err:?}"))
1637                };
1638                let _ = reply_sender.send(DaemonReply::Result(reply));
1639            }
1640            DaemonNodeEvent::OutputsDone { reply_sender } => {
1641                let result = self
1642                    .handle_outputs_done(dataflow_id, &node_id, might_restart().unwrap_or(false))
1643                    .await;
1644
1645                let _ = reply_sender.send(DaemonReply::Result(
1646                    result.map_err(|err| format!("{err:?}")),
1647                ));
1648            }
1649            DaemonNodeEvent::SendOut {
1650                output_id,
1651                metadata,
1652                data,
1653            } => self
1654                .send_out(dataflow_id, node_id, output_id, metadata, data)
1655                .await
1656                .context("failed to send out")?,
1657            DaemonNodeEvent::ReportDrop { tokens } => {
1658                let dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1659                    format!(
1660                        "failed to get handle drop tokens: \
1661                        no running dataflow with ID `{dataflow_id}`"
1662                    )
1663                });
1664
1665                match dataflow {
1666                    Ok(mut dataflow) => {
1667                        for token in tokens {
1668                            match dataflow.pending_drop_tokens.get_mut(&token) {
1669                                Some(info) => {
1670                                    if info.pending_nodes.remove(&node_id) {
1671                                        dataflow.check_drop_token(token, &self.state.clock).await?;
1672                                    } else {
1673                                        tracing::warn!(
1674                                            "node `{node_id}` is not pending for drop token `{token:?}`"
1675                                        );
1676                                    }
1677                                }
1678                                None => tracing::warn!("unknown drop token `{token:?}`"),
1679                            }
1680                        }
1681                    }
1682                    Err(err) => tracing::warn!("{err:?}"),
1683                }
1684            }
1685            DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1686                let reply = match self.state.running.get_mut(&dataflow_id) {
1687                    Some(mut dataflow) => {
1688                        dataflow.subscribe_channels.remove(&node_id);
1689                        Ok(())
1690                    }
1691                    None => Err(format!("no running dataflow with ID `{dataflow_id}`")),
1692                };
1693                let _ = reply_sender.send(DaemonReply::Result(reply));
1694            }
1695        }
1696        Ok(())
1697    }
1698
1699    async fn send_reload(
1700        &mut self,
1701        dataflow_id: Uuid,
1702        node_id: NodeId,
1703        operator_id: Option<OperatorId>,
1704    ) -> Result<(), eyre::ErrReport> {
1705        let mut dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1706            format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1707        })?;
1708        if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1709            match send_with_timestamp(
1710                channel,
1711                NodeEvent::Reload { operator_id },
1712                &self.state.clock,
1713            ) {
1714                Ok(()) => {}
1715                Err(_) => {
1716                    dataflow.subscribe_channels.remove(&node_id);
1717                }
1718            }
1719        }
1720        Ok(())
1721    }
1722
1723    async fn send_out(
1724        &mut self,
1725        dataflow_id: Uuid,
1726        node_id: NodeId,
1727        output_id: DataId,
1728        metadata: dora_message::metadata::Metadata,
1729        data: Option<DataMessage>,
1730    ) -> Result<(), eyre::ErrReport> {
1731        let mut dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1732            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1733        })?;
1734        let data_bytes = send_output_to_local_receivers(
1735            node_id.clone(),
1736            output_id.clone(),
1737            &mut *dataflow,
1738            &metadata,
1739            data,
1740            &self.state.clock,
1741        )
1742        .await?;
1743
1744        let output_id = OutputId(node_id, output_id);
1745        let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1746            || dataflow.publish_all_messages_to_zenoh;
1747        drop(dataflow);
1748        if remote_receivers {
1749            let event = InterDaemonEvent::Output {
1750                dataflow_id,
1751                node_id: output_id.0.clone(),
1752                output_id: output_id.1.clone(),
1753                metadata,
1754                data: data_bytes,
1755            };
1756            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1757                .await?;
1758        }
1759
1760        Ok(())
1761    }
1762
1763    /// Find all receivers affected by a node failure and send NodeFailed events to local ones.
1764    ///
1765    /// Returns the list of outputs and a set of remote receiver node IDs (nodes not on this daemon).
1766    fn find_and_notify_local_receivers(
1767        dataflow: &mut RunningDataflow,
1768        source_node_id: &NodeId,
1769        error_message: &str,
1770        clock: &HLC,
1771    ) -> (Vec<DataId>, BTreeSet<NodeId>) {
1772        // Get all outputs of the failed node
1773        let outputs: Vec<DataId> = dataflow
1774            .mappings
1775            .keys()
1776            .filter(|m| &m.0 == source_node_id)
1777            .map(|m| m.1.clone())
1778            .collect();
1779
1780        // Group affected inputs by receiver node
1781        let mut affected_by_receiver: BTreeMap<NodeId, Vec<DataId>> = BTreeMap::new();
1782
1783        for output_id in &outputs {
1784            let output_key = OutputId(source_node_id.clone(), output_id.clone());
1785            if let Some(receivers) = dataflow.mappings.get(&output_key) {
1786                for (receiver_node_id, input_id) in receivers {
1787                    affected_by_receiver
1788                        .entry(receiver_node_id.clone())
1789                        .or_default()
1790                        .push(input_id.clone());
1791                }
1792            }
1793        }
1794
1795        // Identify remote receivers (nodes not on this daemon)
1796        let remote_receivers: BTreeSet<NodeId> = affected_by_receiver
1797            .keys()
1798            .filter(|receiver_node_id| !dataflow.subscribe_channels.contains_key(receiver_node_id))
1799            .cloned()
1800            .collect();
1801
1802        // Send NodeFailed event to each local receiver
1803        for (receiver_node_id, affected_input_ids) in affected_by_receiver {
1804            if let Some(channel) = dataflow.subscribe_channels.get(&receiver_node_id) {
1805                // Local receiver - send directly
1806                let event = NodeEvent::NodeFailed {
1807                    affected_input_ids,
1808                    error: error_message.to_string(),
1809                    source_node_id: source_node_id.clone(),
1810                };
1811                let _ = send_with_timestamp(channel, event, clock);
1812            }
1813        }
1814
1815        (outputs, remote_receivers)
1816    }
1817
1818    /// Send error events to downstream nodes when a node fails.
1819    ///
1820    /// This is called automatically when a node exits with a non-zero exit code.
1821    /// It sends `NodeFailed` events to all downstream nodes that consume outputs from the failed node.
1822    /// For nodes on the same daemon, events are sent directly. For nodes on other daemons,
1823    /// events are routed through zenoh by sending to all outputs (to ensure all subscribing daemons receive it).
1824    async fn send_node_failed_events(
1825        &mut self,
1826        dataflow_id: Uuid,
1827        source_node_id: NodeId,
1828        error_message: String,
1829    ) -> Result<(), eyre::ErrReport> {
1830        let mut dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1831            format!("propagate error failed: no running dataflow with ID `{dataflow_id}`")
1832        })?;
1833
1834        tracing::warn!(
1835            "Node {}/{} failed: {}. Propagating error to downstream nodes.",
1836            dataflow_id,
1837            source_node_id,
1838            error_message
1839        );
1840
1841        let (outputs, remote_receivers) = Self::find_and_notify_local_receivers(
1842            &mut dataflow,
1843            &source_node_id,
1844            &error_message,
1845            &self.state.clock,
1846        );
1847
1848        // Drop the DashMap lock before doing async I/O via send_to_remote_receivers.
1849        drop(dataflow);
1850
1851        // If there are remote receivers, send InterDaemonEvent to all outputs
1852        // This ensures all daemons that subscribe to any of the node's outputs will receive the error
1853        if !remote_receivers.is_empty() {
1854            // If node has no outputs, there are no remote receivers to notify
1855            if outputs.is_empty() {
1856                return Ok(());
1857            }
1858
1859            // Send to all outputs to ensure all subscribing daemons receive the error
1860            // (daemons subscribe to specific output topics, so we need to send to each one)
1861            for output_id in &outputs {
1862                let output_key = OutputId(source_node_id.clone(), output_id.clone());
1863                let event = InterDaemonEvent::NodeFailed {
1864                    dataflow_id,
1865                    source_node_id: source_node_id.clone(),
1866                    error: error_message.clone(),
1867                };
1868                if let Err(err) = self
1869                    .send_to_remote_receivers(dataflow_id, &output_key, event)
1870                    .await
1871                {
1872                    tracing::warn!(
1873                        "Failed to send error event to remote receivers via output {}: {err:?}",
1874                        output_id
1875                    );
1876                }
1877            }
1878        }
1879
1880        Ok(())
1881    }
1882
1883    async fn send_to_remote_receivers(
1884        &mut self,
1885        dataflow_id: Uuid,
1886        output_id: &OutputId,
1887        event: InterDaemonEvent,
1888    ) -> Result<(), eyre::Error> {
1889        // Ensure the zenoh publisher exists. Creating a publisher involves
1890        // network I/O, so we do it outside the DashMap lock.
1891        {
1892            let has_publisher = self
1893                .state
1894                .running
1895                .get(&dataflow_id)
1896                .map(|d| d.publishers.contains_key(output_id))
1897                .unwrap_or(false);
1898            if !has_publisher {
1899                let publish_topic = {
1900                    let dataflow = self.state.running.get(&dataflow_id).wrap_err_with(|| {
1901                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1902                    })?;
1903                    zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1)
1904                };
1905                // DashMap lock dropped — safe to do async I/O.
1906                tracing::debug!("declaring publisher on {publish_topic}");
1907                let zenoh = self
1908                    .state
1909                    .zenoh_session
1910                    .as_ref()
1911                    .wrap_err("no zenoh session")?;
1912                let publisher = zenoh
1913                    .declare_publisher(publish_topic)
1914                    .await
1915                    .map_err(|e| eyre!(e))
1916                    .context("failed to create zenoh publisher")?;
1917                // Re-acquire lock briefly to insert the publisher.
1918                let mut dataflow =
1919                    self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1920                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1921                    })?;
1922                dataflow.publishers.insert(output_id.clone(), publisher);
1923            }
1924        }
1925
1926        // Now publish via zenoh. The DashMap lock is held during `put()`,
1927        // which is typically fast (no network round-trip).
1928        let dataflow = self.state.running.get(&dataflow_id).wrap_err_with(|| {
1929            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1930        })?;
1931        let publisher = dataflow
1932            .publishers
1933            .get(output_id)
1934            .wrap_err("publisher disappeared")?;
1935        let serialized_event = Timestamped {
1936            inner: event,
1937            timestamp: self.state.clock.new_timestamp(),
1938        }
1939        .serialize();
1940        publisher
1941            .put(serialized_event)
1942            .await
1943            .map_err(|e| eyre!(e))
1944            .context("zenoh put failed")?;
1945        Ok(())
1946    }
1947
1948    async fn send_output_closed_events(
1949        &mut self,
1950        dataflow_id: DataflowId,
1951        node_id: NodeId,
1952        outputs: Vec<DataId>,
1953    ) -> eyre::Result<()> {
1954        let (local_node_inputs, closed) = {
1955            let mut dataflow = self
1956                .state
1957                .running
1958                .get_mut(&dataflow_id)
1959                .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1960            let local_node_inputs: BTreeSet<_> = dataflow
1961                .mappings
1962                .iter()
1963                .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1964                .flat_map(|(_, v)| v)
1965                .cloned()
1966                .collect();
1967            for (receiver_id, input_id) in &local_node_inputs {
1968                close_input(&mut *dataflow, receiver_id, input_id, &self.state.clock);
1969            }
1970
1971            let mut closed = Vec::new();
1972            for output_id in &dataflow.open_external_mappings {
1973                if output_id.0 == node_id && outputs.contains(&output_id.1) {
1974                    closed.push(output_id.clone());
1975                }
1976            }
1977            (local_node_inputs, closed)
1978        };
1979
1980        for output_id in closed {
1981            let event = InterDaemonEvent::OutputClosed {
1982                dataflow_id,
1983                node_id: output_id.0.clone(),
1984                output_id: output_id.1.clone(),
1985            };
1986            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1987                .await?;
1988        }
1989
1990        Ok(())
1991    }
1992
1993    async fn subscribe(
1994        dataflow: &mut RunningDataflow,
1995        node_id: NodeId,
1996        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1997        clock: &HLC,
1998    ) {
1999        // some inputs might have been closed already -> report those events
2000        let closed_inputs = dataflow
2001            .mappings
2002            .values()
2003            .flatten()
2004            .filter(|(node, _)| node == &node_id)
2005            .map(|(_, input)| input)
2006            .filter(|input| {
2007                dataflow
2008                    .open_inputs
2009                    .get(&node_id)
2010                    .map(|open_inputs| !open_inputs.contains(*input))
2011                    .unwrap_or(true)
2012            });
2013        for input_id in closed_inputs {
2014            let _ = send_with_timestamp(
2015                &event_sender,
2016                NodeEvent::InputClosed {
2017                    id: input_id.clone(),
2018                },
2019                clock,
2020            );
2021        }
2022        if dataflow.open_inputs(&node_id).is_empty() {
2023            if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
2024                node.disable_restart();
2025            }
2026            if let Some(node) = dataflow.descriptor.nodes.iter().find(|n| n.id == node_id) {
2027                if node.inputs.is_empty() {
2028                    // do not send AllInputsClosed for source nodes
2029                } else {
2030                    let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
2031                }
2032            }
2033        }
2034
2035        // if a stop event was already sent for the dataflow, send it to
2036        // the newly connected node too
2037        if dataflow.stop_sent {
2038            if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
2039                node.disable_restart();
2040            }
2041            let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
2042        }
2043
2044        dataflow.subscribe_channels.insert(node_id, event_sender);
2045    }
2046
2047    #[tracing::instrument(skip(self), level = "trace")]
2048    async fn handle_outputs_done(
2049        &mut self,
2050        dataflow_id: DataflowId,
2051        node_id: &NodeId,
2052        might_restart: bool,
2053    ) -> eyre::Result<()> {
2054        let outputs = {
2055            let dataflow = self
2056                .state
2057                .running
2058                .get(&dataflow_id)
2059                .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
2060            dataflow
2061                .mappings
2062                .keys()
2063                .filter(|m| &m.0 == node_id)
2064                .map(|m| &m.1)
2065                .cloned()
2066                .collect()
2067        };
2068
2069        if might_restart {
2070            self.logger
2071                .for_dataflow(dataflow_id)
2072                .for_node(node_id.clone())
2073                .log(
2074                    LogLevel::Debug,
2075                    Some("daemon".into()),
2076                    "keeping outputs open because node might restart",
2077                )
2078                .await;
2079        } else {
2080            self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
2081                .await?;
2082        }
2083
2084        if let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) {
2085            dataflow.drop_channels.remove(node_id);
2086        }
2087        Ok(())
2088    }
2089
2090    async fn handle_node_stop(
2091        &mut self,
2092        dataflow_id: Uuid,
2093        node_id: &NodeId,
2094        dynamic_node: bool,
2095    ) -> eyre::Result<()> {
2096        let result = self
2097            .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
2098            .await;
2099
2100        // Always send NodeStopped event, even if handle_node_stop_inner failed
2101        // This is critical for daemon shutdown - the event must be sent
2102        if let Err(err) = self
2103            .state
2104            .events_tx
2105            .send(Timestamped {
2106                inner: Event::NodeStopped {
2107                    dataflow_id,
2108                    node_id: node_id.clone(),
2109                },
2110                timestamp: self.state.clock.new_timestamp(),
2111            })
2112            .await
2113        {
2114            tracing::error!(
2115                "Failed to send NodeStopped event for {}/{}: {err:?}. Daemon may not exit properly.",
2116                dataflow_id,
2117                node_id
2118            );
2119        }
2120
2121        result
2122    }
2123
2124    async fn handle_node_stop_inner(
2125        &mut self,
2126        dataflow_id: Uuid,
2127        node_id: &NodeId,
2128        dynamic_node: bool,
2129    ) -> eyre::Result<()> {
2130        let mut logger = self.logger.for_dataflow(dataflow_id);
2131        let exited_before_subscribe = {
2132            let mut dataflow = match self.state.running.get_mut(&dataflow_id) {
2133                Some(dataflow) => dataflow,
2134                None if dynamic_node => {
2135                    // The dataflow might be done already as we don't wait for dynamic nodes. In this
2136                    // case, we don't need to do anything to handle the node stop.
2137                    tracing::debug!(
2138                        "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
2139                    );
2140                    return Ok(());
2141                }
2142                None => eyre::bail!(
2143                    "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2144                ),
2145            };
2146
2147            let df = &mut *dataflow;
2148            df.pending_nodes
2149                .handle_node_stop_sync(node_id, &mut df.cascading_error_causes)
2150        };
2151        // DashMap guard is dropped — safe to do async I/O.
2152        if exited_before_subscribe {
2153            logger
2154                .log(
2155                    LogLevel::Warn,
2156                    Some(node_id.clone()),
2157                    Some("daemon::pending".into()),
2158                    "node exited before initializing dora connection",
2159                )
2160                .await;
2161        }
2162        // DashMap guard is dropped — safe to do async I/O (RPC to coordinator).
2163        // Check if all local nodes are ready and report to coordinator if needed.
2164        // We must not hold the DashMap guard during the RPC call.
2165        let needs_report = self
2166            .state
2167            .running
2168            .get_mut(&dataflow_id)
2169            .map(|mut dataflow| {
2170                let df = &mut *dataflow;
2171                df.pending_nodes
2172                    .check_and_answer_subscribers(&mut df.cascading_error_causes)
2173            })
2174            .unwrap_or(false);
2175        if needs_report {
2176            // DashMap guard is dropped — safe to do RPC.
2177            if let Some(client) = self.state.coordinator_client() {
2178                if let Some(dataflow) = self.state.running.get(&dataflow_id) {
2179                    let exited = dataflow.pending_nodes.exited_before_subscribe().to_vec();
2180                    drop(dataflow);
2181                    // DashMap guard is dropped — safe to do async I/O.
2182                    logger
2183                        .log(
2184                            LogLevel::Info,
2185                            None,
2186                            Some("daemon".into()),
2187                            format!(
2188                                "all local nodes are ready (exit before subscribe: {exited:?}), \
2189                                 waiting for remote nodes",
2190                            ),
2191                        )
2192                        .await;
2193                    let client = client.clone();
2194                    let events_tx = self.state.events_tx.clone();
2195                    let clock = self.state.clock.clone();
2196                    tokio::spawn(async move {
2197                        if let Err(err) = client
2198                            .all_nodes_ready(tarpc::context::current(), dataflow_id, exited)
2199                            .await
2200                        {
2201                            let _ = events_tx
2202                                .send(Timestamped {
2203                                    inner: Event::DaemonError(
2204                                        eyre::eyre!(err)
2205                                            .wrap_err("failed to send AllNodesReady RPC"),
2206                                    ),
2207                                    timestamp: clock.new_timestamp(),
2208                                })
2209                                .await;
2210                        }
2211                    });
2212                }
2213            }
2214        }
2215
2216        // node only reaches here if it will not be restarted
2217        let might_restart = false;
2218
2219        self.handle_outputs_done(dataflow_id, node_id, might_restart)
2220            .await?;
2221
2222        let should_finish = {
2223            let mut dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
2224                format!(
2225                    "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2226                )
2227            })?;
2228            dataflow.running_nodes.remove(node_id);
2229            // Check if all remaining nodes are dynamic (won't send SpawnedNodeResult)
2230            !dataflow.pending_nodes.local_nodes_pending()
2231                && dataflow
2232                    .running_nodes
2233                    .iter()
2234                    .all(|(_id, n)| n.node_config.dynamic)
2235        };
2236
2237        if should_finish {
2238            self.finish_dataflow(dataflow_id).await?;
2239        }
2240
2241        Ok(())
2242    }
2243
2244    /// Mark a dataflow as finished and perform cleanup.
2245    /// This should be called when:
2246    /// - `stop_all()` returns `FinishDataflowWhen::Now`, or
2247    /// - All non-dynamic nodes have sent `SpawnedNodeResult` events
2248    async fn finish_dataflow(&mut self, dataflow_id: Uuid) -> eyre::Result<()> {
2249        let mut logger = self.logger.for_dataflow(dataflow_id);
2250
2251        // Dynamic nodes don't send SpawnedNodeResult events, so there may be no entry
2252        // in dataflow_node_results. An empty map means all dynamic nodes handled stop successfully.
2253        let result = DataflowDaemonResult {
2254            timestamp: self.state.clock.new_timestamp(),
2255            node_results: self
2256                .state
2257                .dataflow_node_results
2258                .get(&dataflow_id)
2259                .map(|entry| entry.value().clone())
2260                .unwrap_or_default(),
2261        };
2262
2263        {
2264            let mut git_manager = self.state.git_manager.lock().await;
2265            git_manager
2266                .clones_in_use
2267                .values_mut()
2268                .for_each(|dataflows| {
2269                    dataflows.remove(&dataflow_id);
2270                });
2271        }
2272
2273        logger
2274            .log(
2275                LogLevel::Info,
2276                None,
2277                Some("daemon".into()),
2278                format!("dataflow finished on machine `{}`", self.state.daemon_id()),
2279            )
2280            .await;
2281
2282        if let Some(client) = self.state.coordinator_client() {
2283            let client = client.clone();
2284            tokio::spawn(async move {
2285                if let Err(err) = client
2286                    .all_nodes_finished(tarpc::context::current(), dataflow_id, result)
2287                    .await
2288                {
2289                    tracing::error!(
2290                        ?err,
2291                        "failed to send all_nodes_finished notification to coordinator"
2292                    );
2293                }
2294            });
2295        }
2296        self.state.running.remove(&dataflow_id);
2297
2298        Ok(())
2299    }
2300
2301    async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
2302        match event {
2303            DoraEvent::Timer {
2304                dataflow_id,
2305                interval,
2306                metadata,
2307            } => {
2308                let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) else {
2309                    tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
2310                    return Ok(());
2311                };
2312
2313                let Some(subscribers) = dataflow.timers.get(&interval).cloned() else {
2314                    return Ok(());
2315                };
2316
2317                let mut closed = Vec::new();
2318                for (receiver_id, input_id) in &subscribers {
2319                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2320                        continue;
2321                    };
2322
2323                    let send_result = send_with_timestamp(
2324                        channel,
2325                        NodeEvent::Input {
2326                            id: input_id.clone(),
2327                            metadata: metadata.clone(),
2328                            data: None,
2329                        },
2330                        &self.state.clock,
2331                    );
2332                    match send_result {
2333                        Ok(()) => {}
2334                        Err(_) => {
2335                            closed.push(receiver_id.clone());
2336                        }
2337                    }
2338                }
2339                for id in &closed {
2340                    dataflow.subscribe_channels.remove(id);
2341                }
2342            }
2343            DoraEvent::Logs {
2344                dataflow_id,
2345                output_id,
2346                message,
2347                metadata,
2348            } => {
2349                let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) else {
2350                    tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2351                    return Ok(());
2352                };
2353
2354                let Some(subscribers) = dataflow.mappings.get(&output_id).cloned() else {
2355                    tracing::warn!(
2356                        "No subscribers found for {:?} in {:?}",
2357                        output_id,
2358                        dataflow.mappings
2359                    );
2360                    return Ok(());
2361                };
2362
2363                let mut closed = Vec::new();
2364                for (receiver_id, input_id) in &subscribers {
2365                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2366                        tracing::warn!("No subscriber channel found for {:?}", output_id);
2367                        continue;
2368                    };
2369
2370                    let send_result = send_with_timestamp(
2371                        channel,
2372                        NodeEvent::Input {
2373                            id: input_id.clone(),
2374                            metadata: metadata.clone(),
2375                            data: Some(message.clone()),
2376                        },
2377                        &self.state.clock,
2378                    );
2379                    match send_result {
2380                        Ok(()) => {}
2381                        Err(_) => {
2382                            closed.push(receiver_id.clone());
2383                        }
2384                    }
2385                }
2386                for id in &closed {
2387                    dataflow.subscribe_channels.remove(id);
2388                }
2389            }
2390            DoraEvent::SpawnedNodeResult {
2391                dataflow_id,
2392                node_id,
2393                dynamic_node,
2394                exit_status,
2395                restart,
2396            } => {
2397                let mut logger = self
2398                    .logger
2399                    .for_dataflow(dataflow_id)
2400                    .for_node(node_id.clone());
2401                logger
2402                    .log(
2403                        LogLevel::Debug,
2404                        Some("daemon".into()),
2405                        format!("handling node stop with exit status {exit_status:?} (restart: {restart})"),
2406                    )
2407                    .await;
2408
2409                let node_result = match exit_status {
2410                    NodeExitStatus::Success => Ok(()),
2411                    exit_status => {
2412                        // Extract all needed info from the dataflow Ref in one shot
2413                        let (caused_by_node, grace_duration_kill, stderr_lines) =
2414                            if let Some(dataflow) = self.state.running.get(&dataflow_id) {
2415                                let caused_by = dataflow
2416                                    .cascading_error_causes
2417                                    .error_caused_by(&node_id)
2418                                    .cloned();
2419
2420                                let grace_kill = dataflow.grace_duration_kills.contains(&node_id);
2421                                let stderr =
2422                                    dataflow.node_stderr_most_recent.get(&node_id).map(|queue| {
2423                                        let mut lines = Vec::new();
2424                                        if queue.is_full() {
2425                                            lines.push("[...]".into());
2426                                        }
2427                                        while let Some(line) = queue.pop() {
2428                                            lines.push(line);
2429                                        }
2430                                        lines
2431                                    });
2432                                (caused_by, grace_kill, stderr)
2433                            } else {
2434                                (None, false, None)
2435                            };
2436
2437                        let cause = match caused_by_node {
2438                            Some(caused_by_node) => {
2439                                logger
2440                                .log(
2441                                    LogLevel::Info,
2442                                    Some("daemon".into()),
2443                                    format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2444                                )
2445                                .await;
2446
2447                                NodeErrorCause::Cascading { caused_by_node }
2448                            }
2449                            None if grace_duration_kill => NodeErrorCause::GraceDuration,
2450                            None => {
2451                                let cause = stderr_lines
2452                                    .map(extract_err_from_stderr)
2453                                    .unwrap_or_default();
2454
2455                                NodeErrorCause::Other { stderr: cause }
2456                            }
2457                        };
2458                        Err(NodeError {
2459                            timestamp: self.state.clock.new_timestamp(),
2460                            cause,
2461                            exit_status,
2462                        })
2463                    }
2464                };
2465
2466                logger
2467                    .log(
2468                        if node_result.is_ok() {
2469                            LogLevel::Info
2470                        } else {
2471                            LogLevel::Error
2472                        },
2473                        Some("daemon".into()),
2474                        match &node_result {
2475                            Ok(()) => format!("{node_id} finished successfully"),
2476                            Err(err) => format!("{err}"),
2477                        },
2478                    )
2479                    .await;
2480
2481                drop(logger);
2482
2483                // Propagate error to downstream nodes only for genuine user-code failures,
2484                // even if the node will be restarted (downstream should know about failures).
2485                // Cascading errors (caused by a previously failed node), grace-duration
2486                // kills (dora's own timeout), and spawn failures are excluded:
2487                // - Cascading: downstream nodes already received a NodeFailed for the root cause.
2488                // - GraceDuration: this is dora operational behaviour, not a user-code error.
2489                // - FailedToSpawn: the node never ran, so its outputs were never open.
2490                if let Err(node_error) = &node_result {
2491                    if matches!(node_error.cause, NodeErrorCause::Other { .. }) {
2492                        let error_message = format!("{node_error}");
2493                        if let Err(err) = self
2494                            .send_node_failed_events(dataflow_id, node_id.clone(), error_message)
2495                            .await
2496                        {
2497                            tracing::warn!(
2498                                "Failed to propagate error for node {}/{}: {err:?}",
2499                                dataflow_id,
2500                                node_id
2501                            );
2502                        }
2503                    }
2504                }
2505
2506                if restart {
2507                    self.logger
2508                        .for_dataflow(dataflow_id)
2509                        .for_node(node_id.clone())
2510                        .log(
2511                            LogLevel::Info,
2512                            Some("daemon".into()),
2513                            "node will be restarted",
2514                        )
2515                        .await;
2516                } else {
2517                    self.state
2518                        .dataflow_node_results
2519                        .entry(dataflow_id)
2520                        .or_default()
2521                        .insert(node_id.clone(), node_result);
2522
2523                    self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2524                        .await?;
2525                }
2526            }
2527        }
2528        Ok(())
2529    }
2530
2531    fn base_working_dir(
2532        &self,
2533        local_working_dir: Option<PathBuf>,
2534        session_id: SessionId,
2535    ) -> eyre::Result<PathBuf> {
2536        Self::base_working_dir_static(local_working_dir, session_id)
2537    }
2538
2539    /// Static version of `base_working_dir`, usable without a `Daemon` instance.
2540    pub(crate) fn base_working_dir_static(
2541        local_working_dir: Option<PathBuf>,
2542        session_id: SessionId,
2543    ) -> eyre::Result<PathBuf> {
2544        match local_working_dir {
2545            Some(working_dir) => {
2546                // check that working directory exists
2547                if working_dir.exists() {
2548                    Ok(working_dir)
2549                } else {
2550                    bail!(
2551                        "working directory does not exist: {}",
2552                        working_dir.display(),
2553                    )
2554                }
2555            }
2556            None => {
2557                // use subfolder of daemon working dir
2558                let daemon_working_dir =
2559                    current_dir().context("failed to get daemon working dir")?;
2560                Ok(daemon_working_dir
2561                    .join("_work")
2562                    .join(session_id.uuid().to_string()))
2563            }
2564        }
2565    }
2566
2567    /// Static version of `build_dataflow`, usable from the RPC server without
2568    /// a `Daemon` instance. Uses `DaemonState` directly.
2569    #[allow(clippy::too_many_arguments)]
2570    pub(crate) async fn build_dataflow_static(
2571        state: &state::DaemonState,
2572        build_id: BuildId,
2573        session_id: SessionId,
2574        base_working_dir: PathBuf,
2575        git_sources: BTreeMap<NodeId, GitSource>,
2576        prev_git_sources: BTreeMap<NodeId, GitSource>,
2577        dataflow_descriptor: Descriptor,
2578        local_nodes: BTreeSet<NodeId>,
2579        uv: bool,
2580    ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
2581        let builder = build::Builder {
2582            session_id,
2583            base_working_dir,
2584            uv,
2585        };
2586        {
2587            let mut git_manager = state.git_manager.lock().await;
2588            git_manager.clear_planned_builds(session_id);
2589        }
2590
2591        let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
2592
2593        let mut tasks = Vec::new();
2594
2595        // build nodes
2596        for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
2597            let node_id = node.id.clone();
2598            let git_source = git_sources.get(&node_id).cloned();
2599            let prev_git_source = prev_git_sources.get(&node_id).cloned();
2600            let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
2601                still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
2602                git_source: prev_source,
2603            });
2604
2605            let mut builder = builder.clone();
2606            if let Some(node_working_dir) =
2607                node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
2608            {
2609                builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
2610            }
2611
2612            // Use a tracing-only logger stub for the RPC path
2613            let logger = build::TracingBuildLogger::new(build_id, node_id.clone());
2614
2615            let mut git_manager = state.git_manager.lock().await;
2616            match builder
2617                .build_node(node, git_source, prev_git, logger, &mut git_manager)
2618                .await
2619                .wrap_err_with(|| format!("failed to build node `{node_id}`"))
2620            {
2621                Ok(result) => {
2622                    tasks.push(NodeBuildTask {
2623                        node_id,
2624                        task: result,
2625                        dynamic_node: false,
2626                    });
2627                }
2628                Err(err) => {
2629                    tracing::error!("build failed for node {node_id}: {err:?}");
2630                    return Err(err);
2631                }
2632            }
2633        }
2634
2635        let task = async move {
2636            let mut info = BuildInfo {
2637                node_working_dirs: Default::default(),
2638            };
2639            for task in tasks {
2640                let NodeBuildTask {
2641                    node_id,
2642                    dynamic_node: _,
2643                    task,
2644                } = task;
2645                let node = task
2646                    .await
2647                    .with_context(|| format!("failed to build node `{node_id}`"))?;
2648                info.node_working_dirs
2649                    .insert(node_id, node.node_working_dir);
2650            }
2651            Ok(info)
2652        };
2653
2654        Ok(task)
2655    }
2656}
2657
2658async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result<Vec<u8>> {
2659    let mut pos = file.seek(io::SeekFrom::End(0)).await?;
2660
2661    let mut output = VecDeque::<u8>::new();
2662    let mut extend_slice_to_start = |slice: &[u8]| {
2663        output.extend(slice);
2664        output.rotate_right(slice.len());
2665    };
2666
2667    let mut buffer = vec![0; 2048];
2668    let mut estimated_line_length = 0;
2669    let mut at_end = true;
2670    'main: while tail > 0 && pos > 0 {
2671        let new_pos = pos.saturating_sub(buffer.len() as u64);
2672        file.seek(io::SeekFrom::Start(new_pos)).await?;
2673        let read_len = (pos - new_pos) as usize;
2674        pos = new_pos;
2675
2676        file.read_exact(&mut buffer[..read_len]).await?;
2677        let read_buf = if at_end {
2678            at_end = false;
2679            &buffer[..read_len].trim_ascii_end()
2680        } else {
2681            &buffer[..read_len]
2682        };
2683
2684        let mut iter = memchr::memrchr_iter(b'\n', read_buf);
2685        let mut lines = 1;
2686        loop {
2687            let Some(pos) = iter.next() else {
2688                extend_slice_to_start(read_buf);
2689                break;
2690            };
2691            lines += 1;
2692            tail -= 1;
2693            if tail == 0 {
2694                extend_slice_to_start(&read_buf[(pos + 1)..]);
2695                break 'main;
2696            }
2697        }
2698
2699        estimated_line_length = estimated_line_length.max((read_buf.len() + 1).div_ceil(lines));
2700        let estimated_buffer_length = estimated_line_length * tail;
2701        if estimated_buffer_length >= buffer.len() * 2 {
2702            buffer.resize(buffer.len() * 2, 0);
2703        }
2704    }
2705
2706    Ok(output.into())
2707}
2708
2709/// Set up the event stream for the daemon.
2710///
2711/// Returns `((daemon_id, coordinator_client), event_stream)`.
2712async fn set_up_event_stream(
2713    coordinator_addr: SocketAddr,
2714    machine_id: &Option<String>,
2715    clock: &Arc<HLC>,
2716    state: Arc<state::DaemonState>,
2717    remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2718    // used for dynamic nodes
2719    local_listen_port: u16,
2720) -> eyre::Result<(
2721    (
2722        DaemonId,
2723        dora_message::daemon_to_coordinator::CoordinatorNotifyClient,
2724    ),
2725    impl Stream<Item = Timestamped<Event>> + Unpin,
2726)> {
2727    let clock_cloned = clock.clone();
2728    let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2729        Ok(e) => Timestamped {
2730            inner: Event::Daemon(e.inner),
2731            timestamp: e.timestamp,
2732        },
2733        Err(err) => Timestamped {
2734            inner: Event::DaemonError(err),
2735            timestamp: clock_cloned.new_timestamp(),
2736        },
2737    });
2738
2739    let register_result = coordinator::register(coordinator_addr, machine_id.clone(), clock, state)
2740        .await
2741        .wrap_err("failed to connect to dora-coordinator")?;
2742    // Monitor the RPC server task so panics are logged instead of silently lost.
2743    let rpc_server_handle = register_result.rpc_server_handle;
2744    let daemon_id = register_result.daemon_id;
2745    let coordinator_client = register_result.coordinator_client;
2746    tokio::spawn(async move {
2747        if let Err(err) = rpc_server_handle.await {
2748            tracing::error!("coordinator RPC server task panicked: {err}");
2749        }
2750    });
2751
2752    let (events_tx, events_rx) = flume::bounded(10);
2753    let _listen_port =
2754        local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2755            .await?;
2756    let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2757        inner: Event::DynamicNode(e.inner),
2758        timestamp: e.timestamp,
2759    });
2760    let incoming = (remote_daemon_events, dynamic_node_events).merge();
2761    Ok(((daemon_id, coordinator_client), incoming))
2762}
2763
2764async fn send_output_to_local_receivers(
2765    node_id: NodeId,
2766    output_id: DataId,
2767    dataflow: &mut RunningDataflow,
2768    metadata: &metadata::Metadata,
2769    data: Option<DataMessage>,
2770    clock: &HLC,
2771) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2772    let timestamp = metadata.timestamp();
2773    let empty_set = BTreeSet::new();
2774    let output_id = OutputId(node_id, output_id);
2775    let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2776    let OutputId(node_id, _) = output_id;
2777    let mut closed = Vec::new();
2778    for (receiver_id, input_id) in local_receivers {
2779        if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2780            let item = NodeEvent::Input {
2781                id: input_id.clone(),
2782                metadata: metadata.clone(),
2783                data: data.clone(),
2784            };
2785            match channel.send(Timestamped {
2786                inner: item,
2787                timestamp,
2788            }) {
2789                Ok(()) => {
2790                    if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2791                        dataflow
2792                            .pending_drop_tokens
2793                            .entry(token)
2794                            .or_insert_with(|| DropTokenInformation {
2795                                owner: node_id.clone(),
2796                                pending_nodes: Default::default(),
2797                            })
2798                            .pending_nodes
2799                            .insert(receiver_id.clone());
2800                    }
2801                }
2802                Err(_) => {
2803                    closed.push(receiver_id);
2804                }
2805            }
2806        }
2807    }
2808    for id in closed {
2809        dataflow.subscribe_channels.remove(id);
2810    }
2811    let (data_bytes, drop_token) = match data {
2812        None => (None, None),
2813        Some(DataMessage::SharedMemory {
2814            shared_memory_id,
2815            len,
2816            drop_token,
2817        }) => {
2818            let memory = ShmemConf::new()
2819                .os_id(shared_memory_id)
2820                .open()
2821                .wrap_err("failed to map shared memory output")?;
2822            let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2823            (data, Some(drop_token))
2824        }
2825        Some(DataMessage::Vec(v)) => (Some(v), None),
2826    };
2827    if let Some(token) = drop_token {
2828        // insert token into `pending_drop_tokens` even if there are no local subscribers
2829        dataflow
2830            .pending_drop_tokens
2831            .entry(token)
2832            .or_insert_with(|| DropTokenInformation {
2833                owner: node_id.clone(),
2834                pending_nodes: Default::default(),
2835            });
2836        // check if all local subscribers are finished with the token
2837        dataflow.check_drop_token(token, clock).await?;
2838    }
2839    Ok(data_bytes)
2840}
2841
2842fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2843    match &node.kind {
2844        CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2845        CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2846    }
2847}
2848
2849fn close_input(
2850    dataflow: &mut RunningDataflow,
2851    receiver_id: &NodeId,
2852    input_id: &DataId,
2853    clock: &HLC,
2854) {
2855    if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2856        if !open_inputs.remove(input_id) {
2857            return;
2858        }
2859    }
2860    if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2861        let _ = send_with_timestamp(
2862            channel,
2863            NodeEvent::InputClosed {
2864                id: input_id.clone(),
2865            },
2866            clock,
2867        );
2868
2869        if dataflow.open_inputs(receiver_id).is_empty() {
2870            if let Some(node) = dataflow.running_nodes.get_mut(receiver_id) {
2871                node.disable_restart();
2872            }
2873            let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2874        }
2875    }
2876}
2877
2878#[derive(Debug)]
2879pub struct RunningNode {
2880    process: Option<ProcessHandle>,
2881    node_config: NodeConfig,
2882    pid: Option<Arc<AtomicU32>>,
2883    restart_policy: RestartPolicy,
2884    /// Don't restart the node even if the restart policy says so.
2885    ///
2886    /// This flag is set when all inputs of the node were closed and when a manual stop command
2887    /// was sent.
2888    disable_restart: Arc<AtomicBool>,
2889    /// Abort handle for this node's listener task, carried here until the dataflow
2890    /// collects it into `RunningDataflow::_listener_tasks`.
2891    listener_abort_handle: Option<tokio::task::AbortHandle>,
2892}
2893
2894impl RunningNode {
2895    pub fn restarts_disabled(&self) -> bool {
2896        self.disable_restart.load(atomic::Ordering::Acquire)
2897    }
2898
2899    pub fn disable_restart(&mut self) {
2900        self.disable_restart.store(true, atomic::Ordering::Release);
2901    }
2902}
2903
2904#[derive(Debug)]
2905enum ProcessOperation {
2906    SoftKill,
2907    Kill,
2908}
2909
2910impl ProcessOperation {
2911    pub fn execute(&self, child: &mut dyn TokioChildWrapper) {
2912        match self {
2913            Self::SoftKill => {
2914                #[cfg(unix)]
2915                {
2916                    // Send SIGTERM
2917                    if let Err(err) = child.signal(15) {
2918                        warn!("failed to send SIGTERM to process {:?}: {err}", child.id());
2919                    }
2920                }
2921
2922                #[cfg(windows)]
2923                unsafe {
2924                    let Some(pid) = child.id() else {
2925                        warn!("failed to get child process id");
2926                        return;
2927                    };
2928                    if let Err(err) = windows::Win32::System::Console::GenerateConsoleCtrlEvent(
2929                        windows::Win32::System::Console::CTRL_BREAK_EVENT,
2930                        pid,
2931                    ) {
2932                        warn!("failed to send CTRL_BREAK_EVENT to process {pid}: {err}");
2933                    }
2934                }
2935
2936                #[cfg(not(any(unix, windows)))]
2937                {
2938                    warn!("killing process is not implemented on this platform");
2939                }
2940            }
2941            Self::Kill => {
2942                if let Err(err) = child.start_kill() {
2943                    warn!("failed to kill child process: {err}");
2944                }
2945            }
2946        }
2947    }
2948}
2949
2950#[derive(Debug)]
2951struct ProcessHandle {
2952    op_tx: flume::Sender<ProcessOperation>,
2953}
2954
2955impl ProcessHandle {
2956    pub fn new(op_tx: flume::Sender<ProcessOperation>) -> Self {
2957        Self { op_tx }
2958    }
2959
2960    /// Returns true if the process is not finished yet and the operation is
2961    /// delivered.
2962    pub fn submit(&self, operation: ProcessOperation) -> bool {
2963        self.op_tx.send(operation).is_ok()
2964    }
2965}
2966
2967impl Drop for ProcessHandle {
2968    fn drop(&mut self) {
2969        if self.submit(ProcessOperation::Kill) {
2970            warn!("process was killed on drop because it was still running");
2971        }
2972    }
2973}
2974
2975/// Aborts the associated tokio task when dropped.
2976struct ListenerTask(tokio::task::AbortHandle);
2977
2978impl Drop for ListenerTask {
2979    fn drop(&mut self) {
2980        self.0.abort();
2981    }
2982}
2983
2984pub struct RunningDataflow {
2985    id: Uuid,
2986
2987    descriptor: Descriptor,
2988
2989    /// Local nodes that are not started yet
2990    pending_nodes: PendingNodes,
2991
2992    dataflow_started: bool,
2993
2994    subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2995    drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2996    mappings: HashMap<OutputId, BTreeSet<InputId>>,
2997    timers: BTreeMap<Duration, BTreeSet<InputId>>,
2998    open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2999    running_nodes: BTreeMap<NodeId, RunningNode>,
3000
3001    /// List of all dynamic node IDs.
3002    ///
3003    /// We want to treat dynamic nodes differently in some cases, so we need
3004    /// to know which nodes are dynamic.
3005    dynamic_nodes: BTreeSet<NodeId>,
3006
3007    open_external_mappings: BTreeSet<OutputId>,
3008
3009    pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
3010
3011    /// Keep handles to all timer tasks of this dataflow to cancel them on drop.
3012    _timer_handles: BTreeMap<Duration, futures::future::RemoteHandle<()>>,
3013    /// Keep abort handles for all node listener tasks so they are cancelled when
3014    /// the dataflow finishes and this struct is dropped.
3015    _listener_tasks: Vec<ListenerTask>,
3016    stop_sent: bool,
3017
3018    /// Used in `open_inputs`.
3019
3020    /// Contains the node that caused the error for nodes that experienced a cascading error.
3021    cascading_error_causes: CascadingErrorCauses,
3022    grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
3023
3024    /// Tracks nodes whose NodeFailed event has already been delivered to local receivers.
3025    ///
3026    /// Because zenoh is topic-based, a remote daemon may publish one NodeFailed per output
3027    /// of the failed node. Without deduplication the receiving daemon would call
3028    /// find_and_notify_local_receivers once per message, delivering duplicate events.
3029    handled_node_failed: BTreeSet<NodeId>,
3030
3031    node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
3032
3033    publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
3034
3035    finished_tx: broadcast::Sender<()>,
3036
3037    publish_all_messages_to_zenoh: bool,
3038}
3039
3040/// Indicates whether a dataflow should be finished immediately after stop_all()
3041/// or whether to wait for SpawnedNodeResult events from running nodes.
3042#[must_use]
3043pub enum FinishDataflowWhen {
3044    /// Finish the dataflow immediately (all nodes are dynamic or no nodes running)
3045    Now,
3046    /// Wait for SpawnedNodeResult events from non-dynamic nodes
3047    WaitForNodes,
3048}
3049
3050impl RunningDataflow {
3051    fn new(
3052        dataflow_id: Uuid,
3053        daemon_id: DaemonId,
3054        dataflow_descriptor: Descriptor,
3055        events_tx: tokio::sync::mpsc::Sender<Timestamped<Event>>,
3056        clock: Arc<HLC>,
3057    ) -> RunningDataflow {
3058        let (finished_tx, _) = broadcast::channel(1);
3059        Self {
3060            id: dataflow_id,
3061            pending_nodes: PendingNodes::new(dataflow_id, daemon_id, events_tx, clock),
3062            dataflow_started: false,
3063            subscribe_channels: HashMap::new(),
3064            drop_channels: HashMap::new(),
3065            mappings: HashMap::new(),
3066            timers: BTreeMap::new(),
3067            open_inputs: BTreeMap::new(),
3068            running_nodes: BTreeMap::new(),
3069            dynamic_nodes: BTreeSet::new(),
3070            open_external_mappings: Default::default(),
3071            pending_drop_tokens: HashMap::new(),
3072            _timer_handles: BTreeMap::new(),
3073            _listener_tasks: Vec::new(),
3074            stop_sent: false,
3075            cascading_error_causes: Default::default(),
3076            grace_duration_kills: Default::default(),
3077            handled_node_failed: BTreeSet::new(),
3078            node_stderr_most_recent: BTreeMap::new(),
3079            publishers: Default::default(),
3080            finished_tx,
3081            publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
3082            descriptor: dataflow_descriptor,
3083        }
3084    }
3085
3086    async fn start(
3087        &mut self,
3088        events_tx: &mpsc::Sender<Timestamped<Event>>,
3089        clock: &Arc<HLC>,
3090    ) -> eyre::Result<()> {
3091        for interval in self.timers.keys().copied() {
3092            if self._timer_handles.get(&interval).is_some() {
3093                continue;
3094            }
3095            let events_tx = events_tx.clone();
3096            let dataflow_id = self.id;
3097            let clock = clock.clone();
3098            let task = async move {
3099                let mut interval_stream = tokio::time::interval(interval);
3100                let hlc = HLC::default();
3101                loop {
3102                    interval_stream.tick().await;
3103
3104                    let span = tracing::span!(tracing::Level::TRACE, "tick");
3105                    let _ = span.enter();
3106
3107                    let mut parameters = BTreeMap::new();
3108                    parameters.insert(
3109                        "open_telemetry_context".to_string(),
3110                        #[cfg(feature = "telemetry")]
3111                        Parameter::String(serialize_context(&span.context())),
3112                        #[cfg(not(feature = "telemetry"))]
3113                        Parameter::String("".into()),
3114                    );
3115
3116                    let metadata = metadata::Metadata::from_parameters(
3117                        hlc.new_timestamp(),
3118                        empty_type_info(),
3119                        parameters,
3120                    );
3121
3122                    let event = Timestamped {
3123                        inner: DoraEvent::Timer {
3124                            dataflow_id,
3125                            interval,
3126                            metadata,
3127                        }
3128                        .into(),
3129                        timestamp: clock.new_timestamp(),
3130                    };
3131                    if events_tx.send(event).await.is_err() {
3132                        break;
3133                    }
3134                }
3135            };
3136            let (task, handle) = task.remote_handle();
3137            tokio::spawn(task);
3138            self._timer_handles.insert(interval, handle);
3139        }
3140
3141        Ok(())
3142    }
3143
3144    async fn stop_all(
3145        &mut self,
3146        state: &state::DaemonState,
3147        grace_duration: Option<Duration>,
3148        force: bool,
3149        logger: &mut DataflowLogger<'_>,
3150    ) -> eyre::Result<FinishDataflowWhen> {
3151        self.pending_nodes
3152            .handle_dataflow_stop(
3153                &state.coordinator_client().cloned(),
3154                &state.clock,
3155                &mut self.cascading_error_causes,
3156                &self.dynamic_nodes,
3157                logger,
3158            )
3159            .await?;
3160
3161        for node in self.running_nodes.values_mut() {
3162            node.disable_restart();
3163        }
3164
3165        for (_node_id, channel) in self.subscribe_channels.drain() {
3166            let _ = send_with_timestamp(&channel, NodeEvent::Stop, &state.clock);
3167        }
3168
3169        let running_processes: Vec<_> = self
3170            .running_nodes
3171            .iter_mut()
3172            .map(|(id, n)| (id.clone(), n.process.take()))
3173            .collect();
3174        if force {
3175            for (_, proc) in &running_processes {
3176                if let Some(proc) = proc {
3177                    proc.submit(crate::ProcessOperation::Kill);
3178                }
3179            }
3180        } else {
3181            let grace_duration_kills = self.grace_duration_kills.clone();
3182            tokio::spawn(async move {
3183                let duration = grace_duration.unwrap_or(Duration::from_millis(10000));
3184                tokio::time::sleep(duration).await;
3185
3186                for (node, proc) in &running_processes {
3187                    if let Some(proc) = proc {
3188                        grace_duration_kills.insert(node.clone());
3189                        proc.submit(crate::ProcessOperation::SoftKill);
3190                    }
3191                }
3192
3193                let kill_duration = duration / 2;
3194                tokio::time::sleep(kill_duration).await;
3195
3196                for (node, proc) in &running_processes {
3197                    if let Some(proc) = proc {
3198                        grace_duration_kills.insert(node.clone());
3199                        proc.submit(crate::ProcessOperation::Kill);
3200                        warn!(
3201                            "{node} was killed due to not stopping within the {:#?} grace period",
3202                            duration + kill_duration
3203                        );
3204                    }
3205                }
3206            });
3207        }
3208        self.stop_sent = true;
3209
3210        // Determine if we should finish immediately or wait for nodes
3211        Ok(self.should_finish_immediately())
3212    }
3213
3214    /// Synchronous part of `stop_all` — sends stop signals to nodes and
3215    /// schedules grace-period kills. Does NOT do any RPC or async I/O, so
3216    /// it is safe to call while holding a DashMap guard.
3217    fn stop_all_after_pending(
3218        &mut self,
3219        state: &state::DaemonState,
3220        grace_duration: Option<Duration>,
3221        force: bool,
3222    ) -> eyre::Result<FinishDataflowWhen> {
3223        for node in self.running_nodes.values_mut() {
3224            node.disable_restart();
3225        }
3226
3227        for (_node_id, channel) in self.subscribe_channels.drain() {
3228            let _ = send_with_timestamp(&channel, NodeEvent::Stop, &state.clock);
3229        }
3230
3231        let running_processes: Vec<_> = self
3232            .running_nodes
3233            .iter_mut()
3234            .map(|(id, n)| (id.clone(), n.process.take()))
3235            .collect();
3236        if force {
3237            for (_, proc) in &running_processes {
3238                if let Some(proc) = proc {
3239                    proc.submit(crate::ProcessOperation::Kill);
3240                }
3241            }
3242        } else {
3243            let grace_duration_kills = self.grace_duration_kills.clone();
3244            tokio::spawn(async move {
3245                let duration = grace_duration.unwrap_or(Duration::from_millis(10000));
3246                tokio::time::sleep(duration).await;
3247
3248                for (node, proc) in &running_processes {
3249                    if let Some(proc) = proc {
3250                        grace_duration_kills.insert(node.clone());
3251                        proc.submit(crate::ProcessOperation::SoftKill);
3252                    }
3253                }
3254
3255                let kill_duration = duration / 2;
3256                tokio::time::sleep(kill_duration).await;
3257
3258                for (node, proc) in &running_processes {
3259                    if let Some(proc) = proc {
3260                        grace_duration_kills.insert(node.clone());
3261                        proc.submit(crate::ProcessOperation::Kill);
3262                        warn!(
3263                            "{node} was killed due to not stopping within the {:#?} grace period",
3264                            duration + kill_duration
3265                        );
3266                    }
3267                }
3268            });
3269        }
3270        self.stop_sent = true;
3271
3272        Ok(self.should_finish_immediately())
3273    }
3274
3275    /// Check if dataflow should finish immediately after stop_all().
3276    /// Returns `Now` if all running nodes are dynamic (they won't send SpawnedNodeResult).
3277    /// Returns `WaitForNodes` if there are non-dynamic nodes to wait for.
3278    fn should_finish_immediately(&self) -> FinishDataflowWhen {
3279        // Only finish immediately if:
3280        // 1. No pending nodes
3281        // 2. All running nodes are dynamic (they won't send SpawnedNodeResult)
3282        // 3. Stop was sent (stop_all() was called)
3283        if !self.pending_nodes.local_nodes_pending()
3284            && self
3285                .running_nodes
3286                .iter()
3287                .all(|(_id, n)| n.node_config.dynamic)
3288            && self.stop_sent
3289        {
3290            FinishDataflowWhen::Now
3291        } else {
3292            FinishDataflowWhen::WaitForNodes
3293        }
3294    }
3295
3296    fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
3297        self.open_inputs.get(node_id).unwrap_or(&EMPTY_SET)
3298    }
3299
3300    async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
3301        match self.pending_drop_tokens.entry(token) {
3302            std::collections::hash_map::Entry::Occupied(entry) => {
3303                if entry.get().pending_nodes.is_empty() {
3304                    let (drop_token, info) = entry.remove_entry();
3305                    let result = match self.drop_channels.get_mut(&info.owner) {
3306                        Some(channel) => send_with_timestamp(
3307                            channel,
3308                            NodeDropEvent::OutputDropped { drop_token },
3309                            clock,
3310                        )
3311                        .wrap_err("send failed"),
3312                        None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
3313                    };
3314                    if let Err(err) = result.wrap_err_with(|| {
3315                        format!(
3316                            "failed to report drop token `{drop_token:?}` to owner `{}`",
3317                            &info.owner
3318                        )
3319                    }) {
3320                        tracing::warn!("{err:?}");
3321                    }
3322                }
3323            }
3324            std::collections::hash_map::Entry::Vacant(_) => {
3325                tracing::warn!("check_drop_token called with already closed token")
3326            }
3327        }
3328
3329        Ok(())
3330    }
3331}
3332
3333fn empty_type_info() -> ArrowTypeInfo {
3334    ArrowTypeInfo {
3335        data_type: DataType::Null,
3336        len: 0,
3337        null_count: 0,
3338        validity: None,
3339        offset: 0,
3340        buffer_offsets: Vec::new(),
3341        child_data: Vec::new(),
3342    }
3343}
3344
3345#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
3346pub struct OutputId(NodeId, DataId);
3347type InputId = (NodeId, DataId);
3348
3349struct DropTokenInformation {
3350    /// The node that created the associated drop token.
3351    owner: NodeId,
3352    /// Contains the set of pending nodes that still have access to the input
3353    /// associated with a drop token.
3354    pending_nodes: BTreeSet<NodeId>,
3355}
3356
3357#[derive(Debug)]
3358pub enum Event {
3359    Node {
3360        dataflow_id: DataflowId,
3361        node_id: NodeId,
3362        event: DaemonNodeEvent,
3363    },
3364    Daemon(InterDaemonEvent),
3365    Dora(DoraEvent),
3366    DynamicNode(DynamicNodeEventWrapper),
3367    HeartbeatInterval,
3368    MetricsInterval,
3369    CtrlC,
3370    StopAfter(Duration),
3371    SecondCtrlC,
3372    DaemonError(eyre::Report),
3373    /// Coordinator requested a destroy — shut down the daemon.
3374    Destroy,
3375    /// Coordinator requested spawning nodes (routed from RPC server).
3376    SpawnRequest {
3377        build_id: Option<BuildId>,
3378        dataflow_id: DataflowId,
3379        local_working_dir: Option<PathBuf>,
3380        nodes: BTreeMap<NodeId, ResolvedNode>,
3381        dataflow_descriptor: Descriptor,
3382        spawn_nodes: BTreeSet<NodeId>,
3383        uv: bool,
3384        write_events_to: Option<PathBuf>,
3385        reply_tx: oneshot::Sender<Result<(), String>>,
3386    },
3387    /// Coordinator requested stopping a dataflow (routed from RPC server).
3388    StopDataflowRequest {
3389        dataflow_id: DataflowId,
3390        grace_duration: Option<Duration>,
3391        force: bool,
3392        reply_tx: oneshot::Sender<Result<(), String>>,
3393    },
3394    SpawnNodeResult {
3395        dataflow_id: DataflowId,
3396        node_id: NodeId,
3397        dynamic_node: bool,
3398        result: Result<RunningNode, NodeError>,
3399    },
3400    SpawnDataflowResult {
3401        dataflow_id: Uuid,
3402        result: eyre::Result<()>,
3403    },
3404    NodeStopped {
3405        dataflow_id: Uuid,
3406        node_id: NodeId,
3407    },
3408}
3409
3410impl From<DoraEvent> for Event {
3411    fn from(event: DoraEvent) -> Self {
3412        Event::Dora(event)
3413    }
3414}
3415
3416impl Event {
3417    pub fn kind(&self) -> &'static str {
3418        match self {
3419            Event::Node { .. } => "Node",
3420            Event::Daemon(_) => "Daemon",
3421            Event::Dora(_) => "Dora",
3422            Event::DynamicNode(_) => "DynamicNode",
3423            Event::HeartbeatInterval => "HeartbeatInterval",
3424            Event::MetricsInterval => "MetricsInterval",
3425            Event::CtrlC => "CtrlC",
3426            Event::StopAfter(_) => "StopAfter",
3427            Event::SecondCtrlC => "SecondCtrlC",
3428            Event::DaemonError(_) => "DaemonError",
3429            Event::Destroy => "Destroy",
3430            Event::SpawnRequest { .. } => "SpawnRequest",
3431            Event::StopDataflowRequest { .. } => "StopDataflowRequest",
3432            Event::SpawnNodeResult { .. } => "SpawnNodeResult",
3433            Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
3434            Event::NodeStopped { .. } => "NodeStopped",
3435        }
3436    }
3437}
3438
3439#[derive(Debug)]
3440#[allow(clippy::large_enum_variant)]
3441pub enum DaemonNodeEvent {
3442    OutputsDone {
3443        reply_sender: oneshot::Sender<DaemonReply>,
3444    },
3445    Subscribe {
3446        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
3447        reply_sender: oneshot::Sender<DaemonReply>,
3448    },
3449    SubscribeDrop {
3450        event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
3451        reply_sender: oneshot::Sender<DaemonReply>,
3452    },
3453    CloseOutputs {
3454        outputs: Vec<dora_core::config::DataId>,
3455        reply_sender: oneshot::Sender<DaemonReply>,
3456    },
3457    SendOut {
3458        output_id: DataId,
3459        metadata: metadata::Metadata,
3460        data: Option<DataMessage>,
3461    },
3462    ReportDrop {
3463        tokens: Vec<DropToken>,
3464    },
3465    EventStreamDropped {
3466        reply_sender: oneshot::Sender<DaemonReply>,
3467    },
3468}
3469
3470#[derive(Debug)]
3471pub enum DoraEvent {
3472    Timer {
3473        dataflow_id: DataflowId,
3474        interval: Duration,
3475        metadata: metadata::Metadata,
3476    },
3477    Logs {
3478        dataflow_id: DataflowId,
3479        output_id: OutputId,
3480        message: DataMessage,
3481        metadata: metadata::Metadata,
3482    },
3483    SpawnedNodeResult {
3484        dataflow_id: DataflowId,
3485        node_id: NodeId,
3486        dynamic_node: bool,
3487        exit_status: NodeExitStatus,
3488        /// Whether the node will be restarted
3489        restart: bool,
3490    },
3491}
3492
3493#[must_use]
3494enum RunStatus {
3495    Continue,
3496    Exit,
3497}
3498
3499fn send_with_timestamp<T>(
3500    sender: &UnboundedSender<Timestamped<T>>,
3501    event: T,
3502    clock: &HLC,
3503) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
3504    sender.send(Timestamped {
3505        inner: event,
3506        timestamp: clock.new_timestamp(),
3507    })
3508}
3509
3510fn set_up_ctrlc_handler(
3511    clock: Arc<HLC>,
3512) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
3513    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
3514
3515    let mut ctrlc_sent = 0;
3516    ctrlc::set_handler(move || {
3517        let event = match ctrlc_sent {
3518            0 => Event::CtrlC,
3519            1 => Event::SecondCtrlC,
3520            _ => {
3521                tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
3522                std::process::abort();
3523            }
3524        };
3525        if ctrlc_tx
3526            .blocking_send(Timestamped {
3527                inner: event,
3528                timestamp: clock.new_timestamp(),
3529            })
3530            .is_err()
3531        {
3532            tracing::error!("failed to report ctrl-c event to dora-coordinator");
3533        }
3534
3535        ctrlc_sent += 1;
3536    })
3537    .wrap_err("failed to set ctrl-c handler")?;
3538
3539    Ok(ctrlc_rx)
3540}
3541
3542#[derive(Debug, Default, Clone, PartialEq, Eq)]
3543pub struct CascadingErrorCauses {
3544    caused_by: BTreeMap<NodeId, NodeId>,
3545}
3546
3547impl CascadingErrorCauses {
3548    pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
3549        self.caused_by.contains_key(node)
3550    }
3551
3552    /// Return the ID of the node that caused a cascading error for the given node, if any.
3553    pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
3554        self.caused_by.get(node)
3555    }
3556
3557    pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
3558        self.caused_by.entry(affected_node).or_insert(causing_node);
3559    }
3560}
3561
3562fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
3563    n.operators
3564        .iter()
3565        .flat_map(|operator| {
3566            operator.config.inputs.iter().map(|(input_id, mapping)| {
3567                (
3568                    DataId::from(format!("{}/{input_id}", operator.id)),
3569                    mapping.clone(),
3570                )
3571            })
3572        })
3573        .collect()
3574}
3575
3576fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
3577    n.operators
3578        .iter()
3579        .flat_map(|operator| {
3580            operator
3581                .config
3582                .outputs
3583                .iter()
3584                .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
3585        })
3586        .collect()
3587}
3588
3589trait CoreNodeKindExt {
3590    fn run_config(&self) -> NodeRunConfig;
3591    fn dynamic(&self) -> bool;
3592}
3593
3594impl CoreNodeKindExt for CoreNodeKind {
3595    fn run_config(&self) -> NodeRunConfig {
3596        match self {
3597            CoreNodeKind::Runtime(n) => NodeRunConfig {
3598                inputs: runtime_node_inputs(n),
3599                outputs: runtime_node_outputs(n),
3600            },
3601            CoreNodeKind::Custom(n) => n.run_config.clone(),
3602        }
3603    }
3604
3605    fn dynamic(&self) -> bool {
3606        match self {
3607            CoreNodeKind::Runtime(_n) => false,
3608            CoreNodeKind::Custom(n) => {
3609                matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
3610            }
3611        }
3612    }
3613}