dora_daemon/
lib.rs

1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5    build::{self, BuildInfo, GitManager, PrevGitSource},
6    config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
7    descriptor::{
8        CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
9        read_as_descriptor,
10    },
11    topics::{
12        DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST, open_zenoh_session,
13        zenoh_output_publish_topic,
14    },
15    uhlc::{self, HLC},
16};
17use dora_message::{
18    BuildId, DataflowId, SessionId,
19    common::{
20        DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
21        NodeExitStatus,
22    },
23    coordinator_to_cli::DataflowResult,
24    coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
25    daemon_to_coordinator::{
26        CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
27    },
28    daemon_to_daemon::InterDaemonEvent,
29    daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
30    descriptor::{NodeSource, RestartPolicy},
31    metadata::{self, ArrowTypeInfo},
32    node_to_daemon::{DynamicNodeEvent, Timestamped},
33};
34use dora_node_api::{Parameter, arrow::datatypes::DataType};
35use eyre::{Context, ContextCompat, Result, bail, eyre};
36use futures::{FutureExt, TryFutureExt, future, stream};
37use futures_concurrency::stream::Merge;
38use local_listener::DynamicNodeEventWrapper;
39use log::{DaemonLogger, DataflowLogger, Logger};
40use pending::PendingNodes;
41use process_wrap::tokio::TokioChildWrapper;
42use shared_memory_server::ShmemConf;
43use socket_stream_utils::socket_stream_send;
44use spawn::Spawner;
45use std::{
46    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
47    env::current_dir,
48    future::Future,
49    io,
50    net::SocketAddr,
51    path::{Path, PathBuf},
52    pin::pin,
53    sync::{
54        Arc,
55        atomic::{self, AtomicBool, AtomicU32},
56    },
57    time::{Duration, Instant},
58};
59use tokio::{
60    fs::File,
61    io::{AsyncReadExt, AsyncSeekExt},
62    net::TcpStream,
63    sync::{
64        broadcast,
65        mpsc::{self, UnboundedSender},
66        oneshot::{self, Sender},
67    },
68};
69use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
70use tracing::{error, warn};
71use uuid::{NoContext, Timestamp, Uuid};
72
73pub use flume;
74pub use log::LogDestination;
75
76mod coordinator;
77mod local_listener;
78mod log;
79mod node_communication;
80mod pending;
81mod socket_stream_utils;
82mod spawn;
83
84#[cfg(feature = "telemetry")]
85use dora_tracing::telemetry::serialize_context;
86#[cfg(feature = "telemetry")]
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88
89use crate::pending::DataflowStatus;
90
91const STDERR_LOG_LINES: usize = 10;
92
93pub struct Daemon {
94    running: HashMap<DataflowId, RunningDataflow>,
95    working_dir: HashMap<DataflowId, PathBuf>,
96
97    events_tx: mpsc::Sender<Timestamped<Event>>,
98
99    coordinator_connection: Option<TcpStream>,
100    last_coordinator_heartbeat: Instant,
101    daemon_id: DaemonId,
102
103    /// used for testing and examples
104    exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
105    /// set on ctrl-c
106    exit_when_all_finished: bool,
107    /// used to record results of local nodes
108    dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
109
110    clock: Arc<uhlc::HLC>,
111
112    zenoh_session: zenoh::Session,
113    remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
114
115    logger: DaemonLogger,
116
117    sessions: BTreeMap<SessionId, BuildId>,
118    builds: BTreeMap<BuildId, BuildInfo>,
119    git_manager: GitManager,
120    /// System instance for metrics collection (reused across calls)
121    metrics_system: sysinfo::System,
122}
123
124type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
125
126struct NodeBuildTask<F> {
127    node_id: NodeId,
128    dynamic_node: bool,
129    task: F,
130}
131
132impl Daemon {
133    pub async fn run(
134        coordinator_addr: SocketAddr,
135        machine_id: Option<String>,
136        local_listen_port: u16,
137    ) -> eyre::Result<()> {
138        let clock = Arc::new(HLC::default());
139
140        let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
141        let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
142        let (daemon_id, incoming_events) = {
143            let incoming_events = set_up_event_stream(
144                coordinator_addr,
145                &machine_id,
146                &clock,
147                remote_daemon_events_rx,
148                local_listen_port,
149            );
150
151            // finish early if ctrl-c is is pressed during event stream setup
152            let ctrl_c = pin!(ctrlc_events.recv());
153            match futures::future::select(ctrl_c, pin!(incoming_events)).await {
154                future::Either::Left((_ctrl_c, _)) => {
155                    tracing::info!("received ctrl-c signal -> stopping daemon");
156                    return Ok(());
157                }
158                future::Either::Right((events, _)) => events?,
159            }
160        };
161
162        let log_destination = {
163            // additional connection for logging
164            let stream = TcpStream::connect(coordinator_addr)
165                .await
166                .wrap_err("failed to connect log to dora-coordinator")?;
167            stream
168                .set_nodelay(true)
169                .wrap_err("failed to set TCP_NODELAY")?;
170            LogDestination::Coordinator {
171                coordinator_connection: stream,
172            }
173        };
174
175        Self::run_general(
176            (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
177            Some(coordinator_addr),
178            daemon_id,
179            None,
180            clock.clone(),
181            Some(remote_daemon_events_tx),
182            Default::default(),
183            log_destination,
184        )
185        .await
186        .map(|_| ())
187    }
188
189    pub async fn run_dataflow(
190        dataflow_path: &Path,
191        build_id: Option<BuildId>,
192        local_build: Option<BuildInfo>,
193        session_id: SessionId,
194        uv: bool,
195        log_destination: LogDestination,
196        write_events_to: Option<PathBuf>,
197    ) -> eyre::Result<DataflowResult> {
198        let working_dir = dataflow_path
199            .canonicalize()
200            .context("failed to canonicalize dataflow path")?
201            .parent()
202            .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
203            .to_owned();
204
205        let descriptor = read_as_descriptor(dataflow_path).await?;
206        if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
207            eyre::bail!(
208                "node {} has a `deploy` section, which is not supported in `dora run`\n\n
209                Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
210                instances and then use `dora start`.",
211                node.id
212            )
213        }
214
215        descriptor.check(&working_dir)?;
216        let nodes = descriptor.resolve_aliases_and_set_defaults()?;
217
218        let (events_tx, events_rx) = flume::bounded(10);
219        if nodes
220            .iter()
221            .find(|(_n, resolved_nodes)| resolved_nodes.kind.dynamic())
222            .is_some()
223        {
224            // Spawn local listener for dynamic nodes
225            let _listen_port = local_listener::spawn_listener_loop(
226                (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(),
227                events_tx,
228            )
229            .await?;
230        }
231        let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
232            inner: Event::DynamicNode(e.inner),
233            timestamp: e.timestamp,
234        });
235
236        let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
237        let spawn_command = SpawnDataflowNodes {
238            build_id,
239            session_id,
240            dataflow_id,
241            local_working_dir: Some(working_dir),
242            spawn_nodes: nodes.keys().cloned().collect(),
243            nodes,
244            dataflow_descriptor: descriptor,
245            uv,
246            write_events_to,
247        };
248
249        let clock = Arc::new(HLC::default());
250
251        let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
252
253        let exit_when_done = spawn_command
254            .nodes
255            .values()
256            .map(|n| (spawn_command.dataflow_id, n.id.clone()))
257            .collect();
258        let (reply_tx, reply_rx) = oneshot::channel();
259        let timestamp = clock.new_timestamp();
260        let coordinator_events = stream::once(async move {
261            Timestamped {
262                inner: Event::Coordinator(CoordinatorEvent {
263                    event: DaemonCoordinatorEvent::Spawn(spawn_command),
264                    reply_tx,
265                }),
266                timestamp,
267            }
268        });
269        let events = (coordinator_events, ctrlc_events, dynamic_node_events).merge();
270        let run_result = Self::run_general(
271            Box::pin(events),
272            None,
273            DaemonId::new(None),
274            Some(exit_when_done),
275            clock.clone(),
276            None,
277            if let Some(local_build) = local_build {
278                let Some(build_id) = build_id else {
279                    bail!("no build_id, but local_build set")
280                };
281                let mut builds = BTreeMap::new();
282                builds.insert(build_id, local_build);
283                builds
284            } else {
285                Default::default()
286            },
287            log_destination,
288        );
289
290        let spawn_result = reply_rx
291            .map_err(|err| eyre!("failed to receive spawn result: {err}"))
292            .and_then(|r| async {
293                match r {
294                    Some(DaemonCoordinatorReply::TriggerSpawnResult(result)) => {
295                        result.map_err(|err| eyre!(err))
296                    }
297                    _ => Err(eyre!("unexpected spawn reply")),
298                }
299            });
300
301        let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
302
303        Ok(DataflowResult {
304            uuid: dataflow_id,
305            timestamp: clock.new_timestamp(),
306            node_results: dataflow_results
307                .remove(&dataflow_id)
308                .context("no node results for dataflow_id")?,
309        })
310    }
311
312    #[allow(clippy::too_many_arguments)]
313    async fn run_general(
314        external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
315        coordinator_addr: Option<SocketAddr>,
316        daemon_id: DaemonId,
317        exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
318        clock: Arc<HLC>,
319        remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
320        builds: BTreeMap<BuildId, BuildInfo>,
321        log_destination: LogDestination,
322    ) -> eyre::Result<DaemonRunResult> {
323        let coordinator_connection = match coordinator_addr {
324            Some(addr) => {
325                let stream = TcpStream::connect(addr)
326                    .await
327                    .wrap_err("failed to connect to dora-coordinator")?;
328                stream
329                    .set_nodelay(true)
330                    .wrap_err("failed to set TCP_NODELAY")?;
331                Some(stream)
332            }
333            None => None,
334        };
335
336        let zenoh_session = open_zenoh_session(coordinator_addr.map(|addr| addr.ip()))
337            .await
338            .wrap_err("failed to open zenoh session")?;
339        let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
340        let daemon = Self {
341            logger: Logger {
342                destination: log_destination,
343                daemon_id: daemon_id.clone(),
344                clock: clock.clone(),
345            }
346            .for_daemon(daemon_id.clone()),
347            running: HashMap::new(),
348            working_dir: HashMap::new(),
349            events_tx: dora_events_tx,
350            coordinator_connection,
351            last_coordinator_heartbeat: Instant::now(),
352            daemon_id,
353            exit_when_done,
354            exit_when_all_finished: false,
355            dataflow_node_results: BTreeMap::new(),
356            clock,
357            zenoh_session,
358            remote_daemon_events_tx,
359            git_manager: Default::default(),
360            builds,
361            sessions: Default::default(),
362            metrics_system: sysinfo::System::new(),
363        };
364
365        let dora_events = ReceiverStream::new(dora_events_rx);
366        let watchdog_clock = daemon.clock.clone();
367        let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
368            Duration::from_secs(5),
369        ))
370        .map(|_| Timestamped {
371            inner: Event::HeartbeatInterval,
372            timestamp: watchdog_clock.new_timestamp(),
373        });
374
375        let metrics_clock = daemon.clock.clone();
376        let metrics_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
377            Duration::from_secs(2), // Collect metrics every 2 seconds
378        ))
379        .map(|_| Timestamped {
380            inner: Event::MetricsInterval,
381            timestamp: metrics_clock.new_timestamp(),
382        });
383
384        let events = (
385            external_events,
386            dora_events,
387            watchdog_interval,
388            metrics_interval,
389        )
390            .merge();
391        daemon.run_inner(events).await
392    }
393
394    #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
395    async fn run_inner(
396        mut self,
397        incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
398    ) -> eyre::Result<DaemonRunResult> {
399        let mut events = incoming_events;
400
401        while let Some(event) = events.next().await {
402            let Timestamped { inner, timestamp } = event;
403            if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
404                tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
405            }
406
407            // used below for checking the duration of event handling
408            let start = Instant::now();
409            let event_kind = inner.kind();
410
411            match inner {
412                Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
413                    let status = self.handle_coordinator_event(event, reply_tx).await?;
414
415                    match status {
416                        RunStatus::Continue => {}
417                        RunStatus::Exit => break,
418                    }
419                }
420                Event::Daemon(event) => {
421                    self.handle_inter_daemon_event(event).await?;
422                }
423                Event::Node {
424                    dataflow_id: dataflow,
425                    node_id,
426                    event,
427                } => self.handle_node_event(event, dataflow, node_id).await?,
428                Event::Dora(event) => self.handle_dora_event(event).await?,
429                Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
430                Event::HeartbeatInterval => {
431                    if let Some(connection) = &mut self.coordinator_connection {
432                        let msg = serde_json::to_vec(&Timestamped {
433                            inner: CoordinatorRequest::Event {
434                                daemon_id: self.daemon_id.clone(),
435                                event: DaemonEvent::Heartbeat,
436                            },
437                            timestamp: self.clock.new_timestamp(),
438                        })?;
439                        socket_stream_send(connection, &msg)
440                            .await
441                            .wrap_err("failed to send watchdog message to dora-coordinator")?;
442
443                        if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
444                            bail!("lost connection to coordinator")
445                        }
446                    }
447                }
448                Event::MetricsInterval => {
449                    self.collect_and_send_metrics().await?;
450                }
451                Event::CtrlC => {
452                    tracing::info!("received ctrlc signal -> stopping all dataflows");
453                    for dataflow in self.running.values_mut() {
454                        let mut logger = self.logger.for_dataflow(dataflow.id);
455                        dataflow
456                            .stop_all(
457                                &mut self.coordinator_connection,
458                                &self.clock,
459                                None,
460                                false,
461                                &mut logger,
462                            )
463                            .await?;
464                    }
465                    self.exit_when_all_finished = true;
466                    if self.running.is_empty() {
467                        break;
468                    }
469                }
470                Event::SecondCtrlC => {
471                    tracing::warn!("received second ctrlc signal -> exit immediately");
472                    bail!("received second ctrl-c signal");
473                }
474                Event::DaemonError(err) => {
475                    tracing::error!("Daemon error: {err:?}");
476                }
477                Event::SpawnNodeResult {
478                    dataflow_id,
479                    node_id,
480                    dynamic_node,
481                    result,
482                } => match result {
483                    Ok(running_node) => {
484                        if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
485                            dataflow.running_nodes.insert(node_id, running_node);
486                        } else {
487                            tracing::error!(
488                                "failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}"
489                            );
490                        }
491                    }
492                    Err(error) => {
493                        self.dataflow_node_results
494                            .entry(dataflow_id)
495                            .or_default()
496                            .insert(node_id.clone(), Err(error));
497                        self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
498                            .await?;
499                    }
500                },
501                Event::BuildDataflowResult {
502                    build_id,
503                    session_id,
504                    result,
505                } => {
506                    let (build_info, result) = match result {
507                        Ok(build_info) => (Some(build_info), Ok(())),
508                        Err(err) => (None, Err(err)),
509                    };
510                    if let Some(build_info) = build_info {
511                        self.builds.insert(build_id, build_info);
512                        if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
513                            self.builds.remove(&old_build_id);
514                        }
515                    }
516                    if let Some(connection) = &mut self.coordinator_connection {
517                        let msg = serde_json::to_vec(&Timestamped {
518                            inner: CoordinatorRequest::Event {
519                                daemon_id: self.daemon_id.clone(),
520                                event: DaemonEvent::BuildResult {
521                                    build_id,
522                                    result: result.map_err(|err| format!("{err:?}")),
523                                },
524                            },
525                            timestamp: self.clock.new_timestamp(),
526                        })?;
527                        socket_stream_send(connection, &msg).await.wrap_err(
528                            "failed to send BuildDataflowResult message to dora-coordinator",
529                        )?;
530                    }
531                }
532                Event::SpawnDataflowResult {
533                    dataflow_id,
534                    result,
535                } => {
536                    if let Some(connection) = &mut self.coordinator_connection {
537                        let msg = serde_json::to_vec(&Timestamped {
538                            inner: CoordinatorRequest::Event {
539                                daemon_id: self.daemon_id.clone(),
540                                event: DaemonEvent::SpawnResult {
541                                    dataflow_id,
542                                    result: result.map_err(|err| format!("{err:?}")),
543                                },
544                            },
545                            timestamp: self.clock.new_timestamp(),
546                        })?;
547                        socket_stream_send(connection, &msg).await.wrap_err(
548                            "failed to send SpawnDataflowResult message to dora-coordinator",
549                        )?;
550                    }
551                }
552                Event::NodeStopped {
553                    dataflow_id,
554                    node_id,
555                } => {
556                    if let Some(exit_when_done) = &mut self.exit_when_done {
557                        exit_when_done.remove(&(dataflow_id, node_id));
558                        if exit_when_done.is_empty() {
559                            tracing::info!(
560                                "exiting daemon because all required dataflows are finished"
561                            );
562                            break;
563                        }
564                    }
565                    if self.exit_when_all_finished && self.running.is_empty() {
566                        break;
567                    }
568                }
569            }
570
571            // warn if event handling took too long -> the main loop should never be blocked for too long
572            let elapsed = start.elapsed();
573            if elapsed > Duration::from_millis(100) {
574                tracing::warn!(
575                    "Daemon took {}ms for handling event: {event_kind}",
576                    elapsed.as_millis()
577                );
578            }
579        }
580
581        if let Some(mut connection) = self.coordinator_connection.take() {
582            let msg = serde_json::to_vec(&Timestamped {
583                inner: CoordinatorRequest::Event {
584                    daemon_id: self.daemon_id.clone(),
585                    event: DaemonEvent::Exit,
586                },
587                timestamp: self.clock.new_timestamp(),
588            })?;
589            socket_stream_send(&mut connection, &msg)
590                .await
591                .wrap_err("failed to send Exit message to dora-coordinator")?;
592        }
593
594        Ok(self.dataflow_node_results)
595    }
596
597    async fn handle_coordinator_event(
598        &mut self,
599        event: DaemonCoordinatorEvent,
600        reply_tx: Sender<Option<DaemonCoordinatorReply>>,
601    ) -> eyre::Result<RunStatus> {
602        let status = match event {
603            DaemonCoordinatorEvent::Build(BuildDataflowNodes {
604                build_id,
605                session_id,
606                local_working_dir,
607                git_sources,
608                prev_git_sources,
609                dataflow_descriptor,
610                nodes_on_machine,
611                uv,
612            }) => {
613                match dataflow_descriptor.communication.remote {
614                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
615                }
616
617                let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
618
619                let result = self
620                    .build_dataflow(
621                        build_id,
622                        session_id,
623                        base_working_dir,
624                        git_sources,
625                        prev_git_sources,
626                        dataflow_descriptor,
627                        nodes_on_machine,
628                        uv,
629                    )
630                    .await;
631                let (trigger_result, result_task) = match result {
632                    Ok(result_task) => (Ok(()), Some(result_task)),
633                    Err(err) => (Err(format!("{err:?}")), None),
634                };
635                let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
636                let _ = reply_tx.send(Some(reply)).map_err(|_| {
637                    error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
638                });
639
640                let result_tx = self.events_tx.clone();
641                let clock = self.clock.clone();
642                if let Some(result_task) = result_task {
643                    tokio::spawn(async move {
644                        let message = Timestamped {
645                            inner: Event::BuildDataflowResult {
646                                build_id,
647                                session_id,
648                                result: result_task.await,
649                            },
650                            timestamp: clock.new_timestamp(),
651                        };
652                        let _ = result_tx
653                            .send(message)
654                            .map_err(|_| {
655                                error!(
656                                    "could not send `BuildResult` reply from daemon to coordinator"
657                                )
658                            })
659                            .await;
660                    });
661                }
662
663                RunStatus::Continue
664            }
665            DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
666                build_id,
667                session_id,
668                dataflow_id,
669                local_working_dir,
670                nodes,
671                dataflow_descriptor,
672                spawn_nodes,
673                uv,
674                write_events_to,
675            }) => {
676                match dataflow_descriptor.communication.remote {
677                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
678                }
679
680                let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
681
682                let result = self
683                    .spawn_dataflow(
684                        build_id,
685                        dataflow_id,
686                        base_working_dir,
687                        nodes,
688                        dataflow_descriptor,
689                        spawn_nodes,
690                        uv,
691                        write_events_to,
692                    )
693                    .await;
694                let (trigger_result, result_task) = match result {
695                    Ok(result_task) => (Ok(()), Some(result_task)),
696                    Err(err) => (Err(format!("{err:?}")), None),
697                };
698                let reply = DaemonCoordinatorReply::TriggerSpawnResult(trigger_result);
699                let _ = reply_tx.send(Some(reply)).map_err(|_| {
700                    error!("could not send `TriggerSpawnResult` reply from daemon to coordinator")
701                });
702
703                let result_tx = self.events_tx.clone();
704                let clock = self.clock.clone();
705                if let Some(result_task) = result_task {
706                    tokio::spawn(async move {
707                        let message = Timestamped {
708                            inner: Event::SpawnDataflowResult {
709                                dataflow_id,
710                                result: result_task.await,
711                            },
712                            timestamp: clock.new_timestamp(),
713                        };
714                        let _ = result_tx
715                            .send(message)
716                            .map_err(|_| {
717                                error!(
718                                    "could not send `SpawnResult` reply from daemon to coordinator"
719                                )
720                            })
721                            .await;
722                    });
723                }
724
725                RunStatus::Continue
726            }
727            DaemonCoordinatorEvent::AllNodesReady {
728                dataflow_id,
729                exited_before_subscribe,
730            } => {
731                let mut logger = self.logger.for_dataflow(dataflow_id);
732                logger.log(LogLevel::Debug, None,
733                    Some("daemon".into()),
734                    format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
735                )).await;
736                match self.running.get_mut(&dataflow_id) {
737                    Some(dataflow) => {
738                        let ready = exited_before_subscribe.is_empty();
739                        dataflow
740                            .pending_nodes
741                            .handle_external_all_nodes_ready(
742                                exited_before_subscribe,
743                                &mut dataflow.cascading_error_causes,
744                            )
745                            .await?;
746                        if ready {
747                            logger.log(LogLevel::Info, None,
748                                Some("daemon".into()),
749                                "coordinator reported that all nodes are ready, starting dataflow",
750                            ).await;
751                            dataflow.start(&self.events_tx, &self.clock).await?;
752                        }
753                    }
754                    None => {
755                        tracing::warn!(
756                            "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
757                        );
758                    }
759                }
760                let _ = reply_tx.send(None).map_err(|_| {
761                    error!("could not send `AllNodesReady` reply from daemon to coordinator")
762                });
763                RunStatus::Continue
764            }
765            DaemonCoordinatorEvent::Logs {
766                dataflow_id,
767                node_id,
768                tail,
769            } => {
770                match self.working_dir.get(&dataflow_id) {
771                    Some(working_dir) => {
772                        let working_dir = working_dir.clone();
773                        tokio::spawn(async move {
774                            let logs = async {
775                                let mut file =
776                                    File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
777                                        .await
778                                        .wrap_err(format!(
779                                            "Could not open log file: {:#?}",
780                                            log::log_path(&working_dir, &dataflow_id, &node_id)
781                                        ))?;
782
783                                let mut contents = match tail {
784                                    None | Some(0) => {
785                                        let mut contents = vec![];
786                                        file.read_to_end(&mut contents).await.map(|_| contents)
787                                    }
788                                    Some(tail) => read_last_n_lines(&mut file, tail).await,
789                                }
790                                .wrap_err("Could not read last n lines of log file")?;
791                                if !contents.ends_with(b"\n") {
792                                    // Append newline for better readability
793                                    contents.push(b'\n');
794                                }
795                                Result::<Vec<u8>, eyre::Report>::Ok(contents)
796                            }
797                            .await
798                            .map_err(|err| format!("{err:?}"));
799                            let _ = reply_tx
800                                .send(Some(DaemonCoordinatorReply::Logs(logs)))
801                                .map_err(|_| {
802                                    error!("could not send logs reply from daemon to coordinator")
803                                });
804                        });
805                    }
806                    None => {
807                        tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
808                        let _ = reply_tx.send(None).map_err(|_| {
809                            error!(
810                                "could not send `AllNodesReady` reply from daemon to coordinator"
811                            )
812                        });
813                    }
814                }
815                RunStatus::Continue
816            }
817            DaemonCoordinatorEvent::ReloadDataflow {
818                dataflow_id,
819                node_id,
820                operator_id,
821            } => {
822                let result = self.send_reload(dataflow_id, node_id, operator_id).await;
823                let reply =
824                    DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
825                let _ = reply_tx
826                    .send(Some(reply))
827                    .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
828                RunStatus::Continue
829            }
830            DaemonCoordinatorEvent::StopDataflow {
831                dataflow_id,
832                grace_duration,
833                force,
834            } => {
835                let mut logger = self.logger.for_dataflow(dataflow_id);
836                let dataflow = self
837                    .running
838                    .get_mut(&dataflow_id)
839                    .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
840                let (reply, future) = match dataflow {
841                    Ok(dataflow) => {
842                        let future = dataflow.stop_all(
843                            &mut self.coordinator_connection,
844                            &self.clock,
845                            grace_duration,
846                            force,
847                            &mut logger,
848                        );
849                        (Ok(()), Some(future))
850                    }
851                    Err(err) => (Err(err.to_string()), None),
852                };
853
854                let _ = reply_tx
855                    .send(Some(DaemonCoordinatorReply::StopResult(reply)))
856                    .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
857
858                if let Some(future) = future {
859                    future.await?;
860                }
861
862                RunStatus::Continue
863            }
864            DaemonCoordinatorEvent::Destroy => {
865                tracing::info!("received destroy command -> exiting");
866                let (notify_tx, notify_rx) = oneshot::channel();
867                let reply = DaemonCoordinatorReply::DestroyResult {
868                    result: Ok(()),
869                    notify: Some(notify_tx),
870                };
871                let _ = reply_tx
872                    .send(Some(reply))
873                    .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
874                // wait until the reply is sent out
875                if notify_rx.await.is_err() {
876                    tracing::warn!("no confirmation received for DestroyReply");
877                }
878                RunStatus::Exit
879            }
880            DaemonCoordinatorEvent::Heartbeat => {
881                self.last_coordinator_heartbeat = Instant::now();
882                let _ = reply_tx.send(None);
883                RunStatus::Continue
884            }
885        };
886        Ok(status)
887    }
888
889    async fn collect_and_send_metrics(&mut self) -> eyre::Result<()> {
890        use dora_message::daemon_to_coordinator::NodeMetrics;
891        use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate};
892
893        if self.coordinator_connection.is_none() {
894            return Ok(());
895        }
896
897        // Reuse system instance for metrics collection
898        let system = &mut self.metrics_system;
899
900        // Metrics are collected every 2 seconds (metrics_interval)
901        const METRICS_INTERVAL_SECS: f64 = 2.0;
902
903        // Collect metrics for all running dataflows
904        for (dataflow_id, dataflow) in &self.running {
905            let mut metrics = BTreeMap::new();
906
907            // Collect all PIDs for this dataflow
908            let pids: Vec<Pid> = dataflow
909                .running_nodes
910                .values()
911                .filter_map(|node| {
912                    node.pid
913                        .as_ref()
914                        .map(|pid| Pid::from_u32(pid.load(atomic::Ordering::Acquire)))
915                })
916                .collect();
917
918            if !pids.is_empty() {
919                // Refresh process metrics (cpu, memory, disk)
920                let refresh_kind = ProcessRefreshKind::nothing()
921                    .with_cpu()
922                    .with_memory()
923                    .with_disk_usage();
924                system.refresh_processes_specifics(
925                    ProcessesToUpdate::Some(&pids),
926                    true,
927                    refresh_kind,
928                );
929
930                // Collect metrics for each node
931                for (node_id, running_node) in &dataflow.running_nodes {
932                    if let Some(pid) = running_node.pid.as_ref() {
933                        let pid = pid.load(atomic::Ordering::Acquire);
934                        let sys_pid = Pid::from_u32(pid);
935                        if let Some(process) = system.process(sys_pid) {
936                            let disk_usage = process.disk_usage();
937                            // Divide by metrics_interval to get per-second averages
938                            metrics.insert(
939                                node_id.clone(),
940                                NodeMetrics {
941                                    pid,
942                                    cpu_usage: process.cpu_usage(),
943                                    memory_bytes: process.memory(),
944                                    disk_read_bytes: Some(
945                                        (disk_usage.read_bytes as f64 / METRICS_INTERVAL_SECS)
946                                            as u64,
947                                    ),
948                                    disk_write_bytes: Some(
949                                        (disk_usage.written_bytes as f64 / METRICS_INTERVAL_SECS)
950                                            as u64,
951                                    ),
952                                },
953                            );
954                        }
955                    }
956                }
957            }
958
959            // Send metrics to coordinator if we have any
960            if !metrics.is_empty() {
961                if let Some(connection) = &mut self.coordinator_connection {
962                    let msg = serde_json::to_vec(&Timestamped {
963                        inner: CoordinatorRequest::Event {
964                            daemon_id: self.daemon_id.clone(),
965                            event: DaemonEvent::NodeMetrics {
966                                dataflow_id: *dataflow_id,
967                                metrics,
968                            },
969                        },
970                        timestamp: self.clock.new_timestamp(),
971                    })?;
972                    socket_stream_send(connection, &msg)
973                        .await
974                        .wrap_err("failed to send metrics to coordinator")?;
975                }
976            }
977        }
978
979        Ok(())
980    }
981
982    async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
983        match event {
984            InterDaemonEvent::Output {
985                dataflow_id,
986                node_id,
987                output_id,
988                metadata,
989                data,
990            } => {
991                let inner = async {
992                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
993                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
994                    })?;
995                    send_output_to_local_receivers(
996                        node_id.clone(),
997                        output_id.clone(),
998                        dataflow,
999                        &metadata,
1000                        data.map(DataMessage::Vec),
1001                        &self.clock,
1002                    )
1003                    .await?;
1004                    Result::<_, eyre::Report>::Ok(())
1005                };
1006                if let Err(err) = inner
1007                    .await
1008                    .wrap_err("failed to forward remote output to local receivers")
1009                {
1010                    let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
1011                    logger
1012                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
1013                        .await;
1014                }
1015                Ok(())
1016            }
1017            InterDaemonEvent::OutputClosed {
1018                dataflow_id,
1019                node_id,
1020                output_id,
1021            } => {
1022                let output_id = OutputId(node_id.clone(), output_id);
1023                let mut logger = self
1024                    .logger
1025                    .for_dataflow(dataflow_id)
1026                    .for_node(node_id.clone());
1027                logger
1028                    .log(
1029                        LogLevel::Debug,
1030                        Some("daemon".into()),
1031                        format!("received OutputClosed event for output {output_id:?}"),
1032                    )
1033                    .await;
1034
1035                let inner = async {
1036                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1037                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1038                    })?;
1039
1040                    if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
1041                        for (receiver_id, input_id) in &inputs {
1042                            close_input(dataflow, receiver_id, input_id, &self.clock);
1043                        }
1044                    }
1045                    Result::<(), eyre::Report>::Ok(())
1046                };
1047                if let Err(err) = inner
1048                    .await
1049                    .wrap_err("failed to handle InputsClosed event sent by coordinator")
1050                {
1051                    logger
1052                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
1053                        .await;
1054                }
1055                Ok(())
1056            }
1057        }
1058    }
1059
1060    #[allow(clippy::too_many_arguments)]
1061    async fn build_dataflow(
1062        &mut self,
1063        build_id: BuildId,
1064        session_id: SessionId,
1065        base_working_dir: PathBuf,
1066        git_sources: BTreeMap<NodeId, GitSource>,
1067        prev_git_sources: BTreeMap<NodeId, GitSource>,
1068        dataflow_descriptor: Descriptor,
1069        local_nodes: BTreeSet<NodeId>,
1070        uv: bool,
1071    ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
1072        let builder = build::Builder {
1073            session_id,
1074            base_working_dir,
1075            uv,
1076        };
1077        self.git_manager.clear_planned_builds(session_id);
1078
1079        let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1080
1081        let mut tasks = Vec::new();
1082
1083        // build nodes
1084        for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1085            let dynamic_node = node.kind.dynamic();
1086
1087            let node_id = node.id.clone();
1088            let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1089            logger.log(LogLevel::Debug, "building").await;
1090            let git_source = git_sources.get(&node_id).cloned();
1091            let prev_git_source = prev_git_sources.get(&node_id).cloned();
1092            let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1093                still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1094                git_source: prev_source,
1095            });
1096
1097            let logger_cloned = logger
1098                .try_clone_impl()
1099                .await
1100                .wrap_err("failed to clone logger")?;
1101
1102            let mut builder = builder.clone();
1103            if let Some(node_working_dir) =
1104                node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1105            {
1106                builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1107            }
1108
1109            match builder
1110                .build_node(
1111                    node,
1112                    git_source,
1113                    prev_git,
1114                    logger_cloned,
1115                    &mut self.git_manager,
1116                )
1117                .await
1118                .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1119            {
1120                Ok(result) => {
1121                    tasks.push(NodeBuildTask {
1122                        node_id,
1123                        task: result,
1124                        dynamic_node,
1125                    });
1126                }
1127                Err(err) => {
1128                    logger.log(LogLevel::Error, format!("{err:?}")).await;
1129                    return Err(err);
1130                }
1131            }
1132        }
1133
1134        let task = async move {
1135            let mut info = BuildInfo {
1136                node_working_dirs: Default::default(),
1137            };
1138            for task in tasks {
1139                let NodeBuildTask {
1140                    node_id,
1141                    dynamic_node: _,
1142                    task,
1143                } = task;
1144                let node = task
1145                    .await
1146                    .with_context(|| format!("failed to build node `{node_id}`"))?;
1147                info.node_working_dirs
1148                    .insert(node_id, node.node_working_dir);
1149            }
1150            Ok(info)
1151        };
1152
1153        Ok(task)
1154    }
1155
1156    #[allow(clippy::too_many_arguments)]
1157    async fn spawn_dataflow(
1158        &mut self,
1159        build_id: Option<BuildId>,
1160        dataflow_id: DataflowId,
1161        base_working_dir: PathBuf,
1162        nodes: BTreeMap<NodeId, ResolvedNode>,
1163        dataflow_descriptor: Descriptor,
1164        spawn_nodes: BTreeSet<NodeId>,
1165        uv: bool,
1166        write_events_to: Option<PathBuf>,
1167    ) -> eyre::Result<impl Future<Output = eyre::Result<()>> + use<>> {
1168        let mut logger = self
1169            .logger
1170            .for_dataflow(dataflow_id)
1171            .try_clone()
1172            .await
1173            .context("failed to clone logger")?;
1174        let dataflow =
1175            RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
1176        let dataflow = match self.running.entry(dataflow_id) {
1177            std::collections::hash_map::Entry::Vacant(entry) => {
1178                self.working_dir
1179                    .insert(dataflow_id, base_working_dir.clone());
1180                entry.insert(dataflow)
1181            }
1182            std::collections::hash_map::Entry::Occupied(_) => {
1183                bail!("there is already a running dataflow with ID `{dataflow_id}`")
1184            }
1185        };
1186
1187        let mut stopped = Vec::new();
1188
1189        let build_info = build_id.and_then(|build_id| self.builds.get(&build_id));
1190        let node_with_git_source = nodes.values().find(|n| n.has_git_source());
1191        if let Some(git_node) = node_with_git_source {
1192            if build_info.is_none() {
1193                eyre::bail!(
1194                    "node {} has git source, but no `dora build` was run yet\n\n\
1195                    nodes with a `git` field must be built using `dora build` before starting the \
1196                    dataflow",
1197                    git_node.id
1198                )
1199            }
1200        }
1201        let node_working_dirs = build_info
1202            .map(|info| info.node_working_dirs.clone())
1203            .unwrap_or_default();
1204
1205        // calculate info about mappings
1206        for node in nodes.values() {
1207            let local = spawn_nodes.contains(&node.id);
1208
1209            let inputs = node_inputs(node);
1210            for (input_id, input) in inputs {
1211                if local {
1212                    dataflow
1213                        .open_inputs
1214                        .entry(node.id.clone())
1215                        .or_default()
1216                        .insert(input_id.clone());
1217                    match input.mapping {
1218                        InputMapping::User(mapping) => {
1219                            dataflow
1220                                .mappings
1221                                .entry(OutputId(mapping.source, mapping.output))
1222                                .or_default()
1223                                .insert((node.id.clone(), input_id));
1224                        }
1225                        InputMapping::Timer { interval } => {
1226                            dataflow
1227                                .timers
1228                                .entry(interval)
1229                                .or_default()
1230                                .insert((node.id.clone(), input_id));
1231                        }
1232                    }
1233                } else if let InputMapping::User(mapping) = input.mapping {
1234                    dataflow
1235                        .open_external_mappings
1236                        .insert(OutputId(mapping.source, mapping.output));
1237                }
1238            }
1239        }
1240
1241        let spawner = Spawner {
1242            dataflow_id,
1243            daemon_tx: self.events_tx.clone(),
1244            dataflow_descriptor,
1245            clock: self.clock.clone(),
1246            uv,
1247        };
1248
1249        let mut tasks = Vec::new();
1250
1251        // spawn nodes and set up subscriptions
1252        for node in nodes.into_values() {
1253            let mut logger = logger.reborrow().for_node(node.id.clone());
1254            let local = spawn_nodes.contains(&node.id);
1255            if local {
1256                let dynamic_node = node.kind.dynamic();
1257                if dynamic_node {
1258                    dataflow.dynamic_nodes.insert(node.id.clone());
1259                } else {
1260                    dataflow.pending_nodes.insert(node.id.clone());
1261                }
1262
1263                let node_id = node.id.clone();
1264                let node_stderr_most_recent = dataflow
1265                    .node_stderr_most_recent
1266                    .entry(node.id.clone())
1267                    .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
1268                    .clone();
1269
1270                let configured_node_working_dir = node_working_dirs.get(&node_id).cloned();
1271                if configured_node_working_dir.is_none() && node.has_git_source() {
1272                    eyre::bail!(
1273                        "node {} has git source, but no git clone directory was found for it\n\n\
1274                        try running `dora build` again",
1275                        node.id
1276                    )
1277                }
1278                let node_working_dir = configured_node_working_dir
1279                    .or_else(|| {
1280                        node.deploy
1281                            .as_ref()
1282                            .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1283                    })
1284                    .unwrap_or(base_working_dir.clone())
1285                    .clone();
1286                let node_write_events_to = write_events_to
1287                    .as_ref()
1288                    .map(|p| p.join(format!("inputs-{}.json", node.id)));
1289                match spawner
1290                    .clone()
1291                    .spawn_node(
1292                        node,
1293                        node_working_dir,
1294                        node_stderr_most_recent,
1295                        node_write_events_to,
1296                        &mut logger,
1297                    )
1298                    .await
1299                    .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1300                {
1301                    Ok(result) => {
1302                        tasks.push(NodeBuildTask {
1303                            node_id,
1304                            task: result,
1305                            dynamic_node,
1306                        });
1307                    }
1308                    Err(err) => {
1309                        logger
1310                            .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1311                            .await;
1312                        self.dataflow_node_results
1313                            .entry(dataflow_id)
1314                            .or_default()
1315                            .insert(
1316                                node_id.clone(),
1317                                Err(NodeError {
1318                                    timestamp: self.clock.new_timestamp(),
1319                                    cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1320                                    exit_status: NodeExitStatus::Unknown,
1321                                }),
1322                            );
1323                        stopped.push((node_id.clone(), dynamic_node));
1324                    }
1325                }
1326            } else {
1327                // wait until node is ready before starting
1328                dataflow.pending_nodes.set_external_nodes(true);
1329
1330                // subscribe to all node outputs that are mapped to some local inputs
1331                for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1332                    let tx = self
1333                        .remote_daemon_events_tx
1334                        .clone()
1335                        .wrap_err("no remote_daemon_events_tx channel")?;
1336                    let mut finished_rx = dataflow.finished_tx.subscribe();
1337                    let subscribe_topic =
1338                        zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1339                    tracing::debug!("declaring subscriber on {subscribe_topic}");
1340                    let subscriber = self
1341                        .zenoh_session
1342                        .declare_subscriber(subscribe_topic)
1343                        .await
1344                        .map_err(|e| eyre!(e))
1345                        .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1346                    tokio::spawn(async move {
1347                        let mut finished = pin!(finished_rx.recv());
1348                        loop {
1349                            let finished_or_next =
1350                                futures::future::select(finished, subscriber.recv_async());
1351                            match finished_or_next.await {
1352                                future::Either::Left((finished, _)) => match finished {
1353                                    Err(broadcast::error::RecvError::Closed) => {
1354                                        tracing::debug!(
1355                                            "dataflow finished, breaking from zenoh subscribe task"
1356                                        );
1357                                        break;
1358                                    }
1359                                    other => {
1360                                        tracing::warn!(
1361                                            "unexpected return value of dataflow finished_rx channel: {other:?}"
1362                                        );
1363                                        break;
1364                                    }
1365                                },
1366                                future::Either::Right((sample, f)) => {
1367                                    finished = f;
1368                                    let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1369                                        Timestamped::deserialize_inter_daemon_event(
1370                                            &s.payload().to_bytes(),
1371                                        )
1372                                    });
1373                                    if tx.send_async(event).await.is_err() {
1374                                        // daemon finished
1375                                        break;
1376                                    }
1377                                }
1378                            }
1379                        }
1380                    });
1381                }
1382            }
1383        }
1384        for (node_id, dynamic) in stopped {
1385            self.handle_node_stop(dataflow_id, &node_id, dynamic)
1386                .await?;
1387        }
1388
1389        let spawn_result = Self::spawn_prepared_nodes(
1390            dataflow_id,
1391            logger,
1392            tasks,
1393            self.events_tx.clone(),
1394            self.clock.clone(),
1395        );
1396
1397        Ok(spawn_result)
1398    }
1399
1400    async fn spawn_prepared_nodes(
1401        dataflow_id: Uuid,
1402        mut logger: DataflowLogger<'_>,
1403        tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1404        events_tx: mpsc::Sender<Timestamped<Event>>,
1405        clock: Arc<HLC>,
1406    ) -> eyre::Result<()> {
1407        let node_result = |node_id, dynamic_node, result| Timestamped {
1408            inner: Event::SpawnNodeResult {
1409                dataflow_id,
1410                node_id,
1411                dynamic_node,
1412                result,
1413            },
1414            timestamp: clock.new_timestamp(),
1415        };
1416        let mut failed_to_prepare = None;
1417        let mut prepared_nodes = Vec::new();
1418        for task in tasks {
1419            let NodeBuildTask {
1420                node_id,
1421                dynamic_node,
1422                task,
1423            } = task;
1424            match task.await {
1425                Ok(node) => prepared_nodes.push(node),
1426                Err(err) => {
1427                    if failed_to_prepare.is_none() {
1428                        failed_to_prepare = Some(node_id.clone());
1429                    }
1430                    let node_err: NodeError = NodeError {
1431                        timestamp: clock.new_timestamp(),
1432                        cause: NodeErrorCause::FailedToSpawn(format!(
1433                            "preparing for spawn failed: {err:?}"
1434                        )),
1435                        exit_status: NodeExitStatus::Unknown,
1436                    };
1437                    let send_result = events_tx
1438                        .send(node_result(node_id, dynamic_node, Err(node_err)))
1439                        .await;
1440                    if send_result.is_err() {
1441                        tracing::error!("failed to send SpawnNodeResult to main daemon task")
1442                    }
1443                }
1444            }
1445        }
1446
1447        // once all nodes are prepared, do the actual spawning
1448        if let Some(failed_node) = failed_to_prepare {
1449            // don't spawn any nodes when an error occurred before
1450            for node in prepared_nodes {
1451                let err = NodeError {
1452                    timestamp: clock.new_timestamp(),
1453                    cause: NodeErrorCause::Cascading {
1454                        caused_by_node: failed_node.clone(),
1455                    },
1456                    exit_status: NodeExitStatus::Unknown,
1457                };
1458                let send_result = events_tx
1459                    .send(node_result(
1460                        node.node_id().clone(),
1461                        node.dynamic(),
1462                        Err(err),
1463                    ))
1464                    .await;
1465                if send_result.is_err() {
1466                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1467                }
1468            }
1469            Err(eyre!("failed to prepare node {failed_node}"))
1470        } else {
1471            let mut spawn_result = Ok(());
1472
1473            logger
1474                .log(
1475                    LogLevel::Info,
1476                    None,
1477                    Some("dora daemon".into()),
1478                    "finished building nodes, spawning...",
1479                )
1480                .await;
1481
1482            // spawn the nodes
1483            for node in prepared_nodes {
1484                let node_id = node.node_id().clone();
1485                let dynamic_node = node.dynamic();
1486                let logger = logger
1487                    .reborrow()
1488                    .for_node(node_id.clone())
1489                    .try_clone()
1490                    .await
1491                    .context("failed to clone NodeLogger")?;
1492                let result = node.spawn(logger).await;
1493                let node_spawn_result = match result {
1494                    Ok(node) => Ok(node),
1495                    Err(err) => {
1496                        let node_err = NodeError {
1497                            timestamp: clock.new_timestamp(),
1498                            cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1499                            exit_status: NodeExitStatus::Unknown,
1500                        };
1501                        if spawn_result.is_ok() {
1502                            spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1503                        }
1504                        Err(node_err)
1505                    }
1506                };
1507                let send_result = events_tx
1508                    .send(node_result(node_id, dynamic_node, node_spawn_result))
1509                    .await;
1510                if send_result.is_err() {
1511                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1512                }
1513            }
1514            spawn_result
1515        }
1516    }
1517
1518    async fn handle_dynamic_node_event(
1519        &mut self,
1520        event: DynamicNodeEventWrapper,
1521    ) -> eyre::Result<()> {
1522        match event {
1523            DynamicNodeEventWrapper {
1524                event: DynamicNodeEvent::NodeConfig { node_id },
1525                reply_tx,
1526            } => {
1527                let number_node_id = self
1528                    .running
1529                    .iter()
1530                    .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1531                    .count();
1532
1533                let node_config = match number_node_id {
1534                    2.. => Err(format!(
1535                        "multiple dataflows contain dynamic node id {node_id}. \
1536                        Please only have one running dataflow with the specified \
1537                        node id if you want to use dynamic node",
1538                    )),
1539                    1 => self
1540                        .running
1541                        .iter()
1542                        .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1543                        .map(|(id, dataflow)| -> Result<NodeConfig> {
1544                            let node_config = dataflow
1545                                .running_nodes
1546                                .get(&node_id)
1547                                .with_context(|| {
1548                                    format!("no node with ID `{node_id}` within the given dataflow")
1549                                })?
1550                                .node_config
1551                                .clone();
1552                            if !node_config.dynamic {
1553                                bail!("node with ID `{node_id}` in {id} is not dynamic");
1554                            }
1555                            Ok(node_config)
1556                        })
1557                        .next()
1558                        .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1559                        .and_then(|r| r)
1560                        .map_err(|err| {
1561                            format!(
1562                                "failed to get dynamic node config within given dataflow: {err}"
1563                            )
1564                        }),
1565                    0 => Err(format!("no node with ID `{node_id}`")),
1566                };
1567
1568                let reply = DaemonReply::NodeConfig {
1569                    result: node_config,
1570                };
1571                let _ = reply_tx.send(Some(reply)).map_err(|_| {
1572                    error!("could not send node info reply from daemon to coordinator")
1573                });
1574                Ok(())
1575            }
1576        }
1577    }
1578
1579    async fn handle_node_event(
1580        &mut self,
1581        event: DaemonNodeEvent,
1582        dataflow_id: DataflowId,
1583        node_id: NodeId,
1584    ) -> eyre::Result<()> {
1585        let might_restart = || {
1586            let dataflow = self.running.get(&dataflow_id)?;
1587            let node = dataflow.running_nodes.get(&node_id)?;
1588            Some(match node.restart_policy {
1589                RestartPolicy::Never => false,
1590                _ if node.restarts_disabled() => false,
1591                RestartPolicy::OnFailure | RestartPolicy::Always => true,
1592            })
1593        };
1594        match event {
1595            DaemonNodeEvent::Subscribe {
1596                event_sender,
1597                reply_sender,
1598            } => {
1599                let mut logger = self.logger.for_dataflow(dataflow_id);
1600                logger
1601                    .log(
1602                        LogLevel::Info,
1603                        Some(node_id.clone()),
1604                        Some("daemon".into()),
1605                        "node is ready",
1606                    )
1607                    .await;
1608
1609                let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1610                    format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1611                });
1612
1613                match dataflow {
1614                    Err(err) => {
1615                        let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1616                    }
1617                    Ok(dataflow) => {
1618                        Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1619
1620                        let status = dataflow
1621                            .pending_nodes
1622                            .handle_node_subscription(
1623                                node_id.clone(),
1624                                reply_sender,
1625                                &mut self.coordinator_connection,
1626                                &self.clock,
1627                                &mut dataflow.cascading_error_causes,
1628                                &mut logger,
1629                            )
1630                            .await?;
1631                        match status {
1632                            DataflowStatus::AllNodesReady if !dataflow.dataflow_started => {
1633                                logger
1634                                    .log(
1635                                        LogLevel::Info,
1636                                        None,
1637                                        Some("daemon".into()),
1638                                        "all nodes are ready, starting dataflow",
1639                                    )
1640                                    .await;
1641                                dataflow.start(&self.events_tx, &self.clock).await?;
1642                                dataflow.dataflow_started = true;
1643                            }
1644                            _ => {}
1645                        }
1646                    }
1647                }
1648            }
1649            DaemonNodeEvent::SubscribeDrop {
1650                event_sender,
1651                reply_sender,
1652            } => {
1653                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1654                    format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1655                });
1656                let result = match dataflow {
1657                    Ok(dataflow) => {
1658                        dataflow.drop_channels.insert(node_id, event_sender);
1659                        Ok(())
1660                    }
1661                    Err(err) => Err(err.to_string()),
1662                };
1663                let _ = reply_sender.send(DaemonReply::Result(result));
1664            }
1665            DaemonNodeEvent::CloseOutputs {
1666                outputs,
1667                reply_sender,
1668            } => {
1669                let reply = if might_restart().unwrap_or(false) {
1670                    self.logger
1671                        .for_dataflow(dataflow_id)
1672                        .for_node(node_id.clone())
1673                        .log(
1674                            LogLevel::Debug,
1675                            Some("daemon".into()),
1676                            "skipping CloseOutputs because node might restart",
1677                        )
1678                        .await;
1679                    Ok(())
1680                } else {
1681                    // notify downstream nodes
1682                    let inner = async {
1683                        self.send_output_closed_events(dataflow_id, node_id, outputs)
1684                            .await
1685                    };
1686
1687                    inner.await.map_err(|err| format!("{err:?}"))
1688                };
1689                let _ = reply_sender.send(DaemonReply::Result(reply));
1690            }
1691            DaemonNodeEvent::OutputsDone { reply_sender } => {
1692                let result = self
1693                    .handle_outputs_done(dataflow_id, &node_id, might_restart().unwrap_or(false))
1694                    .await;
1695
1696                let _ = reply_sender.send(DaemonReply::Result(
1697                    result.map_err(|err| format!("{err:?}")),
1698                ));
1699            }
1700            DaemonNodeEvent::SendOut {
1701                output_id,
1702                metadata,
1703                data,
1704            } => self
1705                .send_out(dataflow_id, node_id, output_id, metadata, data)
1706                .await
1707                .context("failed to send out")?,
1708            DaemonNodeEvent::ReportDrop { tokens } => {
1709                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1710                    format!(
1711                        "failed to get handle drop tokens: \
1712                        no running dataflow with ID `{dataflow_id}`"
1713                    )
1714                });
1715
1716                match dataflow {
1717                    Ok(dataflow) => {
1718                        for token in tokens {
1719                            match dataflow.pending_drop_tokens.get_mut(&token) {
1720                                Some(info) => {
1721                                    if info.pending_nodes.remove(&node_id) {
1722                                        dataflow.check_drop_token(token, &self.clock).await?;
1723                                    } else {
1724                                        tracing::warn!(
1725                                            "node `{node_id}` is not pending for drop token `{token:?}`"
1726                                        );
1727                                    }
1728                                }
1729                                None => tracing::warn!("unknown drop token `{token:?}`"),
1730                            }
1731                        }
1732                    }
1733                    Err(err) => tracing::warn!("{err:?}"),
1734                }
1735            }
1736            DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1737                let inner = async {
1738                    let dataflow = self
1739                        .running
1740                        .get_mut(&dataflow_id)
1741                        .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1742                    dataflow.subscribe_channels.remove(&node_id);
1743                    Result::<_, eyre::Error>::Ok(())
1744                };
1745
1746                let reply = inner.await.map_err(|err| format!("{err:?}"));
1747                let _ = reply_sender.send(DaemonReply::Result(reply));
1748            }
1749        }
1750        Ok(())
1751    }
1752
1753    async fn send_reload(
1754        &mut self,
1755        dataflow_id: Uuid,
1756        node_id: NodeId,
1757        operator_id: Option<OperatorId>,
1758    ) -> Result<(), eyre::ErrReport> {
1759        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1760            format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1761        })?;
1762        if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1763            match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1764                Ok(()) => {}
1765                Err(_) => {
1766                    dataflow.subscribe_channels.remove(&node_id);
1767                }
1768            }
1769        }
1770        Ok(())
1771    }
1772
1773    async fn send_out(
1774        &mut self,
1775        dataflow_id: Uuid,
1776        node_id: NodeId,
1777        output_id: DataId,
1778        metadata: dora_message::metadata::Metadata,
1779        data: Option<DataMessage>,
1780    ) -> Result<(), eyre::ErrReport> {
1781        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1782            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1783        })?;
1784        let data_bytes = send_output_to_local_receivers(
1785            node_id.clone(),
1786            output_id.clone(),
1787            dataflow,
1788            &metadata,
1789            data,
1790            &self.clock,
1791        )
1792        .await?;
1793
1794        let output_id = OutputId(node_id, output_id);
1795        let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1796            || dataflow.publish_all_messages_to_zenoh;
1797        if remote_receivers {
1798            let event = InterDaemonEvent::Output {
1799                dataflow_id,
1800                node_id: output_id.0.clone(),
1801                output_id: output_id.1.clone(),
1802                metadata,
1803                data: data_bytes,
1804            };
1805            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1806                .await?;
1807        }
1808
1809        Ok(())
1810    }
1811
1812    async fn send_to_remote_receivers(
1813        &mut self,
1814        dataflow_id: Uuid,
1815        output_id: &OutputId,
1816        event: InterDaemonEvent,
1817    ) -> Result<(), eyre::Error> {
1818        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1819            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1820        })?;
1821
1822        // publish via zenoh
1823        let publisher = match dataflow.publishers.get(output_id) {
1824            Some(publisher) => publisher,
1825            None => {
1826                let publish_topic =
1827                    zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1828                tracing::debug!("declaring publisher on {publish_topic}");
1829                let publisher = self
1830                    .zenoh_session
1831                    .declare_publisher(publish_topic)
1832                    .await
1833                    .map_err(|e| eyre!(e))
1834                    .context("failed to create zenoh publisher")?;
1835                dataflow.publishers.insert(output_id.clone(), publisher);
1836                dataflow.publishers.get(output_id).unwrap()
1837            }
1838        };
1839
1840        let serialized_event = Timestamped {
1841            inner: event,
1842            timestamp: self.clock.new_timestamp(),
1843        }
1844        .serialize();
1845        publisher
1846            .put(serialized_event)
1847            .await
1848            .map_err(|e| eyre!(e))
1849            .context("zenoh put failed")?;
1850        Ok(())
1851    }
1852
1853    async fn send_output_closed_events(
1854        &mut self,
1855        dataflow_id: DataflowId,
1856        node_id: NodeId,
1857        outputs: Vec<DataId>,
1858    ) -> eyre::Result<()> {
1859        let dataflow = self
1860            .running
1861            .get_mut(&dataflow_id)
1862            .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1863        let local_node_inputs: BTreeSet<_> = dataflow
1864            .mappings
1865            .iter()
1866            .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1867            .flat_map(|(_, v)| v)
1868            .cloned()
1869            .collect();
1870        for (receiver_id, input_id) in &local_node_inputs {
1871            close_input(dataflow, receiver_id, input_id, &self.clock);
1872        }
1873
1874        let mut closed = Vec::new();
1875        for output_id in &dataflow.open_external_mappings {
1876            if output_id.0 == node_id && outputs.contains(&output_id.1) {
1877                closed.push(output_id.clone());
1878            }
1879        }
1880
1881        for output_id in closed {
1882            let event = InterDaemonEvent::OutputClosed {
1883                dataflow_id,
1884                node_id: output_id.0.clone(),
1885                output_id: output_id.1.clone(),
1886            };
1887            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1888                .await?;
1889        }
1890
1891        Ok(())
1892    }
1893
1894    async fn subscribe(
1895        dataflow: &mut RunningDataflow,
1896        node_id: NodeId,
1897        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1898        clock: &HLC,
1899    ) {
1900        // some inputs might have been closed already -> report those events
1901        let closed_inputs = dataflow
1902            .mappings
1903            .values()
1904            .flatten()
1905            .filter(|(node, _)| node == &node_id)
1906            .map(|(_, input)| input)
1907            .filter(|input| {
1908                dataflow
1909                    .open_inputs
1910                    .get(&node_id)
1911                    .map(|open_inputs| !open_inputs.contains(*input))
1912                    .unwrap_or(true)
1913            });
1914        for input_id in closed_inputs {
1915            let _ = send_with_timestamp(
1916                &event_sender,
1917                NodeEvent::InputClosed {
1918                    id: input_id.clone(),
1919                },
1920                clock,
1921            );
1922        }
1923        if dataflow.open_inputs(&node_id).is_empty() {
1924            if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
1925                node.disable_restart();
1926            }
1927            let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1928        }
1929
1930        // if a stop event was already sent for the dataflow, send it to
1931        // the newly connected node too
1932        if dataflow.stop_sent {
1933            if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
1934                node.disable_restart();
1935            }
1936            let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1937        }
1938
1939        dataflow.subscribe_channels.insert(node_id, event_sender);
1940    }
1941
1942    #[tracing::instrument(skip(self), level = "trace")]
1943    async fn handle_outputs_done(
1944        &mut self,
1945        dataflow_id: DataflowId,
1946        node_id: &NodeId,
1947        might_restart: bool,
1948    ) -> eyre::Result<()> {
1949        let dataflow = self
1950            .running
1951            .get_mut(&dataflow_id)
1952            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1953
1954        let outputs = dataflow
1955            .mappings
1956            .keys()
1957            .filter(|m| &m.0 == node_id)
1958            .map(|m| &m.1)
1959            .cloned()
1960            .collect();
1961
1962        if might_restart {
1963            self.logger
1964                .for_dataflow(dataflow_id)
1965                .for_node(node_id.clone())
1966                .log(
1967                    LogLevel::Debug,
1968                    Some("daemon".into()),
1969                    "keeping outputs open because node might restart",
1970                )
1971                .await;
1972        } else {
1973            self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1974                .await?;
1975        }
1976
1977        let dataflow = self
1978            .running
1979            .get_mut(&dataflow_id)
1980            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1981        dataflow.drop_channels.remove(node_id);
1982        Ok(())
1983    }
1984
1985    async fn handle_node_stop(
1986        &mut self,
1987        dataflow_id: Uuid,
1988        node_id: &NodeId,
1989        dynamic_node: bool,
1990    ) -> eyre::Result<()> {
1991        let result = self
1992            .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
1993            .await;
1994        let _ = self
1995            .events_tx
1996            .send(Timestamped {
1997                inner: Event::NodeStopped {
1998                    dataflow_id,
1999                    node_id: node_id.clone(),
2000                },
2001                timestamp: self.clock.new_timestamp(),
2002            })
2003            .await;
2004        result
2005    }
2006
2007    async fn handle_node_stop_inner(
2008        &mut self,
2009        dataflow_id: Uuid,
2010        node_id: &NodeId,
2011        dynamic_node: bool,
2012    ) -> eyre::Result<()> {
2013        let mut logger = self.logger.for_dataflow(dataflow_id);
2014        let dataflow = match self.running.get_mut(&dataflow_id) {
2015            Some(dataflow) => dataflow,
2016            None if dynamic_node => {
2017                // The dataflow might be done already as we don't wait for dynamic nodes. In this
2018                // case, we don't need to do anything to handle the node stop.
2019                tracing::debug!(
2020                    "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
2021                );
2022                return Ok(());
2023            }
2024            None => eyre::bail!(
2025                "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2026            ),
2027        };
2028
2029        dataflow
2030            .pending_nodes
2031            .handle_node_stop(
2032                node_id,
2033                &mut self.coordinator_connection,
2034                &self.clock,
2035                &mut dataflow.cascading_error_causes,
2036                &mut logger,
2037            )
2038            .await?;
2039
2040        // node only reaches here if it will not be restarted
2041        let might_restart = false;
2042
2043        self.handle_outputs_done(dataflow_id, node_id, might_restart)
2044            .await?;
2045
2046        let mut logger = self.logger.for_dataflow(dataflow_id);
2047        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
2048            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
2049        })?;
2050        dataflow.running_nodes.remove(node_id);
2051        if !dataflow.pending_nodes.local_nodes_pending()
2052            && dataflow
2053                .running_nodes
2054                .iter()
2055                .all(|(_id, n)| n.node_config.dynamic)
2056        {
2057            let result = DataflowDaemonResult {
2058                timestamp: self.clock.new_timestamp(),
2059                node_results: self
2060                    .dataflow_node_results
2061                    .get(&dataflow.id)
2062                    .context("failed to get dataflow node results")?
2063                    .clone(),
2064            };
2065
2066            self.git_manager
2067                .clones_in_use
2068                .values_mut()
2069                .for_each(|dataflows| {
2070                    dataflows.remove(&dataflow_id);
2071                });
2072
2073            logger
2074                .log(
2075                    LogLevel::Info,
2076                    None,
2077                    Some("daemon".into()),
2078                    format!("dataflow finished on machine `{}`", self.daemon_id),
2079                )
2080                .await;
2081            if let Some(connection) = &mut self.coordinator_connection {
2082                let msg = serde_json::to_vec(&Timestamped {
2083                    inner: CoordinatorRequest::Event {
2084                        daemon_id: self.daemon_id.clone(),
2085                        event: DaemonEvent::AllNodesFinished {
2086                            dataflow_id,
2087                            result,
2088                        },
2089                    },
2090                    timestamp: self.clock.new_timestamp(),
2091                })?;
2092                socket_stream_send(connection, &msg)
2093                    .await
2094                    .wrap_err("failed to report dataflow finish to dora-coordinator")?;
2095            }
2096            self.running.remove(&dataflow_id);
2097        }
2098
2099        Ok(())
2100    }
2101
2102    async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
2103        match event {
2104            DoraEvent::Timer {
2105                dataflow_id,
2106                interval,
2107                metadata,
2108            } => {
2109                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2110                    tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
2111                    return Ok(());
2112                };
2113
2114                let Some(subscribers) = dataflow.timers.get(&interval) else {
2115                    return Ok(());
2116                };
2117
2118                let mut closed = Vec::new();
2119                for (receiver_id, input_id) in subscribers {
2120                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2121                        continue;
2122                    };
2123
2124                    let send_result = send_with_timestamp(
2125                        channel,
2126                        NodeEvent::Input {
2127                            id: input_id.clone(),
2128                            metadata: metadata.clone(),
2129                            data: None,
2130                        },
2131                        &self.clock,
2132                    );
2133                    match send_result {
2134                        Ok(()) => {}
2135                        Err(_) => {
2136                            closed.push(receiver_id);
2137                        }
2138                    }
2139                }
2140                for id in closed {
2141                    dataflow.subscribe_channels.remove(id);
2142                }
2143            }
2144            DoraEvent::Logs {
2145                dataflow_id,
2146                output_id,
2147                message,
2148                metadata,
2149            } => {
2150                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2151                    tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2152                    return Ok(());
2153                };
2154
2155                let Some(subscribers) = dataflow.mappings.get(&output_id) else {
2156                    tracing::warn!(
2157                        "No subscribers found for {:?} in {:?}",
2158                        output_id,
2159                        dataflow.mappings
2160                    );
2161                    return Ok(());
2162                };
2163
2164                let mut closed = Vec::new();
2165                for (receiver_id, input_id) in subscribers {
2166                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2167                        tracing::warn!("No subscriber channel found for {:?}", output_id);
2168                        continue;
2169                    };
2170
2171                    let send_result = send_with_timestamp(
2172                        channel,
2173                        NodeEvent::Input {
2174                            id: input_id.clone(),
2175                            metadata: metadata.clone(),
2176                            data: Some(message.clone()),
2177                        },
2178                        &self.clock,
2179                    );
2180                    match send_result {
2181                        Ok(()) => {}
2182                        Err(_) => {
2183                            closed.push(receiver_id);
2184                        }
2185                    }
2186                }
2187                for id in closed {
2188                    dataflow.subscribe_channels.remove(id);
2189                }
2190            }
2191            DoraEvent::SpawnedNodeResult {
2192                dataflow_id,
2193                node_id,
2194                dynamic_node,
2195                exit_status,
2196                restart,
2197            } => {
2198                let mut logger = self
2199                    .logger
2200                    .for_dataflow(dataflow_id)
2201                    .for_node(node_id.clone());
2202                logger
2203                    .log(
2204                        LogLevel::Debug,
2205                        Some("daemon".into()),
2206                        format!("handling node stop with exit status {exit_status:?} (restart: {restart})"),
2207                    )
2208                    .await;
2209
2210                let node_result = match exit_status {
2211                    NodeExitStatus::Success => Ok(()),
2212                    exit_status => {
2213                        let dataflow = self.running.get(&dataflow_id);
2214                        let caused_by_node = dataflow
2215                            .and_then(|dataflow| {
2216                                dataflow.cascading_error_causes.error_caused_by(&node_id)
2217                            })
2218                            .cloned();
2219                        let grace_duration_kill = dataflow
2220                            .map(|d| d.grace_duration_kills.contains(&node_id))
2221                            .unwrap_or_default();
2222
2223                        let cause = match caused_by_node {
2224                            Some(caused_by_node) => {
2225                                logger
2226                                    .log(
2227                                        LogLevel::Info,
2228                                        Some("daemon".into()),
2229                                        format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2230                                    )
2231                                    .await;
2232
2233                                NodeErrorCause::Cascading { caused_by_node }
2234                            }
2235                            None if grace_duration_kill => NodeErrorCause::GraceDuration,
2236                            None => {
2237                                let cause = dataflow
2238                                    .and_then(|d| d.node_stderr_most_recent.get(&node_id))
2239                                    .map(|queue| {
2240                                        let mut s = if queue.is_full() {
2241                                            "[...]".into()
2242                                        } else {
2243                                            String::new()
2244                                        };
2245                                        while let Some(line) = queue.pop() {
2246                                            s += &line;
2247                                        }
2248                                        s
2249                                    })
2250                                    .unwrap_or_default();
2251
2252                                NodeErrorCause::Other { stderr: cause }
2253                            }
2254                        };
2255                        Err(NodeError {
2256                            timestamp: self.clock.new_timestamp(),
2257                            cause,
2258                            exit_status,
2259                        })
2260                    }
2261                };
2262
2263                logger
2264                    .log(
2265                        if node_result.is_ok() {
2266                            LogLevel::Info
2267                        } else {
2268                            LogLevel::Error
2269                        },
2270                        Some("daemon".into()),
2271                        match &node_result {
2272                            Ok(()) => format!("{node_id} finished successfully"),
2273                            Err(err) => format!("{err}"),
2274                        },
2275                    )
2276                    .await;
2277
2278                if restart {
2279                    logger
2280                        .log(
2281                            LogLevel::Info,
2282                            Some("daemon".into()),
2283                            "node will be restarted",
2284                        )
2285                        .await;
2286                } else {
2287                    self.dataflow_node_results
2288                        .entry(dataflow_id)
2289                        .or_default()
2290                        .insert(node_id.clone(), node_result);
2291
2292                    self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2293                        .await?;
2294                }
2295            }
2296        }
2297        Ok(())
2298    }
2299
2300    fn base_working_dir(
2301        &self,
2302        local_working_dir: Option<PathBuf>,
2303        session_id: SessionId,
2304    ) -> eyre::Result<PathBuf> {
2305        match local_working_dir {
2306            Some(working_dir) => {
2307                // check that working directory exists
2308                if working_dir.exists() {
2309                    Ok(working_dir)
2310                } else {
2311                    bail!(
2312                        "working directory does not exist: {}",
2313                        working_dir.display(),
2314                    )
2315                }
2316            }
2317            None => {
2318                // use subfolder of daemon working dir
2319                let daemon_working_dir =
2320                    current_dir().context("failed to get daemon working dir")?;
2321                Ok(daemon_working_dir
2322                    .join("_work")
2323                    .join(session_id.uuid().to_string()))
2324            }
2325        }
2326    }
2327}
2328
2329async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result<Vec<u8>> {
2330    let mut pos = file.seek(io::SeekFrom::End(0)).await?;
2331
2332    let mut output = VecDeque::<u8>::new();
2333    let mut extend_slice_to_start = |slice: &[u8]| {
2334        output.extend(slice);
2335        output.rotate_right(slice.len());
2336    };
2337
2338    let mut buffer = vec![0; 2048];
2339    let mut estimated_line_length = 0;
2340    let mut at_end = true;
2341    'main: while tail > 0 && pos > 0 {
2342        let new_pos = pos.saturating_sub(buffer.len() as u64);
2343        file.seek(io::SeekFrom::Start(new_pos)).await?;
2344        let read_len = (pos - new_pos) as usize;
2345        pos = new_pos;
2346
2347        file.read_exact(&mut buffer[..read_len]).await?;
2348        let read_buf = if at_end {
2349            at_end = false;
2350            &buffer[..read_len].trim_ascii_end()
2351        } else {
2352            &buffer[..read_len]
2353        };
2354
2355        let mut iter = memchr::memrchr_iter(b'\n', read_buf);
2356        let mut lines = 1;
2357        loop {
2358            let Some(pos) = iter.next() else {
2359                extend_slice_to_start(read_buf);
2360                break;
2361            };
2362            lines += 1;
2363            tail -= 1;
2364            if tail == 0 {
2365                extend_slice_to_start(&read_buf[(pos + 1)..]);
2366                break 'main;
2367            }
2368        }
2369
2370        estimated_line_length = estimated_line_length.max((read_buf.len() + 1).div_ceil(lines));
2371        let estimated_buffer_length = estimated_line_length * tail;
2372        if estimated_buffer_length >= buffer.len() * 2 {
2373            buffer.resize(buffer.len() * 2, 0);
2374        }
2375    }
2376
2377    Ok(output.into())
2378}
2379
2380async fn set_up_event_stream(
2381    coordinator_addr: SocketAddr,
2382    machine_id: &Option<String>,
2383    clock: &Arc<HLC>,
2384    remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2385    // used for dynamic nodes
2386    local_listen_port: u16,
2387) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
2388    let clock_cloned = clock.clone();
2389    let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2390        Ok(e) => Timestamped {
2391            inner: Event::Daemon(e.inner),
2392            timestamp: e.timestamp,
2393        },
2394        Err(err) => Timestamped {
2395            inner: Event::DaemonError(err),
2396            timestamp: clock_cloned.new_timestamp(),
2397        },
2398    });
2399    let (daemon_id, coordinator_events) =
2400        coordinator::register(coordinator_addr, machine_id.clone(), clock)
2401            .await
2402            .wrap_err("failed to connect to dora-coordinator")?;
2403    let coordinator_events = coordinator_events.map(
2404        |Timestamped {
2405             inner: event,
2406             timestamp,
2407         }| Timestamped {
2408            inner: Event::Coordinator(event),
2409            timestamp,
2410        },
2411    );
2412    let (events_tx, events_rx) = flume::bounded(10);
2413    let _listen_port =
2414        local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2415            .await?;
2416    let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2417        inner: Event::DynamicNode(e.inner),
2418        timestamp: e.timestamp,
2419    });
2420    let incoming = (
2421        coordinator_events,
2422        remote_daemon_events,
2423        dynamic_node_events,
2424    )
2425        .merge();
2426    Ok((daemon_id, incoming))
2427}
2428
2429async fn send_output_to_local_receivers(
2430    node_id: NodeId,
2431    output_id: DataId,
2432    dataflow: &mut RunningDataflow,
2433    metadata: &metadata::Metadata,
2434    data: Option<DataMessage>,
2435    clock: &HLC,
2436) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2437    let timestamp = metadata.timestamp();
2438    let empty_set = BTreeSet::new();
2439    let output_id = OutputId(node_id, output_id);
2440    let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2441    let OutputId(node_id, _) = output_id;
2442    let mut closed = Vec::new();
2443    for (receiver_id, input_id) in local_receivers {
2444        if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2445            let item = NodeEvent::Input {
2446                id: input_id.clone(),
2447                metadata: metadata.clone(),
2448                data: data.clone(),
2449            };
2450            match channel.send(Timestamped {
2451                inner: item,
2452                timestamp,
2453            }) {
2454                Ok(()) => {
2455                    if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2456                        dataflow
2457                            .pending_drop_tokens
2458                            .entry(token)
2459                            .or_insert_with(|| DropTokenInformation {
2460                                owner: node_id.clone(),
2461                                pending_nodes: Default::default(),
2462                            })
2463                            .pending_nodes
2464                            .insert(receiver_id.clone());
2465                    }
2466                }
2467                Err(_) => {
2468                    closed.push(receiver_id);
2469                }
2470            }
2471        }
2472    }
2473    for id in closed {
2474        dataflow.subscribe_channels.remove(id);
2475    }
2476    let (data_bytes, drop_token) = match data {
2477        None => (None, None),
2478        Some(DataMessage::SharedMemory {
2479            shared_memory_id,
2480            len,
2481            drop_token,
2482        }) => {
2483            let memory = ShmemConf::new()
2484                .os_id(shared_memory_id)
2485                .open()
2486                .wrap_err("failed to map shared memory output")?;
2487            let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2488            (data, Some(drop_token))
2489        }
2490        Some(DataMessage::Vec(v)) => (Some(v), None),
2491    };
2492    if let Some(token) = drop_token {
2493        // insert token into `pending_drop_tokens` even if there are no local subscribers
2494        dataflow
2495            .pending_drop_tokens
2496            .entry(token)
2497            .or_insert_with(|| DropTokenInformation {
2498                owner: node_id.clone(),
2499                pending_nodes: Default::default(),
2500            });
2501        // check if all local subscribers are finished with the token
2502        dataflow.check_drop_token(token, clock).await?;
2503    }
2504    Ok(data_bytes)
2505}
2506
2507fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2508    match &node.kind {
2509        CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2510        CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2511    }
2512}
2513
2514fn close_input(
2515    dataflow: &mut RunningDataflow,
2516    receiver_id: &NodeId,
2517    input_id: &DataId,
2518    clock: &HLC,
2519) {
2520    if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2521        if !open_inputs.remove(input_id) {
2522            return;
2523        }
2524    }
2525    if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2526        let _ = send_with_timestamp(
2527            channel,
2528            NodeEvent::InputClosed {
2529                id: input_id.clone(),
2530            },
2531            clock,
2532        );
2533
2534        if dataflow.open_inputs(receiver_id).is_empty() {
2535            if let Some(node) = dataflow.running_nodes.get_mut(receiver_id) {
2536                node.disable_restart();
2537            }
2538            let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2539        }
2540    }
2541}
2542
2543#[derive(Debug)]
2544pub struct RunningNode {
2545    process: Option<ProcessHandle>,
2546    node_config: NodeConfig,
2547    pid: Option<Arc<AtomicU32>>,
2548    restart_policy: RestartPolicy,
2549    /// Don't restart the node even if the restart policy says so.
2550    ///
2551    /// This flag is set when all inputs of the node were closed and when a manual stop command
2552    /// was sent.
2553    disable_restart: Arc<AtomicBool>,
2554}
2555
2556impl RunningNode {
2557    pub fn restarts_disabled(&self) -> bool {
2558        self.disable_restart.load(atomic::Ordering::Acquire)
2559    }
2560
2561    pub fn disable_restart(&mut self) {
2562        self.disable_restart.store(true, atomic::Ordering::Release);
2563    }
2564}
2565
2566#[derive(Debug)]
2567enum ProcessOperation {
2568    SoftKill,
2569    Kill,
2570}
2571
2572impl ProcessOperation {
2573    pub fn execute(&self, child: &mut dyn TokioChildWrapper) {
2574        match self {
2575            Self::SoftKill => {
2576                #[cfg(unix)]
2577                {
2578                    // Send SIGTERM
2579                    if let Err(err) = child.signal(15) {
2580                        warn!("failed to send SIGTERM to process {:?}: {err}", child.id());
2581                    }
2582                }
2583
2584                #[cfg(windows)]
2585                unsafe {
2586                    let Some(pid) = child.id() else {
2587                        warn!("failed to get child process id");
2588                        return;
2589                    };
2590                    if let Err(err) = windows::Win32::System::Console::GenerateConsoleCtrlEvent(
2591                        windows::Win32::System::Console::CTRL_BREAK_EVENT,
2592                        pid,
2593                    ) {
2594                        warn!("failed to send CTRL_BREAK_EVENT to process {pid}: {err}");
2595                    }
2596                }
2597
2598                #[cfg(not(any(unix, windows)))]
2599                {
2600                    warn!("killing process is not implemented on this platform");
2601                }
2602            }
2603            Self::Kill => {
2604                if let Err(err) = child.start_kill() {
2605                    warn!("failed to kill child process: {err}");
2606                }
2607            }
2608        }
2609    }
2610}
2611
2612#[derive(Debug)]
2613struct ProcessHandle {
2614    op_tx: flume::Sender<ProcessOperation>,
2615}
2616
2617impl ProcessHandle {
2618    pub fn new(op_tx: flume::Sender<ProcessOperation>) -> Self {
2619        Self { op_tx }
2620    }
2621
2622    /// Returns true if the process is not finished yet and the operation is
2623    /// delivered.
2624    pub fn submit(&self, operation: ProcessOperation) -> bool {
2625        self.op_tx.send(operation).is_ok()
2626    }
2627}
2628
2629impl Drop for ProcessHandle {
2630    fn drop(&mut self) {
2631        if self.submit(ProcessOperation::Kill) {
2632            warn!("process was killed on drop because it was still running");
2633        }
2634    }
2635}
2636
2637pub struct RunningDataflow {
2638    id: Uuid,
2639    /// Local nodes that are not started yet
2640    pending_nodes: PendingNodes,
2641
2642    dataflow_started: bool,
2643
2644    subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2645    drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2646    mappings: HashMap<OutputId, BTreeSet<InputId>>,
2647    timers: BTreeMap<Duration, BTreeSet<InputId>>,
2648    open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2649    running_nodes: BTreeMap<NodeId, RunningNode>,
2650
2651    /// List of all dynamic node IDs.
2652    ///
2653    /// We want to treat dynamic nodes differently in some cases, so we need
2654    /// to know which nodes are dynamic.
2655    dynamic_nodes: BTreeSet<NodeId>,
2656
2657    open_external_mappings: BTreeSet<OutputId>,
2658
2659    pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
2660
2661    /// Keep handles to all timer tasks of this dataflow to cancel them on drop.
2662    _timer_handles: BTreeMap<Duration, futures::future::RemoteHandle<()>>,
2663    stop_sent: bool,
2664
2665    /// Used in `open_inputs`.
2666    ///
2667    /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
2668    empty_set: BTreeSet<DataId>,
2669
2670    /// Contains the node that caused the error for nodes that experienced a cascading error.
2671    cascading_error_causes: CascadingErrorCauses,
2672    grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
2673
2674    node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
2675
2676    publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
2677
2678    finished_tx: broadcast::Sender<()>,
2679
2680    publish_all_messages_to_zenoh: bool,
2681}
2682
2683impl RunningDataflow {
2684    fn new(
2685        dataflow_id: Uuid,
2686        daemon_id: DaemonId,
2687        dataflow_descriptor: &Descriptor,
2688    ) -> RunningDataflow {
2689        let (finished_tx, _) = broadcast::channel(1);
2690        Self {
2691            id: dataflow_id,
2692            pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
2693            dataflow_started: false,
2694            subscribe_channels: HashMap::new(),
2695            drop_channels: HashMap::new(),
2696            mappings: HashMap::new(),
2697            timers: BTreeMap::new(),
2698            open_inputs: BTreeMap::new(),
2699            running_nodes: BTreeMap::new(),
2700            dynamic_nodes: BTreeSet::new(),
2701            open_external_mappings: Default::default(),
2702            pending_drop_tokens: HashMap::new(),
2703            _timer_handles: BTreeMap::new(),
2704            stop_sent: false,
2705            empty_set: BTreeSet::new(),
2706            cascading_error_causes: Default::default(),
2707            grace_duration_kills: Default::default(),
2708            node_stderr_most_recent: BTreeMap::new(),
2709            publishers: Default::default(),
2710            finished_tx,
2711            publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
2712        }
2713    }
2714
2715    async fn start(
2716        &mut self,
2717        events_tx: &mpsc::Sender<Timestamped<Event>>,
2718        clock: &Arc<HLC>,
2719    ) -> eyre::Result<()> {
2720        for interval in self.timers.keys().copied() {
2721            if self._timer_handles.get(&interval).is_some() {
2722                continue;
2723            }
2724            let events_tx = events_tx.clone();
2725            let dataflow_id = self.id;
2726            let clock = clock.clone();
2727            let task = async move {
2728                let mut interval_stream = tokio::time::interval(interval);
2729                let hlc = HLC::default();
2730                loop {
2731                    interval_stream.tick().await;
2732
2733                    let span = tracing::span!(tracing::Level::TRACE, "tick");
2734                    let _ = span.enter();
2735
2736                    let mut parameters = BTreeMap::new();
2737                    parameters.insert(
2738                        "open_telemetry_context".to_string(),
2739                        #[cfg(feature = "telemetry")]
2740                        Parameter::String(serialize_context(&span.context())),
2741                        #[cfg(not(feature = "telemetry"))]
2742                        Parameter::String("".into()),
2743                    );
2744
2745                    let metadata = metadata::Metadata::from_parameters(
2746                        hlc.new_timestamp(),
2747                        empty_type_info(),
2748                        parameters,
2749                    );
2750
2751                    let event = Timestamped {
2752                        inner: DoraEvent::Timer {
2753                            dataflow_id,
2754                            interval,
2755                            metadata,
2756                        }
2757                        .into(),
2758                        timestamp: clock.new_timestamp(),
2759                    };
2760                    if events_tx.send(event).await.is_err() {
2761                        break;
2762                    }
2763                }
2764            };
2765            let (task, handle) = task.remote_handle();
2766            tokio::spawn(task);
2767            self._timer_handles.insert(interval, handle);
2768        }
2769
2770        Ok(())
2771    }
2772
2773    async fn stop_all(
2774        &mut self,
2775        coordinator_connection: &mut Option<TcpStream>,
2776        clock: &HLC,
2777        grace_duration: Option<Duration>,
2778        force: bool,
2779        logger: &mut DataflowLogger<'_>,
2780    ) -> eyre::Result<()> {
2781        self.pending_nodes
2782            .handle_dataflow_stop(
2783                coordinator_connection,
2784                clock,
2785                &mut self.cascading_error_causes,
2786                &self.dynamic_nodes,
2787                logger,
2788            )
2789            .await?;
2790
2791        for node in self.running_nodes.values_mut() {
2792            node.disable_restart();
2793        }
2794
2795        for (_node_id, channel) in self.subscribe_channels.drain() {
2796            let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
2797        }
2798
2799        let running_processes: Vec<_> = self
2800            .running_nodes
2801            .iter_mut()
2802            .map(|(id, n)| (id.clone(), n.process.take()))
2803            .collect();
2804        if force {
2805            for (_, proc) in &running_processes {
2806                if let Some(proc) = proc {
2807                    proc.submit(crate::ProcessOperation::Kill);
2808                }
2809            }
2810        } else {
2811            let grace_duration_kills = self.grace_duration_kills.clone();
2812            tokio::spawn(async move {
2813                let duration = grace_duration.unwrap_or(Duration::from_millis(10000));
2814                tokio::time::sleep(duration).await;
2815
2816                for (node, proc) in &running_processes {
2817                    if let Some(proc) = proc {
2818                        if proc.submit(crate::ProcessOperation::SoftKill) {
2819                            grace_duration_kills.insert(node.clone());
2820                        }
2821                    }
2822                }
2823
2824                let kill_duration = duration / 2;
2825                tokio::time::sleep(kill_duration).await;
2826
2827                for (node, proc) in &running_processes {
2828                    if let Some(proc) = proc {
2829                        if proc.submit(crate::ProcessOperation::Kill) {
2830                            grace_duration_kills.insert(node.clone());
2831                            warn!(
2832                                "{node} was killed due to not stopping within the {:#?} grace period",
2833                                duration + kill_duration
2834                            );
2835                        }
2836                    }
2837                }
2838            });
2839        }
2840        self.stop_sent = true;
2841        Ok(())
2842    }
2843
2844    fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2845        self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2846    }
2847
2848    async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2849        match self.pending_drop_tokens.entry(token) {
2850            std::collections::hash_map::Entry::Occupied(entry) => {
2851                if entry.get().pending_nodes.is_empty() {
2852                    let (drop_token, info) = entry.remove_entry();
2853                    let result = match self.drop_channels.get_mut(&info.owner) {
2854                        Some(channel) => send_with_timestamp(
2855                            channel,
2856                            NodeDropEvent::OutputDropped { drop_token },
2857                            clock,
2858                        )
2859                        .wrap_err("send failed"),
2860                        None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2861                    };
2862                    if let Err(err) = result.wrap_err_with(|| {
2863                        format!(
2864                            "failed to report drop token `{drop_token:?}` to owner `{}`",
2865                            &info.owner
2866                        )
2867                    }) {
2868                        tracing::warn!("{err:?}");
2869                    }
2870                }
2871            }
2872            std::collections::hash_map::Entry::Vacant(_) => {
2873                tracing::warn!("check_drop_token called with already closed token")
2874            }
2875        }
2876
2877        Ok(())
2878    }
2879}
2880
2881fn empty_type_info() -> ArrowTypeInfo {
2882    ArrowTypeInfo {
2883        data_type: DataType::Null,
2884        len: 0,
2885        null_count: 0,
2886        validity: None,
2887        offset: 0,
2888        buffer_offsets: Vec::new(),
2889        child_data: Vec::new(),
2890    }
2891}
2892
2893#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2894pub struct OutputId(NodeId, DataId);
2895type InputId = (NodeId, DataId);
2896
2897struct DropTokenInformation {
2898    /// The node that created the associated drop token.
2899    owner: NodeId,
2900    /// Contains the set of pending nodes that still have access to the input
2901    /// associated with a drop token.
2902    pending_nodes: BTreeSet<NodeId>,
2903}
2904
2905#[derive(Debug)]
2906pub enum Event {
2907    Node {
2908        dataflow_id: DataflowId,
2909        node_id: NodeId,
2910        event: DaemonNodeEvent,
2911    },
2912    Coordinator(CoordinatorEvent),
2913    Daemon(InterDaemonEvent),
2914    Dora(DoraEvent),
2915    DynamicNode(DynamicNodeEventWrapper),
2916    HeartbeatInterval,
2917    MetricsInterval,
2918    CtrlC,
2919    SecondCtrlC,
2920    DaemonError(eyre::Report),
2921    SpawnNodeResult {
2922        dataflow_id: DataflowId,
2923        node_id: NodeId,
2924        dynamic_node: bool,
2925        result: Result<RunningNode, NodeError>,
2926    },
2927    BuildDataflowResult {
2928        build_id: BuildId,
2929        session_id: SessionId,
2930        result: eyre::Result<BuildInfo>,
2931    },
2932    SpawnDataflowResult {
2933        dataflow_id: Uuid,
2934        result: eyre::Result<()>,
2935    },
2936    NodeStopped {
2937        dataflow_id: Uuid,
2938        node_id: NodeId,
2939    },
2940}
2941
2942impl From<DoraEvent> for Event {
2943    fn from(event: DoraEvent) -> Self {
2944        Event::Dora(event)
2945    }
2946}
2947
2948impl Event {
2949    pub fn kind(&self) -> &'static str {
2950        match self {
2951            Event::Node { .. } => "Node",
2952            Event::Coordinator(_) => "Coordinator",
2953            Event::Daemon(_) => "Daemon",
2954            Event::Dora(_) => "Dora",
2955            Event::DynamicNode(_) => "DynamicNode",
2956            Event::HeartbeatInterval => "HeartbeatInterval",
2957            Event::MetricsInterval => "MetricsInterval",
2958            Event::CtrlC => "CtrlC",
2959            Event::SecondCtrlC => "SecondCtrlC",
2960            Event::DaemonError(_) => "DaemonError",
2961            Event::SpawnNodeResult { .. } => "SpawnNodeResult",
2962            Event::BuildDataflowResult { .. } => "BuildDataflowResult",
2963            Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
2964            Event::NodeStopped { .. } => "NodeStopped",
2965        }
2966    }
2967}
2968
2969#[derive(Debug)]
2970#[allow(clippy::large_enum_variant)]
2971pub enum DaemonNodeEvent {
2972    OutputsDone {
2973        reply_sender: oneshot::Sender<DaemonReply>,
2974    },
2975    Subscribe {
2976        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2977        reply_sender: oneshot::Sender<DaemonReply>,
2978    },
2979    SubscribeDrop {
2980        event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2981        reply_sender: oneshot::Sender<DaemonReply>,
2982    },
2983    CloseOutputs {
2984        outputs: Vec<dora_core::config::DataId>,
2985        reply_sender: oneshot::Sender<DaemonReply>,
2986    },
2987    SendOut {
2988        output_id: DataId,
2989        metadata: metadata::Metadata,
2990        data: Option<DataMessage>,
2991    },
2992    ReportDrop {
2993        tokens: Vec<DropToken>,
2994    },
2995    EventStreamDropped {
2996        reply_sender: oneshot::Sender<DaemonReply>,
2997    },
2998}
2999
3000#[derive(Debug)]
3001pub enum DoraEvent {
3002    Timer {
3003        dataflow_id: DataflowId,
3004        interval: Duration,
3005        metadata: metadata::Metadata,
3006    },
3007    Logs {
3008        dataflow_id: DataflowId,
3009        output_id: OutputId,
3010        message: DataMessage,
3011        metadata: metadata::Metadata,
3012    },
3013    SpawnedNodeResult {
3014        dataflow_id: DataflowId,
3015        node_id: NodeId,
3016        dynamic_node: bool,
3017        exit_status: NodeExitStatus,
3018        /// Whether the node will be restarted
3019        restart: bool,
3020    },
3021}
3022
3023#[must_use]
3024enum RunStatus {
3025    Continue,
3026    Exit,
3027}
3028
3029fn send_with_timestamp<T>(
3030    sender: &UnboundedSender<Timestamped<T>>,
3031    event: T,
3032    clock: &HLC,
3033) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
3034    sender.send(Timestamped {
3035        inner: event,
3036        timestamp: clock.new_timestamp(),
3037    })
3038}
3039
3040fn set_up_ctrlc_handler(
3041    clock: Arc<HLC>,
3042) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
3043    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
3044
3045    let mut ctrlc_sent = 0;
3046    ctrlc::set_handler(move || {
3047        let event = match ctrlc_sent {
3048            0 => Event::CtrlC,
3049            1 => Event::SecondCtrlC,
3050            _ => {
3051                tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
3052                std::process::abort();
3053            }
3054        };
3055        if ctrlc_tx
3056            .blocking_send(Timestamped {
3057                inner: event,
3058                timestamp: clock.new_timestamp(),
3059            })
3060            .is_err()
3061        {
3062            tracing::error!("failed to report ctrl-c event to dora-coordinator");
3063        }
3064
3065        ctrlc_sent += 1;
3066    })
3067    .wrap_err("failed to set ctrl-c handler")?;
3068
3069    Ok(ctrlc_rx)
3070}
3071
3072#[derive(Debug, Default, Clone, PartialEq, Eq)]
3073pub struct CascadingErrorCauses {
3074    caused_by: BTreeMap<NodeId, NodeId>,
3075}
3076
3077impl CascadingErrorCauses {
3078    pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
3079        self.caused_by.contains_key(node)
3080    }
3081
3082    /// Return the ID of the node that caused a cascading error for the given node, if any.
3083    pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
3084        self.caused_by.get(node)
3085    }
3086
3087    pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
3088        self.caused_by.entry(affected_node).or_insert(causing_node);
3089    }
3090}
3091
3092fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
3093    n.operators
3094        .iter()
3095        .flat_map(|operator| {
3096            operator.config.inputs.iter().map(|(input_id, mapping)| {
3097                (
3098                    DataId::from(format!("{}/{input_id}", operator.id)),
3099                    mapping.clone(),
3100                )
3101            })
3102        })
3103        .collect()
3104}
3105
3106fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
3107    n.operators
3108        .iter()
3109        .flat_map(|operator| {
3110            operator
3111                .config
3112                .outputs
3113                .iter()
3114                .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
3115        })
3116        .collect()
3117}
3118
3119trait CoreNodeKindExt {
3120    fn run_config(&self) -> NodeRunConfig;
3121    fn dynamic(&self) -> bool;
3122}
3123
3124impl CoreNodeKindExt for CoreNodeKind {
3125    fn run_config(&self) -> NodeRunConfig {
3126        match self {
3127            CoreNodeKind::Runtime(n) => NodeRunConfig {
3128                inputs: runtime_node_inputs(n),
3129                outputs: runtime_node_outputs(n),
3130            },
3131            CoreNodeKind::Custom(n) => n.run_config.clone(),
3132        }
3133    }
3134
3135    fn dynamic(&self) -> bool {
3136        match self {
3137            CoreNodeKind::Runtime(_n) => false,
3138            CoreNodeKind::Custom(n) => {
3139                matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
3140            }
3141        }
3142    }
3143}