dora_daemon/
lib.rs

1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5    config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
6    descriptor::{
7        read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
8        DYNAMIC_SOURCE,
9    },
10    topics::LOCALHOST,
11    uhlc::{self, HLC},
12};
13use dora_message::{
14    common::{
15        DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
16    },
17    coordinator_to_cli::DataflowResult,
18    coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
19    daemon_to_coordinator::{
20        CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
21    },
22    daemon_to_daemon::InterDaemonEvent,
23    daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
24    metadata::{self, ArrowTypeInfo},
25    node_to_daemon::{DynamicNodeEvent, Timestamped},
26    DataflowId,
27};
28use dora_node_api::{arrow::datatypes::DataType, Parameter};
29use eyre::{bail, eyre, Context, ContextCompat, Result};
30use futures::{future, stream, FutureExt, TryFutureExt};
31use futures_concurrency::stream::Merge;
32use local_listener::DynamicNodeEventWrapper;
33use log::{DaemonLogger, DataflowLogger, Logger};
34use pending::PendingNodes;
35use shared_memory_server::ShmemConf;
36use socket_stream_utils::socket_stream_send;
37use std::{
38    collections::{BTreeMap, BTreeSet, HashMap},
39    net::SocketAddr,
40    path::{Path, PathBuf},
41    pin::pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use sysinfo::Pid;
46use tokio::{
47    fs::File,
48    io::AsyncReadExt,
49    net::TcpStream,
50    sync::{
51        broadcast,
52        mpsc::{self, UnboundedSender},
53        oneshot::{self, Sender},
54    },
55};
56use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
57use tracing::{error, warn};
58use uuid::{NoContext, Timestamp, Uuid};
59
60mod coordinator;
61mod local_listener;
62mod log;
63mod node_communication;
64mod pending;
65mod socket_stream_utils;
66mod spawn;
67
68#[cfg(feature = "telemetry")]
69use dora_tracing::telemetry::serialize_context;
70#[cfg(feature = "telemetry")]
71use tracing_opentelemetry::OpenTelemetrySpanExt;
72
73use crate::pending::DataflowStatus;
74
75const STDERR_LOG_LINES: usize = 10;
76
77pub struct Daemon {
78    running: HashMap<DataflowId, RunningDataflow>,
79    working_dir: HashMap<DataflowId, PathBuf>,
80
81    events_tx: mpsc::Sender<Timestamped<Event>>,
82
83    coordinator_connection: Option<TcpStream>,
84    last_coordinator_heartbeat: Instant,
85    daemon_id: DaemonId,
86
87    /// used for testing and examples
88    exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
89    /// set on ctrl-c
90    exit_when_all_finished: bool,
91    /// used to record results of local nodes
92    dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
93
94    clock: Arc<uhlc::HLC>,
95
96    zenoh_session: zenoh::Session,
97    remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
98
99    logger: DaemonLogger,
100}
101
102type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
103
104impl Daemon {
105    pub async fn run(
106        coordinator_addr: SocketAddr,
107        machine_id: Option<String>,
108        local_listen_port: u16,
109    ) -> eyre::Result<()> {
110        let clock = Arc::new(HLC::default());
111
112        let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
113        let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
114        let (daemon_id, incoming_events) = {
115            let incoming_events = set_up_event_stream(
116                coordinator_addr,
117                &machine_id,
118                &clock,
119                remote_daemon_events_rx,
120                local_listen_port,
121            );
122
123            // finish early if ctrl-c is is pressed during event stream setup
124            let ctrl_c = pin!(ctrlc_events.recv());
125            match futures::future::select(ctrl_c, pin!(incoming_events)).await {
126                future::Either::Left((_ctrl_c, _)) => {
127                    tracing::info!("received ctrl-c signal -> stopping daemon");
128                    return Ok(());
129                }
130                future::Either::Right((events, _)) => events?,
131            }
132        };
133        Self::run_general(
134            (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
135            Some(coordinator_addr),
136            daemon_id,
137            None,
138            clock,
139            Some(remote_daemon_events_tx),
140        )
141        .await
142        .map(|_| ())
143    }
144
145    pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
146        let working_dir = dataflow_path
147            .canonicalize()
148            .context("failed to canonicalize dataflow path")?
149            .parent()
150            .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
151            .to_owned();
152
153        let descriptor = read_as_descriptor(dataflow_path).await?;
154        descriptor.check(&working_dir)?;
155        let nodes = descriptor.resolve_aliases_and_set_defaults()?;
156
157        let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
158        let spawn_command = SpawnDataflowNodes {
159            dataflow_id,
160            working_dir,
161            spawn_nodes: nodes.keys().cloned().collect(),
162            nodes,
163            dataflow_descriptor: descriptor,
164            uv,
165        };
166
167        let clock = Arc::new(HLC::default());
168
169        let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
170
171        let exit_when_done = spawn_command
172            .nodes
173            .values()
174            .map(|n| (spawn_command.dataflow_id, n.id.clone()))
175            .collect();
176        let (reply_tx, reply_rx) = oneshot::channel();
177        let timestamp = clock.new_timestamp();
178        let coordinator_events = stream::once(async move {
179            Timestamped {
180                inner: Event::Coordinator(CoordinatorEvent {
181                    event: DaemonCoordinatorEvent::Spawn(spawn_command),
182                    reply_tx,
183                }),
184                timestamp,
185            }
186        });
187        let events = (coordinator_events, ctrlc_events).merge();
188        let run_result = Self::run_general(
189            Box::pin(events),
190            None,
191            DaemonId::new(None),
192            Some(exit_when_done),
193            clock.clone(),
194            None,
195        );
196
197        let spawn_result = reply_rx
198            .map_err(|err| eyre!("failed to receive spawn result: {err}"))
199            .and_then(|r| async {
200                match r {
201                    Some(DaemonCoordinatorReply::SpawnResult(result)) => {
202                        result.map_err(|err| eyre!(err))
203                    }
204                    _ => Err(eyre!("unexpected spawn reply")),
205                }
206            });
207
208        let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
209
210        Ok(DataflowResult {
211            uuid: dataflow_id,
212            timestamp: clock.new_timestamp(),
213            node_results: dataflow_results
214                .remove(&dataflow_id)
215                .context("no node results for dataflow_id")?,
216        })
217    }
218
219    async fn run_general(
220        external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
221        coordinator_addr: Option<SocketAddr>,
222        daemon_id: DaemonId,
223        exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
224        clock: Arc<HLC>,
225        remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
226    ) -> eyre::Result<DaemonRunResult> {
227        let coordinator_connection = match coordinator_addr {
228            Some(addr) => {
229                let stream = TcpStream::connect(addr)
230                    .await
231                    .wrap_err("failed to connect to dora-coordinator")?;
232                stream
233                    .set_nodelay(true)
234                    .wrap_err("failed to set TCP_NODELAY")?;
235                Some(stream)
236            }
237            None => None,
238        };
239
240        // additional connection for logging
241        let logger_coordinator_connection = match coordinator_addr {
242            Some(addr) => {
243                let stream = TcpStream::connect(addr)
244                    .await
245                    .wrap_err("failed to connect log to dora-coordinator")?;
246                stream
247                    .set_nodelay(true)
248                    .wrap_err("failed to set TCP_NODELAY")?;
249                Some(stream)
250            }
251            None => None,
252        };
253
254        let zenoh_config = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
255            Ok(path) => zenoh::Config::from_file(&path)
256                .map_err(|e| eyre!(e))
257                .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?,
258            Err(std::env::VarError::NotPresent) => zenoh::Config::default(),
259            Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
260                "{} env variable is not valid unicode",
261                zenoh::Config::DEFAULT_CONFIG_PATH_ENV
262            ),
263        };
264        let zenoh_session = zenoh::open(zenoh_config)
265            .await
266            .map_err(|e| eyre!(e))
267            .context("failed to open zenoh session")?;
268
269        let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
270        let daemon = Self {
271            logger: Logger {
272                coordinator_connection: logger_coordinator_connection,
273                daemon_id: daemon_id.clone(),
274                clock: clock.clone(),
275            }
276            .for_daemon(daemon_id.clone()),
277            running: HashMap::new(),
278            working_dir: HashMap::new(),
279            events_tx: dora_events_tx,
280            coordinator_connection,
281            last_coordinator_heartbeat: Instant::now(),
282            daemon_id,
283            exit_when_done,
284            exit_when_all_finished: false,
285            dataflow_node_results: BTreeMap::new(),
286            clock,
287            zenoh_session,
288            remote_daemon_events_tx,
289        };
290
291        let dora_events = ReceiverStream::new(dora_events_rx);
292        let watchdog_clock = daemon.clock.clone();
293        let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
294            Duration::from_secs(5),
295        ))
296        .map(|_| Timestamped {
297            inner: Event::HeartbeatInterval,
298            timestamp: watchdog_clock.new_timestamp(),
299        });
300        let events = (external_events, dora_events, watchdog_interval).merge();
301        daemon.run_inner(events).await
302    }
303
304    #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
305    async fn run_inner(
306        mut self,
307        incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
308    ) -> eyre::Result<DaemonRunResult> {
309        let mut events = incoming_events;
310
311        while let Some(event) = events.next().await {
312            let Timestamped { inner, timestamp } = event;
313            if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
314                tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
315            }
316
317            match inner {
318                Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
319                    let status = self.handle_coordinator_event(event, reply_tx).await?;
320
321                    match status {
322                        RunStatus::Continue => {}
323                        RunStatus::Exit => break,
324                    }
325                }
326                Event::Daemon(event) => {
327                    self.handle_inter_daemon_event(event).await?;
328                }
329                Event::Node {
330                    dataflow_id: dataflow,
331                    node_id,
332                    event,
333                } => self.handle_node_event(event, dataflow, node_id).await?,
334                Event::Dora(event) => match self.handle_dora_event(event).await? {
335                    RunStatus::Continue => {}
336                    RunStatus::Exit => break,
337                },
338                Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
339                Event::HeartbeatInterval => {
340                    if let Some(connection) = &mut self.coordinator_connection {
341                        let msg = serde_json::to_vec(&Timestamped {
342                            inner: CoordinatorRequest::Event {
343                                daemon_id: self.daemon_id.clone(),
344                                event: DaemonEvent::Heartbeat,
345                            },
346                            timestamp: self.clock.new_timestamp(),
347                        })?;
348                        socket_stream_send(connection, &msg)
349                            .await
350                            .wrap_err("failed to send watchdog message to dora-coordinator")?;
351
352                        if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
353                            bail!("lost connection to coordinator")
354                        }
355                    }
356                }
357                Event::CtrlC => {
358                    tracing::info!("received ctrlc signal -> stopping all dataflows");
359                    for dataflow in self.running.values_mut() {
360                        let mut logger = self.logger.for_dataflow(dataflow.id);
361                        dataflow
362                            .stop_all(
363                                &mut self.coordinator_connection,
364                                &self.clock,
365                                None,
366                                &mut logger,
367                            )
368                            .await?;
369                    }
370                    self.exit_when_all_finished = true;
371                    if self.running.is_empty() {
372                        break;
373                    }
374                }
375                Event::SecondCtrlC => {
376                    tracing::warn!("received second ctrlc signal -> exit immediately");
377                    bail!("received second ctrl-c signal");
378                }
379                Event::DaemonError(err) => {
380                    tracing::error!("Daemon error: {err:?}");
381                }
382            }
383        }
384
385        if let Some(mut connection) = self.coordinator_connection.take() {
386            let msg = serde_json::to_vec(&Timestamped {
387                inner: CoordinatorRequest::Event {
388                    daemon_id: self.daemon_id.clone(),
389                    event: DaemonEvent::Exit,
390                },
391                timestamp: self.clock.new_timestamp(),
392            })?;
393            socket_stream_send(&mut connection, &msg)
394                .await
395                .wrap_err("failed to send Exit message to dora-coordinator")?;
396        }
397
398        Ok(self.dataflow_node_results)
399    }
400
401    async fn handle_coordinator_event(
402        &mut self,
403        event: DaemonCoordinatorEvent,
404        reply_tx: Sender<Option<DaemonCoordinatorReply>>,
405    ) -> eyre::Result<RunStatus> {
406        let status = match event {
407            DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
408                dataflow_id,
409                working_dir,
410                nodes,
411                dataflow_descriptor,
412                spawn_nodes,
413                uv,
414            }) => {
415                match dataflow_descriptor.communication.remote {
416                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
417                }
418
419                // Use the working directory if it exists, otherwise use the working directory where the daemon is spawned
420                let working_dir = if working_dir.exists() {
421                    working_dir
422                } else {
423                    std::env::current_dir().wrap_err("failed to get current working dir")?
424                };
425
426                let result = self
427                    .spawn_dataflow(
428                        dataflow_id,
429                        working_dir,
430                        nodes,
431                        dataflow_descriptor,
432                        spawn_nodes,
433                        uv,
434                    )
435                    .await;
436                if let Err(err) = &result {
437                    tracing::error!("{err:?}");
438                }
439                let reply =
440                    DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
441                let _ = reply_tx.send(Some(reply)).map_err(|_| {
442                    error!("could not send `SpawnResult` reply from daemon to coordinator")
443                });
444                RunStatus::Continue
445            }
446            DaemonCoordinatorEvent::AllNodesReady {
447                dataflow_id,
448                exited_before_subscribe,
449            } => {
450                let mut logger = self.logger.for_dataflow(dataflow_id);
451                logger.log(LogLevel::Debug, None,
452                    Some("daemon".into()),
453                    format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
454                )).await;
455                match self.running.get_mut(&dataflow_id) {
456                    Some(dataflow) => {
457                        let ready = exited_before_subscribe.is_empty();
458                        dataflow
459                            .pending_nodes
460                            .handle_external_all_nodes_ready(
461                                exited_before_subscribe,
462                                &mut dataflow.cascading_error_causes,
463                            )
464                            .await?;
465                        if ready {
466                            logger.log(LogLevel::Info, None,
467                                Some("daemon".into()),
468                                "coordinator reported that all nodes are ready, starting dataflow",
469                            ).await;
470                            dataflow.start(&self.events_tx, &self.clock).await?;
471                        }
472                    }
473                    None => {
474                        tracing::warn!(
475                            "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
476                        );
477                    }
478                }
479                let _ = reply_tx.send(None).map_err(|_| {
480                    error!("could not send `AllNodesReady` reply from daemon to coordinator")
481                });
482                RunStatus::Continue
483            }
484            DaemonCoordinatorEvent::Logs {
485                dataflow_id,
486                node_id,
487            } => {
488                match self.working_dir.get(&dataflow_id) {
489                    Some(working_dir) => {
490                        let working_dir = working_dir.clone();
491                        tokio::spawn(async move {
492                            let logs = async {
493                                let mut file =
494                                    File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
495                                        .await
496                                        .wrap_err(format!(
497                                            "Could not open log file: {:#?}",
498                                            log::log_path(&working_dir, &dataflow_id, &node_id)
499                                        ))?;
500
501                                let mut contents = vec![];
502                                file.read_to_end(&mut contents)
503                                    .await
504                                    .wrap_err("Could not read content of log file")?;
505                                Result::<Vec<u8>, eyre::Report>::Ok(contents)
506                            }
507                            .await
508                            .map_err(|err| format!("{err:?}"));
509                            let _ = reply_tx
510                                .send(Some(DaemonCoordinatorReply::Logs(logs)))
511                                .map_err(|_| {
512                                    error!("could not send logs reply from daemon to coordinator")
513                                });
514                        });
515                    }
516                    None => {
517                        tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
518                        let _ = reply_tx.send(None).map_err(|_| {
519                            error!(
520                                "could not send `AllNodesReady` reply from daemon to coordinator"
521                            )
522                        });
523                    }
524                }
525                RunStatus::Continue
526            }
527            DaemonCoordinatorEvent::ReloadDataflow {
528                dataflow_id,
529                node_id,
530                operator_id,
531            } => {
532                let result = self.send_reload(dataflow_id, node_id, operator_id).await;
533                let reply =
534                    DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
535                let _ = reply_tx
536                    .send(Some(reply))
537                    .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
538                RunStatus::Continue
539            }
540            DaemonCoordinatorEvent::StopDataflow {
541                dataflow_id,
542                grace_duration,
543            } => {
544                let mut logger = self.logger.for_dataflow(dataflow_id);
545                let dataflow = self
546                    .running
547                    .get_mut(&dataflow_id)
548                    .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
549                let (reply, future) = match dataflow {
550                    Ok(dataflow) => {
551                        let future = dataflow.stop_all(
552                            &mut self.coordinator_connection,
553                            &self.clock,
554                            grace_duration,
555                            &mut logger,
556                        );
557                        (Ok(()), Some(future))
558                    }
559                    Err(err) => (Err(err.to_string()), None),
560                };
561
562                let _ = reply_tx
563                    .send(Some(DaemonCoordinatorReply::StopResult(reply)))
564                    .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
565
566                if let Some(future) = future {
567                    future.await?;
568                }
569
570                RunStatus::Continue
571            }
572            DaemonCoordinatorEvent::Destroy => {
573                tracing::info!("received destroy command -> exiting");
574                let (notify_tx, notify_rx) = oneshot::channel();
575                let reply = DaemonCoordinatorReply::DestroyResult {
576                    result: Ok(()),
577                    notify: Some(notify_tx),
578                };
579                let _ = reply_tx
580                    .send(Some(reply))
581                    .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
582                // wait until the reply is sent out
583                if notify_rx.await.is_err() {
584                    tracing::warn!("no confirmation received for DestroyReply");
585                }
586                RunStatus::Exit
587            }
588            DaemonCoordinatorEvent::Heartbeat => {
589                self.last_coordinator_heartbeat = Instant::now();
590                let _ = reply_tx.send(None);
591                RunStatus::Continue
592            }
593        };
594        Ok(status)
595    }
596
597    async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
598        match event {
599            InterDaemonEvent::Output {
600                dataflow_id,
601                node_id,
602                output_id,
603                metadata,
604                data,
605            } => {
606                let inner = async {
607                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
608                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
609                    })?;
610                    send_output_to_local_receivers(
611                        node_id.clone(),
612                        output_id.clone(),
613                        dataflow,
614                        &metadata,
615                        data.map(DataMessage::Vec),
616                        &self.clock,
617                    )
618                    .await?;
619                    Result::<_, eyre::Report>::Ok(())
620                };
621                if let Err(err) = inner
622                    .await
623                    .wrap_err("failed to forward remote output to local receivers")
624                {
625                    let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
626                    logger
627                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
628                        .await;
629                }
630                Ok(())
631            }
632            InterDaemonEvent::OutputClosed {
633                dataflow_id,
634                node_id,
635                output_id,
636            } => {
637                let output_id = OutputId(node_id.clone(), output_id);
638                let mut logger = self
639                    .logger
640                    .for_dataflow(dataflow_id)
641                    .for_node(node_id.clone());
642                logger
643                    .log(
644                        LogLevel::Debug,
645                        Some("daemon".into()),
646                        format!("received OutputClosed event for output {output_id:?}"),
647                    )
648                    .await;
649
650                let inner = async {
651                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
652                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
653                    })?;
654
655                    if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
656                        for (receiver_id, input_id) in &inputs {
657                            close_input(dataflow, receiver_id, input_id, &self.clock);
658                        }
659                    }
660                    Result::<(), eyre::Report>::Ok(())
661                };
662                if let Err(err) = inner
663                    .await
664                    .wrap_err("failed to handle InputsClosed event sent by coordinator")
665                {
666                    logger
667                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
668                        .await;
669                }
670                Ok(())
671            }
672        }
673    }
674
675    async fn spawn_dataflow(
676        &mut self,
677        dataflow_id: uuid::Uuid,
678        working_dir: PathBuf,
679        nodes: BTreeMap<NodeId, ResolvedNode>,
680        dataflow_descriptor: Descriptor,
681        spawn_nodes: BTreeSet<NodeId>,
682        uv: bool,
683    ) -> eyre::Result<()> {
684        let mut logger = self.logger.for_dataflow(dataflow_id);
685        let dataflow =
686            RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
687        let dataflow = match self.running.entry(dataflow_id) {
688            std::collections::hash_map::Entry::Vacant(entry) => {
689                self.working_dir.insert(dataflow_id, working_dir.clone());
690                entry.insert(dataflow)
691            }
692            std::collections::hash_map::Entry::Occupied(_) => {
693                bail!("there is already a running dataflow with ID `{dataflow_id}`")
694            }
695        };
696
697        let mut stopped = Vec::new();
698
699        // calculate info about mappings
700        for node in nodes.values() {
701            let local = spawn_nodes.contains(&node.id);
702
703            let inputs = node_inputs(node);
704            for (input_id, input) in inputs {
705                if local {
706                    dataflow
707                        .open_inputs
708                        .entry(node.id.clone())
709                        .or_default()
710                        .insert(input_id.clone());
711                    match input.mapping {
712                        InputMapping::User(mapping) => {
713                            dataflow
714                                .mappings
715                                .entry(OutputId(mapping.source, mapping.output))
716                                .or_default()
717                                .insert((node.id.clone(), input_id));
718                        }
719                        InputMapping::Timer { interval } => {
720                            dataflow
721                                .timers
722                                .entry(interval)
723                                .or_default()
724                                .insert((node.id.clone(), input_id));
725                        }
726                    }
727                } else if let InputMapping::User(mapping) = input.mapping {
728                    dataflow
729                        .open_external_mappings
730                        .insert(OutputId(mapping.source, mapping.output));
731                }
732            }
733        }
734
735        // spawn nodes and set up subscriptions
736        for node in nodes.into_values() {
737            let mut logger = logger.reborrow().for_node(node.id.clone());
738            let local = spawn_nodes.contains(&node.id);
739            if local {
740                if node.kind.dynamic() {
741                    dataflow.dynamic_nodes.insert(node.id.clone());
742                } else {
743                    dataflow.pending_nodes.insert(node.id.clone());
744                }
745
746                let node_id = node.id.clone();
747                let node_stderr_most_recent = dataflow
748                    .node_stderr_most_recent
749                    .entry(node.id.clone())
750                    .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
751                    .clone();
752                logger
753                    .log(LogLevel::Info, Some("daemon".into()), "spawning")
754                    .await;
755                match spawn::spawn_node(
756                    dataflow_id,
757                    &working_dir,
758                    node,
759                    self.events_tx.clone(),
760                    dataflow_descriptor.clone(),
761                    self.clock.clone(),
762                    node_stderr_most_recent,
763                    uv,
764                    &mut logger,
765                )
766                .await
767                .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
768                {
769                    Ok(running_node) => {
770                        dataflow.running_nodes.insert(node_id, running_node);
771                    }
772                    Err(err) => {
773                        logger
774                            .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
775                            .await;
776                        self.dataflow_node_results
777                            .entry(dataflow_id)
778                            .or_default()
779                            .insert(
780                                node_id.clone(),
781                                Err(NodeError {
782                                    timestamp: self.clock.new_timestamp(),
783                                    cause: NodeErrorCause::Other {
784                                        stderr: format!("spawn failed: {err:?}"),
785                                    },
786                                    exit_status: NodeExitStatus::Unknown,
787                                }),
788                            );
789                        stopped.push(node_id.clone());
790                    }
791                }
792            } else {
793                // wait until node is ready before starting
794                dataflow.pending_nodes.set_external_nodes(true);
795
796                // subscribe to all node outputs that are mapped to some local inputs
797                for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
798                    let tx = self
799                        .remote_daemon_events_tx
800                        .clone()
801                        .wrap_err("no remote_daemon_events_tx channel")?;
802                    let mut finished_rx = dataflow.finished_tx.subscribe();
803                    let subscribe_topic = dataflow.output_publish_topic(output_id);
804                    tracing::debug!("declaring subscriber on {subscribe_topic}");
805                    let subscriber = self
806                        .zenoh_session
807                        .declare_subscriber(subscribe_topic)
808                        .await
809                        .map_err(|e| eyre!(e))
810                        .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
811                    tokio::spawn(async move {
812                        let mut finished = pin!(finished_rx.recv());
813                        loop {
814                            let finished_or_next =
815                                futures::future::select(finished, subscriber.recv_async());
816                            match finished_or_next.await {
817                                future::Either::Left((finished, _)) => {
818                                    match finished {
819                                        Err(broadcast::error::RecvError::Closed) => {
820                                            tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
821                                            break;
822                                        }
823                                        other => {
824                                            tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
825                                            break;
826                                        }
827                                    }
828                                }
829                                future::Either::Right((sample, f)) => {
830                                    finished = f;
831                                    let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
832                                        Timestamped::deserialize_inter_daemon_event(
833                                            &s.payload().to_bytes(),
834                                        )
835                                    });
836                                    if tx.send_async(event).await.is_err() {
837                                        // daemon finished
838                                        break;
839                                    }
840                                }
841                            }
842                        }
843                    });
844                }
845            }
846        }
847        for node_id in stopped {
848            self.handle_node_stop(dataflow_id, &node_id).await?;
849        }
850
851        Ok(())
852    }
853
854    async fn handle_dynamic_node_event(
855        &mut self,
856        event: DynamicNodeEventWrapper,
857    ) -> eyre::Result<()> {
858        match event {
859            DynamicNodeEventWrapper {
860                event: DynamicNodeEvent::NodeConfig { node_id },
861                reply_tx,
862            } => {
863                let number_node_id = self
864                    .running
865                    .iter()
866                    .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
867                    .count();
868
869                let node_config = match number_node_id {
870                    2.. => Err(format!(
871                        "multiple dataflows contains dynamic node id {node_id}. \
872                        Please only have one running dataflow with the specified \
873                        node id if you want to use dynamic node",
874                    )),
875                    1 => self
876                        .running
877                        .iter()
878                        .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
879                        .map(|(id, dataflow)| -> Result<NodeConfig> {
880                            let node_config = dataflow
881                                .running_nodes
882                                .get(&node_id)
883                                .context("no node with ID `{node_id}` within the given dataflow")?
884                                .node_config
885                                .clone();
886                            if !node_config.dynamic {
887                                bail!("node with ID `{node_id}` in {id} is not dynamic");
888                            }
889                            Ok(node_config)
890                        })
891                        .next()
892                        .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
893                        .and_then(|r| r)
894                        .map_err(|err| {
895                            format!(
896                                "failed to get dynamic node config within given dataflow: {err}"
897                            )
898                        }),
899                    0 => Err("no node with ID `{node_id}`".to_string()),
900                };
901
902                let reply = DaemonReply::NodeConfig {
903                    result: node_config,
904                };
905                let _ = reply_tx.send(Some(reply)).map_err(|_| {
906                    error!("could not send node info reply from daemon to coordinator")
907                });
908                Ok(())
909            }
910        }
911    }
912
913    async fn handle_node_event(
914        &mut self,
915        event: DaemonNodeEvent,
916        dataflow_id: DataflowId,
917        node_id: NodeId,
918    ) -> eyre::Result<()> {
919        match event {
920            DaemonNodeEvent::Subscribe {
921                event_sender,
922                reply_sender,
923            } => {
924                let mut logger = self.logger.for_dataflow(dataflow_id);
925                logger
926                    .log(
927                        LogLevel::Info,
928                        Some(node_id.clone()),
929                        Some("daemon".into()),
930                        "node is ready",
931                    )
932                    .await;
933
934                let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
935                    format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
936                });
937
938                match dataflow {
939                    Err(err) => {
940                        let _ = reply_sender.send(DaemonReply::Result(Err(err)));
941                    }
942                    Ok(dataflow) => {
943                        Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
944
945                        let status = dataflow
946                            .pending_nodes
947                            .handle_node_subscription(
948                                node_id.clone(),
949                                reply_sender,
950                                &mut self.coordinator_connection,
951                                &self.clock,
952                                &mut dataflow.cascading_error_causes,
953                                &mut logger,
954                            )
955                            .await?;
956                        match status {
957                            DataflowStatus::AllNodesReady => {
958                                logger
959                                    .log(
960                                        LogLevel::Info,
961                                        None,
962                                        Some("daemon".into()),
963                                        "all nodes are ready, starting dataflow",
964                                    )
965                                    .await;
966                                dataflow.start(&self.events_tx, &self.clock).await?;
967                            }
968                            DataflowStatus::Pending => {}
969                        }
970                    }
971                }
972            }
973            DaemonNodeEvent::SubscribeDrop {
974                event_sender,
975                reply_sender,
976            } => {
977                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
978                    format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
979                });
980                let result = match dataflow {
981                    Ok(dataflow) => {
982                        dataflow.drop_channels.insert(node_id, event_sender);
983                        Ok(())
984                    }
985                    Err(err) => Err(err.to_string()),
986                };
987                let _ = reply_sender.send(DaemonReply::Result(result));
988            }
989            DaemonNodeEvent::CloseOutputs {
990                outputs,
991                reply_sender,
992            } => {
993                // notify downstream nodes
994                let inner = async {
995                    self.send_output_closed_events(dataflow_id, node_id, outputs)
996                        .await
997                };
998
999                let reply = inner.await.map_err(|err| format!("{err:?}"));
1000                let _ = reply_sender.send(DaemonReply::Result(reply));
1001            }
1002            DaemonNodeEvent::OutputsDone { reply_sender } => {
1003                let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1004
1005                let _ = reply_sender.send(DaemonReply::Result(
1006                    result.map_err(|err| format!("{err:?}")),
1007                ));
1008            }
1009            DaemonNodeEvent::SendOut {
1010                output_id,
1011                metadata,
1012                data,
1013            } => self
1014                .send_out(dataflow_id, node_id, output_id, metadata, data)
1015                .await
1016                .context("failed to send out")?,
1017            DaemonNodeEvent::ReportDrop { tokens } => {
1018                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1019                    format!(
1020                        "failed to get handle drop tokens: \
1021                        no running dataflow with ID `{dataflow_id}`"
1022                    )
1023                });
1024
1025                match dataflow {
1026                    Ok(dataflow) => {
1027                        for token in tokens {
1028                            match dataflow.pending_drop_tokens.get_mut(&token) {
1029                                Some(info) => {
1030                                    if info.pending_nodes.remove(&node_id) {
1031                                        dataflow.check_drop_token(token, &self.clock).await?;
1032                                    } else {
1033                                        tracing::warn!(
1034                                            "node `{node_id}` is not pending for drop token `{token:?}`"
1035                                        );
1036                                    }
1037                                }
1038                                None => tracing::warn!("unknown drop token `{token:?}`"),
1039                            }
1040                        }
1041                    }
1042                    Err(err) => tracing::warn!("{err:?}"),
1043                }
1044            }
1045            DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1046                let inner = async {
1047                    let dataflow = self
1048                        .running
1049                        .get_mut(&dataflow_id)
1050                        .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1051                    dataflow.subscribe_channels.remove(&node_id);
1052                    Result::<_, eyre::Error>::Ok(())
1053                };
1054
1055                let reply = inner.await.map_err(|err| format!("{err:?}"));
1056                let _ = reply_sender.send(DaemonReply::Result(reply));
1057            }
1058        }
1059        Ok(())
1060    }
1061
1062    async fn send_reload(
1063        &mut self,
1064        dataflow_id: Uuid,
1065        node_id: NodeId,
1066        operator_id: Option<OperatorId>,
1067    ) -> Result<(), eyre::ErrReport> {
1068        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1069            format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1070        })?;
1071        if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1072            match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1073                Ok(()) => {}
1074                Err(_) => {
1075                    dataflow.subscribe_channels.remove(&node_id);
1076                }
1077            }
1078        }
1079        Ok(())
1080    }
1081
1082    async fn send_out(
1083        &mut self,
1084        dataflow_id: Uuid,
1085        node_id: NodeId,
1086        output_id: DataId,
1087        metadata: dora_message::metadata::Metadata,
1088        data: Option<DataMessage>,
1089    ) -> Result<(), eyre::ErrReport> {
1090        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1091            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1092        })?;
1093        let data_bytes = send_output_to_local_receivers(
1094            node_id.clone(),
1095            output_id.clone(),
1096            dataflow,
1097            &metadata,
1098            data,
1099            &self.clock,
1100        )
1101        .await?;
1102
1103        let output_id = OutputId(node_id, output_id);
1104        let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1105            || dataflow.publish_all_messages_to_zenoh;
1106        if remote_receivers {
1107            let event = InterDaemonEvent::Output {
1108                dataflow_id,
1109                node_id: output_id.0.clone(),
1110                output_id: output_id.1.clone(),
1111                metadata,
1112                data: data_bytes,
1113            };
1114            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1115                .await?;
1116        }
1117
1118        Ok(())
1119    }
1120
1121    async fn send_to_remote_receivers(
1122        &mut self,
1123        dataflow_id: Uuid,
1124        output_id: &OutputId,
1125        event: InterDaemonEvent,
1126    ) -> Result<(), eyre::Error> {
1127        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1128            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1129        })?;
1130
1131        // publish via zenoh
1132        let publisher = match dataflow.publishers.get(output_id) {
1133            Some(publisher) => publisher,
1134            None => {
1135                let publish_topic = dataflow.output_publish_topic(output_id);
1136                tracing::debug!("declaring publisher on {publish_topic}");
1137                let publisher = self
1138                    .zenoh_session
1139                    .declare_publisher(publish_topic)
1140                    .await
1141                    .map_err(|e| eyre!(e))
1142                    .context("failed to create zenoh publisher")?;
1143                dataflow.publishers.insert(output_id.clone(), publisher);
1144                dataflow.publishers.get(output_id).unwrap()
1145            }
1146        };
1147
1148        let serialized_event = Timestamped {
1149            inner: event,
1150            timestamp: self.clock.new_timestamp(),
1151        }
1152        .serialize();
1153        publisher
1154            .put(serialized_event)
1155            .await
1156            .map_err(|e| eyre!(e))
1157            .context("zenoh put failed")?;
1158        Ok(())
1159    }
1160
1161    async fn send_output_closed_events(
1162        &mut self,
1163        dataflow_id: DataflowId,
1164        node_id: NodeId,
1165        outputs: Vec<DataId>,
1166    ) -> eyre::Result<()> {
1167        let dataflow = self
1168            .running
1169            .get_mut(&dataflow_id)
1170            .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1171        let local_node_inputs: BTreeSet<_> = dataflow
1172            .mappings
1173            .iter()
1174            .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1175            .flat_map(|(_, v)| v)
1176            .cloned()
1177            .collect();
1178        for (receiver_id, input_id) in &local_node_inputs {
1179            close_input(dataflow, receiver_id, input_id, &self.clock);
1180        }
1181
1182        let mut closed = Vec::new();
1183        for output_id in &dataflow.open_external_mappings {
1184            if output_id.0 == node_id && outputs.contains(&output_id.1) {
1185                closed.push(output_id.clone());
1186            }
1187        }
1188
1189        for output_id in closed {
1190            let event = InterDaemonEvent::OutputClosed {
1191                dataflow_id,
1192                node_id: output_id.0.clone(),
1193                output_id: output_id.1.clone(),
1194            };
1195            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1196                .await?;
1197        }
1198
1199        Ok(())
1200    }
1201
1202    async fn subscribe(
1203        dataflow: &mut RunningDataflow,
1204        node_id: NodeId,
1205        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1206        clock: &HLC,
1207    ) {
1208        // some inputs might have been closed already -> report those events
1209        let closed_inputs = dataflow
1210            .mappings
1211            .values()
1212            .flatten()
1213            .filter(|(node, _)| node == &node_id)
1214            .map(|(_, input)| input)
1215            .filter(|input| {
1216                dataflow
1217                    .open_inputs
1218                    .get(&node_id)
1219                    .map(|open_inputs| !open_inputs.contains(*input))
1220                    .unwrap_or(true)
1221            });
1222        for input_id in closed_inputs {
1223            let _ = send_with_timestamp(
1224                &event_sender,
1225                NodeEvent::InputClosed {
1226                    id: input_id.clone(),
1227                },
1228                clock,
1229            );
1230        }
1231        if dataflow.open_inputs(&node_id).is_empty() {
1232            let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1233        }
1234
1235        // if a stop event was already sent for the dataflow, send it to
1236        // the newly connected node too
1237        if dataflow.stop_sent {
1238            let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1239        }
1240
1241        dataflow.subscribe_channels.insert(node_id, event_sender);
1242    }
1243
1244    #[tracing::instrument(skip(self), level = "trace")]
1245    async fn handle_outputs_done(
1246        &mut self,
1247        dataflow_id: DataflowId,
1248        node_id: &NodeId,
1249    ) -> eyre::Result<()> {
1250        let dataflow = self
1251            .running
1252            .get_mut(&dataflow_id)
1253            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1254
1255        let outputs = dataflow
1256            .mappings
1257            .keys()
1258            .filter(|m| &m.0 == node_id)
1259            .map(|m| &m.1)
1260            .cloned()
1261            .collect();
1262        self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1263            .await?;
1264
1265        let dataflow = self
1266            .running
1267            .get_mut(&dataflow_id)
1268            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1269        dataflow.drop_channels.remove(node_id);
1270        Ok(())
1271    }
1272
1273    async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
1274        let mut logger = self.logger.for_dataflow(dataflow_id);
1275        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1276            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1277        })?;
1278
1279        dataflow
1280            .pending_nodes
1281            .handle_node_stop(
1282                node_id,
1283                &mut self.coordinator_connection,
1284                &self.clock,
1285                &mut dataflow.cascading_error_causes,
1286                &mut logger,
1287            )
1288            .await?;
1289
1290        self.handle_outputs_done(dataflow_id, node_id).await?;
1291
1292        let mut logger = self.logger.for_dataflow(dataflow_id);
1293        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1294            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1295        })?;
1296        if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1297            pid.mark_as_stopped()
1298        }
1299        if dataflow
1300            .running_nodes
1301            .iter()
1302            .all(|(_id, n)| n.node_config.dynamic)
1303        {
1304            let result = DataflowDaemonResult {
1305                timestamp: self.clock.new_timestamp(),
1306                node_results: self
1307                    .dataflow_node_results
1308                    .get(&dataflow.id)
1309                    .context("failed to get dataflow node results")?
1310                    .clone(),
1311            };
1312
1313            logger
1314                .log(
1315                    LogLevel::Info,
1316                    None,
1317                    Some("daemon".into()),
1318                    format!("dataflow finished on machine `{}`", self.daemon_id),
1319                )
1320                .await;
1321            if let Some(connection) = &mut self.coordinator_connection {
1322                let msg = serde_json::to_vec(&Timestamped {
1323                    inner: CoordinatorRequest::Event {
1324                        daemon_id: self.daemon_id.clone(),
1325                        event: DaemonEvent::AllNodesFinished {
1326                            dataflow_id,
1327                            result,
1328                        },
1329                    },
1330                    timestamp: self.clock.new_timestamp(),
1331                })?;
1332                socket_stream_send(connection, &msg)
1333                    .await
1334                    .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1335            }
1336            self.running.remove(&dataflow_id);
1337        }
1338
1339        Ok(())
1340    }
1341
1342    async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<RunStatus> {
1343        match event {
1344            DoraEvent::Timer {
1345                dataflow_id,
1346                interval,
1347                metadata,
1348            } => {
1349                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1350                    tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1351                    return Ok(RunStatus::Continue);
1352                };
1353
1354                let Some(subscribers) = dataflow.timers.get(&interval) else {
1355                    return Ok(RunStatus::Continue);
1356                };
1357
1358                let mut closed = Vec::new();
1359                for (receiver_id, input_id) in subscribers {
1360                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1361                        continue;
1362                    };
1363
1364                    let send_result = send_with_timestamp(
1365                        channel,
1366                        NodeEvent::Input {
1367                            id: input_id.clone(),
1368                            metadata: metadata.clone(),
1369                            data: None,
1370                        },
1371                        &self.clock,
1372                    );
1373                    match send_result {
1374                        Ok(()) => {}
1375                        Err(_) => {
1376                            closed.push(receiver_id);
1377                        }
1378                    }
1379                }
1380                for id in closed {
1381                    dataflow.subscribe_channels.remove(id);
1382                }
1383            }
1384            DoraEvent::Logs {
1385                dataflow_id,
1386                output_id,
1387                message,
1388                metadata,
1389            } => {
1390                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1391                    tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
1392                    return Ok(RunStatus::Continue);
1393                };
1394
1395                let Some(subscribers) = dataflow.mappings.get(&output_id) else {
1396                    tracing::warn!(
1397                        "No subscribers found for {:?} in {:?}",
1398                        output_id,
1399                        dataflow.mappings
1400                    );
1401                    return Ok(RunStatus::Continue);
1402                };
1403
1404                let mut closed = Vec::new();
1405                for (receiver_id, input_id) in subscribers {
1406                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1407                        tracing::warn!("No subscriber channel found for {:?}", output_id);
1408                        continue;
1409                    };
1410
1411                    let send_result = send_with_timestamp(
1412                        channel,
1413                        NodeEvent::Input {
1414                            id: input_id.clone(),
1415                            metadata: metadata.clone(),
1416                            data: Some(message.clone()),
1417                        },
1418                        &self.clock,
1419                    );
1420                    match send_result {
1421                        Ok(()) => {}
1422                        Err(_) => {
1423                            closed.push(receiver_id);
1424                        }
1425                    }
1426                }
1427                for id in closed {
1428                    dataflow.subscribe_channels.remove(id);
1429                }
1430            }
1431            DoraEvent::SpawnedNodeResult {
1432                dataflow_id,
1433                node_id,
1434                exit_status,
1435            } => {
1436                let mut logger = self
1437                    .logger
1438                    .for_dataflow(dataflow_id)
1439                    .for_node(node_id.clone());
1440                logger
1441                    .log(
1442                        LogLevel::Debug,
1443                        Some("daemon".into()),
1444                        format!("handling node stop with exit status {exit_status:?}"),
1445                    )
1446                    .await;
1447
1448                let node_result = match exit_status {
1449                    NodeExitStatus::Success => Ok(()),
1450                    exit_status => {
1451                        let dataflow = self.running.get(&dataflow_id);
1452                        let caused_by_node = dataflow
1453                            .and_then(|dataflow| {
1454                                dataflow.cascading_error_causes.error_caused_by(&node_id)
1455                            })
1456                            .cloned();
1457                        let grace_duration_kill = dataflow
1458                            .map(|d| d.grace_duration_kills.contains(&node_id))
1459                            .unwrap_or_default();
1460
1461                        let cause = match caused_by_node {
1462                            Some(caused_by_node) => {
1463                                logger
1464                                    .log(
1465                                        LogLevel::Info,
1466                                        Some("daemon".into()),
1467                                        format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
1468                                    )
1469                                    .await;
1470
1471                                NodeErrorCause::Cascading { caused_by_node }
1472                            }
1473                            None if grace_duration_kill => NodeErrorCause::GraceDuration,
1474                            None => {
1475                                let cause = dataflow
1476                                    .and_then(|d| d.node_stderr_most_recent.get(&node_id))
1477                                    .map(|queue| {
1478                                        let mut s = if queue.is_full() {
1479                                            "[...]".into()
1480                                        } else {
1481                                            String::new()
1482                                        };
1483                                        while let Some(line) = queue.pop() {
1484                                            s += &line;
1485                                        }
1486                                        s
1487                                    })
1488                                    .unwrap_or_default();
1489
1490                                NodeErrorCause::Other { stderr: cause }
1491                            }
1492                        };
1493                        Err(NodeError {
1494                            timestamp: self.clock.new_timestamp(),
1495                            cause,
1496                            exit_status,
1497                        })
1498                    }
1499                };
1500
1501                logger
1502                    .log(
1503                        if node_result.is_ok() {
1504                            LogLevel::Info
1505                        } else {
1506                            LogLevel::Error
1507                        },
1508                        Some("daemon".into()),
1509                        match &node_result {
1510                            Ok(()) => format!("{node_id} finished successfully"),
1511                            Err(err) => format!("{err}"),
1512                        },
1513                    )
1514                    .await;
1515
1516                self.dataflow_node_results
1517                    .entry(dataflow_id)
1518                    .or_default()
1519                    .insert(node_id.clone(), node_result);
1520
1521                self.handle_node_stop(dataflow_id, &node_id).await?;
1522
1523                if let Some(exit_when_done) = &mut self.exit_when_done {
1524                    exit_when_done.remove(&(dataflow_id, node_id));
1525                    if exit_when_done.is_empty() {
1526                        tracing::info!(
1527                            "exiting daemon because all required dataflows are finished"
1528                        );
1529                        return Ok(RunStatus::Exit);
1530                    }
1531                }
1532                if self.exit_when_all_finished && self.running.is_empty() {
1533                    return Ok(RunStatus::Exit);
1534                }
1535            }
1536        }
1537        Ok(RunStatus::Continue)
1538    }
1539}
1540
1541async fn set_up_event_stream(
1542    coordinator_addr: SocketAddr,
1543    machine_id: &Option<String>,
1544    clock: &Arc<HLC>,
1545    remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
1546    // used for dynamic nodes
1547    local_listen_port: u16,
1548) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
1549    let clock_cloned = clock.clone();
1550    let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
1551        Ok(e) => Timestamped {
1552            inner: Event::Daemon(e.inner),
1553            timestamp: e.timestamp,
1554        },
1555        Err(err) => Timestamped {
1556            inner: Event::DaemonError(err),
1557            timestamp: clock_cloned.new_timestamp(),
1558        },
1559    });
1560    let (daemon_id, coordinator_events) =
1561        coordinator::register(coordinator_addr, machine_id.clone(), clock)
1562            .await
1563            .wrap_err("failed to connect to dora-coordinator")?;
1564    let coordinator_events = coordinator_events.map(
1565        |Timestamped {
1566             inner: event,
1567             timestamp,
1568         }| Timestamped {
1569            inner: Event::Coordinator(event),
1570            timestamp,
1571        },
1572    );
1573    let (events_tx, events_rx) = flume::bounded(10);
1574    let _listen_port =
1575        local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
1576            .await?;
1577    let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
1578        inner: Event::DynamicNode(e.inner),
1579        timestamp: e.timestamp,
1580    });
1581    let incoming = (
1582        coordinator_events,
1583        remote_daemon_events,
1584        dynamic_node_events,
1585    )
1586        .merge();
1587    Ok((daemon_id, incoming))
1588}
1589
1590async fn send_output_to_local_receivers(
1591    node_id: NodeId,
1592    output_id: DataId,
1593    dataflow: &mut RunningDataflow,
1594    metadata: &metadata::Metadata,
1595    data: Option<DataMessage>,
1596    clock: &HLC,
1597) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
1598    let timestamp = metadata.timestamp();
1599    let empty_set = BTreeSet::new();
1600    let output_id = OutputId(node_id, output_id);
1601    let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
1602    let OutputId(node_id, _) = output_id;
1603    let mut closed = Vec::new();
1604    for (receiver_id, input_id) in local_receivers {
1605        if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1606            let item = NodeEvent::Input {
1607                id: input_id.clone(),
1608                metadata: metadata.clone(),
1609                data: data.clone(),
1610            };
1611            match channel.send(Timestamped {
1612                inner: item,
1613                timestamp,
1614            }) {
1615                Ok(()) => {
1616                    if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
1617                        dataflow
1618                            .pending_drop_tokens
1619                            .entry(token)
1620                            .or_insert_with(|| DropTokenInformation {
1621                                owner: node_id.clone(),
1622                                pending_nodes: Default::default(),
1623                            })
1624                            .pending_nodes
1625                            .insert(receiver_id.clone());
1626                    }
1627                }
1628                Err(_) => {
1629                    closed.push(receiver_id);
1630                }
1631            }
1632        }
1633    }
1634    for id in closed {
1635        dataflow.subscribe_channels.remove(id);
1636    }
1637    let (data_bytes, drop_token) = match data {
1638        None => (None, None),
1639        Some(DataMessage::SharedMemory {
1640            shared_memory_id,
1641            len,
1642            drop_token,
1643        }) => {
1644            let memory = ShmemConf::new()
1645                .os_id(shared_memory_id)
1646                .open()
1647                .wrap_err("failed to map shared memory output")?;
1648            let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
1649            (data, Some(drop_token))
1650        }
1651        Some(DataMessage::Vec(v)) => (Some(v), None),
1652    };
1653    if let Some(token) = drop_token {
1654        // insert token into `pending_drop_tokens` even if there are no local subscribers
1655        dataflow
1656            .pending_drop_tokens
1657            .entry(token)
1658            .or_insert_with(|| DropTokenInformation {
1659                owner: node_id.clone(),
1660                pending_nodes: Default::default(),
1661            });
1662        // check if all local subscribers are finished with the token
1663        dataflow.check_drop_token(token, clock).await?;
1664    }
1665    Ok(data_bytes)
1666}
1667
1668fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
1669    match &node.kind {
1670        CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
1671        CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
1672    }
1673}
1674
1675fn close_input(
1676    dataflow: &mut RunningDataflow,
1677    receiver_id: &NodeId,
1678    input_id: &DataId,
1679    clock: &HLC,
1680) {
1681    if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
1682        if !open_inputs.remove(input_id) {
1683            return;
1684        }
1685    }
1686    if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1687        let _ = send_with_timestamp(
1688            channel,
1689            NodeEvent::InputClosed {
1690                id: input_id.clone(),
1691            },
1692            clock,
1693        );
1694
1695        if dataflow.open_inputs(receiver_id).is_empty() {
1696            let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
1697        }
1698    }
1699}
1700
1701#[derive(Debug)]
1702struct RunningNode {
1703    pid: Option<ProcessId>,
1704    node_config: NodeConfig,
1705}
1706
1707#[derive(Debug)]
1708struct ProcessId(Option<u32>);
1709
1710impl ProcessId {
1711    pub fn new(process_id: u32) -> Self {
1712        Self(Some(process_id))
1713    }
1714
1715    pub fn mark_as_stopped(&mut self) {
1716        self.0 = None;
1717    }
1718
1719    pub fn kill(&mut self) -> bool {
1720        if let Some(pid) = self.0 {
1721            let mut system = sysinfo::System::new();
1722            system.refresh_processes();
1723
1724            if let Some(process) = system.process(Pid::from(pid as usize)) {
1725                process.kill();
1726                self.mark_as_stopped();
1727                return true;
1728            }
1729        }
1730
1731        false
1732    }
1733}
1734
1735impl Drop for ProcessId {
1736    fn drop(&mut self) {
1737        // kill the process if it's still running
1738        if let Some(pid) = self.0 {
1739            if self.kill() {
1740                warn!("process {pid} was killed on drop because it was still running")
1741            }
1742        }
1743    }
1744}
1745
1746pub struct RunningDataflow {
1747    id: Uuid,
1748    /// Local nodes that are not started yet
1749    pending_nodes: PendingNodes,
1750
1751    subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
1752    drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
1753    mappings: HashMap<OutputId, BTreeSet<InputId>>,
1754    timers: BTreeMap<Duration, BTreeSet<InputId>>,
1755    open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
1756    running_nodes: BTreeMap<NodeId, RunningNode>,
1757
1758    /// List of all dynamic node IDs.
1759    ///
1760    /// We want to treat dynamic nodes differently in some cases, so we need
1761    /// to know which nodes are dynamic.
1762    dynamic_nodes: BTreeSet<NodeId>,
1763
1764    open_external_mappings: BTreeSet<OutputId>,
1765
1766    pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
1767
1768    /// Keep handles to all timer tasks of this dataflow to cancel them on drop.
1769    _timer_handles: Vec<futures::future::RemoteHandle<()>>,
1770    stop_sent: bool,
1771
1772    /// Used in `open_inputs`.
1773    ///
1774    /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
1775    empty_set: BTreeSet<DataId>,
1776
1777    /// Contains the node that caused the error for nodes that experienced a cascading error.
1778    cascading_error_causes: CascadingErrorCauses,
1779    grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
1780
1781    node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
1782
1783    publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
1784
1785    finished_tx: broadcast::Sender<()>,
1786
1787    publish_all_messages_to_zenoh: bool,
1788}
1789
1790impl RunningDataflow {
1791    fn new(
1792        dataflow_id: Uuid,
1793        daemon_id: DaemonId,
1794        dataflow_descriptor: &Descriptor,
1795    ) -> RunningDataflow {
1796        let (finished_tx, _) = broadcast::channel(1);
1797        Self {
1798            id: dataflow_id,
1799            pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
1800            subscribe_channels: HashMap::new(),
1801            drop_channels: HashMap::new(),
1802            mappings: HashMap::new(),
1803            timers: BTreeMap::new(),
1804            open_inputs: BTreeMap::new(),
1805            running_nodes: BTreeMap::new(),
1806            dynamic_nodes: BTreeSet::new(),
1807            open_external_mappings: Default::default(),
1808            pending_drop_tokens: HashMap::new(),
1809            _timer_handles: Vec::new(),
1810            stop_sent: false,
1811            empty_set: BTreeSet::new(),
1812            cascading_error_causes: Default::default(),
1813            grace_duration_kills: Default::default(),
1814            node_stderr_most_recent: BTreeMap::new(),
1815            publishers: Default::default(),
1816            finished_tx,
1817            publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
1818        }
1819    }
1820
1821    async fn start(
1822        &mut self,
1823        events_tx: &mpsc::Sender<Timestamped<Event>>,
1824        clock: &Arc<HLC>,
1825    ) -> eyre::Result<()> {
1826        for interval in self.timers.keys().copied() {
1827            let events_tx = events_tx.clone();
1828            let dataflow_id = self.id;
1829            let clock = clock.clone();
1830            let task = async move {
1831                let mut interval_stream = tokio::time::interval(interval);
1832                let hlc = HLC::default();
1833                loop {
1834                    interval_stream.tick().await;
1835
1836                    let span = tracing::span!(tracing::Level::TRACE, "tick");
1837                    let _ = span.enter();
1838
1839                    let mut parameters = BTreeMap::new();
1840                    parameters.insert(
1841                        "open_telemetry_context".to_string(),
1842                        #[cfg(feature = "telemetry")]
1843                        Parameter::String(serialize_context(&span.context())),
1844                        #[cfg(not(feature = "telemetry"))]
1845                        Parameter::String("".into()),
1846                    );
1847
1848                    let metadata = metadata::Metadata::from_parameters(
1849                        hlc.new_timestamp(),
1850                        empty_type_info(),
1851                        parameters,
1852                    );
1853
1854                    let event = Timestamped {
1855                        inner: DoraEvent::Timer {
1856                            dataflow_id,
1857                            interval,
1858                            metadata,
1859                        }
1860                        .into(),
1861                        timestamp: clock.new_timestamp(),
1862                    };
1863                    if events_tx.send(event).await.is_err() {
1864                        break;
1865                    }
1866                }
1867            };
1868            let (task, handle) = task.remote_handle();
1869            tokio::spawn(task);
1870            self._timer_handles.push(handle);
1871        }
1872
1873        Ok(())
1874    }
1875
1876    async fn stop_all(
1877        &mut self,
1878        coordinator_connection: &mut Option<TcpStream>,
1879        clock: &HLC,
1880        grace_duration: Option<Duration>,
1881        logger: &mut DataflowLogger<'_>,
1882    ) -> eyre::Result<()> {
1883        self.pending_nodes
1884            .handle_dataflow_stop(
1885                coordinator_connection,
1886                clock,
1887                &mut self.cascading_error_causes,
1888                &self.dynamic_nodes,
1889                logger,
1890            )
1891            .await?;
1892
1893        for (_node_id, channel) in self.subscribe_channels.drain() {
1894            let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
1895        }
1896
1897        let running_processes: Vec<_> = self
1898            .running_nodes
1899            .iter_mut()
1900            .map(|(id, n)| (id.clone(), n.pid.take()))
1901            .collect();
1902        let grace_duration_kills = self.grace_duration_kills.clone();
1903        tokio::spawn(async move {
1904            let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
1905            tokio::time::sleep(duration).await;
1906
1907            for (node, pid) in running_processes {
1908                if let Some(mut pid) = pid {
1909                    if pid.kill() {
1910                        grace_duration_kills.insert(node.clone());
1911                        warn!(
1912                            "{node} was killed due to not stopping within the {:#?} grace period",
1913                            duration
1914                        )
1915                    }
1916                }
1917            }
1918        });
1919        self.stop_sent = true;
1920        Ok(())
1921    }
1922
1923    fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
1924        self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
1925    }
1926
1927    async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
1928        match self.pending_drop_tokens.entry(token) {
1929            std::collections::hash_map::Entry::Occupied(entry) => {
1930                if entry.get().pending_nodes.is_empty() {
1931                    let (drop_token, info) = entry.remove_entry();
1932                    let result = match self.drop_channels.get_mut(&info.owner) {
1933                        Some(channel) => send_with_timestamp(
1934                            channel,
1935                            NodeDropEvent::OutputDropped { drop_token },
1936                            clock,
1937                        )
1938                        .wrap_err("send failed"),
1939                        None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
1940                    };
1941                    if let Err(err) = result.wrap_err_with(|| {
1942                        format!(
1943                            "failed to report drop token `{drop_token:?}` to owner `{}`",
1944                            &info.owner
1945                        )
1946                    }) {
1947                        tracing::warn!("{err:?}");
1948                    }
1949                }
1950            }
1951            std::collections::hash_map::Entry::Vacant(_) => {
1952                tracing::warn!("check_drop_token called with already closed token")
1953            }
1954        }
1955
1956        Ok(())
1957    }
1958
1959    fn output_publish_topic(&self, output_id: &OutputId) -> String {
1960        let network_id = "default";
1961        let dataflow_id = self.id;
1962        let OutputId(node_id, output_id) = output_id;
1963        format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
1964    }
1965}
1966
1967fn empty_type_info() -> ArrowTypeInfo {
1968    ArrowTypeInfo {
1969        data_type: DataType::Null,
1970        len: 0,
1971        null_count: 0,
1972        validity: None,
1973        offset: 0,
1974        buffer_offsets: Vec::new(),
1975        child_data: Vec::new(),
1976    }
1977}
1978
1979#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
1980pub struct OutputId(NodeId, DataId);
1981type InputId = (NodeId, DataId);
1982
1983struct DropTokenInformation {
1984    /// The node that created the associated drop token.
1985    owner: NodeId,
1986    /// Contains the set of pending nodes that still have access to the input
1987    /// associated with a drop token.
1988    pending_nodes: BTreeSet<NodeId>,
1989}
1990
1991#[derive(Debug)]
1992pub enum Event {
1993    Node {
1994        dataflow_id: DataflowId,
1995        node_id: NodeId,
1996        event: DaemonNodeEvent,
1997    },
1998    Coordinator(CoordinatorEvent),
1999    Daemon(InterDaemonEvent),
2000    Dora(DoraEvent),
2001    DynamicNode(DynamicNodeEventWrapper),
2002    HeartbeatInterval,
2003    CtrlC,
2004    SecondCtrlC,
2005    DaemonError(eyre::Report),
2006}
2007
2008impl From<DoraEvent> for Event {
2009    fn from(event: DoraEvent) -> Self {
2010        Event::Dora(event)
2011    }
2012}
2013
2014#[derive(Debug)]
2015pub enum DaemonNodeEvent {
2016    OutputsDone {
2017        reply_sender: oneshot::Sender<DaemonReply>,
2018    },
2019    Subscribe {
2020        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2021        reply_sender: oneshot::Sender<DaemonReply>,
2022    },
2023    SubscribeDrop {
2024        event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2025        reply_sender: oneshot::Sender<DaemonReply>,
2026    },
2027    CloseOutputs {
2028        outputs: Vec<dora_core::config::DataId>,
2029        reply_sender: oneshot::Sender<DaemonReply>,
2030    },
2031    SendOut {
2032        output_id: DataId,
2033        metadata: metadata::Metadata,
2034        data: Option<DataMessage>,
2035    },
2036    ReportDrop {
2037        tokens: Vec<DropToken>,
2038    },
2039    EventStreamDropped {
2040        reply_sender: oneshot::Sender<DaemonReply>,
2041    },
2042}
2043
2044#[derive(Debug)]
2045pub enum DoraEvent {
2046    Timer {
2047        dataflow_id: DataflowId,
2048        interval: Duration,
2049        metadata: metadata::Metadata,
2050    },
2051    Logs {
2052        dataflow_id: DataflowId,
2053        output_id: OutputId,
2054        message: DataMessage,
2055        metadata: metadata::Metadata,
2056    },
2057    SpawnedNodeResult {
2058        dataflow_id: DataflowId,
2059        node_id: NodeId,
2060        exit_status: NodeExitStatus,
2061    },
2062}
2063
2064#[must_use]
2065enum RunStatus {
2066    Continue,
2067    Exit,
2068}
2069
2070fn send_with_timestamp<T>(
2071    sender: &UnboundedSender<Timestamped<T>>,
2072    event: T,
2073    clock: &HLC,
2074) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2075    sender.send(Timestamped {
2076        inner: event,
2077        timestamp: clock.new_timestamp(),
2078    })
2079}
2080
2081fn set_up_ctrlc_handler(
2082    clock: Arc<HLC>,
2083) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2084    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2085
2086    let mut ctrlc_sent = 0;
2087    ctrlc::set_handler(move || {
2088        let event = match ctrlc_sent {
2089            0 => Event::CtrlC,
2090            1 => Event::SecondCtrlC,
2091            _ => {
2092                tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2093                std::process::abort();
2094            }
2095        };
2096        if ctrlc_tx
2097            .blocking_send(Timestamped {
2098                inner: event,
2099                timestamp: clock.new_timestamp(),
2100            })
2101            .is_err()
2102        {
2103            tracing::error!("failed to report ctrl-c event to dora-coordinator");
2104        }
2105
2106        ctrlc_sent += 1;
2107    })
2108    .wrap_err("failed to set ctrl-c handler")?;
2109
2110    Ok(ctrlc_rx)
2111}
2112
2113#[derive(Debug, Default, Clone, PartialEq, Eq)]
2114pub struct CascadingErrorCauses {
2115    caused_by: BTreeMap<NodeId, NodeId>,
2116}
2117
2118impl CascadingErrorCauses {
2119    pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2120        self.caused_by.contains_key(node)
2121    }
2122
2123    /// Return the ID of the node that caused a cascading error for the given node, if any.
2124    pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2125        self.caused_by.get(node)
2126    }
2127
2128    pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2129        self.caused_by.entry(affected_node).or_insert(causing_node);
2130    }
2131}
2132
2133fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2134    n.operators
2135        .iter()
2136        .flat_map(|operator| {
2137            operator.config.inputs.iter().map(|(input_id, mapping)| {
2138                (
2139                    DataId::from(format!("{}/{input_id}", operator.id)),
2140                    mapping.clone(),
2141                )
2142            })
2143        })
2144        .collect()
2145}
2146
2147fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2148    n.operators
2149        .iter()
2150        .flat_map(|operator| {
2151            operator
2152                .config
2153                .outputs
2154                .iter()
2155                .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2156        })
2157        .collect()
2158}
2159
2160trait CoreNodeKindExt {
2161    fn run_config(&self) -> NodeRunConfig;
2162    fn dynamic(&self) -> bool;
2163}
2164
2165impl CoreNodeKindExt for CoreNodeKind {
2166    fn run_config(&self) -> NodeRunConfig {
2167        match self {
2168            CoreNodeKind::Runtime(n) => NodeRunConfig {
2169                inputs: runtime_node_inputs(n),
2170                outputs: runtime_node_outputs(n),
2171            },
2172            CoreNodeKind::Custom(n) => n.run_config.clone(),
2173        }
2174    }
2175
2176    fn dynamic(&self) -> bool {
2177        match self {
2178            CoreNodeKind::Runtime(_n) => false,
2179            CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
2180        }
2181    }
2182}