dora_daemon/
lib.rs

1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5    build::{self, BuildInfo, GitManager, PrevGitSource},
6    config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
7    descriptor::{
8        read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
9        DYNAMIC_SOURCE,
10    },
11    topics::LOCALHOST,
12    uhlc::{self, HLC},
13};
14use dora_message::{
15    common::{
16        DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
17        NodeExitStatus,
18    },
19    coordinator_to_cli::DataflowResult,
20    coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
21    daemon_to_coordinator::{
22        CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
23    },
24    daemon_to_daemon::InterDaemonEvent,
25    daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
26    descriptor::NodeSource,
27    metadata::{self, ArrowTypeInfo},
28    node_to_daemon::{DynamicNodeEvent, Timestamped},
29    BuildId, DataflowId, SessionId,
30};
31use dora_node_api::{arrow::datatypes::DataType, Parameter};
32use eyre::{bail, eyre, Context, ContextCompat, Result};
33use futures::{future, stream, FutureExt, TryFutureExt};
34use futures_concurrency::stream::Merge;
35use local_listener::DynamicNodeEventWrapper;
36use log::{DaemonLogger, DataflowLogger, Logger};
37use pending::PendingNodes;
38use shared_memory_server::ShmemConf;
39use socket_stream_utils::socket_stream_send;
40use spawn::Spawner;
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap},
43    env::current_dir,
44    future::Future,
45    net::SocketAddr,
46    path::{Path, PathBuf},
47    pin::pin,
48    sync::Arc,
49    time::{Duration, Instant},
50};
51use sysinfo::Pid;
52use tokio::{
53    fs::File,
54    io::AsyncReadExt,
55    net::TcpStream,
56    sync::{
57        broadcast,
58        mpsc::{self, UnboundedSender},
59        oneshot::{self, Sender},
60    },
61};
62use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
63use tracing::{error, warn};
64use uuid::{NoContext, Timestamp, Uuid};
65
66pub use flume;
67pub use log::LogDestination;
68
69mod coordinator;
70mod local_listener;
71mod log;
72mod node_communication;
73mod pending;
74mod socket_stream_utils;
75mod spawn;
76
77#[cfg(feature = "telemetry")]
78use dora_tracing::telemetry::serialize_context;
79#[cfg(feature = "telemetry")]
80use tracing_opentelemetry::OpenTelemetrySpanExt;
81
82use crate::pending::DataflowStatus;
83
84const STDERR_LOG_LINES: usize = 10;
85
86pub struct Daemon {
87    running: HashMap<DataflowId, RunningDataflow>,
88    working_dir: HashMap<DataflowId, PathBuf>,
89
90    events_tx: mpsc::Sender<Timestamped<Event>>,
91
92    coordinator_connection: Option<TcpStream>,
93    last_coordinator_heartbeat: Instant,
94    daemon_id: DaemonId,
95
96    /// used for testing and examples
97    exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
98    /// set on ctrl-c
99    exit_when_all_finished: bool,
100    /// used to record results of local nodes
101    dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
102
103    clock: Arc<uhlc::HLC>,
104
105    zenoh_session: zenoh::Session,
106    remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
107
108    logger: DaemonLogger,
109
110    sessions: BTreeMap<SessionId, BuildId>,
111    builds: BTreeMap<BuildId, BuildInfo>,
112    git_manager: GitManager,
113}
114
115type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
116
117struct NodeBuildTask<F> {
118    node_id: NodeId,
119    dynamic_node: bool,
120    task: F,
121}
122
123impl Daemon {
124    pub async fn run(
125        coordinator_addr: SocketAddr,
126        machine_id: Option<String>,
127        local_listen_port: u16,
128    ) -> eyre::Result<()> {
129        let clock = Arc::new(HLC::default());
130
131        let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
132        let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
133        let (daemon_id, incoming_events) = {
134            let incoming_events = set_up_event_stream(
135                coordinator_addr,
136                &machine_id,
137                &clock,
138                remote_daemon_events_rx,
139                local_listen_port,
140            );
141
142            // finish early if ctrl-c is is pressed during event stream setup
143            let ctrl_c = pin!(ctrlc_events.recv());
144            match futures::future::select(ctrl_c, pin!(incoming_events)).await {
145                future::Either::Left((_ctrl_c, _)) => {
146                    tracing::info!("received ctrl-c signal -> stopping daemon");
147                    return Ok(());
148                }
149                future::Either::Right((events, _)) => events?,
150            }
151        };
152
153        let log_destination = {
154            // additional connection for logging
155            let stream = TcpStream::connect(coordinator_addr)
156                .await
157                .wrap_err("failed to connect log to dora-coordinator")?;
158            stream
159                .set_nodelay(true)
160                .wrap_err("failed to set TCP_NODELAY")?;
161            LogDestination::Coordinator {
162                coordinator_connection: stream,
163            }
164        };
165
166        Self::run_general(
167            (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
168            Some(coordinator_addr),
169            daemon_id,
170            None,
171            clock,
172            Some(remote_daemon_events_tx),
173            Default::default(),
174            log_destination,
175        )
176        .await
177        .map(|_| ())
178    }
179
180    pub async fn run_dataflow(
181        dataflow_path: &Path,
182        build_id: Option<BuildId>,
183        local_build: Option<BuildInfo>,
184        session_id: SessionId,
185        uv: bool,
186        log_destination: LogDestination,
187    ) -> eyre::Result<DataflowResult> {
188        let working_dir = dataflow_path
189            .canonicalize()
190            .context("failed to canonicalize dataflow path")?
191            .parent()
192            .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
193            .to_owned();
194
195        let descriptor = read_as_descriptor(dataflow_path).await?;
196        if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
197            eyre::bail!(
198                "node {} has a `deploy` section, which is not supported in `dora run`\n\n
199                Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
200                instances and then use `dora start`.",
201                node.id
202            )
203        }
204
205        descriptor.check(&working_dir)?;
206        let nodes = descriptor.resolve_aliases_and_set_defaults()?;
207
208        let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
209        let spawn_command = SpawnDataflowNodes {
210            build_id,
211            session_id,
212            dataflow_id,
213            local_working_dir: Some(working_dir),
214            spawn_nodes: nodes.keys().cloned().collect(),
215            nodes,
216            dataflow_descriptor: descriptor,
217            uv,
218        };
219
220        let clock = Arc::new(HLC::default());
221
222        let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
223
224        let exit_when_done = spawn_command
225            .nodes
226            .values()
227            .map(|n| (spawn_command.dataflow_id, n.id.clone()))
228            .collect();
229        let (reply_tx, reply_rx) = oneshot::channel();
230        let timestamp = clock.new_timestamp();
231        let coordinator_events = stream::once(async move {
232            Timestamped {
233                inner: Event::Coordinator(CoordinatorEvent {
234                    event: DaemonCoordinatorEvent::Spawn(spawn_command),
235                    reply_tx,
236                }),
237                timestamp,
238            }
239        });
240        let events = (coordinator_events, ctrlc_events).merge();
241        let run_result = Self::run_general(
242            Box::pin(events),
243            None,
244            DaemonId::new(None),
245            Some(exit_when_done),
246            clock.clone(),
247            None,
248            if let Some(local_build) = local_build {
249                let Some(build_id) = build_id else {
250                    bail!("no build_id, but local_build set")
251                };
252                let mut builds = BTreeMap::new();
253                builds.insert(build_id, local_build);
254                builds
255            } else {
256                Default::default()
257            },
258            log_destination,
259        );
260
261        let spawn_result = reply_rx
262            .map_err(|err| eyre!("failed to receive spawn result: {err}"))
263            .and_then(|r| async {
264                match r {
265                    Some(DaemonCoordinatorReply::TriggerSpawnResult(result)) => {
266                        result.map_err(|err| eyre!(err))
267                    }
268                    _ => Err(eyre!("unexpected spawn reply")),
269                }
270            });
271
272        let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
273
274        Ok(DataflowResult {
275            uuid: dataflow_id,
276            timestamp: clock.new_timestamp(),
277            node_results: dataflow_results
278                .remove(&dataflow_id)
279                .context("no node results for dataflow_id")?,
280        })
281    }
282
283    #[allow(clippy::too_many_arguments)]
284    async fn run_general(
285        external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
286        coordinator_addr: Option<SocketAddr>,
287        daemon_id: DaemonId,
288        exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
289        clock: Arc<HLC>,
290        remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
291        builds: BTreeMap<BuildId, BuildInfo>,
292        log_destination: LogDestination,
293    ) -> eyre::Result<DaemonRunResult> {
294        let coordinator_connection = match coordinator_addr {
295            Some(addr) => {
296                let stream = TcpStream::connect(addr)
297                    .await
298                    .wrap_err("failed to connect to dora-coordinator")?;
299                stream
300                    .set_nodelay(true)
301                    .wrap_err("failed to set TCP_NODELAY")?;
302                Some(stream)
303            }
304            None => None,
305        };
306
307        let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
308            Ok(path) => {
309                let zenoh_config = zenoh::Config::from_file(&path)
310                    .map_err(|e| eyre!(e))
311                    .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
312                zenoh::open(zenoh_config)
313                    .await
314                    .map_err(|e| eyre!(e))
315                    .context("failed to open zenoh session")?
316            }
317            Err(std::env::VarError::NotPresent) => {
318                let mut zenoh_config = zenoh::Config::default();
319
320                if let Some(addr) = coordinator_addr {
321                    // Linkstate make it possible to connect two daemons on different network through a public daemon
322                    // TODO: There is currently a CI/CD Error in windows linkstate.
323                    if cfg!(not(target_os = "windows")) {
324                        zenoh_config
325                            .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
326                            .unwrap();
327                    }
328
329                    zenoh_config
330                        .insert_json5(
331                            "connect/endpoints",
332                            &format!(
333                                r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
334                                addr.ip()
335                            ),
336                        )
337                        .unwrap();
338                    zenoh_config
339                        .insert_json5(
340                            "listen/endpoints",
341                            r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
342                        )
343                        .unwrap();
344                    if cfg!(target_os = "macos") {
345                        warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
346                        zenoh_config
347                            .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
348                            .unwrap();
349                    }
350                };
351                if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
352                    zenoh_session
353                } else {
354                    warn!(
355                        "failed to open zenoh session, retrying with default config + coordinator"
356                    );
357                    let mut zenoh_config = zenoh::Config::default();
358                    // Linkstate make it possible to connect two daemons on different network through a public daemon
359                    // TODO: There is currently a CI/CD Error in windows linkstate.
360                    if cfg!(not(target_os = "windows")) {
361                        zenoh_config
362                            .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
363                            .unwrap();
364                    }
365
366                    if let Some(addr) = coordinator_addr {
367                        zenoh_config
368                            .insert_json5(
369                                "connect/endpoints",
370                                &format!(
371                                    r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
372                                    addr.ip()
373                                ),
374                            )
375                            .unwrap();
376                        if cfg!(target_os = "macos") {
377                            warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
378                            zenoh_config
379                                .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
380                                .unwrap();
381                        }
382                    }
383                    if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
384                        zenoh_session
385                    } else {
386                        warn!("failed to open zenoh session, retrying with default config");
387                        let zenoh_config = zenoh::Config::default();
388                        zenoh::open(zenoh_config)
389                            .await
390                            .map_err(|e| eyre!(e))
391                            .context("failed to open zenoh session")?
392                    }
393                }
394            }
395            Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
396                "{} env variable is not valid unicode",
397                zenoh::Config::DEFAULT_CONFIG_PATH_ENV
398            ),
399        };
400        let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
401        let daemon = Self {
402            logger: Logger {
403                destination: log_destination,
404                daemon_id: daemon_id.clone(),
405                clock: clock.clone(),
406            }
407            .for_daemon(daemon_id.clone()),
408            running: HashMap::new(),
409            working_dir: HashMap::new(),
410            events_tx: dora_events_tx,
411            coordinator_connection,
412            last_coordinator_heartbeat: Instant::now(),
413            daemon_id,
414            exit_when_done,
415            exit_when_all_finished: false,
416            dataflow_node_results: BTreeMap::new(),
417            clock,
418            zenoh_session,
419            remote_daemon_events_tx,
420            git_manager: Default::default(),
421            builds,
422            sessions: Default::default(),
423        };
424
425        let dora_events = ReceiverStream::new(dora_events_rx);
426        let watchdog_clock = daemon.clock.clone();
427        let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
428            Duration::from_secs(5),
429        ))
430        .map(|_| Timestamped {
431            inner: Event::HeartbeatInterval,
432            timestamp: watchdog_clock.new_timestamp(),
433        });
434        let events = (external_events, dora_events, watchdog_interval).merge();
435        daemon.run_inner(events).await
436    }
437
438    #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
439    async fn run_inner(
440        mut self,
441        incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
442    ) -> eyre::Result<DaemonRunResult> {
443        let mut events = incoming_events;
444
445        while let Some(event) = events.next().await {
446            let Timestamped { inner, timestamp } = event;
447            if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
448                tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
449            }
450
451            // used below for checking the duration of event handling
452            let start = Instant::now();
453            let event_kind = inner.kind();
454
455            match inner {
456                Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
457                    let status = self.handle_coordinator_event(event, reply_tx).await?;
458
459                    match status {
460                        RunStatus::Continue => {}
461                        RunStatus::Exit => break,
462                    }
463                }
464                Event::Daemon(event) => {
465                    self.handle_inter_daemon_event(event).await?;
466                }
467                Event::Node {
468                    dataflow_id: dataflow,
469                    node_id,
470                    event,
471                } => self.handle_node_event(event, dataflow, node_id).await?,
472                Event::Dora(event) => self.handle_dora_event(event).await?,
473                Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
474                Event::HeartbeatInterval => {
475                    if let Some(connection) = &mut self.coordinator_connection {
476                        let msg = serde_json::to_vec(&Timestamped {
477                            inner: CoordinatorRequest::Event {
478                                daemon_id: self.daemon_id.clone(),
479                                event: DaemonEvent::Heartbeat,
480                            },
481                            timestamp: self.clock.new_timestamp(),
482                        })?;
483                        socket_stream_send(connection, &msg)
484                            .await
485                            .wrap_err("failed to send watchdog message to dora-coordinator")?;
486
487                        if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
488                            bail!("lost connection to coordinator")
489                        }
490                    }
491                }
492                Event::CtrlC => {
493                    tracing::info!("received ctrlc signal -> stopping all dataflows");
494                    for dataflow in self.running.values_mut() {
495                        let mut logger = self.logger.for_dataflow(dataflow.id);
496                        dataflow
497                            .stop_all(
498                                &mut self.coordinator_connection,
499                                &self.clock,
500                                None,
501                                &mut logger,
502                            )
503                            .await?;
504                    }
505                    self.exit_when_all_finished = true;
506                    if self.running.is_empty() {
507                        break;
508                    }
509                }
510                Event::SecondCtrlC => {
511                    tracing::warn!("received second ctrlc signal -> exit immediately");
512                    bail!("received second ctrl-c signal");
513                }
514                Event::DaemonError(err) => {
515                    tracing::error!("Daemon error: {err:?}");
516                }
517                Event::SpawnNodeResult {
518                    dataflow_id,
519                    node_id,
520                    dynamic_node,
521                    result,
522                } => match result {
523                    Ok(running_node) => {
524                        if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
525                            dataflow.running_nodes.insert(node_id, running_node);
526                        } else {
527                            tracing::error!("failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}");
528                        }
529                    }
530                    Err(error) => {
531                        self.dataflow_node_results
532                            .entry(dataflow_id)
533                            .or_default()
534                            .insert(node_id.clone(), Err(error));
535                        self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
536                            .await?;
537                    }
538                },
539                Event::BuildDataflowResult {
540                    build_id,
541                    session_id,
542                    result,
543                } => {
544                    let (build_info, result) = match result {
545                        Ok(build_info) => (Some(build_info), Ok(())),
546                        Err(err) => (None, Err(err)),
547                    };
548                    if let Some(build_info) = build_info {
549                        self.builds.insert(build_id, build_info);
550                        if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
551                            self.builds.remove(&old_build_id);
552                        }
553                    }
554                    if let Some(connection) = &mut self.coordinator_connection {
555                        let msg = serde_json::to_vec(&Timestamped {
556                            inner: CoordinatorRequest::Event {
557                                daemon_id: self.daemon_id.clone(),
558                                event: DaemonEvent::BuildResult {
559                                    build_id,
560                                    result: result.map_err(|err| format!("{err:?}")),
561                                },
562                            },
563                            timestamp: self.clock.new_timestamp(),
564                        })?;
565                        socket_stream_send(connection, &msg).await.wrap_err(
566                            "failed to send BuildDataflowResult message to dora-coordinator",
567                        )?;
568                    }
569                }
570                Event::SpawnDataflowResult {
571                    dataflow_id,
572                    result,
573                } => {
574                    if let Some(connection) = &mut self.coordinator_connection {
575                        let msg = serde_json::to_vec(&Timestamped {
576                            inner: CoordinatorRequest::Event {
577                                daemon_id: self.daemon_id.clone(),
578                                event: DaemonEvent::SpawnResult {
579                                    dataflow_id,
580                                    result: result.map_err(|err| format!("{err:?}")),
581                                },
582                            },
583                            timestamp: self.clock.new_timestamp(),
584                        })?;
585                        socket_stream_send(connection, &msg).await.wrap_err(
586                            "failed to send SpawnDataflowResult message to dora-coordinator",
587                        )?;
588                    }
589                }
590                Event::NodeStopped {
591                    dataflow_id,
592                    node_id,
593                } => {
594                    if let Some(exit_when_done) = &mut self.exit_when_done {
595                        exit_when_done.remove(&(dataflow_id, node_id));
596                        if exit_when_done.is_empty() {
597                            tracing::info!(
598                                "exiting daemon because all required dataflows are finished"
599                            );
600                            break;
601                        }
602                    }
603                    if self.exit_when_all_finished && self.running.is_empty() {
604                        break;
605                    }
606                }
607            }
608
609            // warn if event handling took too long -> the main loop should never be blocked for too long
610            let elapsed = start.elapsed();
611            if elapsed > Duration::from_millis(100) {
612                tracing::warn!(
613                    "Daemon took {}ms for handling event: {event_kind}",
614                    elapsed.as_millis()
615                );
616            }
617        }
618
619        if let Some(mut connection) = self.coordinator_connection.take() {
620            let msg = serde_json::to_vec(&Timestamped {
621                inner: CoordinatorRequest::Event {
622                    daemon_id: self.daemon_id.clone(),
623                    event: DaemonEvent::Exit,
624                },
625                timestamp: self.clock.new_timestamp(),
626            })?;
627            socket_stream_send(&mut connection, &msg)
628                .await
629                .wrap_err("failed to send Exit message to dora-coordinator")?;
630        }
631
632        Ok(self.dataflow_node_results)
633    }
634
635    async fn handle_coordinator_event(
636        &mut self,
637        event: DaemonCoordinatorEvent,
638        reply_tx: Sender<Option<DaemonCoordinatorReply>>,
639    ) -> eyre::Result<RunStatus> {
640        let status = match event {
641            DaemonCoordinatorEvent::Build(BuildDataflowNodes {
642                build_id,
643                session_id,
644                local_working_dir,
645                git_sources,
646                prev_git_sources,
647                dataflow_descriptor,
648                nodes_on_machine,
649                uv,
650            }) => {
651                match dataflow_descriptor.communication.remote {
652                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
653                }
654
655                let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
656
657                let result = self
658                    .build_dataflow(
659                        build_id,
660                        session_id,
661                        base_working_dir,
662                        git_sources,
663                        prev_git_sources,
664                        dataflow_descriptor,
665                        nodes_on_machine,
666                        uv,
667                    )
668                    .await;
669                let (trigger_result, result_task) = match result {
670                    Ok(result_task) => (Ok(()), Some(result_task)),
671                    Err(err) => (Err(format!("{err:?}")), None),
672                };
673                let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
674                let _ = reply_tx.send(Some(reply)).map_err(|_| {
675                    error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
676                });
677
678                let result_tx = self.events_tx.clone();
679                let clock = self.clock.clone();
680                if let Some(result_task) = result_task {
681                    tokio::spawn(async move {
682                        let message = Timestamped {
683                            inner: Event::BuildDataflowResult {
684                                build_id,
685                                session_id,
686                                result: result_task.await,
687                            },
688                            timestamp: clock.new_timestamp(),
689                        };
690                        let _ = result_tx
691                            .send(message)
692                            .map_err(|_| {
693                                error!(
694                                    "could not send `BuildResult` reply from daemon to coordinator"
695                                )
696                            })
697                            .await;
698                    });
699                }
700
701                RunStatus::Continue
702            }
703            DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
704                build_id,
705                session_id,
706                dataflow_id,
707                local_working_dir,
708                nodes,
709                dataflow_descriptor,
710                spawn_nodes,
711                uv,
712            }) => {
713                match dataflow_descriptor.communication.remote {
714                    dora_core::config::RemoteCommunicationConfig::Tcp => {}
715                }
716
717                let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
718
719                let result = self
720                    .spawn_dataflow(
721                        build_id,
722                        dataflow_id,
723                        base_working_dir,
724                        nodes,
725                        dataflow_descriptor,
726                        spawn_nodes,
727                        uv,
728                    )
729                    .await;
730                let (trigger_result, result_task) = match result {
731                    Ok(result_task) => (Ok(()), Some(result_task)),
732                    Err(err) => (Err(format!("{err:?}")), None),
733                };
734                let reply = DaemonCoordinatorReply::TriggerSpawnResult(trigger_result);
735                let _ = reply_tx.send(Some(reply)).map_err(|_| {
736                    error!("could not send `TriggerSpawnResult` reply from daemon to coordinator")
737                });
738
739                let result_tx = self.events_tx.clone();
740                let clock = self.clock.clone();
741                if let Some(result_task) = result_task {
742                    tokio::spawn(async move {
743                        let message = Timestamped {
744                            inner: Event::SpawnDataflowResult {
745                                dataflow_id,
746                                result: result_task.await,
747                            },
748                            timestamp: clock.new_timestamp(),
749                        };
750                        let _ = result_tx
751                            .send(message)
752                            .map_err(|_| {
753                                error!(
754                                    "could not send `SpawnResult` reply from daemon to coordinator"
755                                )
756                            })
757                            .await;
758                    });
759                }
760
761                RunStatus::Continue
762            }
763            DaemonCoordinatorEvent::AllNodesReady {
764                dataflow_id,
765                exited_before_subscribe,
766            } => {
767                let mut logger = self.logger.for_dataflow(dataflow_id);
768                logger.log(LogLevel::Debug, None,
769                    Some("daemon".into()),
770                    format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
771                )).await;
772                match self.running.get_mut(&dataflow_id) {
773                    Some(dataflow) => {
774                        let ready = exited_before_subscribe.is_empty();
775                        dataflow
776                            .pending_nodes
777                            .handle_external_all_nodes_ready(
778                                exited_before_subscribe,
779                                &mut dataflow.cascading_error_causes,
780                            )
781                            .await?;
782                        if ready {
783                            logger.log(LogLevel::Info, None,
784                                Some("daemon".into()),
785                                "coordinator reported that all nodes are ready, starting dataflow",
786                            ).await;
787                            dataflow.start(&self.events_tx, &self.clock).await?;
788                        }
789                    }
790                    None => {
791                        tracing::warn!(
792                            "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
793                        );
794                    }
795                }
796                let _ = reply_tx.send(None).map_err(|_| {
797                    error!("could not send `AllNodesReady` reply from daemon to coordinator")
798                });
799                RunStatus::Continue
800            }
801            DaemonCoordinatorEvent::Logs {
802                dataflow_id,
803                node_id,
804            } => {
805                match self.working_dir.get(&dataflow_id) {
806                    Some(working_dir) => {
807                        let working_dir = working_dir.clone();
808                        tokio::spawn(async move {
809                            let logs = async {
810                                let mut file =
811                                    File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
812                                        .await
813                                        .wrap_err(format!(
814                                            "Could not open log file: {:#?}",
815                                            log::log_path(&working_dir, &dataflow_id, &node_id)
816                                        ))?;
817
818                                let mut contents = vec![];
819                                file.read_to_end(&mut contents)
820                                    .await
821                                    .wrap_err("Could not read content of log file")?;
822                                Result::<Vec<u8>, eyre::Report>::Ok(contents)
823                            }
824                            .await
825                            .map_err(|err| format!("{err:?}"));
826                            let _ = reply_tx
827                                .send(Some(DaemonCoordinatorReply::Logs(logs)))
828                                .map_err(|_| {
829                                    error!("could not send logs reply from daemon to coordinator")
830                                });
831                        });
832                    }
833                    None => {
834                        tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
835                        let _ = reply_tx.send(None).map_err(|_| {
836                            error!(
837                                "could not send `AllNodesReady` reply from daemon to coordinator"
838                            )
839                        });
840                    }
841                }
842                RunStatus::Continue
843            }
844            DaemonCoordinatorEvent::ReloadDataflow {
845                dataflow_id,
846                node_id,
847                operator_id,
848            } => {
849                let result = self.send_reload(dataflow_id, node_id, operator_id).await;
850                let reply =
851                    DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
852                let _ = reply_tx
853                    .send(Some(reply))
854                    .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
855                RunStatus::Continue
856            }
857            DaemonCoordinatorEvent::StopDataflow {
858                dataflow_id,
859                grace_duration,
860            } => {
861                let mut logger = self.logger.for_dataflow(dataflow_id);
862                let dataflow = self
863                    .running
864                    .get_mut(&dataflow_id)
865                    .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
866                let (reply, future) = match dataflow {
867                    Ok(dataflow) => {
868                        let future = dataflow.stop_all(
869                            &mut self.coordinator_connection,
870                            &self.clock,
871                            grace_duration,
872                            &mut logger,
873                        );
874                        (Ok(()), Some(future))
875                    }
876                    Err(err) => (Err(err.to_string()), None),
877                };
878
879                let _ = reply_tx
880                    .send(Some(DaemonCoordinatorReply::StopResult(reply)))
881                    .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
882
883                if let Some(future) = future {
884                    future.await?;
885                }
886
887                RunStatus::Continue
888            }
889            DaemonCoordinatorEvent::Destroy => {
890                tracing::info!("received destroy command -> exiting");
891                let (notify_tx, notify_rx) = oneshot::channel();
892                let reply = DaemonCoordinatorReply::DestroyResult {
893                    result: Ok(()),
894                    notify: Some(notify_tx),
895                };
896                let _ = reply_tx
897                    .send(Some(reply))
898                    .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
899                // wait until the reply is sent out
900                if notify_rx.await.is_err() {
901                    tracing::warn!("no confirmation received for DestroyReply");
902                }
903                RunStatus::Exit
904            }
905            DaemonCoordinatorEvent::Heartbeat => {
906                self.last_coordinator_heartbeat = Instant::now();
907                let _ = reply_tx.send(None);
908                RunStatus::Continue
909            }
910        };
911        Ok(status)
912    }
913
914    async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
915        match event {
916            InterDaemonEvent::Output {
917                dataflow_id,
918                node_id,
919                output_id,
920                metadata,
921                data,
922            } => {
923                let inner = async {
924                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
925                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
926                    })?;
927                    send_output_to_local_receivers(
928                        node_id.clone(),
929                        output_id.clone(),
930                        dataflow,
931                        &metadata,
932                        data.map(DataMessage::Vec),
933                        &self.clock,
934                    )
935                    .await?;
936                    Result::<_, eyre::Report>::Ok(())
937                };
938                if let Err(err) = inner
939                    .await
940                    .wrap_err("failed to forward remote output to local receivers")
941                {
942                    let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
943                    logger
944                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
945                        .await;
946                }
947                Ok(())
948            }
949            InterDaemonEvent::OutputClosed {
950                dataflow_id,
951                node_id,
952                output_id,
953            } => {
954                let output_id = OutputId(node_id.clone(), output_id);
955                let mut logger = self
956                    .logger
957                    .for_dataflow(dataflow_id)
958                    .for_node(node_id.clone());
959                logger
960                    .log(
961                        LogLevel::Debug,
962                        Some("daemon".into()),
963                        format!("received OutputClosed event for output {output_id:?}"),
964                    )
965                    .await;
966
967                let inner = async {
968                    let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
969                        format!("send out failed: no running dataflow with ID `{dataflow_id}`")
970                    })?;
971
972                    if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
973                        for (receiver_id, input_id) in &inputs {
974                            close_input(dataflow, receiver_id, input_id, &self.clock);
975                        }
976                    }
977                    Result::<(), eyre::Report>::Ok(())
978                };
979                if let Err(err) = inner
980                    .await
981                    .wrap_err("failed to handle InputsClosed event sent by coordinator")
982                {
983                    logger
984                        .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
985                        .await;
986                }
987                Ok(())
988            }
989        }
990    }
991
992    #[allow(clippy::too_many_arguments)]
993    async fn build_dataflow(
994        &mut self,
995        build_id: BuildId,
996        session_id: SessionId,
997        base_working_dir: PathBuf,
998        git_sources: BTreeMap<NodeId, GitSource>,
999        prev_git_sources: BTreeMap<NodeId, GitSource>,
1000        dataflow_descriptor: Descriptor,
1001        local_nodes: BTreeSet<NodeId>,
1002        uv: bool,
1003    ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>>> {
1004        let builder = build::Builder {
1005            session_id,
1006            base_working_dir,
1007            uv,
1008        };
1009        self.git_manager.clear_planned_builds(session_id);
1010
1011        let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1012
1013        let mut tasks = Vec::new();
1014
1015        // build nodes
1016        for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1017            let dynamic_node = node.kind.dynamic();
1018
1019            let node_id = node.id.clone();
1020            let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1021            logger.log(LogLevel::Info, "building").await;
1022            let git_source = git_sources.get(&node_id).cloned();
1023            let prev_git_source = prev_git_sources.get(&node_id).cloned();
1024            let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1025                still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1026                git_source: prev_source,
1027            });
1028
1029            let logger_cloned = logger
1030                .try_clone_impl()
1031                .await
1032                .wrap_err("failed to clone logger")?;
1033
1034            let mut builder = builder.clone();
1035            if let Some(node_working_dir) =
1036                node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1037            {
1038                builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1039            }
1040
1041            match builder
1042                .build_node(
1043                    node,
1044                    git_source,
1045                    prev_git,
1046                    logger_cloned,
1047                    &mut self.git_manager,
1048                )
1049                .await
1050                .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1051            {
1052                Ok(result) => {
1053                    tasks.push(NodeBuildTask {
1054                        node_id,
1055                        task: result,
1056                        dynamic_node,
1057                    });
1058                }
1059                Err(err) => {
1060                    logger.log(LogLevel::Error, format!("{err:?}")).await;
1061                    return Err(err);
1062                }
1063            }
1064        }
1065
1066        let task = async move {
1067            let mut info = BuildInfo {
1068                node_working_dirs: Default::default(),
1069            };
1070            for task in tasks {
1071                let NodeBuildTask {
1072                    node_id,
1073                    dynamic_node,
1074                    task,
1075                } = task;
1076                let node = task
1077                    .await
1078                    .with_context(|| format!("failed to build node `{node_id}`"))?;
1079                info.node_working_dirs
1080                    .insert(node_id, node.node_working_dir);
1081            }
1082            Ok(info)
1083        };
1084
1085        Ok(task)
1086    }
1087
1088    #[allow(clippy::too_many_arguments)]
1089    async fn spawn_dataflow(
1090        &mut self,
1091        build_id: Option<BuildId>,
1092        dataflow_id: DataflowId,
1093        base_working_dir: PathBuf,
1094        nodes: BTreeMap<NodeId, ResolvedNode>,
1095        dataflow_descriptor: Descriptor,
1096        spawn_nodes: BTreeSet<NodeId>,
1097        uv: bool,
1098    ) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
1099        let mut logger = self
1100            .logger
1101            .for_dataflow(dataflow_id)
1102            .try_clone()
1103            .await
1104            .context("failed to clone logger")?;
1105        let dataflow =
1106            RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
1107        let dataflow = match self.running.entry(dataflow_id) {
1108            std::collections::hash_map::Entry::Vacant(entry) => {
1109                self.working_dir
1110                    .insert(dataflow_id, base_working_dir.clone());
1111                entry.insert(dataflow)
1112            }
1113            std::collections::hash_map::Entry::Occupied(_) => {
1114                bail!("there is already a running dataflow with ID `{dataflow_id}`")
1115            }
1116        };
1117
1118        let mut stopped = Vec::new();
1119
1120        let node_working_dirs = build_id
1121            .and_then(|build_id| self.builds.get(&build_id))
1122            .map(|info| info.node_working_dirs.clone())
1123            .unwrap_or_default();
1124
1125        // calculate info about mappings
1126        for node in nodes.values() {
1127            let local = spawn_nodes.contains(&node.id);
1128
1129            let inputs = node_inputs(node);
1130            for (input_id, input) in inputs {
1131                if local {
1132                    dataflow
1133                        .open_inputs
1134                        .entry(node.id.clone())
1135                        .or_default()
1136                        .insert(input_id.clone());
1137                    match input.mapping {
1138                        InputMapping::User(mapping) => {
1139                            dataflow
1140                                .mappings
1141                                .entry(OutputId(mapping.source, mapping.output))
1142                                .or_default()
1143                                .insert((node.id.clone(), input_id));
1144                        }
1145                        InputMapping::Timer { interval } => {
1146                            dataflow
1147                                .timers
1148                                .entry(interval)
1149                                .or_default()
1150                                .insert((node.id.clone(), input_id));
1151                        }
1152                    }
1153                } else if let InputMapping::User(mapping) = input.mapping {
1154                    dataflow
1155                        .open_external_mappings
1156                        .insert(OutputId(mapping.source, mapping.output));
1157                }
1158            }
1159        }
1160
1161        let spawner = Spawner {
1162            dataflow_id,
1163            daemon_tx: self.events_tx.clone(),
1164            dataflow_descriptor,
1165            clock: self.clock.clone(),
1166            uv,
1167        };
1168
1169        let mut tasks = Vec::new();
1170
1171        // spawn nodes and set up subscriptions
1172        for node in nodes.into_values() {
1173            let mut logger = logger.reborrow().for_node(node.id.clone());
1174            let local = spawn_nodes.contains(&node.id);
1175            if local {
1176                let dynamic_node = node.kind.dynamic();
1177                if dynamic_node {
1178                    dataflow.dynamic_nodes.insert(node.id.clone());
1179                } else {
1180                    dataflow.pending_nodes.insert(node.id.clone());
1181                }
1182
1183                let node_id = node.id.clone();
1184                let node_stderr_most_recent = dataflow
1185                    .node_stderr_most_recent
1186                    .entry(node.id.clone())
1187                    .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
1188                    .clone();
1189                logger
1190                    .log(LogLevel::Info, Some("daemon".into()), "spawning")
1191                    .await;
1192                let node_working_dir = node_working_dirs
1193                    .get(&node_id)
1194                    .cloned()
1195                    .or_else(|| {
1196                        node.deploy
1197                            .as_ref()
1198                            .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1199                    })
1200                    .unwrap_or(base_working_dir.clone())
1201                    .clone();
1202                match spawner
1203                    .clone()
1204                    .spawn_node(node, node_working_dir, node_stderr_most_recent, &mut logger)
1205                    .await
1206                    .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1207                {
1208                    Ok(result) => {
1209                        tasks.push(NodeBuildTask {
1210                            node_id,
1211                            task: result,
1212                            dynamic_node,
1213                        });
1214                    }
1215                    Err(err) => {
1216                        logger
1217                            .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1218                            .await;
1219                        self.dataflow_node_results
1220                            .entry(dataflow_id)
1221                            .or_default()
1222                            .insert(
1223                                node_id.clone(),
1224                                Err(NodeError {
1225                                    timestamp: self.clock.new_timestamp(),
1226                                    cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1227                                    exit_status: NodeExitStatus::Unknown,
1228                                }),
1229                            );
1230                        stopped.push((node_id.clone(), dynamic_node));
1231                    }
1232                }
1233            } else {
1234                // wait until node is ready before starting
1235                dataflow.pending_nodes.set_external_nodes(true);
1236
1237                // subscribe to all node outputs that are mapped to some local inputs
1238                for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1239                    let tx = self
1240                        .remote_daemon_events_tx
1241                        .clone()
1242                        .wrap_err("no remote_daemon_events_tx channel")?;
1243                    let mut finished_rx = dataflow.finished_tx.subscribe();
1244                    let subscribe_topic = dataflow.output_publish_topic(output_id);
1245                    tracing::debug!("declaring subscriber on {subscribe_topic}");
1246                    let subscriber = self
1247                        .zenoh_session
1248                        .declare_subscriber(subscribe_topic)
1249                        .await
1250                        .map_err(|e| eyre!(e))
1251                        .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1252                    tokio::spawn(async move {
1253                        let mut finished = pin!(finished_rx.recv());
1254                        loop {
1255                            let finished_or_next =
1256                                futures::future::select(finished, subscriber.recv_async());
1257                            match finished_or_next.await {
1258                                future::Either::Left((finished, _)) => {
1259                                    match finished {
1260                                        Err(broadcast::error::RecvError::Closed) => {
1261                                            tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
1262                                            break;
1263                                        }
1264                                        other => {
1265                                            tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
1266                                            break;
1267                                        }
1268                                    }
1269                                }
1270                                future::Either::Right((sample, f)) => {
1271                                    finished = f;
1272                                    let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1273                                        Timestamped::deserialize_inter_daemon_event(
1274                                            &s.payload().to_bytes(),
1275                                        )
1276                                    });
1277                                    if tx.send_async(event).await.is_err() {
1278                                        // daemon finished
1279                                        break;
1280                                    }
1281                                }
1282                            }
1283                        }
1284                    });
1285                }
1286            }
1287        }
1288        for (node_id, dynamic) in stopped {
1289            self.handle_node_stop(dataflow_id, &node_id, dynamic)
1290                .await?;
1291        }
1292
1293        let spawn_result = Self::spawn_prepared_nodes(
1294            dataflow_id,
1295            logger,
1296            tasks,
1297            self.events_tx.clone(),
1298            self.clock.clone(),
1299        );
1300
1301        Ok(spawn_result)
1302    }
1303
1304    async fn spawn_prepared_nodes(
1305        dataflow_id: Uuid,
1306        mut logger: DataflowLogger<'_>,
1307        tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1308        events_tx: mpsc::Sender<Timestamped<Event>>,
1309        clock: Arc<HLC>,
1310    ) -> eyre::Result<()> {
1311        let node_result = |node_id, dynamic_node, result| Timestamped {
1312            inner: Event::SpawnNodeResult {
1313                dataflow_id,
1314                node_id,
1315                dynamic_node,
1316                result,
1317            },
1318            timestamp: clock.new_timestamp(),
1319        };
1320        let mut failed_to_prepare = None;
1321        let mut prepared_nodes = Vec::new();
1322        for task in tasks {
1323            let NodeBuildTask {
1324                node_id,
1325                dynamic_node,
1326                task,
1327            } = task;
1328            match task.await {
1329                Ok(node) => prepared_nodes.push(node),
1330                Err(err) => {
1331                    if failed_to_prepare.is_none() {
1332                        failed_to_prepare = Some(node_id.clone());
1333                    }
1334                    let node_err: NodeError = NodeError {
1335                        timestamp: clock.new_timestamp(),
1336                        cause: NodeErrorCause::FailedToSpawn(format!(
1337                            "preparing for spawn failed: {err:?}"
1338                        )),
1339                        exit_status: NodeExitStatus::Unknown,
1340                    };
1341                    let send_result = events_tx
1342                        .send(node_result(node_id, dynamic_node, Err(node_err)))
1343                        .await;
1344                    if send_result.is_err() {
1345                        tracing::error!("failed to send SpawnNodeResult to main daemon task")
1346                    }
1347                }
1348            }
1349        }
1350
1351        // once all nodes are prepared, do the actual spawning
1352        if let Some(failed_node) = failed_to_prepare {
1353            // don't spawn any nodes when an error occurred before
1354            for node in prepared_nodes {
1355                let err = NodeError {
1356                    timestamp: clock.new_timestamp(),
1357                    cause: NodeErrorCause::Cascading {
1358                        caused_by_node: failed_node.clone(),
1359                    },
1360                    exit_status: NodeExitStatus::Unknown,
1361                };
1362                let send_result = events_tx
1363                    .send(node_result(
1364                        node.node_id().clone(),
1365                        node.dynamic(),
1366                        Err(err),
1367                    ))
1368                    .await;
1369                if send_result.is_err() {
1370                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1371                }
1372            }
1373            Err(eyre!("failed to prepare node {failed_node}"))
1374        } else {
1375            let mut spawn_result = Ok(());
1376
1377            logger
1378                .log(
1379                    LogLevel::Info,
1380                    None,
1381                    Some("dora daemon".into()),
1382                    "finished building nodes, spawning...",
1383                )
1384                .await;
1385
1386            // spawn the nodes
1387            for node in prepared_nodes {
1388                let node_id = node.node_id().clone();
1389                let dynamic_node = node.dynamic();
1390                let mut logger = logger.reborrow().for_node(node_id.clone());
1391                let result = node.spawn(&mut logger).await;
1392                let node_spawn_result = match result {
1393                    Ok(node) => Ok(node),
1394                    Err(err) => {
1395                        let node_err = NodeError {
1396                            timestamp: clock.new_timestamp(),
1397                            cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1398                            exit_status: NodeExitStatus::Unknown,
1399                        };
1400                        if spawn_result.is_ok() {
1401                            spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1402                        }
1403                        Err(node_err)
1404                    }
1405                };
1406                let send_result = events_tx
1407                    .send(node_result(node_id, dynamic_node, node_spawn_result))
1408                    .await;
1409                if send_result.is_err() {
1410                    tracing::error!("failed to send SpawnNodeResult to main daemon task")
1411                }
1412            }
1413            spawn_result
1414        }
1415    }
1416
1417    async fn handle_dynamic_node_event(
1418        &mut self,
1419        event: DynamicNodeEventWrapper,
1420    ) -> eyre::Result<()> {
1421        match event {
1422            DynamicNodeEventWrapper {
1423                event: DynamicNodeEvent::NodeConfig { node_id },
1424                reply_tx,
1425            } => {
1426                let number_node_id = self
1427                    .running
1428                    .iter()
1429                    .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1430                    .count();
1431
1432                let node_config = match number_node_id {
1433                    2.. => Err(format!(
1434                        "multiple dataflows contain dynamic node id {node_id}. \
1435                        Please only have one running dataflow with the specified \
1436                        node id if you want to use dynamic node",
1437                    )),
1438                    1 => self
1439                        .running
1440                        .iter()
1441                        .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1442                        .map(|(id, dataflow)| -> Result<NodeConfig> {
1443                            let node_config = dataflow
1444                                .running_nodes
1445                                .get(&node_id)
1446                                .with_context(|| {
1447                                    format!("no node with ID `{node_id}` within the given dataflow")
1448                                })?
1449                                .node_config
1450                                .clone();
1451                            if !node_config.dynamic {
1452                                bail!("node with ID `{node_id}` in {id} is not dynamic");
1453                            }
1454                            Ok(node_config)
1455                        })
1456                        .next()
1457                        .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1458                        .and_then(|r| r)
1459                        .map_err(|err| {
1460                            format!(
1461                                "failed to get dynamic node config within given dataflow: {err}"
1462                            )
1463                        }),
1464                    0 => Err(format!("no node with ID `{node_id}`")),
1465                };
1466
1467                let reply = DaemonReply::NodeConfig {
1468                    result: node_config,
1469                };
1470                let _ = reply_tx.send(Some(reply)).map_err(|_| {
1471                    error!("could not send node info reply from daemon to coordinator")
1472                });
1473                Ok(())
1474            }
1475        }
1476    }
1477
1478    async fn handle_node_event(
1479        &mut self,
1480        event: DaemonNodeEvent,
1481        dataflow_id: DataflowId,
1482        node_id: NodeId,
1483    ) -> eyre::Result<()> {
1484        match event {
1485            DaemonNodeEvent::Subscribe {
1486                event_sender,
1487                reply_sender,
1488            } => {
1489                let mut logger = self.logger.for_dataflow(dataflow_id);
1490                logger
1491                    .log(
1492                        LogLevel::Info,
1493                        Some(node_id.clone()),
1494                        Some("daemon".into()),
1495                        "node is ready",
1496                    )
1497                    .await;
1498
1499                let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1500                    format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1501                });
1502
1503                match dataflow {
1504                    Err(err) => {
1505                        let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1506                    }
1507                    Ok(dataflow) => {
1508                        Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1509
1510                        let status = dataflow
1511                            .pending_nodes
1512                            .handle_node_subscription(
1513                                node_id.clone(),
1514                                reply_sender,
1515                                &mut self.coordinator_connection,
1516                                &self.clock,
1517                                &mut dataflow.cascading_error_causes,
1518                                &mut logger,
1519                            )
1520                            .await?;
1521                        match status {
1522                            DataflowStatus::AllNodesReady => {
1523                                logger
1524                                    .log(
1525                                        LogLevel::Info,
1526                                        None,
1527                                        Some("daemon".into()),
1528                                        "all nodes are ready, starting dataflow",
1529                                    )
1530                                    .await;
1531                                dataflow.start(&self.events_tx, &self.clock).await?;
1532                            }
1533                            DataflowStatus::Pending => {}
1534                        }
1535                    }
1536                }
1537            }
1538            DaemonNodeEvent::SubscribeDrop {
1539                event_sender,
1540                reply_sender,
1541            } => {
1542                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1543                    format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1544                });
1545                let result = match dataflow {
1546                    Ok(dataflow) => {
1547                        dataflow.drop_channels.insert(node_id, event_sender);
1548                        Ok(())
1549                    }
1550                    Err(err) => Err(err.to_string()),
1551                };
1552                let _ = reply_sender.send(DaemonReply::Result(result));
1553            }
1554            DaemonNodeEvent::CloseOutputs {
1555                outputs,
1556                reply_sender,
1557            } => {
1558                // notify downstream nodes
1559                let inner = async {
1560                    self.send_output_closed_events(dataflow_id, node_id, outputs)
1561                        .await
1562                };
1563
1564                let reply = inner.await.map_err(|err| format!("{err:?}"));
1565                let _ = reply_sender.send(DaemonReply::Result(reply));
1566            }
1567            DaemonNodeEvent::OutputsDone { reply_sender } => {
1568                let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1569
1570                let _ = reply_sender.send(DaemonReply::Result(
1571                    result.map_err(|err| format!("{err:?}")),
1572                ));
1573            }
1574            DaemonNodeEvent::SendOut {
1575                output_id,
1576                metadata,
1577                data,
1578            } => self
1579                .send_out(dataflow_id, node_id, output_id, metadata, data)
1580                .await
1581                .context("failed to send out")?,
1582            DaemonNodeEvent::ReportDrop { tokens } => {
1583                let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1584                    format!(
1585                        "failed to get handle drop tokens: \
1586                        no running dataflow with ID `{dataflow_id}`"
1587                    )
1588                });
1589
1590                match dataflow {
1591                    Ok(dataflow) => {
1592                        for token in tokens {
1593                            match dataflow.pending_drop_tokens.get_mut(&token) {
1594                                Some(info) => {
1595                                    if info.pending_nodes.remove(&node_id) {
1596                                        dataflow.check_drop_token(token, &self.clock).await?;
1597                                    } else {
1598                                        tracing::warn!(
1599                                            "node `{node_id}` is not pending for drop token `{token:?}`"
1600                                        );
1601                                    }
1602                                }
1603                                None => tracing::warn!("unknown drop token `{token:?}`"),
1604                            }
1605                        }
1606                    }
1607                    Err(err) => tracing::warn!("{err:?}"),
1608                }
1609            }
1610            DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1611                let inner = async {
1612                    let dataflow = self
1613                        .running
1614                        .get_mut(&dataflow_id)
1615                        .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1616                    dataflow.subscribe_channels.remove(&node_id);
1617                    Result::<_, eyre::Error>::Ok(())
1618                };
1619
1620                let reply = inner.await.map_err(|err| format!("{err:?}"));
1621                let _ = reply_sender.send(DaemonReply::Result(reply));
1622            }
1623        }
1624        Ok(())
1625    }
1626
1627    async fn send_reload(
1628        &mut self,
1629        dataflow_id: Uuid,
1630        node_id: NodeId,
1631        operator_id: Option<OperatorId>,
1632    ) -> Result<(), eyre::ErrReport> {
1633        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1634            format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1635        })?;
1636        if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1637            match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1638                Ok(()) => {}
1639                Err(_) => {
1640                    dataflow.subscribe_channels.remove(&node_id);
1641                }
1642            }
1643        }
1644        Ok(())
1645    }
1646
1647    async fn send_out(
1648        &mut self,
1649        dataflow_id: Uuid,
1650        node_id: NodeId,
1651        output_id: DataId,
1652        metadata: dora_message::metadata::Metadata,
1653        data: Option<DataMessage>,
1654    ) -> Result<(), eyre::ErrReport> {
1655        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1656            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1657        })?;
1658        let data_bytes = send_output_to_local_receivers(
1659            node_id.clone(),
1660            output_id.clone(),
1661            dataflow,
1662            &metadata,
1663            data,
1664            &self.clock,
1665        )
1666        .await?;
1667
1668        let output_id = OutputId(node_id, output_id);
1669        let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1670            || dataflow.publish_all_messages_to_zenoh;
1671        if remote_receivers {
1672            let event = InterDaemonEvent::Output {
1673                dataflow_id,
1674                node_id: output_id.0.clone(),
1675                output_id: output_id.1.clone(),
1676                metadata,
1677                data: data_bytes,
1678            };
1679            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1680                .await?;
1681        }
1682
1683        Ok(())
1684    }
1685
1686    async fn send_to_remote_receivers(
1687        &mut self,
1688        dataflow_id: Uuid,
1689        output_id: &OutputId,
1690        event: InterDaemonEvent,
1691    ) -> Result<(), eyre::Error> {
1692        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1693            format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1694        })?;
1695
1696        // publish via zenoh
1697        let publisher = match dataflow.publishers.get(output_id) {
1698            Some(publisher) => publisher,
1699            None => {
1700                let publish_topic = dataflow.output_publish_topic(output_id);
1701                tracing::debug!("declaring publisher on {publish_topic}");
1702                let publisher = self
1703                    .zenoh_session
1704                    .declare_publisher(publish_topic)
1705                    .await
1706                    .map_err(|e| eyre!(e))
1707                    .context("failed to create zenoh publisher")?;
1708                dataflow.publishers.insert(output_id.clone(), publisher);
1709                dataflow.publishers.get(output_id).unwrap()
1710            }
1711        };
1712
1713        let serialized_event = Timestamped {
1714            inner: event,
1715            timestamp: self.clock.new_timestamp(),
1716        }
1717        .serialize();
1718        publisher
1719            .put(serialized_event)
1720            .await
1721            .map_err(|e| eyre!(e))
1722            .context("zenoh put failed")?;
1723        Ok(())
1724    }
1725
1726    async fn send_output_closed_events(
1727        &mut self,
1728        dataflow_id: DataflowId,
1729        node_id: NodeId,
1730        outputs: Vec<DataId>,
1731    ) -> eyre::Result<()> {
1732        let dataflow = self
1733            .running
1734            .get_mut(&dataflow_id)
1735            .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1736        let local_node_inputs: BTreeSet<_> = dataflow
1737            .mappings
1738            .iter()
1739            .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1740            .flat_map(|(_, v)| v)
1741            .cloned()
1742            .collect();
1743        for (receiver_id, input_id) in &local_node_inputs {
1744            close_input(dataflow, receiver_id, input_id, &self.clock);
1745        }
1746
1747        let mut closed = Vec::new();
1748        for output_id in &dataflow.open_external_mappings {
1749            if output_id.0 == node_id && outputs.contains(&output_id.1) {
1750                closed.push(output_id.clone());
1751            }
1752        }
1753
1754        for output_id in closed {
1755            let event = InterDaemonEvent::OutputClosed {
1756                dataflow_id,
1757                node_id: output_id.0.clone(),
1758                output_id: output_id.1.clone(),
1759            };
1760            self.send_to_remote_receivers(dataflow_id, &output_id, event)
1761                .await?;
1762        }
1763
1764        Ok(())
1765    }
1766
1767    async fn subscribe(
1768        dataflow: &mut RunningDataflow,
1769        node_id: NodeId,
1770        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1771        clock: &HLC,
1772    ) {
1773        // some inputs might have been closed already -> report those events
1774        let closed_inputs = dataflow
1775            .mappings
1776            .values()
1777            .flatten()
1778            .filter(|(node, _)| node == &node_id)
1779            .map(|(_, input)| input)
1780            .filter(|input| {
1781                dataflow
1782                    .open_inputs
1783                    .get(&node_id)
1784                    .map(|open_inputs| !open_inputs.contains(*input))
1785                    .unwrap_or(true)
1786            });
1787        for input_id in closed_inputs {
1788            let _ = send_with_timestamp(
1789                &event_sender,
1790                NodeEvent::InputClosed {
1791                    id: input_id.clone(),
1792                },
1793                clock,
1794            );
1795        }
1796        if dataflow.open_inputs(&node_id).is_empty() {
1797            let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1798        }
1799
1800        // if a stop event was already sent for the dataflow, send it to
1801        // the newly connected node too
1802        if dataflow.stop_sent {
1803            let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1804        }
1805
1806        dataflow.subscribe_channels.insert(node_id, event_sender);
1807    }
1808
1809    #[tracing::instrument(skip(self), level = "trace")]
1810    async fn handle_outputs_done(
1811        &mut self,
1812        dataflow_id: DataflowId,
1813        node_id: &NodeId,
1814    ) -> eyre::Result<()> {
1815        let dataflow = self
1816            .running
1817            .get_mut(&dataflow_id)
1818            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1819
1820        let outputs = dataflow
1821            .mappings
1822            .keys()
1823            .filter(|m| &m.0 == node_id)
1824            .map(|m| &m.1)
1825            .cloned()
1826            .collect();
1827        self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1828            .await?;
1829
1830        let dataflow = self
1831            .running
1832            .get_mut(&dataflow_id)
1833            .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1834        dataflow.drop_channels.remove(node_id);
1835        Ok(())
1836    }
1837
1838    async fn handle_node_stop(
1839        &mut self,
1840        dataflow_id: Uuid,
1841        node_id: &NodeId,
1842        dynamic_node: bool,
1843    ) -> eyre::Result<()> {
1844        let result = self
1845            .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
1846            .await;
1847        let _ = self
1848            .events_tx
1849            .send(Timestamped {
1850                inner: Event::NodeStopped {
1851                    dataflow_id,
1852                    node_id: node_id.clone(),
1853                },
1854                timestamp: self.clock.new_timestamp(),
1855            })
1856            .await;
1857        result
1858    }
1859
1860    async fn handle_node_stop_inner(
1861        &mut self,
1862        dataflow_id: Uuid,
1863        node_id: &NodeId,
1864        dynamic_node: bool,
1865    ) -> eyre::Result<()> {
1866        let mut logger = self.logger.for_dataflow(dataflow_id);
1867        let dataflow = match self.running.get_mut(&dataflow_id) {
1868            Some(dataflow) => dataflow,
1869            None if dynamic_node => {
1870                // The dataflow might be done already as we don't wait for dynamic nodes. In this
1871                // case, we don't need to do anything to handle the node stop.
1872                tracing::debug!(
1873                    "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
1874                );
1875                return Ok(());
1876            }
1877            None => eyre::bail!(
1878                "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
1879            ),
1880        };
1881
1882        dataflow
1883            .pending_nodes
1884            .handle_node_stop(
1885                node_id,
1886                &mut self.coordinator_connection,
1887                &self.clock,
1888                &mut dataflow.cascading_error_causes,
1889                &mut logger,
1890            )
1891            .await?;
1892
1893        self.handle_outputs_done(dataflow_id, node_id).await?;
1894
1895        let mut logger = self.logger.for_dataflow(dataflow_id);
1896        let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1897            format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1898        })?;
1899        if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1900            pid.mark_as_stopped()
1901        }
1902        if !dataflow.pending_nodes.local_nodes_pending()
1903            && dataflow
1904                .running_nodes
1905                .iter()
1906                .all(|(_id, n)| n.node_config.dynamic)
1907        {
1908            let result = DataflowDaemonResult {
1909                timestamp: self.clock.new_timestamp(),
1910                node_results: self
1911                    .dataflow_node_results
1912                    .get(&dataflow.id)
1913                    .context("failed to get dataflow node results")?
1914                    .clone(),
1915            };
1916
1917            self.git_manager
1918                .clones_in_use
1919                .values_mut()
1920                .for_each(|dataflows| {
1921                    dataflows.remove(&dataflow_id);
1922                });
1923
1924            logger
1925                .log(
1926                    LogLevel::Info,
1927                    None,
1928                    Some("daemon".into()),
1929                    format!("dataflow finished on machine `{}`", self.daemon_id),
1930                )
1931                .await;
1932            if let Some(connection) = &mut self.coordinator_connection {
1933                let msg = serde_json::to_vec(&Timestamped {
1934                    inner: CoordinatorRequest::Event {
1935                        daemon_id: self.daemon_id.clone(),
1936                        event: DaemonEvent::AllNodesFinished {
1937                            dataflow_id,
1938                            result,
1939                        },
1940                    },
1941                    timestamp: self.clock.new_timestamp(),
1942                })?;
1943                socket_stream_send(connection, &msg)
1944                    .await
1945                    .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1946            }
1947            self.running.remove(&dataflow_id);
1948        }
1949
1950        Ok(())
1951    }
1952
1953    async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
1954        match event {
1955            DoraEvent::Timer {
1956                dataflow_id,
1957                interval,
1958                metadata,
1959            } => {
1960                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1961                    tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1962                    return Ok(());
1963                };
1964
1965                let Some(subscribers) = dataflow.timers.get(&interval) else {
1966                    return Ok(());
1967                };
1968
1969                let mut closed = Vec::new();
1970                for (receiver_id, input_id) in subscribers {
1971                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1972                        continue;
1973                    };
1974
1975                    let send_result = send_with_timestamp(
1976                        channel,
1977                        NodeEvent::Input {
1978                            id: input_id.clone(),
1979                            metadata: metadata.clone(),
1980                            data: None,
1981                        },
1982                        &self.clock,
1983                    );
1984                    match send_result {
1985                        Ok(()) => {}
1986                        Err(_) => {
1987                            closed.push(receiver_id);
1988                        }
1989                    }
1990                }
1991                for id in closed {
1992                    dataflow.subscribe_channels.remove(id);
1993                }
1994            }
1995            DoraEvent::Logs {
1996                dataflow_id,
1997                output_id,
1998                message,
1999                metadata,
2000            } => {
2001                let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2002                    tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2003                    return Ok(());
2004                };
2005
2006                let Some(subscribers) = dataflow.mappings.get(&output_id) else {
2007                    tracing::warn!(
2008                        "No subscribers found for {:?} in {:?}",
2009                        output_id,
2010                        dataflow.mappings
2011                    );
2012                    return Ok(());
2013                };
2014
2015                let mut closed = Vec::new();
2016                for (receiver_id, input_id) in subscribers {
2017                    let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2018                        tracing::warn!("No subscriber channel found for {:?}", output_id);
2019                        continue;
2020                    };
2021
2022                    let send_result = send_with_timestamp(
2023                        channel,
2024                        NodeEvent::Input {
2025                            id: input_id.clone(),
2026                            metadata: metadata.clone(),
2027                            data: Some(message.clone()),
2028                        },
2029                        &self.clock,
2030                    );
2031                    match send_result {
2032                        Ok(()) => {}
2033                        Err(_) => {
2034                            closed.push(receiver_id);
2035                        }
2036                    }
2037                }
2038                for id in closed {
2039                    dataflow.subscribe_channels.remove(id);
2040                }
2041            }
2042            DoraEvent::SpawnedNodeResult {
2043                dataflow_id,
2044                node_id,
2045                dynamic_node,
2046                exit_status,
2047            } => {
2048                let mut logger = self
2049                    .logger
2050                    .for_dataflow(dataflow_id)
2051                    .for_node(node_id.clone());
2052                logger
2053                    .log(
2054                        LogLevel::Debug,
2055                        Some("daemon".into()),
2056                        format!("handling node stop with exit status {exit_status:?}"),
2057                    )
2058                    .await;
2059
2060                let node_result = match exit_status {
2061                    NodeExitStatus::Success => Ok(()),
2062                    exit_status => {
2063                        let dataflow = self.running.get(&dataflow_id);
2064                        let caused_by_node = dataflow
2065                            .and_then(|dataflow| {
2066                                dataflow.cascading_error_causes.error_caused_by(&node_id)
2067                            })
2068                            .cloned();
2069                        let grace_duration_kill = dataflow
2070                            .map(|d| d.grace_duration_kills.contains(&node_id))
2071                            .unwrap_or_default();
2072
2073                        let cause = match caused_by_node {
2074                            Some(caused_by_node) => {
2075                                logger
2076                                    .log(
2077                                        LogLevel::Info,
2078                                        Some("daemon".into()),
2079                                        format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2080                                    )
2081                                    .await;
2082
2083                                NodeErrorCause::Cascading { caused_by_node }
2084                            }
2085                            None if grace_duration_kill => NodeErrorCause::GraceDuration,
2086                            None => {
2087                                let cause = dataflow
2088                                    .and_then(|d| d.node_stderr_most_recent.get(&node_id))
2089                                    .map(|queue| {
2090                                        let mut s = if queue.is_full() {
2091                                            "[...]".into()
2092                                        } else {
2093                                            String::new()
2094                                        };
2095                                        while let Some(line) = queue.pop() {
2096                                            s += &line;
2097                                        }
2098                                        s
2099                                    })
2100                                    .unwrap_or_default();
2101
2102                                NodeErrorCause::Other { stderr: cause }
2103                            }
2104                        };
2105                        Err(NodeError {
2106                            timestamp: self.clock.new_timestamp(),
2107                            cause,
2108                            exit_status,
2109                        })
2110                    }
2111                };
2112
2113                logger
2114                    .log(
2115                        if node_result.is_ok() {
2116                            LogLevel::Info
2117                        } else {
2118                            LogLevel::Error
2119                        },
2120                        Some("daemon".into()),
2121                        match &node_result {
2122                            Ok(()) => format!("{node_id} finished successfully"),
2123                            Err(err) => format!("{err}"),
2124                        },
2125                    )
2126                    .await;
2127
2128                self.dataflow_node_results
2129                    .entry(dataflow_id)
2130                    .or_default()
2131                    .insert(node_id.clone(), node_result);
2132
2133                self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2134                    .await?;
2135            }
2136        }
2137        Ok(())
2138    }
2139
2140    fn base_working_dir(
2141        &self,
2142        local_working_dir: Option<PathBuf>,
2143        session_id: SessionId,
2144    ) -> eyre::Result<PathBuf> {
2145        match local_working_dir {
2146            Some(working_dir) => {
2147                // check that working directory exists
2148                if working_dir.exists() {
2149                    Ok(working_dir)
2150                } else {
2151                    bail!(
2152                        "working directory does not exist: {}",
2153                        working_dir.display(),
2154                    )
2155                }
2156            }
2157            None => {
2158                // use subfolder of daemon working dir
2159                let daemon_working_dir =
2160                    current_dir().context("failed to get daemon working dir")?;
2161                Ok(daemon_working_dir
2162                    .join("_work")
2163                    .join(session_id.uuid().to_string()))
2164            }
2165        }
2166    }
2167}
2168
2169async fn set_up_event_stream(
2170    coordinator_addr: SocketAddr,
2171    machine_id: &Option<String>,
2172    clock: &Arc<HLC>,
2173    remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2174    // used for dynamic nodes
2175    local_listen_port: u16,
2176) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
2177    let clock_cloned = clock.clone();
2178    let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2179        Ok(e) => Timestamped {
2180            inner: Event::Daemon(e.inner),
2181            timestamp: e.timestamp,
2182        },
2183        Err(err) => Timestamped {
2184            inner: Event::DaemonError(err),
2185            timestamp: clock_cloned.new_timestamp(),
2186        },
2187    });
2188    let (daemon_id, coordinator_events) =
2189        coordinator::register(coordinator_addr, machine_id.clone(), clock)
2190            .await
2191            .wrap_err("failed to connect to dora-coordinator")?;
2192    let coordinator_events = coordinator_events.map(
2193        |Timestamped {
2194             inner: event,
2195             timestamp,
2196         }| Timestamped {
2197            inner: Event::Coordinator(event),
2198            timestamp,
2199        },
2200    );
2201    let (events_tx, events_rx) = flume::bounded(10);
2202    let _listen_port =
2203        local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2204            .await?;
2205    let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2206        inner: Event::DynamicNode(e.inner),
2207        timestamp: e.timestamp,
2208    });
2209    let incoming = (
2210        coordinator_events,
2211        remote_daemon_events,
2212        dynamic_node_events,
2213    )
2214        .merge();
2215    Ok((daemon_id, incoming))
2216}
2217
2218async fn send_output_to_local_receivers(
2219    node_id: NodeId,
2220    output_id: DataId,
2221    dataflow: &mut RunningDataflow,
2222    metadata: &metadata::Metadata,
2223    data: Option<DataMessage>,
2224    clock: &HLC,
2225) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2226    let timestamp = metadata.timestamp();
2227    let empty_set = BTreeSet::new();
2228    let output_id = OutputId(node_id, output_id);
2229    let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2230    let OutputId(node_id, _) = output_id;
2231    let mut closed = Vec::new();
2232    for (receiver_id, input_id) in local_receivers {
2233        if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2234            let item = NodeEvent::Input {
2235                id: input_id.clone(),
2236                metadata: metadata.clone(),
2237                data: data.clone(),
2238            };
2239            match channel.send(Timestamped {
2240                inner: item,
2241                timestamp,
2242            }) {
2243                Ok(()) => {
2244                    if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2245                        dataflow
2246                            .pending_drop_tokens
2247                            .entry(token)
2248                            .or_insert_with(|| DropTokenInformation {
2249                                owner: node_id.clone(),
2250                                pending_nodes: Default::default(),
2251                            })
2252                            .pending_nodes
2253                            .insert(receiver_id.clone());
2254                    }
2255                }
2256                Err(_) => {
2257                    closed.push(receiver_id);
2258                }
2259            }
2260        }
2261    }
2262    for id in closed {
2263        dataflow.subscribe_channels.remove(id);
2264    }
2265    let (data_bytes, drop_token) = match data {
2266        None => (None, None),
2267        Some(DataMessage::SharedMemory {
2268            shared_memory_id,
2269            len,
2270            drop_token,
2271        }) => {
2272            let memory = ShmemConf::new()
2273                .os_id(shared_memory_id)
2274                .open()
2275                .wrap_err("failed to map shared memory output")?;
2276            let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2277            (data, Some(drop_token))
2278        }
2279        Some(DataMessage::Vec(v)) => (Some(v), None),
2280    };
2281    if let Some(token) = drop_token {
2282        // insert token into `pending_drop_tokens` even if there are no local subscribers
2283        dataflow
2284            .pending_drop_tokens
2285            .entry(token)
2286            .or_insert_with(|| DropTokenInformation {
2287                owner: node_id.clone(),
2288                pending_nodes: Default::default(),
2289            });
2290        // check if all local subscribers are finished with the token
2291        dataflow.check_drop_token(token, clock).await?;
2292    }
2293    Ok(data_bytes)
2294}
2295
2296fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2297    match &node.kind {
2298        CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2299        CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2300    }
2301}
2302
2303fn close_input(
2304    dataflow: &mut RunningDataflow,
2305    receiver_id: &NodeId,
2306    input_id: &DataId,
2307    clock: &HLC,
2308) {
2309    if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2310        if !open_inputs.remove(input_id) {
2311            return;
2312        }
2313    }
2314    if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2315        let _ = send_with_timestamp(
2316            channel,
2317            NodeEvent::InputClosed {
2318                id: input_id.clone(),
2319            },
2320            clock,
2321        );
2322
2323        if dataflow.open_inputs(receiver_id).is_empty() {
2324            let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2325        }
2326    }
2327}
2328
2329#[derive(Debug)]
2330pub struct RunningNode {
2331    pid: Option<ProcessId>,
2332    node_config: NodeConfig,
2333}
2334
2335#[derive(Debug)]
2336struct ProcessId(Option<u32>);
2337
2338impl ProcessId {
2339    pub fn new(process_id: u32) -> Self {
2340        Self(Some(process_id))
2341    }
2342
2343    pub fn mark_as_stopped(&mut self) {
2344        self.0 = None;
2345    }
2346
2347    pub fn kill(&mut self) -> bool {
2348        if let Some(pid) = self.0 {
2349            let pid = Pid::from(pid as usize);
2350            let mut system = sysinfo::System::new();
2351            system.refresh_process(pid);
2352
2353            if let Some(process) = system.process(pid) {
2354                process.kill();
2355                self.mark_as_stopped();
2356                return true;
2357            }
2358        }
2359
2360        false
2361    }
2362}
2363
2364impl Drop for ProcessId {
2365    fn drop(&mut self) {
2366        // kill the process if it's still running
2367        if let Some(pid) = self.0 {
2368            if self.kill() {
2369                warn!("process {pid} was killed on drop because it was still running")
2370            }
2371        }
2372    }
2373}
2374
2375pub struct RunningDataflow {
2376    id: Uuid,
2377    /// Local nodes that are not started yet
2378    pending_nodes: PendingNodes,
2379
2380    subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2381    drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2382    mappings: HashMap<OutputId, BTreeSet<InputId>>,
2383    timers: BTreeMap<Duration, BTreeSet<InputId>>,
2384    open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2385    running_nodes: BTreeMap<NodeId, RunningNode>,
2386
2387    /// List of all dynamic node IDs.
2388    ///
2389    /// We want to treat dynamic nodes differently in some cases, so we need
2390    /// to know which nodes are dynamic.
2391    dynamic_nodes: BTreeSet<NodeId>,
2392
2393    open_external_mappings: BTreeSet<OutputId>,
2394
2395    pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
2396
2397    /// Keep handles to all timer tasks of this dataflow to cancel them on drop.
2398    _timer_handles: Vec<futures::future::RemoteHandle<()>>,
2399    stop_sent: bool,
2400
2401    /// Used in `open_inputs`.
2402    ///
2403    /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
2404    empty_set: BTreeSet<DataId>,
2405
2406    /// Contains the node that caused the error for nodes that experienced a cascading error.
2407    cascading_error_causes: CascadingErrorCauses,
2408    grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
2409
2410    node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
2411
2412    publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
2413
2414    finished_tx: broadcast::Sender<()>,
2415
2416    publish_all_messages_to_zenoh: bool,
2417}
2418
2419impl RunningDataflow {
2420    fn new(
2421        dataflow_id: Uuid,
2422        daemon_id: DaemonId,
2423        dataflow_descriptor: &Descriptor,
2424    ) -> RunningDataflow {
2425        let (finished_tx, _) = broadcast::channel(1);
2426        Self {
2427            id: dataflow_id,
2428            pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
2429            subscribe_channels: HashMap::new(),
2430            drop_channels: HashMap::new(),
2431            mappings: HashMap::new(),
2432            timers: BTreeMap::new(),
2433            open_inputs: BTreeMap::new(),
2434            running_nodes: BTreeMap::new(),
2435            dynamic_nodes: BTreeSet::new(),
2436            open_external_mappings: Default::default(),
2437            pending_drop_tokens: HashMap::new(),
2438            _timer_handles: Vec::new(),
2439            stop_sent: false,
2440            empty_set: BTreeSet::new(),
2441            cascading_error_causes: Default::default(),
2442            grace_duration_kills: Default::default(),
2443            node_stderr_most_recent: BTreeMap::new(),
2444            publishers: Default::default(),
2445            finished_tx,
2446            publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
2447        }
2448    }
2449
2450    async fn start(
2451        &mut self,
2452        events_tx: &mpsc::Sender<Timestamped<Event>>,
2453        clock: &Arc<HLC>,
2454    ) -> eyre::Result<()> {
2455        for interval in self.timers.keys().copied() {
2456            let events_tx = events_tx.clone();
2457            let dataflow_id = self.id;
2458            let clock = clock.clone();
2459            let task = async move {
2460                let mut interval_stream = tokio::time::interval(interval);
2461                let hlc = HLC::default();
2462                loop {
2463                    interval_stream.tick().await;
2464
2465                    let span = tracing::span!(tracing::Level::TRACE, "tick");
2466                    let _ = span.enter();
2467
2468                    let mut parameters = BTreeMap::new();
2469                    parameters.insert(
2470                        "open_telemetry_context".to_string(),
2471                        #[cfg(feature = "telemetry")]
2472                        Parameter::String(serialize_context(&span.context())),
2473                        #[cfg(not(feature = "telemetry"))]
2474                        Parameter::String("".into()),
2475                    );
2476
2477                    let metadata = metadata::Metadata::from_parameters(
2478                        hlc.new_timestamp(),
2479                        empty_type_info(),
2480                        parameters,
2481                    );
2482
2483                    let event = Timestamped {
2484                        inner: DoraEvent::Timer {
2485                            dataflow_id,
2486                            interval,
2487                            metadata,
2488                        }
2489                        .into(),
2490                        timestamp: clock.new_timestamp(),
2491                    };
2492                    if events_tx.send(event).await.is_err() {
2493                        break;
2494                    }
2495                }
2496            };
2497            let (task, handle) = task.remote_handle();
2498            tokio::spawn(task);
2499            self._timer_handles.push(handle);
2500        }
2501
2502        Ok(())
2503    }
2504
2505    async fn stop_all(
2506        &mut self,
2507        coordinator_connection: &mut Option<TcpStream>,
2508        clock: &HLC,
2509        grace_duration: Option<Duration>,
2510        logger: &mut DataflowLogger<'_>,
2511    ) -> eyre::Result<()> {
2512        self.pending_nodes
2513            .handle_dataflow_stop(
2514                coordinator_connection,
2515                clock,
2516                &mut self.cascading_error_causes,
2517                &self.dynamic_nodes,
2518                logger,
2519            )
2520            .await?;
2521
2522        for (_node_id, channel) in self.subscribe_channels.drain() {
2523            let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
2524        }
2525
2526        let running_processes: Vec<_> = self
2527            .running_nodes
2528            .iter_mut()
2529            .map(|(id, n)| (id.clone(), n.pid.take()))
2530            .collect();
2531        let grace_duration_kills = self.grace_duration_kills.clone();
2532        tokio::spawn(async move {
2533            let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
2534            tokio::time::sleep(duration).await;
2535
2536            for (node, pid) in running_processes {
2537                if let Some(mut pid) = pid {
2538                    if pid.kill() {
2539                        grace_duration_kills.insert(node.clone());
2540                        warn!(
2541                            "{node} was killed due to not stopping within the {:#?} grace period",
2542                            duration
2543                        )
2544                    }
2545                }
2546            }
2547        });
2548        self.stop_sent = true;
2549        Ok(())
2550    }
2551
2552    fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2553        self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2554    }
2555
2556    async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2557        match self.pending_drop_tokens.entry(token) {
2558            std::collections::hash_map::Entry::Occupied(entry) => {
2559                if entry.get().pending_nodes.is_empty() {
2560                    let (drop_token, info) = entry.remove_entry();
2561                    let result = match self.drop_channels.get_mut(&info.owner) {
2562                        Some(channel) => send_with_timestamp(
2563                            channel,
2564                            NodeDropEvent::OutputDropped { drop_token },
2565                            clock,
2566                        )
2567                        .wrap_err("send failed"),
2568                        None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2569                    };
2570                    if let Err(err) = result.wrap_err_with(|| {
2571                        format!(
2572                            "failed to report drop token `{drop_token:?}` to owner `{}`",
2573                            &info.owner
2574                        )
2575                    }) {
2576                        tracing::warn!("{err:?}");
2577                    }
2578                }
2579            }
2580            std::collections::hash_map::Entry::Vacant(_) => {
2581                tracing::warn!("check_drop_token called with already closed token")
2582            }
2583        }
2584
2585        Ok(())
2586    }
2587
2588    fn output_publish_topic(&self, output_id: &OutputId) -> String {
2589        let network_id = "default";
2590        let dataflow_id = self.id;
2591        let OutputId(node_id, output_id) = output_id;
2592        format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
2593    }
2594}
2595
2596fn empty_type_info() -> ArrowTypeInfo {
2597    ArrowTypeInfo {
2598        data_type: DataType::Null,
2599        len: 0,
2600        null_count: 0,
2601        validity: None,
2602        offset: 0,
2603        buffer_offsets: Vec::new(),
2604        child_data: Vec::new(),
2605    }
2606}
2607
2608#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2609pub struct OutputId(NodeId, DataId);
2610type InputId = (NodeId, DataId);
2611
2612struct DropTokenInformation {
2613    /// The node that created the associated drop token.
2614    owner: NodeId,
2615    /// Contains the set of pending nodes that still have access to the input
2616    /// associated with a drop token.
2617    pending_nodes: BTreeSet<NodeId>,
2618}
2619
2620#[derive(Debug)]
2621pub enum Event {
2622    Node {
2623        dataflow_id: DataflowId,
2624        node_id: NodeId,
2625        event: DaemonNodeEvent,
2626    },
2627    Coordinator(CoordinatorEvent),
2628    Daemon(InterDaemonEvent),
2629    Dora(DoraEvent),
2630    DynamicNode(DynamicNodeEventWrapper),
2631    HeartbeatInterval,
2632    CtrlC,
2633    SecondCtrlC,
2634    DaemonError(eyre::Report),
2635    SpawnNodeResult {
2636        dataflow_id: DataflowId,
2637        node_id: NodeId,
2638        dynamic_node: bool,
2639        result: Result<RunningNode, NodeError>,
2640    },
2641    BuildDataflowResult {
2642        build_id: BuildId,
2643        session_id: SessionId,
2644        result: eyre::Result<BuildInfo>,
2645    },
2646    SpawnDataflowResult {
2647        dataflow_id: Uuid,
2648        result: eyre::Result<()>,
2649    },
2650    NodeStopped {
2651        dataflow_id: Uuid,
2652        node_id: NodeId,
2653    },
2654}
2655
2656impl From<DoraEvent> for Event {
2657    fn from(event: DoraEvent) -> Self {
2658        Event::Dora(event)
2659    }
2660}
2661
2662impl Event {
2663    pub fn kind(&self) -> &'static str {
2664        match self {
2665            Event::Node { .. } => "Node",
2666            Event::Coordinator(_) => "Coordinator",
2667            Event::Daemon(_) => "Daemon",
2668            Event::Dora(_) => "Dora",
2669            Event::DynamicNode(_) => "DynamicNode",
2670            Event::HeartbeatInterval => "HeartbeatInterval",
2671            Event::CtrlC => "CtrlC",
2672            Event::SecondCtrlC => "SecondCtrlC",
2673            Event::DaemonError(_) => "DaemonError",
2674            Event::SpawnNodeResult { .. } => "SpawnNodeResult",
2675            Event::BuildDataflowResult { .. } => "BuildDataflowResult",
2676            Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
2677            Event::NodeStopped { .. } => "NodeStopped",
2678        }
2679    }
2680}
2681
2682#[derive(Debug)]
2683pub enum DaemonNodeEvent {
2684    OutputsDone {
2685        reply_sender: oneshot::Sender<DaemonReply>,
2686    },
2687    Subscribe {
2688        event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2689        reply_sender: oneshot::Sender<DaemonReply>,
2690    },
2691    SubscribeDrop {
2692        event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2693        reply_sender: oneshot::Sender<DaemonReply>,
2694    },
2695    CloseOutputs {
2696        outputs: Vec<dora_core::config::DataId>,
2697        reply_sender: oneshot::Sender<DaemonReply>,
2698    },
2699    SendOut {
2700        output_id: DataId,
2701        metadata: metadata::Metadata,
2702        data: Option<DataMessage>,
2703    },
2704    ReportDrop {
2705        tokens: Vec<DropToken>,
2706    },
2707    EventStreamDropped {
2708        reply_sender: oneshot::Sender<DaemonReply>,
2709    },
2710}
2711
2712#[derive(Debug)]
2713pub enum DoraEvent {
2714    Timer {
2715        dataflow_id: DataflowId,
2716        interval: Duration,
2717        metadata: metadata::Metadata,
2718    },
2719    Logs {
2720        dataflow_id: DataflowId,
2721        output_id: OutputId,
2722        message: DataMessage,
2723        metadata: metadata::Metadata,
2724    },
2725    SpawnedNodeResult {
2726        dataflow_id: DataflowId,
2727        node_id: NodeId,
2728        dynamic_node: bool,
2729        exit_status: NodeExitStatus,
2730    },
2731}
2732
2733#[must_use]
2734enum RunStatus {
2735    Continue,
2736    Exit,
2737}
2738
2739fn send_with_timestamp<T>(
2740    sender: &UnboundedSender<Timestamped<T>>,
2741    event: T,
2742    clock: &HLC,
2743) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2744    sender.send(Timestamped {
2745        inner: event,
2746        timestamp: clock.new_timestamp(),
2747    })
2748}
2749
2750fn set_up_ctrlc_handler(
2751    clock: Arc<HLC>,
2752) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2753    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2754
2755    let mut ctrlc_sent = 0;
2756    ctrlc::set_handler(move || {
2757        let event = match ctrlc_sent {
2758            0 => Event::CtrlC,
2759            1 => Event::SecondCtrlC,
2760            _ => {
2761                tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2762                std::process::abort();
2763            }
2764        };
2765        if ctrlc_tx
2766            .blocking_send(Timestamped {
2767                inner: event,
2768                timestamp: clock.new_timestamp(),
2769            })
2770            .is_err()
2771        {
2772            tracing::error!("failed to report ctrl-c event to dora-coordinator");
2773        }
2774
2775        ctrlc_sent += 1;
2776    })
2777    .wrap_err("failed to set ctrl-c handler")?;
2778
2779    Ok(ctrlc_rx)
2780}
2781
2782#[derive(Debug, Default, Clone, PartialEq, Eq)]
2783pub struct CascadingErrorCauses {
2784    caused_by: BTreeMap<NodeId, NodeId>,
2785}
2786
2787impl CascadingErrorCauses {
2788    pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2789        self.caused_by.contains_key(node)
2790    }
2791
2792    /// Return the ID of the node that caused a cascading error for the given node, if any.
2793    pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2794        self.caused_by.get(node)
2795    }
2796
2797    pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2798        self.caused_by.entry(affected_node).or_insert(causing_node);
2799    }
2800}
2801
2802fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2803    n.operators
2804        .iter()
2805        .flat_map(|operator| {
2806            operator.config.inputs.iter().map(|(input_id, mapping)| {
2807                (
2808                    DataId::from(format!("{}/{input_id}", operator.id)),
2809                    mapping.clone(),
2810                )
2811            })
2812        })
2813        .collect()
2814}
2815
2816fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2817    n.operators
2818        .iter()
2819        .flat_map(|operator| {
2820            operator
2821                .config
2822                .outputs
2823                .iter()
2824                .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2825        })
2826        .collect()
2827}
2828
2829trait CoreNodeKindExt {
2830    fn run_config(&self) -> NodeRunConfig;
2831    fn dynamic(&self) -> bool;
2832}
2833
2834impl CoreNodeKindExt for CoreNodeKind {
2835    fn run_config(&self) -> NodeRunConfig {
2836        match self {
2837            CoreNodeKind::Runtime(n) => NodeRunConfig {
2838                inputs: runtime_node_inputs(n),
2839                outputs: runtime_node_outputs(n),
2840            },
2841            CoreNodeKind::Custom(n) => n.run_config.clone(),
2842        }
2843    }
2844
2845    fn dynamic(&self) -> bool {
2846        match self {
2847            CoreNodeKind::Runtime(_n) => false,
2848            CoreNodeKind::Custom(n) => {
2849                matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
2850            }
2851        }
2852    }
2853}