dora_daemon/
lib.rs

1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5    build::{self, BuildInfo, GitManager, PrevGitSource},
6    config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
7    descriptor::{
8        CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
9        read_as_descriptor,
10    },
11    topics::LOCALHOST,
12    uhlc::{self, HLC},
13};
14use dora_message::{
15    BuildId, DataflowId, SessionId,
16    common::{
17        DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
18        NodeExitStatus,
19    },
20    coordinator_to_cli::DataflowResult,
21    coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
22    daemon_to_coordinator::{
23        CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
24    },
25    daemon_to_daemon::InterDaemonEvent,
26    daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
27    descriptor::NodeSource,
28    metadata::{self, ArrowTypeInfo},
29    node_to_daemon::{DynamicNodeEvent, Timestamped},
30};
31use dora_node_api::{Parameter, arrow::datatypes::DataType};
32use eyre::{Context, ContextCompat, Result, bail, eyre};
33use futures::{FutureExt, TryFutureExt, future, stream};
34use futures_concurrency::stream::Merge;
35use local_listener::DynamicNodeEventWrapper;
36use log::{DaemonLogger, DataflowLogger, Logger};
37use pending::PendingNodes;
38use shared_memory_server::ShmemConf;
39use socket_stream_utils::socket_stream_send;
40use spawn::Spawner;
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap},
43    env::current_dir,
44    future::Future,
45    net::SocketAddr,
46    path::{Path, PathBuf},
47    pin::pin,
48    sync::Arc,
49    time::{Duration, Instant},
50};
51use sysinfo::Pid;
52use tokio::{
53    fs::File,
54    io::AsyncReadExt,
55    net::TcpStream,
56    sync::{
57        broadcast,
58        mpsc::{self, UnboundedSender},
59        oneshot::{self, Sender},
60    },
61};
62use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
63use tracing::{error, warn};
64use uuid::{NoContext, Timestamp, Uuid};
65
66pub use flume;
67pub use log::LogDestination;
68
69mod coordinator;
70mod local_listener;
71mod log;
72mod node_communication;
73mod pending;
74mod socket_stream_utils;
75mod spawn;
76
77#[cfg(feature = "telemetry")]
78use dora_tracing::telemetry::serialize_context;
79#[cfg(feature = "telemetry")]
80use tracing_opentelemetry::OpenTelemetrySpanExt;
81
82use crate::pending::DataflowStatus;
83
84const STDERR_LOG_LINES: usize = 10;
85
86pub struct Daemon {
87    running: HashMap<DataflowId, RunningDataflow>,
88    working_dir: HashMap<DataflowId, PathBuf>,
89
90    events_tx: mpsc::Sender<Timestamped<Event>>,
91
92    coordinator_connection: Option<TcpStream>,
93    last_coordinator_heartbeat: Instant,
94    daemon_id: DaemonId,
95
96    /// used for testing and examples
97    exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
98    /// set on ctrl-c
99    exit_when_all_finished: bool,
100    /// used to record results of local nodes
101    dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
102
103    clock: Arc<uhlc::HLC>,
104
105    zenoh_session: zenoh::Session,
106    remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
107
108    logger: DaemonLogger,
109
110    sessions: BTreeMap<SessionId, BuildId>,
111    builds: BTreeMap<BuildId, BuildInfo>,
112    git_manager: GitManager,
113}
114
115type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
116
117struct NodeBuildTask<F> {
118    node_id: NodeId,
119    dynamic_node: bool,
120    task: F,
121}
122
123impl Daemon {
124    pub async fn run(
125        coordinator_addr: SocketAddr,
126        machine_id: Option<String>,
127        local_listen_port: u16,
128    ) -> eyre::Result<()> {
129        let clock = Arc::new(HLC::default());
130
131        let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
132        let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
133        let (daemon_id, incoming_events) = {
134            let incoming_events = set_up_event_stream(
135                coordinator_addr,
136                &machine_id,
137                &clock,
138                remote_daemon_events_rx,
139                local_listen_port,
140            );
141
142            // finish early if ctrl-c is is pressed during event stream setup
143            let ctrl_c = pin!(ctrlc_events.recv());
144            match futures::future::select(ctrl_c, pin!(incoming_events)).await {
145                future::Either::Left((_ctrl_c, _)) => {
146                    tracing::info!("received ctrl-c signal -> stopping daemon");
147                    return Ok(());
148                }
149                future::Either::Right((events, _)) => events?,
150            }
151        };
152
153        let log_destination = {
154            // additional connection for logging
155            let stream = TcpStream::connect(coordinator_addr)
156                .await
157                .wrap_err("failed to connect log to dora-coordinator")?;
158            stream
159                .set_nodelay(true)
160                .wrap_err("failed to set TCP_NODELAY")?;
161            LogDestination::Coordinator {
162                coordinator_connection: stream,
163            }
164        };
165
166        Self::run_general(
167            (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
168            Some(coordinator_addr),
169            daemon_id,
170            None,
171            clock.clone(),
172            Some(remote_daemon_events_tx),
173            Default::default(),
174            log_destination,
175        )
176        .await
177        .map(|_| ())
178    }
179
180    pub async fn run_dataflow(
181        dataflow_path: &Path,
182        build_id: Option<BuildId>,
183        local_build: Option<BuildInfo>,
184        session_id: SessionId,
185        uv: bool,
186        log_destination: LogDestination,
187    ) -> eyre::Result<DataflowResult> {
188        let working_dir = dataflow_path
189            .canonicalize()
190            .context("failed to canonicalize dataflow path")?
191            .parent()
192            .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
193            .to_owned();
194
195        let descriptor = read_as_descriptor(dataflow_path).await?;
196        if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
197            eyre::bail!(
198                "node {} has a `deploy` section, which is not supported in `dora run`\n\n
199                Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
200                instances and then use `dora start`.",
201                node.id
202            )
203        }
204
205        descriptor.check(&working_dir)?;
206        let nodes = descriptor.resolve_aliases_and_set_defaults()?;
207
208        let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
209        let spawn_command = SpawnDataflowNodes {
210            build_id,
211            session_id,
212            dataflow_id,
213            local_working_dir: Some(working_dir),
214            spawn_nodes: nodes.keys().cloned().collect(),
215            nodes,
216            dataflow_descriptor: descriptor,
217            uv,
218        };
219
220        let clock = Arc::new(HLC::default());
221
222        let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
223
224        let exit_when_done = spawn_command
225            .nodes
226            .values()
227            .map(|n| (spawn_command.dataflow_id, n.id.clone()))
228            .collect();
229        let (reply_tx, reply_rx) = oneshot::channel();
230        let timestamp = clock.new_timestamp();
231        let coordinator_events = stream::once(async move {
232            Timestamped {
233                inner: Event::Coordinator(CoordinatorEvent {
234                    event: DaemonCoordinatorEvent::Spawn(spawn_command),
235                    reply_tx,
236                }),
237                timestamp,
238            }
239        });
240        let events = (coordinator_events, ctrlc_events).merge();
241        let run_result = Self::run_general(
242            Box::pin(events),
243            None,
244            DaemonId::new(None),
245            Some(exit_when_done),
246            clock.clone(),
247            None,
248            if let Some(local_build) = local_build {
249                let Some(build_id) = build_id else {
250                    bail!("no build_id, but local_build set")
251                };
252                let mut builds = BTreeMap::new();
253                builds.insert(build_id, local_build);
254                builds
255            } else {
256                Default::default()
257            },
258            log_destination,
259        );
260
261        let spawn_result = reply_rx
262            .map_err(|err| eyre!("failed to receive spawn result: {err}"))
263            .and_then(|r| async {
264                match r {
265                    Some(DaemonCoordinatorReply::TriggerSpawnResult(result)) => {
266                        result.map_err(|err| eyre!(err))
267                    }
268                    _ => Err(eyre!("unexpected spawn reply")),
269                }
270            });
271
272        let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
273
274        Ok(DataflowResult {
275            uuid: dataflow_id,
276            timestamp: clock.new_timestamp(),
277            node_results: dataflow_results
278                .remove(&dataflow_id)
279                .context("no node results for dataflow_id")?,
280        })
281    }
282
283    #[allow(clippy::too_many_arguments)]
284    async fn run_general(
285        external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
286        coordinator_addr: Option<SocketAddr>,
287        daemon_id: DaemonId,
288        exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
289        clock: Arc<HLC>,
290        remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
291        builds: BTreeMap<BuildId, BuildInfo>,
292        log_destination: LogDestination,
293    ) -> eyre::Result<DaemonRunResult> {
294        let coordinator_connection = match coordinator_addr {
295            Some(addr) => {
296                let stream = TcpStream::connect(addr)
297                    .await
298                    .wrap_err("failed to connect to dora-coordinator")?;
299                stream
300                    .set_nodelay(true)
301                    .wrap_err("failed to set TCP_NODELAY")?;
302                Some(stream)
303            }
304            None => None,
305        };
306
307        let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
308            Ok(path) => {
309                let zenoh_config = zenoh::Config::from_file(&path)
310                    .map_err(|e| eyre!(e))
311                    .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
312                zenoh::open(zenoh_config)
313                    .await
314                    .map_err(|e| eyre!(e))
315                    .context("failed to open zenoh session")?
316            }
317            Err(std::env::VarError::NotPresent) => {
318                let mut zenoh_config = zenoh::Config::default();
319
320                if let Some(addr) = coordinator_addr {
321                    // Linkstate make it possible to connect two daemons on different network through a public daemon
322                    // TODO: There is currently a CI/CD Error in windows linkstate.
323                    if cfg!(not(target_os = "windows")) {
324                        zenoh_config
325                            .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
326                            .unwrap();
327                    }
328
329                    zenoh_config
330                        .insert_json5(
331                            "connect/endpoints",
332                            &format!(
333                                r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
334                                addr.ip()
335                            ),
336                        )
337                        .unwrap();
338                    zenoh_config
339                        .insert_json5(
340                            "listen/endpoints",
341                            r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
342                        )
343                        .unwrap();
344                    if cfg!(target_os = "macos") {
345                        warn!(
346                            "disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file"
347                        );
348                        zenoh_config
349                            .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
350                            .unwrap();
351                    }
352                };
353                if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
354                    zenoh_session
355                } else {
356                    warn!(
357                        "failed to open zenoh session, retrying with default config + coordinator"
358                    );
359                    let mut zenoh_config = zenoh::Config::default();
360                    // Linkstate make it possible to connect two daemons on different network through a public daemon
361                    // TODO: There is currently a CI/CD Error in windows linkstate.
362                    if cfg!(not(target_os = "windows")) {
363                        zenoh_config
364                            .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
365                            .unwrap();
366                    }
367
368                    if let Some(addr) = coordinator_addr {
369                        zenoh_config
370                            .insert_json5(
371                                "connect/endpoints",
372                                &format!(
373                                    r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
374                                    addr.ip()
375                                ),
376                            )
377                            .unwrap();
378                        if cfg!(target_os = "macos") {
379                            warn!(
380                                "disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file"
381                            );
382                            zenoh_config
383                                .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
384                                .unwrap();
385                        }
386                    }
387                    if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
388                        zenoh_session
389                    } else {
390                        warn!("failed to open zenoh session, retrying with default config");
391                        let zenoh_config = zenoh::Config::default();
392                        zenoh::open(zenoh_config)
393                            .await
394                            .map_err(|e| eyre!(e))
395                            .context("failed to open zenoh session")?
396                    }
397                }
398            }
399            Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
400                "{} env variable is not valid unicode",
401                zenoh::Config::DEFAULT_CONFIG_PATH_ENV
402            ),
403        };
404        let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
405        let daemon = Self {
406            logger: Logger {
407                destination: log_destination,
408                daemon_id: daemon_id.clone(),
409                clock: clock.clone(),
410            }
411            .for_daemon(daemon_id.clone()),
412            running: HashMap::new(),
413            working_dir: HashMap::new(),
414            events_tx: dora_events_tx,
415            coordinator_connection,
416            last_coordinator_heartbeat: Instant::now(),
417            daemon_id,
418            exit_when_done,
419            exit_when_all_finished: false,
420            dataflow_node_results: BTreeMap::new(),
421            clock,
422            zenoh_session,
423            remote_daemon_events_tx,
424            git_manager: Default::default(),
425            builds,
426            sessions: Default::default(),
427        };
428
429        let dora_events = ReceiverStream::new(dora_events_rx);
430        let watchdog_clock = daemon.clock.clone();
431        let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
432            Duration::from_secs(5),
433        ))
434        .map(|_| Timestamped {
435            inner: Event::HeartbeatInterval,
436            timestamp: watchdog_clock.new_timestamp(),
437        });
438        let events = (external_events, dora_events, watchdog_interval).merge();
439        daemon.run_inner(events).await
440    }
441
442    #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
443    async fn run_inner(
444        mut self,
445        incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
446    ) -> eyre::Result<DaemonRunResult> {
447        let mut events = incoming_events;
448
449        while let Some(event) = events.next().await {
450            let Timestamped { inner, timestamp } = event;
451            if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
452                tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
453            }
454
455            // used below for checking the duration of event handling
456            let start = Instant::now();
457            let event_kind = inner.kind();
458
459            match inner {
460                Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
461                    let status = self.handle_coordinator_event(event, reply_tx).await?;
462
463                    match status {
464                        RunStatus::Continue => {}
465                        RunStatus::Exit => break,
466                    }
467                }
468                Event::Daemon(event) => {
469                    self.handle_inter_daemon_event(event).await?;
470                }
471                Event::Node {
472                    dataflow_id: dataflow,
473                    node_id,
474                    event,
475                } => self.handle_node_event(event, dataflow, node_id).await?,
476                Event::Dora(event) => self.handle_dora_event(event).await?,
477                Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
478                Event::HeartbeatInterval => {
479                    if let Some(connection) = &mut self.coordinator_connection {
480                        let msg = serde_json::to_vec(&Timestamped {
481                            inner: CoordinatorRequest::Event {
482                                daemon_id: self.daemon_id.clone(),
483                                event: DaemonEvent::Heartbeat,
484                            },
485                            timestamp: self.clock.new_timestamp(),
486                        })?;
487                        socket_stream_send(connection, &msg)
488                            .await
489                            .wrap_err("failed to send watchdog message to dora-coordinator")?;
490
491                        if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
492                            bail!("lost connection to coordinator")
493                        }
494                    }
495                }
496                Event::CtrlC => {
497                    tracing::info!("received ctrlc signal -> stopping all dataflows");
498                    for dataflow in self.running.values_mut() {
499                        let mut logger = self.logger.for_dataflow(dataflow.id);
500                        dataflow
501                            .stop_all(
502                                &mut self.coordinator_connection,
503                                &self.clock,
504                                None,
505                                &mut logger,
506                            )
507                            .await?;
508                    }
509                    self.exit_when_all_finished = true;
510                    if self.running.is_empty() {
511                        break;
512                    }
513                }
514                Event::SecondCtrlC => {
515                    tracing::warn!("received second ctrlc signal -> exit immediately");
516                    bail!("received second ctrl-c signal");
517                }
518                Event::DaemonError(err) => {
519                    tracing::error!("Daemon error: {err:?}");
520                }
521                Event::SpawnNodeResult {
522                    dataflow_id,
523                    node_id,
524                    dynamic_node,
525                    result,
526                } => match result {
527                    Ok(running_node) => {
528                        if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
529                            dataflow.running_nodes.insert(node_id, running_node);
530                        } else {
531                            tracing::error!(
532                                "failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}"
533                            );
534                        }
535                    }
536                    Err(error) => {
537                        self.dataflow_node_results
538                            .entry(dataflow_id)
539                            .or_default()
540                            .insert(node_id.clone(), Err(error));
541                        self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
542                            .await?;
543                    }
544                },
545                Event::BuildDataflowResult {
546                    build_id,
547                    session_id,
548                    result,
549                } => {
550                    let (build_info, result) = match result {
551                        Ok(build_info) => (Some(build_info), Ok(())),
552                        Err(err) => (None, Err(err)),
553                    };
554                    if let Some(build_info) = build_info {
555                        self.builds.insert(build_id, build_info);
556                        if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
557                            self.builds.remove(&old_build_id);
558                        }
559                    }
560                    if let Some(connection) = &mut self.coordinator_connection {
561                        let msg = serde_json::to_vec(&Timestamped {
562                            inner: CoordinatorRequest::Event {
563                                daemon_id: self.daemon_id.clone(),
564                                event: DaemonEvent::BuildResult {
565                                    build_id,
566                                    result: result.map_err(|err| format!("{err:?}")),
567                                },
568                            },
569                            timestamp: self.clock.new_timestamp(),
570                        })?;
571                        socket_stream_send(connection, &msg).await.wrap_err(
572                            "failed to send BuildDataflowResult message to dora-coordinator",
573                        )?;
574                    }
575                }
576                Event::SpawnDataflowResult {
577                    dataflow_id,
578                    result,
579                } => {
580                    if let Some(connection) = &mut self.coordinator_connection {
581                        let msg = serde_json::to_vec(&Timestamped {
582                            inner: CoordinatorRequest::Event {
583                                daemon_id: self.daemon_id.clone(),
584                                event: DaemonEvent::SpawnResult {
585                                    dataflow_id,
586                                    result: result.map_err(|err| format!("{err:?}")),
587                                },
588                            },
589                            timestamp: self.clock.new_timestamp(),
590                        })?;
591                        socket_stream_send(connection, &msg).await.wrap_err(
592                            "failed to send SpawnDataflowResult message to dora-coordinator",
593                        )?;
594                    }
595                }
596                Event::NodeStopped {
597                    dataflow_id,
598                    node_id,
599                } => {
600                    if let Some(exit_when_done) = &mut self.exit_when_done {
601                        exit_when_done.remove(&(dataflow_id, node_id));
602                        if exit_when_done.is_empty() {
603                            tracing::info!(
604                                "exiting daemon because all required dataflows are finished"
605                            );
606                            break;
607                        }
608                    }
609                    if self.exit_when_all_finished && self.running.is_empty() {
610                        break;
611                    }
612                }
613            }
614
615            // warn if event handling took too long -> the main loop should never be blocked for too long
616            let elapsed = start.elapsed();
617            if elapsed > Duration::from_millis(100) {
618                tracing::warn!(
619                    "Daemon took {}ms for handling event: {event_kind}",
620                    elapsed.as_millis()
621                );
622            }
623        }
624
625        if let Some(mut connection) = self.coordinator_connection.take() {
626            let msg = serde_json::to_vec(&Timestamped {
627                inner: CoordinatorRequest::Event {
628                    daemon_id: self.daemon_id.clone(),
629                    event: DaemonEvent::Exit,
630                },
631                timestamp: self.clock.new_timestamp(),
632            })?;
633            socket_stream_send(&mut connection, &msg)
634                .await
635                .wrap_err("failed to send Exit message to dora-coordinator")?;
636        }
637
638        Ok(self.dataflow_node_results)
639    }
640
641    async fn handle_coordinator_event(
642        &mut self,
643        event: DaemonCoordinatorEvent,
644        reply_tx: Sender<Option<DaemonCoordinatorReply>>,
645    ) -> eyre::Result<RunStatus> {
646        let status = match event {
647            DaemonCoordinatorEvent::Build(BuildDataflowNodes {
648                build_id,
649                session_id,
650                local_working_dir,
651                git_sources,
652                prev_git_sources,
653                dataflow_descriptor,
654                nodes_on_machine,
655                uv,
656            }) => {
657                match dataflow_descriptor.communication.remote {
658                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
659                }
660
661                let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
662
663                let result = self
664                    .build_dataflow(
665                        build_id,
666                        session_id,
667                        base_working_dir,
668                        git_sources,
669                        prev_git_sources,
670                        dataflow_descriptor,
671                        nodes_on_machine,
672                        uv,
673                    )
674                    .await;
675                let (trigger_result, result_task) = match result {
676                    Ok(result_task) => (Ok(()), Some(result_task)),
677                    Err(err) => (Err(format!("{err:?}")), None),
678                };
679                let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
680                let _ = reply_tx.send(Some(reply)).map_err(|_| {
681                    error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
682                });
683
684                let result_tx = self.events_tx.clone();
685                let clock = self.clock.clone();
686                if let Some(result_task) = result_task {
687                    tokio::spawn(async move {
688                        let message = Timestamped {
689                            inner: Event::BuildDataflowResult {
690                                build_id,
691                                session_id,
692                                result: result_task.await,
693                            },
694                            timestamp: clock.new_timestamp(),
695                        };
696                        let _ = result_tx
697                            .send(message)
698                            .map_err(|_| {
699                                error!(
700                                    "could not send `BuildResult` reply from daemon to coordinator"
701                                )
702                            })
703                            .await;
704                    });
705                }
706
707                RunStatus::Continue
708            }
709            DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
710                build_id,
711                session_id,
712                dataflow_id,
713                local_working_dir,
714                nodes,
715                dataflow_descriptor,
716                spawn_nodes,
717                uv,
718            }) => {
719                match dataflow_descriptor.communication.remote {
720                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
721                }
722
723                let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
724
725                let result = self
726                    .spawn_dataflow(
727                        build_id,
728                        dataflow_id,
729                        base_working_dir,
730                        nodes,
731                        dataflow_descriptor,
732                        spawn_nodes,
733                        uv,
734                    )
735                    .await;
736                let (trigger_result, result_task) = match result {
737                    Ok(result_task) => (Ok(()), Some(result_task)),
738                    Err(err) => (Err(format!("{err:?}")), None),
739                };
740                let reply = DaemonCoordinatorReply::TriggerSpawnResult(trigger_result);
741                let _ = reply_tx.send(Some(reply)).map_err(|_| {
742                    error!("could not send `TriggerSpawnResult` reply from daemon to coordinator")
743                });
744
745                let result_tx = self.events_tx.clone();
746                let clock = self.clock.clone();
747                if let Some(result_task) = result_task {
748                    tokio::spawn(async move {
749                        let message = Timestamped {
750                            inner: Event::SpawnDataflowResult {
751                                dataflow_id,
752                                result: result_task.await,
753                            },
754                            timestamp: clock.new_timestamp(),
755                        };
756                        let _ = result_tx
757                            .send(message)
758                            .map_err(|_| {
759                                error!(
760                                    "could not send `SpawnResult` reply from daemon to coordinator"
761                                )
762                            })
763                            .await;
764                    });
765                }
766
767                RunStatus::Continue
768            }
769            DaemonCoordinatorEvent::AllNodesReady {
770                dataflow_id,
771                exited_before_subscribe,
772            } => {
773                let mut logger = self.logger.for_dataflow(dataflow_id);
774                logger.log(LogLevel::Debug, None,
775                    Some("daemon".into()),
776                    format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
777                )).await;
778                match self.running.get_mut(&dataflow_id) {
779                    Some(dataflow) => {
780                        let ready = exited_before_subscribe.is_empty();
781                        dataflow
782                            .pending_nodes
783                            .handle_external_all_nodes_ready(
784                                exited_before_subscribe,
785                                &mut dataflow.cascading_error_causes,
786                            )
787                            .await?;
788                        if ready {
789                            logger.log(LogLevel::Info, None,
790                                Some("daemon".into()),
791                                "coordinator reported that all nodes are ready, starting dataflow",
792                            ).await;
793                            dataflow.start(&self.events_tx, &self.clock).await?;
794                        }
795                    }
796                    None => {
797                        tracing::warn!(
798                            "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
799                        );
800                    }
801                }
802                let _ = reply_tx.send(None).map_err(|_| {
803                    error!("could not send `AllNodesReady` reply from daemon to coordinator")
804                });
805                RunStatus::Continue
806            }
807            DaemonCoordinatorEvent::Logs {
808                dataflow_id,
809                node_id,
810            } => {
811                match self.working_dir.get(&dataflow_id) {
812                    Some(working_dir) => {
813                        let working_dir = working_dir.clone();
814                        tokio::spawn(async move {
815                            let logs = async {
816                                let mut file =
817                                    File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
818                                        .await
819                                        .wrap_err(format!(
820                                            "Could not open log file: {:#?}",
821                                            log::log_path(&working_dir, &dataflow_id, &node_id)
822                                        ))?;
823
824                                let mut contents = vec![];
825                                file.read_to_end(&mut contents)
826                                    .await
827                                    .wrap_err("Could not read content of log file")?;
828                                Result::<Vec<u8>, eyre::Report>::Ok(contents)
829                            }
830                            .await
831                            .map_err(|err| format!("{err:?}"));
832                            let _ = reply_tx
833                                .send(Some(DaemonCoordinatorReply::Logs(logs)))
834                                .map_err(|_| {
835                                    error!("could not send logs reply from daemon to coordinator")
836                                });
837                        });
838                    }
839                    None => {
840                        tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
841                        let _ = reply_tx.send(None).map_err(|_| {
842                            error!(
843                                "could not send `AllNodesReady` reply from daemon to coordinator"
844                            )
845                        });
846                    }
847                }
848                RunStatus::Continue
849            }
850            DaemonCoordinatorEvent::ReloadDataflow {
851                dataflow_id,
852                node_id,
853                operator_id,
854            } => {
855                let result = self.send_reload(dataflow_id, node_id, operator_id).await;
856                let reply =
857                    DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
858                let _ = reply_tx
859                    .send(Some(reply))
860                    .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
861                RunStatus::Continue
862            }
863            DaemonCoordinatorEvent::StopDataflow {
864                dataflow_id,
865                grace_duration,
866            } => {
867                let mut logger = self.logger.for_dataflow(dataflow_id);
868                let dataflow = self
869                    .running
870                    .get_mut(&dataflow_id)
871                    .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
872                let (reply, future) = match dataflow {
873                    Ok(dataflow) => {
874                        let future = dataflow.stop_all(
875                            &mut self.coordinator_connection,
876                            &self.clock,
877                            grace_duration,
878                            &mut logger,
879                        );
880                        (Ok(()), Some(future))
881                    }
882                    Err(err) => (Err(err.to_string()), None),
883                };
884
885                let _ = reply_tx
886                    .send(Some(DaemonCoordinatorReply::StopResult(reply)))
887                    .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
888
889                if let Some(future) = future {
890                    future.await?;
891                }
892
893                RunStatus::Continue
894            }
895            DaemonCoordinatorEvent::Destroy => {
896                tracing::info!("received destroy command -> exiting");
897                let (notify_tx, notify_rx) = oneshot::channel();
898                let reply = DaemonCoordinatorReply::DestroyResult {
899                    result: Ok(()),
900                    notify: Some(notify_tx),
901                };
902                let _ = reply_tx
903                    .send(Some(reply))
904                    .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
905                // wait until the reply is sent out
906                if notify_rx.await.is_err() {
907                    tracing::warn!("no confirmation received for DestroyReply");
908                }
909                RunStatus::Exit
910            }
911            DaemonCoordinatorEvent::Heartbeat => {
912                self.last_coordinator_heartbeat = Instant::now();
913                let _ = reply_tx.send(None);
914                RunStatus::Continue
915            }
916        };
917        Ok(status)
918    }
919
920    async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
921        match event {
922            InterDaemonEvent::Output {
923                dataflow_id,
924                node_id,
925                output_id,
926                metadata,
927                data,
928            } => {
929                let inner = async {
930                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
931                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
932                    })?;
933                    send_output_to_local_receivers(
934                        node_id.clone(),
935                        output_id.clone(),
936                        dataflow,
937                        &metadata,
938                        data.map(DataMessage::Vec),
939                        &self.clock,
940                    )
941                    .await?;
942                    Result::<_, eyre::Report>::Ok(())
943                };
944                if let Err(err) = inner
945                    .await
946                    .wrap_err("failed to forward remote output to local receivers")
947                {
948                    let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
949                    logger
950                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
951                        .await;
952                }
953                Ok(())
954            }
955            InterDaemonEvent::OutputClosed {
956                dataflow_id,
957                node_id,
958                output_id,
959            } => {
960                let output_id = OutputId(node_id.clone(), output_id);
961                let mut logger = self
962                    .logger
963                    .for_dataflow(dataflow_id)
964                    .for_node(node_id.clone());
965                logger
966                    .log(
967                        LogLevel::Debug,
968                        Some("daemon".into()),
969                        format!("received OutputClosed event for output {output_id:?}"),
970                    )
971                    .await;
972
973                let inner = async {
974                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
975                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
976                    })?;
977
978                    if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
979                        for (receiver_id, input_id) in &inputs {
980                            close_input(dataflow, receiver_id, input_id, &self.clock);
981                        }
982                    }
983                    Result::<(), eyre::Report>::Ok(())
984                };
985                if let Err(err) = inner
986                    .await
987                    .wrap_err("failed to handle InputsClosed event sent by coordinator")
988                {
989                    logger
990                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
991                        .await;
992                }
993                Ok(())
994            }
995        }
996    }
997
998    #[allow(clippy::too_many_arguments)]
999    async fn build_dataflow(
1000        &mut self,
1001        build_id: BuildId,
1002        session_id: SessionId,
1003        base_working_dir: PathBuf,
1004        git_sources: BTreeMap<NodeId, GitSource>,
1005        prev_git_sources: BTreeMap<NodeId, GitSource>,
1006        dataflow_descriptor: Descriptor,
1007        local_nodes: BTreeSet<NodeId>,
1008        uv: bool,
1009    ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
1010        let builder = build::Builder {
1011            session_id,
1012            base_working_dir,
1013            uv,
1014        };
1015        self.git_manager.clear_planned_builds(session_id);
1016
1017        let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1018
1019        let mut tasks = Vec::new();
1020
1021        // build nodes
1022        for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1023            let dynamic_node = node.kind.dynamic();
1024
1025            let node_id = node.id.clone();
1026            let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1027            logger.log(LogLevel::Debug, "building").await;
1028            let git_source = git_sources.get(&node_id).cloned();
1029            let prev_git_source = prev_git_sources.get(&node_id).cloned();
1030            let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1031                still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1032                git_source: prev_source,
1033            });
1034
1035            let logger_cloned = logger
1036                .try_clone_impl()
1037                .await
1038                .wrap_err("failed to clone logger")?;
1039
1040            let mut builder = builder.clone();
1041            if let Some(node_working_dir) =
1042                node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1043            {
1044                builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1045            }
1046
1047            match builder
1048                .build_node(
1049                    node,
1050                    git_source,
1051                    prev_git,
1052                    logger_cloned,
1053                    &mut self.git_manager,
1054                )
1055                .await
1056                .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1057            {
1058                Ok(result) => {
1059                    tasks.push(NodeBuildTask {
1060                        node_id,
1061                        task: result,
1062                        dynamic_node,
1063                    });
1064                }
1065                Err(err) => {
1066                    logger.log(LogLevel::Error, format!("{err:?}")).await;
1067                    return Err(err);
1068                }
1069            }
1070        }
1071
1072        let task = async move {
1073            let mut info = BuildInfo {
1074                node_working_dirs: Default::default(),
1075            };
1076            for task in tasks {
1077                let NodeBuildTask {
1078                    node_id,
1079                    dynamic_node: _,
1080                    task,
1081                } = task;
1082                let node = task
1083                    .await
1084                    .with_context(|| format!("failed to build node `{node_id}`"))?;
1085                info.node_working_dirs
1086                    .insert(node_id, node.node_working_dir);
1087            }
1088            Ok(info)
1089        };
1090
1091        Ok(task)
1092    }
1093
1094    #[allow(clippy::too_many_arguments)]
1095    async fn spawn_dataflow(
1096        &mut self,
1097        build_id: Option<BuildId>,
1098        dataflow_id: DataflowId,
1099        base_working_dir: PathBuf,
1100        nodes: BTreeMap<NodeId, ResolvedNode>,
1101        dataflow_descriptor: Descriptor,
1102        spawn_nodes: BTreeSet<NodeId>,
1103        uv: bool,
1104    ) -> eyre::Result<impl Future<Output = eyre::Result<()>> + use<>> {
1105        let mut logger = self
1106            .logger
1107            .for_dataflow(dataflow_id)
1108            .try_clone()
1109            .await
1110            .context("failed to clone logger")?;
1111        let dataflow =
1112            RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
1113        let dataflow = match self.running.entry(dataflow_id) {
1114            std::collections::hash_map::Entry::Vacant(entry) => {
1115                self.working_dir
1116                    .insert(dataflow_id, base_working_dir.clone());
1117                entry.insert(dataflow)
1118            }
1119            std::collections::hash_map::Entry::Occupied(_) => {
1120                bail!("there is already a running dataflow with ID `{dataflow_id}`")
1121            }
1122        };
1123
1124        let mut stopped = Vec::new();
1125
1126        let build_info = build_id.and_then(|build_id| self.builds.get(&build_id));
1127        let node_with_git_source = nodes.values().find(|n| n.has_git_source());
1128        if let Some(git_node) = node_with_git_source {
1129            if build_info.is_none() {
1130                eyre::bail!(
1131                    "node {} has git source, but no `dora build` was run yet\n\n\
1132                    nodes with a `git` field must be built using `dora build` before starting the \
1133                    dataflow",
1134                    git_node.id
1135                )
1136            }
1137        }
1138        let node_working_dirs = build_info
1139            .map(|info| info.node_working_dirs.clone())
1140            .unwrap_or_default();
1141
1142        // calculate info about mappings
1143        for node in nodes.values() {
1144            let local = spawn_nodes.contains(&node.id);
1145
1146            let inputs = node_inputs(node);
1147            for (input_id, input) in inputs {
1148                if local {
1149                    dataflow
1150                        .open_inputs
1151                        .entry(node.id.clone())
1152                        .or_default()
1153                        .insert(input_id.clone());
1154                    match input.mapping {
1155                        InputMapping::User(mapping) => {
1156                            dataflow
1157                                .mappings
1158                                .entry(OutputId(mapping.source, mapping.output))
1159                                .or_default()
1160                                .insert((node.id.clone(), input_id));
1161                        }
1162                        InputMapping::Timer { interval } => {
1163                            dataflow
1164                                .timers
1165                                .entry(interval)
1166                                .or_default()
1167                                .insert((node.id.clone(), input_id));
1168                        }
1169                    }
1170                } else if let InputMapping::User(mapping) = input.mapping {
1171                    dataflow
1172                        .open_external_mappings
1173                        .insert(OutputId(mapping.source, mapping.output));
1174                }
1175            }
1176        }
1177
1178        let spawner = Spawner {
1179            dataflow_id,
1180            daemon_tx: self.events_tx.clone(),
1181            dataflow_descriptor,
1182            clock: self.clock.clone(),
1183            uv,
1184        };
1185
1186        let mut tasks = Vec::new();
1187
1188        // spawn nodes and set up subscriptions
1189        for node in nodes.into_values() {
1190            let mut logger = logger.reborrow().for_node(node.id.clone());
1191            let local = spawn_nodes.contains(&node.id);
1192            if local {
1193                let dynamic_node = node.kind.dynamic();
1194                if dynamic_node {
1195                    dataflow.dynamic_nodes.insert(node.id.clone());
1196                } else {
1197                    dataflow.pending_nodes.insert(node.id.clone());
1198                }
1199
1200                let node_id = node.id.clone();
1201                let node_stderr_most_recent = dataflow
1202                    .node_stderr_most_recent
1203                    .entry(node.id.clone())
1204                    .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
1205                    .clone();
1206
1207                let configured_node_working_dir = node_working_dirs.get(&node_id).cloned();
1208                if configured_node_working_dir.is_none() && node.has_git_source() {
1209                    eyre::bail!(
1210                        "node {} has git source, but no git clone directory was found for it\n\n\
1211                        try running `dora build` again",
1212                        node.id
1213                    )
1214                }
1215                let node_working_dir = configured_node_working_dir
1216                    .or_else(|| {
1217                        node.deploy
1218                            .as_ref()
1219                            .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1220                    })
1221                    .unwrap_or(base_working_dir.clone())
1222                    .clone();
1223                match spawner
1224                    .clone()
1225                    .spawn_node(node, node_working_dir, node_stderr_most_recent, &mut logger)
1226                    .await
1227                    .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1228                {
1229                    Ok(result) => {
1230                        tasks.push(NodeBuildTask {
1231                            node_id,
1232                            task: result,
1233                            dynamic_node,
1234                        });
1235                    }
1236                    Err(err) => {
1237                        logger
1238                            .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1239                            .await;
1240                        self.dataflow_node_results
1241                            .entry(dataflow_id)
1242                            .or_default()
1243                            .insert(
1244                                node_id.clone(),
1245                                Err(NodeError {
1246                                    timestamp: self.clock.new_timestamp(),
1247                                    cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1248                                    exit_status: NodeExitStatus::Unknown,
1249                                }),
1250                            );
1251                        stopped.push((node_id.clone(), dynamic_node));
1252                    }
1253                }
1254            } else {
1255                // wait until node is ready before starting
1256                dataflow.pending_nodes.set_external_nodes(true);
1257
1258                // subscribe to all node outputs that are mapped to some local inputs
1259                for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1260                    let tx = self
1261                        .remote_daemon_events_tx
1262                        .clone()
1263                        .wrap_err("no remote_daemon_events_tx channel")?;
1264                    let mut finished_rx = dataflow.finished_tx.subscribe();
1265                    let subscribe_topic = dataflow.output_publish_topic(output_id);
1266                    tracing::debug!("declaring subscriber on {subscribe_topic}");
1267                    let subscriber = self
1268                        .zenoh_session
1269                        .declare_subscriber(subscribe_topic)
1270                        .await
1271                        .map_err(|e| eyre!(e))
1272                        .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1273                    tokio::spawn(async move {
1274                        let mut finished = pin!(finished_rx.recv());
1275                        loop {
1276                            let finished_or_next =
1277                                futures::future::select(finished, subscriber.recv_async());
1278                            match finished_or_next.await {
1279                                future::Either::Left((finished, _)) => match finished {
1280                                    Err(broadcast::error::RecvError::Closed) => {
1281                                        tracing::debug!(
1282                                            "dataflow finished, breaking from zenoh subscribe task"
1283                                        );
1284                                        break;
1285                                    }
1286                                    other => {
1287                                        tracing::warn!(
1288                                            "unexpected return value of dataflow finished_rx channel: {other:?}"
1289                                        );
1290                                        break;
1291                                    }
1292                                },
1293                                future::Either::Right((sample, f)) => {
1294                                    finished = f;
1295                                    let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1296                                        Timestamped::deserialize_inter_daemon_event(
1297                                            &s.payload().to_bytes(),
1298                                        )
1299                                    });
1300                                    if tx.send_async(event).await.is_err() {
1301                                        // daemon finished
1302                                        break;
1303                                    }
1304                                }
1305                            }
1306                        }
1307                    });
1308                }
1309            }
1310        }
1311        for (node_id, dynamic) in stopped {
1312            self.handle_node_stop(dataflow_id, &node_id, dynamic)
1313                .await?;
1314        }
1315
1316        let spawn_result = Self::spawn_prepared_nodes(
1317            dataflow_id,
1318            logger,
1319            tasks,
1320            self.events_tx.clone(),
1321            self.clock.clone(),
1322        );
1323
1324        Ok(spawn_result)
1325    }
1326
1327    async fn spawn_prepared_nodes(
1328        dataflow_id: Uuid,
1329        mut logger: DataflowLogger<'_>,
1330        tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1331        events_tx: mpsc::Sender<Timestamped<Event>>,
1332        clock: Arc<HLC>,
1333    ) -> eyre::Result<()> {
1334        let node_result = |node_id, dynamic_node, result| Timestamped {
1335            inner: Event::SpawnNodeResult {
1336                dataflow_id,
1337                node_id,
1338                dynamic_node,
1339                result,
1340            },
1341            timestamp: clock.new_timestamp(),
1342        };
1343        let mut failed_to_prepare = None;
1344        let mut prepared_nodes = Vec::new();
1345        for task in tasks {
1346            let NodeBuildTask {
1347                node_id,
1348                dynamic_node,
1349                task,
1350            } = task;
1351            match task.await {
1352                Ok(node) => prepared_nodes.push(node),
1353                Err(err) => {
1354                    if failed_to_prepare.is_none() {
1355                        failed_to_prepare = Some(node_id.clone());
1356                    }
1357                    let node_err: NodeError = NodeError {
1358                        timestamp: clock.new_timestamp(),
1359                        cause: NodeErrorCause::FailedToSpawn(format!(
1360                            "preparing for spawn failed: {err:?}"
1361                        )),
1362                        exit_status: NodeExitStatus::Unknown,
1363                    };
1364                    let send_result = events_tx
1365                        .send(node_result(node_id, dynamic_node, Err(node_err)))
1366                        .await;
1367                    if send_result.is_err() {
1368                        tracing::error!("failed to send SpawnNodeResult to main daemon task")
1369                    }
1370                }
1371            }
1372        }
1373
1374        // once all nodes are prepared, do the actual spawning
1375        if let Some(failed_node) = failed_to_prepare {
1376            // don't spawn any nodes when an error occurred before
1377            for node in prepared_nodes {
1378                let err = NodeError {
1379                    timestamp: clock.new_timestamp(),
1380                    cause: NodeErrorCause::Cascading {
1381                        caused_by_node: failed_node.clone(),
1382                    },
1383                    exit_status: NodeExitStatus::Unknown,
1384                };
1385                let send_result = events_tx
1386                    .send(node_result(
1387                        node.node_id().clone(),
1388                        node.dynamic(),
1389                        Err(err),
1390                    ))
1391                    .await;
1392                if send_result.is_err() {
1393                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1394                }
1395            }
1396            Err(eyre!("failed to prepare node {failed_node}"))
1397        } else {
1398            let mut spawn_result = Ok(());
1399
1400            logger
1401                .log(
1402                    LogLevel::Info,
1403                    None,
1404                    Some("dora daemon".into()),
1405                    "finished building nodes, spawning...",
1406                )
1407                .await;
1408
1409            // spawn the nodes
1410            for node in prepared_nodes {
1411                let node_id = node.node_id().clone();
1412                let dynamic_node = node.dynamic();
1413                let mut logger = logger.reborrow().for_node(node_id.clone());
1414                let result = node.spawn(&mut logger).await;
1415                let node_spawn_result = match result {
1416                    Ok(node) => Ok(node),
1417                    Err(err) => {
1418                        let node_err = NodeError {
1419                            timestamp: clock.new_timestamp(),
1420                            cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1421                            exit_status: NodeExitStatus::Unknown,
1422                        };
1423                        if spawn_result.is_ok() {
1424                            spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1425                        }
1426                        Err(node_err)
1427                    }
1428                };
1429                let send_result = events_tx
1430                    .send(node_result(node_id, dynamic_node, node_spawn_result))
1431                    .await;
1432                if send_result.is_err() {
1433                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1434                }
1435            }
1436            spawn_result
1437        }
1438    }
1439
1440    async fn handle_dynamic_node_event(
1441        &mut self,
1442        event: DynamicNodeEventWrapper,
1443    ) -> eyre::Result<()> {
1444        match event {
1445            DynamicNodeEventWrapper {
1446                event: DynamicNodeEvent::NodeConfig { node_id },
1447                reply_tx,
1448            } => {
1449                let number_node_id = self
1450                    .running
1451                    .iter()
1452                    .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1453                    .count();
1454
1455                let node_config = match number_node_id {
1456                    2.. => Err(format!(
1457                        "multiple dataflows contain dynamic node id {node_id}. \
1458                        Please only have one running dataflow with the specified \
1459                        node id if you want to use dynamic node",
1460                    )),
1461                    1 => self
1462                        .running
1463                        .iter()
1464                        .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1465                        .map(|(id, dataflow)| -> Result<NodeConfig> {
1466                            let node_config = dataflow
1467                                .running_nodes
1468                                .get(&node_id)
1469                                .with_context(|| {
1470                                    format!("no node with ID `{node_id}` within the given dataflow")
1471                                })?
1472                                .node_config
1473                                .clone();
1474                            if !node_config.dynamic {
1475                                bail!("node with ID `{node_id}` in {id} is not dynamic");
1476                            }
1477                            Ok(node_config)
1478                        })
1479                        .next()
1480                        .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1481                        .and_then(|r| r)
1482                        .map_err(|err| {
1483                            format!(
1484                                "failed to get dynamic node config within given dataflow: {err}"
1485                            )
1486                        }),
1487                    0 => Err(format!("no node with ID `{node_id}`")),
1488                };
1489
1490                let reply = DaemonReply::NodeConfig {
1491                    result: node_config,
1492                };
1493                let _ = reply_tx.send(Some(reply)).map_err(|_| {
1494                    error!("could not send node info reply from daemon to coordinator")
1495                });
1496                Ok(())
1497            }
1498        }
1499    }
1500
1501    async fn handle_node_event(
1502        &mut self,
1503        event: DaemonNodeEvent,
1504        dataflow_id: DataflowId,
1505        node_id: NodeId,
1506    ) -> eyre::Result<()> {
1507        match event {
1508            DaemonNodeEvent::Subscribe {
1509                event_sender,
1510                reply_sender,
1511            } => {
1512                let mut logger = self.logger.for_dataflow(dataflow_id);
1513                logger
1514                    .log(
1515                        LogLevel::Info,
1516                        Some(node_id.clone()),
1517                        Some("daemon".into()),
1518                        "node is ready",
1519                    )
1520                    .await;
1521
1522                let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1523                    format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1524                });
1525
1526                match dataflow {
1527                    Err(err) => {
1528                        let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1529                    }
1530                    Ok(dataflow) => {
1531                        Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1532
1533                        let status = dataflow
1534                            .pending_nodes
1535                            .handle_node_subscription(
1536                                node_id.clone(),
1537                                reply_sender,
1538                                &mut self.coordinator_connection,
1539                                &self.clock,
1540                                &mut dataflow.cascading_error_causes,
1541                                &mut logger,
1542                            )
1543                            .await?;
1544                        match status {
1545                            DataflowStatus::AllNodesReady => {
1546                                logger
1547                                    .log(
1548                                        LogLevel::Info,
1549                                        None,
1550                                        Some("daemon".into()),
1551                                        "all nodes are ready, starting dataflow",
1552                                    )
1553                                    .await;
1554                                dataflow.start(&self.events_tx, &self.clock).await?;
1555                            }
1556                            DataflowStatus::Pending => {}
1557                        }
1558                    }
1559                }
1560            }
1561            DaemonNodeEvent::SubscribeDrop {
1562                event_sender,
1563                reply_sender,
1564            } => {
1565                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1566                    format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1567                });
1568                let result = match dataflow {
1569                    Ok(dataflow) => {
1570                        dataflow.drop_channels.insert(node_id, event_sender);
1571                        Ok(())
1572                    }
1573                    Err(err) => Err(err.to_string()),
1574                };
1575                let _ = reply_sender.send(DaemonReply::Result(result));
1576            }
1577            DaemonNodeEvent::CloseOutputs {
1578                outputs,
1579                reply_sender,
1580            } => {
1581                // notify downstream nodes
1582                let inner = async {
1583                    self.send_output_closed_events(dataflow_id, node_id, outputs)
1584                        .await
1585                };
1586
1587                let reply = inner.await.map_err(|err| format!("{err:?}"));
1588                let _ = reply_sender.send(DaemonReply::Result(reply));
1589            }
1590            DaemonNodeEvent::OutputsDone { reply_sender } => {
1591                let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1592
1593                let _ = reply_sender.send(DaemonReply::Result(
1594                    result.map_err(|err| format!("{err:?}")),
1595                ));
1596            }
1597            DaemonNodeEvent::SendOut {
1598                output_id,
1599                metadata,
1600                data,
1601            } => self
1602                .send_out(dataflow_id, node_id, output_id, metadata, data)
1603                .await
1604                .context("failed to send out")?,
1605            DaemonNodeEvent::ReportDrop { tokens } => {
1606                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1607                    format!(
1608                        "failed to get handle drop tokens: \
1609                        no running dataflow with ID `{dataflow_id}`"
1610                    )
1611                });
1612
1613                match dataflow {
1614                    Ok(dataflow) => {
1615                        for token in tokens {
1616                            match dataflow.pending_drop_tokens.get_mut(&token) {
1617                                Some(info) => {
1618                                    if info.pending_nodes.remove(&node_id) {
1619                                        dataflow.check_drop_token(token, &self.clock).await?;
1620                                    } else {
1621                                        tracing::warn!(
1622                                            "node `{node_id}` is not pending for drop token `{token:?}`"
1623                                        );
1624                                    }
1625                                }
1626                                None => tracing::warn!("unknown drop token `{token:?}`"),
1627                            }
1628                        }
1629                    }
1630                    Err(err) => tracing::warn!("{err:?}"),
1631                }
1632            }
1633            DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1634                let inner = async {
1635                    let dataflow = self
1636                        .running
1637                        .get_mut(&dataflow_id)
1638                        .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1639                    dataflow.subscribe_channels.remove(&node_id);
1640                    Result::<_, eyre::Error>::Ok(())
1641                };
1642
1643                let reply = inner.await.map_err(|err| format!("{err:?}"));
1644                let _ = reply_sender.send(DaemonReply::Result(reply));
1645            }
1646        }
1647        Ok(())
1648    }
1649
1650    async fn send_reload(
1651        &mut self,
1652        dataflow_id: Uuid,
1653        node_id: NodeId,
1654        operator_id: Option<OperatorId>,
1655    ) -> Result<(), eyre::ErrReport> {
1656        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1657            format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1658        })?;
1659        if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1660            match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1661                Ok(()) => {}
1662                Err(_) => {
1663                    dataflow.subscribe_channels.remove(&node_id);
1664                }
1665            }
1666        }
1667        Ok(())
1668    }
1669
1670    async fn send_out(
1671        &mut self,
1672        dataflow_id: Uuid,
1673        node_id: NodeId,
1674        output_id: DataId,
1675        metadata: dora_message::metadata::Metadata,
1676        data: Option<DataMessage>,
1677    ) -> Result<(), eyre::ErrReport> {
1678        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1679            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1680        })?;
1681        let data_bytes = send_output_to_local_receivers(
1682            node_id.clone(),
1683            output_id.clone(),
1684            dataflow,
1685            &metadata,
1686            data,
1687            &self.clock,
1688        )
1689        .await?;
1690
1691        let output_id = OutputId(node_id, output_id);
1692        let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1693            || dataflow.publish_all_messages_to_zenoh;
1694        if remote_receivers {
1695            let event = InterDaemonEvent::Output {
1696                dataflow_id,
1697                node_id: output_id.0.clone(),
1698                output_id: output_id.1.clone(),
1699                metadata,
1700                data: data_bytes,
1701            };
1702            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1703                .await?;
1704        }
1705
1706        Ok(())
1707    }
1708
1709    async fn send_to_remote_receivers(
1710        &mut self,
1711        dataflow_id: Uuid,
1712        output_id: &OutputId,
1713        event: InterDaemonEvent,
1714    ) -> Result<(), eyre::Error> {
1715        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1716            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1717        })?;
1718
1719        // publish via zenoh
1720        let publisher = match dataflow.publishers.get(output_id) {
1721            Some(publisher) => publisher,
1722            None => {
1723                let publish_topic = dataflow.output_publish_topic(output_id);
1724                tracing::debug!("declaring publisher on {publish_topic}");
1725                let publisher = self
1726                    .zenoh_session
1727                    .declare_publisher(publish_topic)
1728                    .await
1729                    .map_err(|e| eyre!(e))
1730                    .context("failed to create zenoh publisher")?;
1731                dataflow.publishers.insert(output_id.clone(), publisher);
1732                dataflow.publishers.get(output_id).unwrap()
1733            }
1734        };
1735
1736        let serialized_event = Timestamped {
1737            inner: event,
1738            timestamp: self.clock.new_timestamp(),
1739        }
1740        .serialize();
1741        publisher
1742            .put(serialized_event)
1743            .await
1744            .map_err(|e| eyre!(e))
1745            .context("zenoh put failed")?;
1746        Ok(())
1747    }
1748
1749    async fn send_output_closed_events(
1750        &mut self,
1751        dataflow_id: DataflowId,
1752        node_id: NodeId,
1753        outputs: Vec<DataId>,
1754    ) -> eyre::Result<()> {
1755        let dataflow = self
1756            .running
1757            .get_mut(&dataflow_id)
1758            .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1759        let local_node_inputs: BTreeSet<_> = dataflow
1760            .mappings
1761            .iter()
1762            .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1763            .flat_map(|(_, v)| v)
1764            .cloned()
1765            .collect();
1766        for (receiver_id, input_id) in &local_node_inputs {
1767            close_input(dataflow, receiver_id, input_id, &self.clock);
1768        }
1769
1770        let mut closed = Vec::new();
1771        for output_id in &dataflow.open_external_mappings {
1772            if output_id.0 == node_id && outputs.contains(&output_id.1) {
1773                closed.push(output_id.clone());
1774            }
1775        }
1776
1777        for output_id in closed {
1778            let event = InterDaemonEvent::OutputClosed {
1779                dataflow_id,
1780                node_id: output_id.0.clone(),
1781                output_id: output_id.1.clone(),
1782            };
1783            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1784                .await?;
1785        }
1786
1787        Ok(())
1788    }
1789
1790    async fn subscribe(
1791        dataflow: &mut RunningDataflow,
1792        node_id: NodeId,
1793        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1794        clock: &HLC,
1795    ) {
1796        // some inputs might have been closed already -> report those events
1797        let closed_inputs = dataflow
1798            .mappings
1799            .values()
1800            .flatten()
1801            .filter(|(node, _)| node == &node_id)
1802            .map(|(_, input)| input)
1803            .filter(|input| {
1804                dataflow
1805                    .open_inputs
1806                    .get(&node_id)
1807                    .map(|open_inputs| !open_inputs.contains(*input))
1808                    .unwrap_or(true)
1809            });
1810        for input_id in closed_inputs {
1811            let _ = send_with_timestamp(
1812                &event_sender,
1813                NodeEvent::InputClosed {
1814                    id: input_id.clone(),
1815                },
1816                clock,
1817            );
1818        }
1819        if dataflow.open_inputs(&node_id).is_empty() {
1820            let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1821        }
1822
1823        // if a stop event was already sent for the dataflow, send it to
1824        // the newly connected node too
1825        if dataflow.stop_sent {
1826            let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1827        }
1828
1829        dataflow.subscribe_channels.insert(node_id, event_sender);
1830    }
1831
1832    #[tracing::instrument(skip(self), level = "trace")]
1833    async fn handle_outputs_done(
1834        &mut self,
1835        dataflow_id: DataflowId,
1836        node_id: &NodeId,
1837    ) -> eyre::Result<()> {
1838        let dataflow = self
1839            .running
1840            .get_mut(&dataflow_id)
1841            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1842
1843        let outputs = dataflow
1844            .mappings
1845            .keys()
1846            .filter(|m| &m.0 == node_id)
1847            .map(|m| &m.1)
1848            .cloned()
1849            .collect();
1850        self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1851            .await?;
1852
1853        let dataflow = self
1854            .running
1855            .get_mut(&dataflow_id)
1856            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1857        dataflow.drop_channels.remove(node_id);
1858        Ok(())
1859    }
1860
1861    async fn handle_node_stop(
1862        &mut self,
1863        dataflow_id: Uuid,
1864        node_id: &NodeId,
1865        dynamic_node: bool,
1866    ) -> eyre::Result<()> {
1867        let result = self
1868            .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
1869            .await;
1870        let _ = self
1871            .events_tx
1872            .send(Timestamped {
1873                inner: Event::NodeStopped {
1874                    dataflow_id,
1875                    node_id: node_id.clone(),
1876                },
1877                timestamp: self.clock.new_timestamp(),
1878            })
1879            .await;
1880        result
1881    }
1882
1883    async fn handle_node_stop_inner(
1884        &mut self,
1885        dataflow_id: Uuid,
1886        node_id: &NodeId,
1887        dynamic_node: bool,
1888    ) -> eyre::Result<()> {
1889        let mut logger = self.logger.for_dataflow(dataflow_id);
1890        let dataflow = match self.running.get_mut(&dataflow_id) {
1891            Some(dataflow) => dataflow,
1892            None if dynamic_node => {
1893                // The dataflow might be done already as we don't wait for dynamic nodes. In this
1894                // case, we don't need to do anything to handle the node stop.
1895                tracing::debug!(
1896                    "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
1897                );
1898                return Ok(());
1899            }
1900            None => eyre::bail!(
1901                "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
1902            ),
1903        };
1904
1905        dataflow
1906            .pending_nodes
1907            .handle_node_stop(
1908                node_id,
1909                &mut self.coordinator_connection,
1910                &self.clock,
1911                &mut dataflow.cascading_error_causes,
1912                &mut logger,
1913            )
1914            .await?;
1915
1916        self.handle_outputs_done(dataflow_id, node_id).await?;
1917
1918        let mut logger = self.logger.for_dataflow(dataflow_id);
1919        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1920            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1921        })?;
1922        if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1923            pid.mark_as_stopped()
1924        }
1925        if !dataflow.pending_nodes.local_nodes_pending()
1926            && dataflow
1927                .running_nodes
1928                .iter()
1929                .all(|(_id, n)| n.node_config.dynamic)
1930        {
1931            let result = DataflowDaemonResult {
1932                timestamp: self.clock.new_timestamp(),
1933                node_results: self
1934                    .dataflow_node_results
1935                    .get(&dataflow.id)
1936                    .context("failed to get dataflow node results")?
1937                    .clone(),
1938            };
1939
1940            self.git_manager
1941                .clones_in_use
1942                .values_mut()
1943                .for_each(|dataflows| {
1944                    dataflows.remove(&dataflow_id);
1945                });
1946
1947            logger
1948                .log(
1949                    LogLevel::Info,
1950                    None,
1951                    Some("daemon".into()),
1952                    format!("dataflow finished on machine `{}`", self.daemon_id),
1953                )
1954                .await;
1955            if let Some(connection) = &mut self.coordinator_connection {
1956                let msg = serde_json::to_vec(&Timestamped {
1957                    inner: CoordinatorRequest::Event {
1958                        daemon_id: self.daemon_id.clone(),
1959                        event: DaemonEvent::AllNodesFinished {
1960                            dataflow_id,
1961                            result,
1962                        },
1963                    },
1964                    timestamp: self.clock.new_timestamp(),
1965                })?;
1966                socket_stream_send(connection, &msg)
1967                    .await
1968                    .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1969            }
1970            self.running.remove(&dataflow_id);
1971        }
1972
1973        Ok(())
1974    }
1975
1976    async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
1977        match event {
1978            DoraEvent::Timer {
1979                dataflow_id,
1980                interval,
1981                metadata,
1982            } => {
1983                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1984                    tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1985                    return Ok(());
1986                };
1987
1988                let Some(subscribers) = dataflow.timers.get(&interval) else {
1989                    return Ok(());
1990                };
1991
1992                let mut closed = Vec::new();
1993                for (receiver_id, input_id) in subscribers {
1994                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1995                        continue;
1996                    };
1997
1998                    let send_result = send_with_timestamp(
1999                        channel,
2000                        NodeEvent::Input {
2001                            id: input_id.clone(),
2002                            metadata: metadata.clone(),
2003                            data: None,
2004                        },
2005                        &self.clock,
2006                    );
2007                    match send_result {
2008                        Ok(()) => {}
2009                        Err(_) => {
2010                            closed.push(receiver_id);
2011                        }
2012                    }
2013                }
2014                for id in closed {
2015                    dataflow.subscribe_channels.remove(id);
2016                }
2017            }
2018            DoraEvent::Logs {
2019                dataflow_id,
2020                output_id,
2021                message,
2022                metadata,
2023            } => {
2024                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2025                    tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2026                    return Ok(());
2027                };
2028
2029                let Some(subscribers) = dataflow.mappings.get(&output_id) else {
2030                    tracing::warn!(
2031                        "No subscribers found for {:?} in {:?}",
2032                        output_id,
2033                        dataflow.mappings
2034                    );
2035                    return Ok(());
2036                };
2037
2038                let mut closed = Vec::new();
2039                for (receiver_id, input_id) in subscribers {
2040                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2041                        tracing::warn!("No subscriber channel found for {:?}", output_id);
2042                        continue;
2043                    };
2044
2045                    let send_result = send_with_timestamp(
2046                        channel,
2047                        NodeEvent::Input {
2048                            id: input_id.clone(),
2049                            metadata: metadata.clone(),
2050                            data: Some(message.clone()),
2051                        },
2052                        &self.clock,
2053                    );
2054                    match send_result {
2055                        Ok(()) => {}
2056                        Err(_) => {
2057                            closed.push(receiver_id);
2058                        }
2059                    }
2060                }
2061                for id in closed {
2062                    dataflow.subscribe_channels.remove(id);
2063                }
2064            }
2065            DoraEvent::SpawnedNodeResult {
2066                dataflow_id,
2067                node_id,
2068                dynamic_node,
2069                exit_status,
2070            } => {
2071                let mut logger = self
2072                    .logger
2073                    .for_dataflow(dataflow_id)
2074                    .for_node(node_id.clone());
2075                logger
2076                    .log(
2077                        LogLevel::Debug,
2078                        Some("daemon".into()),
2079                        format!("handling node stop with exit status {exit_status:?}"),
2080                    )
2081                    .await;
2082
2083                let node_result = match exit_status {
2084                    NodeExitStatus::Success => Ok(()),
2085                    exit_status => {
2086                        let dataflow = self.running.get(&dataflow_id);
2087                        let caused_by_node = dataflow
2088                            .and_then(|dataflow| {
2089                                dataflow.cascading_error_causes.error_caused_by(&node_id)
2090                            })
2091                            .cloned();
2092                        let grace_duration_kill = dataflow
2093                            .map(|d| d.grace_duration_kills.contains(&node_id))
2094                            .unwrap_or_default();
2095
2096                        let cause = match caused_by_node {
2097                            Some(caused_by_node) => {
2098                                logger
2099                                    .log(
2100                                        LogLevel::Info,
2101                                        Some("daemon".into()),
2102                                        format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2103                                    )
2104                                    .await;
2105
2106                                NodeErrorCause::Cascading { caused_by_node }
2107                            }
2108                            None if grace_duration_kill => NodeErrorCause::GraceDuration,
2109                            None => {
2110                                let cause = dataflow
2111                                    .and_then(|d| d.node_stderr_most_recent.get(&node_id))
2112                                    .map(|queue| {
2113                                        let mut s = if queue.is_full() {
2114                                            "[...]".into()
2115                                        } else {
2116                                            String::new()
2117                                        };
2118                                        while let Some(line) = queue.pop() {
2119                                            s += &line;
2120                                        }
2121                                        s
2122                                    })
2123                                    .unwrap_or_default();
2124
2125                                NodeErrorCause::Other { stderr: cause }
2126                            }
2127                        };
2128                        Err(NodeError {
2129                            timestamp: self.clock.new_timestamp(),
2130                            cause,
2131                            exit_status,
2132                        })
2133                    }
2134                };
2135
2136                logger
2137                    .log(
2138                        if node_result.is_ok() {
2139                            LogLevel::Info
2140                        } else {
2141                            LogLevel::Error
2142                        },
2143                        Some("daemon".into()),
2144                        match &node_result {
2145                            Ok(()) => format!("{node_id} finished successfully"),
2146                            Err(err) => format!("{err}"),
2147                        },
2148                    )
2149                    .await;
2150
2151                self.dataflow_node_results
2152                    .entry(dataflow_id)
2153                    .or_default()
2154                    .insert(node_id.clone(), node_result);
2155
2156                self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2157                    .await?;
2158            }
2159        }
2160        Ok(())
2161    }
2162
2163    fn base_working_dir(
2164        &self,
2165        local_working_dir: Option<PathBuf>,
2166        session_id: SessionId,
2167    ) -> eyre::Result<PathBuf> {
2168        match local_working_dir {
2169            Some(working_dir) => {
2170                // check that working directory exists
2171                if working_dir.exists() {
2172                    Ok(working_dir)
2173                } else {
2174                    bail!(
2175                        "working directory does not exist: {}",
2176                        working_dir.display(),
2177                    )
2178                }
2179            }
2180            None => {
2181                // use subfolder of daemon working dir
2182                let daemon_working_dir =
2183                    current_dir().context("failed to get daemon working dir")?;
2184                Ok(daemon_working_dir
2185                    .join("_work")
2186                    .join(session_id.uuid().to_string()))
2187            }
2188        }
2189    }
2190}
2191
2192async fn set_up_event_stream(
2193    coordinator_addr: SocketAddr,
2194    machine_id: &Option<String>,
2195    clock: &Arc<HLC>,
2196    remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2197    // used for dynamic nodes
2198    local_listen_port: u16,
2199) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
2200    let clock_cloned = clock.clone();
2201    let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2202        Ok(e) => Timestamped {
2203            inner: Event::Daemon(e.inner),
2204            timestamp: e.timestamp,
2205        },
2206        Err(err) => Timestamped {
2207            inner: Event::DaemonError(err),
2208            timestamp: clock_cloned.new_timestamp(),
2209        },
2210    });
2211    let (daemon_id, coordinator_events) =
2212        coordinator::register(coordinator_addr, machine_id.clone(), clock)
2213            .await
2214            .wrap_err("failed to connect to dora-coordinator")?;
2215    let coordinator_events = coordinator_events.map(
2216        |Timestamped {
2217             inner: event,
2218             timestamp,
2219         }| Timestamped {
2220            inner: Event::Coordinator(event),
2221            timestamp,
2222        },
2223    );
2224    let (events_tx, events_rx) = flume::bounded(10);
2225    let _listen_port =
2226        local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2227            .await?;
2228    let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2229        inner: Event::DynamicNode(e.inner),
2230        timestamp: e.timestamp,
2231    });
2232    let incoming = (
2233        coordinator_events,
2234        remote_daemon_events,
2235        dynamic_node_events,
2236    )
2237        .merge();
2238    Ok((daemon_id, incoming))
2239}
2240
2241async fn send_output_to_local_receivers(
2242    node_id: NodeId,
2243    output_id: DataId,
2244    dataflow: &mut RunningDataflow,
2245    metadata: &metadata::Metadata,
2246    data: Option<DataMessage>,
2247    clock: &HLC,
2248) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2249    let timestamp = metadata.timestamp();
2250    let empty_set = BTreeSet::new();
2251    let output_id = OutputId(node_id, output_id);
2252    let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2253    let OutputId(node_id, _) = output_id;
2254    let mut closed = Vec::new();
2255    for (receiver_id, input_id) in local_receivers {
2256        if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2257            let item = NodeEvent::Input {
2258                id: input_id.clone(),
2259                metadata: metadata.clone(),
2260                data: data.clone(),
2261            };
2262            match channel.send(Timestamped {
2263                inner: item,
2264                timestamp,
2265            }) {
2266                Ok(()) => {
2267                    if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2268                        dataflow
2269                            .pending_drop_tokens
2270                            .entry(token)
2271                            .or_insert_with(|| DropTokenInformation {
2272                                owner: node_id.clone(),
2273                                pending_nodes: Default::default(),
2274                            })
2275                            .pending_nodes
2276                            .insert(receiver_id.clone());
2277                    }
2278                }
2279                Err(_) => {
2280                    closed.push(receiver_id);
2281                }
2282            }
2283        }
2284    }
2285    for id in closed {
2286        dataflow.subscribe_channels.remove(id);
2287    }
2288    let (data_bytes, drop_token) = match data {
2289        None => (None, None),
2290        Some(DataMessage::SharedMemory {
2291            shared_memory_id,
2292            len,
2293            drop_token,
2294        }) => {
2295            let memory = ShmemConf::new()
2296                .os_id(shared_memory_id)
2297                .open()
2298                .wrap_err("failed to map shared memory output")?;
2299            let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2300            (data, Some(drop_token))
2301        }
2302        Some(DataMessage::Vec(v)) => (Some(v), None),
2303    };
2304    if let Some(token) = drop_token {
2305        // insert token into `pending_drop_tokens` even if there are no local subscribers
2306        dataflow
2307            .pending_drop_tokens
2308            .entry(token)
2309            .or_insert_with(|| DropTokenInformation {
2310                owner: node_id.clone(),
2311                pending_nodes: Default::default(),
2312            });
2313        // check if all local subscribers are finished with the token
2314        dataflow.check_drop_token(token, clock).await?;
2315    }
2316    Ok(data_bytes)
2317}
2318
2319fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2320    match &node.kind {
2321        CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2322        CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2323    }
2324}
2325
2326fn close_input(
2327    dataflow: &mut RunningDataflow,
2328    receiver_id: &NodeId,
2329    input_id: &DataId,
2330    clock: &HLC,
2331) {
2332    if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2333        if !open_inputs.remove(input_id) {
2334            return;
2335        }
2336    }
2337    if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2338        let _ = send_with_timestamp(
2339            channel,
2340            NodeEvent::InputClosed {
2341                id: input_id.clone(),
2342            },
2343            clock,
2344        );
2345
2346        if dataflow.open_inputs(receiver_id).is_empty() {
2347            let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2348        }
2349    }
2350}
2351
2352#[derive(Debug)]
2353pub struct RunningNode {
2354    pid: Option<ProcessId>,
2355    node_config: NodeConfig,
2356}
2357
2358#[derive(Debug)]
2359struct ProcessId(Option<u32>);
2360
2361impl ProcessId {
2362    pub fn new(process_id: u32) -> Self {
2363        Self(Some(process_id))
2364    }
2365
2366    pub fn mark_as_stopped(&mut self) {
2367        self.0 = None;
2368    }
2369
2370    pub fn kill(&mut self) -> bool {
2371        if let Some(pid) = self.0 {
2372            let pid = Pid::from(pid as usize);
2373            let mut system = sysinfo::System::new();
2374            system.refresh_process(pid);
2375
2376            if let Some(process) = system.process(pid) {
2377                process.kill();
2378                self.mark_as_stopped();
2379                return true;
2380            }
2381        }
2382
2383        false
2384    }
2385}
2386
2387impl Drop for ProcessId {
2388    fn drop(&mut self) {
2389        // kill the process if it's still running
2390        if let Some(pid) = self.0 {
2391            if self.kill() {
2392                warn!("process {pid} was killed on drop because it was still running")
2393            }
2394        }
2395    }
2396}
2397
2398pub struct RunningDataflow {
2399    id: Uuid,
2400    /// Local nodes that are not started yet
2401    pending_nodes: PendingNodes,
2402
2403    subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2404    drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2405    mappings: HashMap<OutputId, BTreeSet<InputId>>,
2406    timers: BTreeMap<Duration, BTreeSet<InputId>>,
2407    open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2408    running_nodes: BTreeMap<NodeId, RunningNode>,
2409
2410    /// List of all dynamic node IDs.
2411    ///
2412    /// We want to treat dynamic nodes differently in some cases, so we need
2413    /// to know which nodes are dynamic.
2414    dynamic_nodes: BTreeSet<NodeId>,
2415
2416    open_external_mappings: BTreeSet<OutputId>,
2417
2418    pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
2419
2420    /// Keep handles to all timer tasks of this dataflow to cancel them on drop.
2421    _timer_handles: BTreeMap<Duration, futures::future::RemoteHandle<()>>,
2422    stop_sent: bool,
2423
2424    /// Used in `open_inputs`.
2425    ///
2426    /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
2427    empty_set: BTreeSet<DataId>,
2428
2429    /// Contains the node that caused the error for nodes that experienced a cascading error.
2430    cascading_error_causes: CascadingErrorCauses,
2431    grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
2432
2433    node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
2434
2435    publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
2436
2437    finished_tx: broadcast::Sender<()>,
2438
2439    publish_all_messages_to_zenoh: bool,
2440}
2441
2442impl RunningDataflow {
2443    fn new(
2444        dataflow_id: Uuid,
2445        daemon_id: DaemonId,
2446        dataflow_descriptor: &Descriptor,
2447    ) -> RunningDataflow {
2448        let (finished_tx, _) = broadcast::channel(1);
2449        Self {
2450            id: dataflow_id,
2451            pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
2452            subscribe_channels: HashMap::new(),
2453            drop_channels: HashMap::new(),
2454            mappings: HashMap::new(),
2455            timers: BTreeMap::new(),
2456            open_inputs: BTreeMap::new(),
2457            running_nodes: BTreeMap::new(),
2458            dynamic_nodes: BTreeSet::new(),
2459            open_external_mappings: Default::default(),
2460            pending_drop_tokens: HashMap::new(),
2461            _timer_handles: BTreeMap::new(),
2462            stop_sent: false,
2463            empty_set: BTreeSet::new(),
2464            cascading_error_causes: Default::default(),
2465            grace_duration_kills: Default::default(),
2466            node_stderr_most_recent: BTreeMap::new(),
2467            publishers: Default::default(),
2468            finished_tx,
2469            publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
2470        }
2471    }
2472
2473    async fn start(
2474        &mut self,
2475        events_tx: &mpsc::Sender<Timestamped<Event>>,
2476        clock: &Arc<HLC>,
2477    ) -> eyre::Result<()> {
2478        for interval in self.timers.keys().copied() {
2479            if self._timer_handles.get(&interval).is_some() {
2480                continue;
2481            }
2482            let events_tx = events_tx.clone();
2483            let dataflow_id = self.id;
2484            let clock = clock.clone();
2485            let task = async move {
2486                let mut interval_stream = tokio::time::interval(interval);
2487                let hlc = HLC::default();
2488                loop {
2489                    interval_stream.tick().await;
2490
2491                    let span = tracing::span!(tracing::Level::TRACE, "tick");
2492                    let _ = span.enter();
2493
2494                    let mut parameters = BTreeMap::new();
2495                    parameters.insert(
2496                        "open_telemetry_context".to_string(),
2497                        #[cfg(feature = "telemetry")]
2498                        Parameter::String(serialize_context(&span.context())),
2499                        #[cfg(not(feature = "telemetry"))]
2500                        Parameter::String("".into()),
2501                    );
2502
2503                    let metadata = metadata::Metadata::from_parameters(
2504                        hlc.new_timestamp(),
2505                        empty_type_info(),
2506                        parameters,
2507                    );
2508
2509                    let event = Timestamped {
2510                        inner: DoraEvent::Timer {
2511                            dataflow_id,
2512                            interval,
2513                            metadata,
2514                        }
2515                        .into(),
2516                        timestamp: clock.new_timestamp(),
2517                    };
2518                    if events_tx.send(event).await.is_err() {
2519                        break;
2520                    }
2521                }
2522            };
2523            let (task, handle) = task.remote_handle();
2524            tokio::spawn(task);
2525            self._timer_handles.insert(interval, handle);
2526        }
2527
2528        Ok(())
2529    }
2530
2531    async fn stop_all(
2532        &mut self,
2533        coordinator_connection: &mut Option<TcpStream>,
2534        clock: &HLC,
2535        grace_duration: Option<Duration>,
2536        logger: &mut DataflowLogger<'_>,
2537    ) -> eyre::Result<()> {
2538        self.pending_nodes
2539            .handle_dataflow_stop(
2540                coordinator_connection,
2541                clock,
2542                &mut self.cascading_error_causes,
2543                &self.dynamic_nodes,
2544                logger,
2545            )
2546            .await?;
2547
2548        for (_node_id, channel) in self.subscribe_channels.drain() {
2549            let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
2550        }
2551
2552        let running_processes: Vec<_> = self
2553            .running_nodes
2554            .iter_mut()
2555            .map(|(id, n)| (id.clone(), n.pid.take()))
2556            .collect();
2557        let grace_duration_kills = self.grace_duration_kills.clone();
2558        tokio::spawn(async move {
2559            let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
2560            tokio::time::sleep(duration).await;
2561
2562            for (node, pid) in running_processes {
2563                if let Some(mut pid) = pid {
2564                    if pid.kill() {
2565                        grace_duration_kills.insert(node.clone());
2566                        warn!(
2567                            "{node} was killed due to not stopping within the {:#?} grace period",
2568                            duration
2569                        )
2570                    }
2571                }
2572            }
2573        });
2574        self.stop_sent = true;
2575        Ok(())
2576    }
2577
2578    fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2579        self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2580    }
2581
2582    async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2583        match self.pending_drop_tokens.entry(token) {
2584            std::collections::hash_map::Entry::Occupied(entry) => {
2585                if entry.get().pending_nodes.is_empty() {
2586                    let (drop_token, info) = entry.remove_entry();
2587                    let result = match self.drop_channels.get_mut(&info.owner) {
2588                        Some(channel) => send_with_timestamp(
2589                            channel,
2590                            NodeDropEvent::OutputDropped { drop_token },
2591                            clock,
2592                        )
2593                        .wrap_err("send failed"),
2594                        None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2595                    };
2596                    if let Err(err) = result.wrap_err_with(|| {
2597                        format!(
2598                            "failed to report drop token `{drop_token:?}` to owner `{}`",
2599                            &info.owner
2600                        )
2601                    }) {
2602                        tracing::warn!("{err:?}");
2603                    }
2604                }
2605            }
2606            std::collections::hash_map::Entry::Vacant(_) => {
2607                tracing::warn!("check_drop_token called with already closed token")
2608            }
2609        }
2610
2611        Ok(())
2612    }
2613
2614    fn output_publish_topic(&self, output_id: &OutputId) -> String {
2615        let network_id = "default";
2616        let dataflow_id = self.id;
2617        let OutputId(node_id, output_id) = output_id;
2618        format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
2619    }
2620}
2621
2622fn empty_type_info() -> ArrowTypeInfo {
2623    ArrowTypeInfo {
2624        data_type: DataType::Null,
2625        len: 0,
2626        null_count: 0,
2627        validity: None,
2628        offset: 0,
2629        buffer_offsets: Vec::new(),
2630        child_data: Vec::new(),
2631    }
2632}
2633
2634#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2635pub struct OutputId(NodeId, DataId);
2636type InputId = (NodeId, DataId);
2637
2638struct DropTokenInformation {
2639    /// The node that created the associated drop token.
2640    owner: NodeId,
2641    /// Contains the set of pending nodes that still have access to the input
2642    /// associated with a drop token.
2643    pending_nodes: BTreeSet<NodeId>,
2644}
2645
2646#[derive(Debug)]
2647pub enum Event {
2648    Node {
2649        dataflow_id: DataflowId,
2650        node_id: NodeId,
2651        event: DaemonNodeEvent,
2652    },
2653    Coordinator(CoordinatorEvent),
2654    Daemon(InterDaemonEvent),
2655    Dora(DoraEvent),
2656    DynamicNode(DynamicNodeEventWrapper),
2657    HeartbeatInterval,
2658    CtrlC,
2659    SecondCtrlC,
2660    DaemonError(eyre::Report),
2661    SpawnNodeResult {
2662        dataflow_id: DataflowId,
2663        node_id: NodeId,
2664        dynamic_node: bool,
2665        result: Result<RunningNode, NodeError>,
2666    },
2667    BuildDataflowResult {
2668        build_id: BuildId,
2669        session_id: SessionId,
2670        result: eyre::Result<BuildInfo>,
2671    },
2672    SpawnDataflowResult {
2673        dataflow_id: Uuid,
2674        result: eyre::Result<()>,
2675    },
2676    NodeStopped {
2677        dataflow_id: Uuid,
2678        node_id: NodeId,
2679    },
2680}
2681
2682impl From<DoraEvent> for Event {
2683    fn from(event: DoraEvent) -> Self {
2684        Event::Dora(event)
2685    }
2686}
2687
2688impl Event {
2689    pub fn kind(&self) -> &'static str {
2690        match self {
2691            Event::Node { .. } => "Node",
2692            Event::Coordinator(_) => "Coordinator",
2693            Event::Daemon(_) => "Daemon",
2694            Event::Dora(_) => "Dora",
2695            Event::DynamicNode(_) => "DynamicNode",
2696            Event::HeartbeatInterval => "HeartbeatInterval",
2697            Event::CtrlC => "CtrlC",
2698            Event::SecondCtrlC => "SecondCtrlC",
2699            Event::DaemonError(_) => "DaemonError",
2700            Event::SpawnNodeResult { .. } => "SpawnNodeResult",
2701            Event::BuildDataflowResult { .. } => "BuildDataflowResult",
2702            Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
2703            Event::NodeStopped { .. } => "NodeStopped",
2704        }
2705    }
2706}
2707
2708#[derive(Debug)]
2709#[allow(clippy::large_enum_variant)]
2710pub enum DaemonNodeEvent {
2711    OutputsDone {
2712        reply_sender: oneshot::Sender<DaemonReply>,
2713    },
2714    Subscribe {
2715        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2716        reply_sender: oneshot::Sender<DaemonReply>,
2717    },
2718    SubscribeDrop {
2719        event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2720        reply_sender: oneshot::Sender<DaemonReply>,
2721    },
2722    CloseOutputs {
2723        outputs: Vec<dora_core::config::DataId>,
2724        reply_sender: oneshot::Sender<DaemonReply>,
2725    },
2726    SendOut {
2727        output_id: DataId,
2728        metadata: metadata::Metadata,
2729        data: Option<DataMessage>,
2730    },
2731    ReportDrop {
2732        tokens: Vec<DropToken>,
2733    },
2734    EventStreamDropped {
2735        reply_sender: oneshot::Sender<DaemonReply>,
2736    },
2737}
2738
2739#[derive(Debug)]
2740pub enum DoraEvent {
2741    Timer {
2742        dataflow_id: DataflowId,
2743        interval: Duration,
2744        metadata: metadata::Metadata,
2745    },
2746    Logs {
2747        dataflow_id: DataflowId,
2748        output_id: OutputId,
2749        message: DataMessage,
2750        metadata: metadata::Metadata,
2751    },
2752    SpawnedNodeResult {
2753        dataflow_id: DataflowId,
2754        node_id: NodeId,
2755        dynamic_node: bool,
2756        exit_status: NodeExitStatus,
2757    },
2758}
2759
2760#[must_use]
2761enum RunStatus {
2762    Continue,
2763    Exit,
2764}
2765
2766fn send_with_timestamp<T>(
2767    sender: &UnboundedSender<Timestamped<T>>,
2768    event: T,
2769    clock: &HLC,
2770) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2771    sender.send(Timestamped {
2772        inner: event,
2773        timestamp: clock.new_timestamp(),
2774    })
2775}
2776
2777fn set_up_ctrlc_handler(
2778    clock: Arc<HLC>,
2779) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2780    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2781
2782    let mut ctrlc_sent = 0;
2783    ctrlc::set_handler(move || {
2784        let event = match ctrlc_sent {
2785            0 => Event::CtrlC,
2786            1 => Event::SecondCtrlC,
2787            _ => {
2788                tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2789                std::process::abort();
2790            }
2791        };
2792        if ctrlc_tx
2793            .blocking_send(Timestamped {
2794                inner: event,
2795                timestamp: clock.new_timestamp(),
2796            })
2797            .is_err()
2798        {
2799            tracing::error!("failed to report ctrl-c event to dora-coordinator");
2800        }
2801
2802        ctrlc_sent += 1;
2803    })
2804    .wrap_err("failed to set ctrl-c handler")?;
2805
2806    Ok(ctrlc_rx)
2807}
2808
2809#[derive(Debug, Default, Clone, PartialEq, Eq)]
2810pub struct CascadingErrorCauses {
2811    caused_by: BTreeMap<NodeId, NodeId>,
2812}
2813
2814impl CascadingErrorCauses {
2815    pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2816        self.caused_by.contains_key(node)
2817    }
2818
2819    /// Return the ID of the node that caused a cascading error for the given node, if any.
2820    pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2821        self.caused_by.get(node)
2822    }
2823
2824    pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2825        self.caused_by.entry(affected_node).or_insert(causing_node);
2826    }
2827}
2828
2829fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2830    n.operators
2831        .iter()
2832        .flat_map(|operator| {
2833            operator.config.inputs.iter().map(|(input_id, mapping)| {
2834                (
2835                    DataId::from(format!("{}/{input_id}", operator.id)),
2836                    mapping.clone(),
2837                )
2838            })
2839        })
2840        .collect()
2841}
2842
2843fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2844    n.operators
2845        .iter()
2846        .flat_map(|operator| {
2847            operator
2848                .config
2849                .outputs
2850                .iter()
2851                .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2852        })
2853        .collect()
2854}
2855
2856trait CoreNodeKindExt {
2857    fn run_config(&self) -> NodeRunConfig;
2858    fn dynamic(&self) -> bool;
2859}
2860
2861impl CoreNodeKindExt for CoreNodeKind {
2862    fn run_config(&self) -> NodeRunConfig {
2863        match self {
2864            CoreNodeKind::Runtime(n) => NodeRunConfig {
2865                inputs: runtime_node_inputs(n),
2866                outputs: runtime_node_outputs(n),
2867            },
2868            CoreNodeKind::Custom(n) => n.run_config.clone(),
2869        }
2870    }
2871
2872    fn dynamic(&self) -> bool {
2873        match self {
2874            CoreNodeKind::Runtime(_n) => false,
2875            CoreNodeKind::Custom(n) => {
2876                matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
2877            }
2878        }
2879    }
2880}