1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5 config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
6 descriptor::{
7 read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
8 DYNAMIC_SOURCE,
9 },
10 topics::LOCALHOST,
11 uhlc::{self, HLC},
12};
13use dora_message::{
14 common::{
15 DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
16 },
17 coordinator_to_cli::DataflowResult,
18 coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
19 daemon_to_coordinator::{
20 CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
21 },
22 daemon_to_daemon::InterDaemonEvent,
23 daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
24 metadata::{self, ArrowTypeInfo},
25 node_to_daemon::{DynamicNodeEvent, Timestamped},
26 DataflowId,
27};
28use dora_node_api::{arrow::datatypes::DataType, Parameter};
29use eyre::{bail, eyre, Context, ContextCompat, Result};
30use futures::{future, stream, FutureExt, TryFutureExt};
31use futures_concurrency::stream::Merge;
32use local_listener::DynamicNodeEventWrapper;
33use log::{DaemonLogger, DataflowLogger, Logger};
34use pending::PendingNodes;
35use shared_memory_server::ShmemConf;
36use socket_stream_utils::socket_stream_send;
37use std::{
38 collections::{BTreeMap, BTreeSet, HashMap},
39 net::SocketAddr,
40 path::{Path, PathBuf},
41 pin::pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use sysinfo::Pid;
46use tokio::{
47 fs::File,
48 io::AsyncReadExt,
49 net::TcpStream,
50 sync::{
51 broadcast,
52 mpsc::{self, UnboundedSender},
53 oneshot::{self, Sender},
54 },
55};
56use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
57use tracing::{error, warn};
58use uuid::{NoContext, Timestamp, Uuid};
59
60mod coordinator;
61mod local_listener;
62mod log;
63mod node_communication;
64mod pending;
65mod socket_stream_utils;
66mod spawn;
67
68#[cfg(feature = "telemetry")]
69use dora_tracing::telemetry::serialize_context;
70#[cfg(feature = "telemetry")]
71use tracing_opentelemetry::OpenTelemetrySpanExt;
72
73use crate::pending::DataflowStatus;
74
75const STDERR_LOG_LINES: usize = 10;
76
77pub struct Daemon {
78 running: HashMap<DataflowId, RunningDataflow>,
79 working_dir: HashMap<DataflowId, PathBuf>,
80
81 events_tx: mpsc::Sender<Timestamped<Event>>,
82
83 coordinator_connection: Option<TcpStream>,
84 last_coordinator_heartbeat: Instant,
85 daemon_id: DaemonId,
86
87 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
89 exit_when_all_finished: bool,
91 dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
93
94 clock: Arc<uhlc::HLC>,
95
96 zenoh_session: zenoh::Session,
97 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
98
99 logger: DaemonLogger,
100}
101
102type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
103
104impl Daemon {
105 pub async fn run(
106 coordinator_addr: SocketAddr,
107 machine_id: Option<String>,
108 local_listen_port: u16,
109 ) -> eyre::Result<()> {
110 let clock = Arc::new(HLC::default());
111
112 let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
113 let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
114 let (daemon_id, incoming_events) = {
115 let incoming_events = set_up_event_stream(
116 coordinator_addr,
117 &machine_id,
118 &clock,
119 remote_daemon_events_rx,
120 local_listen_port,
121 );
122
123 let ctrl_c = pin!(ctrlc_events.recv());
125 match futures::future::select(ctrl_c, pin!(incoming_events)).await {
126 future::Either::Left((_ctrl_c, _)) => {
127 tracing::info!("received ctrl-c signal -> stopping daemon");
128 return Ok(());
129 }
130 future::Either::Right((events, _)) => events?,
131 }
132 };
133 Self::run_general(
134 (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
135 Some(coordinator_addr),
136 daemon_id,
137 None,
138 clock,
139 Some(remote_daemon_events_tx),
140 )
141 .await
142 .map(|_| ())
143 }
144
145 pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
146 let working_dir = dataflow_path
147 .canonicalize()
148 .context("failed to canonicalize dataflow path")?
149 .parent()
150 .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
151 .to_owned();
152
153 let descriptor = read_as_descriptor(dataflow_path).await?;
154 descriptor.check(&working_dir)?;
155 let nodes = descriptor.resolve_aliases_and_set_defaults()?;
156
157 let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
158 let spawn_command = SpawnDataflowNodes {
159 dataflow_id,
160 working_dir,
161 spawn_nodes: nodes.keys().cloned().collect(),
162 nodes,
163 dataflow_descriptor: descriptor,
164 uv,
165 };
166
167 let clock = Arc::new(HLC::default());
168
169 let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
170
171 let exit_when_done = spawn_command
172 .nodes
173 .values()
174 .map(|n| (spawn_command.dataflow_id, n.id.clone()))
175 .collect();
176 let (reply_tx, reply_rx) = oneshot::channel();
177 let timestamp = clock.new_timestamp();
178 let coordinator_events = stream::once(async move {
179 Timestamped {
180 inner: Event::Coordinator(CoordinatorEvent {
181 event: DaemonCoordinatorEvent::Spawn(spawn_command),
182 reply_tx,
183 }),
184 timestamp,
185 }
186 });
187 let events = (coordinator_events, ctrlc_events).merge();
188 let run_result = Self::run_general(
189 Box::pin(events),
190 None,
191 DaemonId::new(None),
192 Some(exit_when_done),
193 clock.clone(),
194 None,
195 );
196
197 let spawn_result = reply_rx
198 .map_err(|err| eyre!("failed to receive spawn result: {err}"))
199 .and_then(|r| async {
200 match r {
201 Some(DaemonCoordinatorReply::SpawnResult(result)) => {
202 result.map_err(|err| eyre!(err))
203 }
204 _ => Err(eyre!("unexpected spawn reply")),
205 }
206 });
207
208 let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
209
210 Ok(DataflowResult {
211 uuid: dataflow_id,
212 timestamp: clock.new_timestamp(),
213 node_results: dataflow_results
214 .remove(&dataflow_id)
215 .context("no node results for dataflow_id")?,
216 })
217 }
218
219 async fn run_general(
220 external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
221 coordinator_addr: Option<SocketAddr>,
222 daemon_id: DaemonId,
223 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
224 clock: Arc<HLC>,
225 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
226 ) -> eyre::Result<DaemonRunResult> {
227 let coordinator_connection = match coordinator_addr {
228 Some(addr) => {
229 let stream = TcpStream::connect(addr)
230 .await
231 .wrap_err("failed to connect to dora-coordinator")?;
232 stream
233 .set_nodelay(true)
234 .wrap_err("failed to set TCP_NODELAY")?;
235 Some(stream)
236 }
237 None => None,
238 };
239
240 let logger_coordinator_connection = match coordinator_addr {
242 Some(addr) => {
243 let stream = TcpStream::connect(addr)
244 .await
245 .wrap_err("failed to connect log to dora-coordinator")?;
246 stream
247 .set_nodelay(true)
248 .wrap_err("failed to set TCP_NODELAY")?;
249 Some(stream)
250 }
251 None => None,
252 };
253
254 let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
255 Ok(path) => {
256 let zenoh_config = zenoh::Config::from_file(&path)
257 .map_err(|e| eyre!(e))
258 .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
259 zenoh::open(zenoh_config)
260 .await
261 .map_err(|e| eyre!(e))
262 .context("failed to open zenoh session")?
263 }
264 Err(std::env::VarError::NotPresent) => {
265 let mut zenoh_config = zenoh::Config::default();
266
267 if let Some(addr) = coordinator_addr {
268 if cfg!(not(target_os = "windows")) {
271 zenoh_config
272 .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
273 .unwrap();
274 }
275
276 zenoh_config
277 .insert_json5(
278 "connect/endpoints",
279 &format!(
280 r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
281 addr.ip()
282 ),
283 )
284 .unwrap();
285 zenoh_config
286 .insert_json5(
287 "listen/endpoints",
288 r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
289 )
290 .unwrap();
291 if cfg!(target_os = "macos") {
292 warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
293 zenoh_config
294 .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
295 .unwrap();
296 }
297 };
298 if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
299 zenoh_session
300 } else {
301 warn!(
302 "failed to open zenoh session, retrying with default config + coordinator"
303 );
304 let mut zenoh_config = zenoh::Config::default();
305 if cfg!(not(target_os = "windows")) {
308 zenoh_config
309 .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
310 .unwrap();
311 }
312
313 if let Some(addr) = coordinator_addr {
314 zenoh_config
315 .insert_json5(
316 "connect/endpoints",
317 &format!(
318 r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
319 addr.ip()
320 ),
321 )
322 .unwrap();
323 if cfg!(target_os = "macos") {
324 warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
325 zenoh_config
326 .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
327 .unwrap();
328 }
329 }
330 if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
331 zenoh_session
332 } else {
333 warn!("failed to open zenoh session, retrying with default config");
334 let zenoh_config = zenoh::Config::default();
335 zenoh::open(zenoh_config)
336 .await
337 .map_err(|e| eyre!(e))
338 .context("failed to open zenoh session")?
339 }
340 }
341 }
342 Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
343 "{} env variable is not valid unicode",
344 zenoh::Config::DEFAULT_CONFIG_PATH_ENV
345 ),
346 };
347 let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
348 let daemon = Self {
349 logger: Logger {
350 coordinator_connection: logger_coordinator_connection,
351 daemon_id: daemon_id.clone(),
352 clock: clock.clone(),
353 }
354 .for_daemon(daemon_id.clone()),
355 running: HashMap::new(),
356 working_dir: HashMap::new(),
357 events_tx: dora_events_tx,
358 coordinator_connection,
359 last_coordinator_heartbeat: Instant::now(),
360 daemon_id,
361 exit_when_done,
362 exit_when_all_finished: false,
363 dataflow_node_results: BTreeMap::new(),
364 clock,
365 zenoh_session,
366 remote_daemon_events_tx,
367 };
368
369 let dora_events = ReceiverStream::new(dora_events_rx);
370 let watchdog_clock = daemon.clock.clone();
371 let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
372 Duration::from_secs(5),
373 ))
374 .map(|_| Timestamped {
375 inner: Event::HeartbeatInterval,
376 timestamp: watchdog_clock.new_timestamp(),
377 });
378 let events = (external_events, dora_events, watchdog_interval).merge();
379 daemon.run_inner(events).await
380 }
381
382 #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
383 async fn run_inner(
384 mut self,
385 incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
386 ) -> eyre::Result<DaemonRunResult> {
387 let mut events = incoming_events;
388
389 while let Some(event) = events.next().await {
390 let Timestamped { inner, timestamp } = event;
391 if let Err(err) = self.clock.update_with_timestamp(×tamp) {
392 tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
393 }
394
395 match inner {
396 Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
397 let status = self.handle_coordinator_event(event, reply_tx).await?;
398
399 match status {
400 RunStatus::Continue => {}
401 RunStatus::Exit => break,
402 }
403 }
404 Event::Daemon(event) => {
405 self.handle_inter_daemon_event(event).await?;
406 }
407 Event::Node {
408 dataflow_id: dataflow,
409 node_id,
410 event,
411 } => self.handle_node_event(event, dataflow, node_id).await?,
412 Event::Dora(event) => match self.handle_dora_event(event).await? {
413 RunStatus::Continue => {}
414 RunStatus::Exit => break,
415 },
416 Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
417 Event::HeartbeatInterval => {
418 if let Some(connection) = &mut self.coordinator_connection {
419 let msg = serde_json::to_vec(&Timestamped {
420 inner: CoordinatorRequest::Event {
421 daemon_id: self.daemon_id.clone(),
422 event: DaemonEvent::Heartbeat,
423 },
424 timestamp: self.clock.new_timestamp(),
425 })?;
426 socket_stream_send(connection, &msg)
427 .await
428 .wrap_err("failed to send watchdog message to dora-coordinator")?;
429
430 if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
431 bail!("lost connection to coordinator")
432 }
433 }
434 }
435 Event::CtrlC => {
436 tracing::info!("received ctrlc signal -> stopping all dataflows");
437 for dataflow in self.running.values_mut() {
438 let mut logger = self.logger.for_dataflow(dataflow.id);
439 dataflow
440 .stop_all(
441 &mut self.coordinator_connection,
442 &self.clock,
443 None,
444 &mut logger,
445 )
446 .await?;
447 }
448 self.exit_when_all_finished = true;
449 if self.running.is_empty() {
450 break;
451 }
452 }
453 Event::SecondCtrlC => {
454 tracing::warn!("received second ctrlc signal -> exit immediately");
455 bail!("received second ctrl-c signal");
456 }
457 Event::DaemonError(err) => {
458 tracing::error!("Daemon error: {err:?}");
459 }
460 }
461 }
462
463 if let Some(mut connection) = self.coordinator_connection.take() {
464 let msg = serde_json::to_vec(&Timestamped {
465 inner: CoordinatorRequest::Event {
466 daemon_id: self.daemon_id.clone(),
467 event: DaemonEvent::Exit,
468 },
469 timestamp: self.clock.new_timestamp(),
470 })?;
471 socket_stream_send(&mut connection, &msg)
472 .await
473 .wrap_err("failed to send Exit message to dora-coordinator")?;
474 }
475
476 Ok(self.dataflow_node_results)
477 }
478
479 async fn handle_coordinator_event(
480 &mut self,
481 event: DaemonCoordinatorEvent,
482 reply_tx: Sender<Option<DaemonCoordinatorReply>>,
483 ) -> eyre::Result<RunStatus> {
484 let status = match event {
485 DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
486 dataflow_id,
487 working_dir,
488 nodes,
489 dataflow_descriptor,
490 spawn_nodes,
491 uv,
492 }) => {
493 match dataflow_descriptor.communication.remote {
494 dora_core::config::RemoteCommunicationConfig::Tcp => {}
495 }
496
497 let working_dir = if working_dir.exists() {
499 working_dir
500 } else {
501 std::env::current_dir().wrap_err("failed to get current working dir")?
502 };
503
504 let result = self
505 .spawn_dataflow(
506 dataflow_id,
507 working_dir,
508 nodes,
509 dataflow_descriptor,
510 spawn_nodes,
511 uv,
512 )
513 .await;
514 if let Err(err) = &result {
515 tracing::error!("{err:?}");
516 }
517 let reply =
518 DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
519 let _ = reply_tx.send(Some(reply)).map_err(|_| {
520 error!("could not send `SpawnResult` reply from daemon to coordinator")
521 });
522 RunStatus::Continue
523 }
524 DaemonCoordinatorEvent::AllNodesReady {
525 dataflow_id,
526 exited_before_subscribe,
527 } => {
528 let mut logger = self.logger.for_dataflow(dataflow_id);
529 logger.log(LogLevel::Debug, None,
530 Some("daemon".into()),
531 format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
532 )).await;
533 match self.running.get_mut(&dataflow_id) {
534 Some(dataflow) => {
535 let ready = exited_before_subscribe.is_empty();
536 dataflow
537 .pending_nodes
538 .handle_external_all_nodes_ready(
539 exited_before_subscribe,
540 &mut dataflow.cascading_error_causes,
541 )
542 .await?;
543 if ready {
544 logger.log(LogLevel::Info, None,
545 Some("daemon".into()),
546 "coordinator reported that all nodes are ready, starting dataflow",
547 ).await;
548 dataflow.start(&self.events_tx, &self.clock).await?;
549 }
550 }
551 None => {
552 tracing::warn!(
553 "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
554 );
555 }
556 }
557 let _ = reply_tx.send(None).map_err(|_| {
558 error!("could not send `AllNodesReady` reply from daemon to coordinator")
559 });
560 RunStatus::Continue
561 }
562 DaemonCoordinatorEvent::Logs {
563 dataflow_id,
564 node_id,
565 } => {
566 match self.working_dir.get(&dataflow_id) {
567 Some(working_dir) => {
568 let working_dir = working_dir.clone();
569 tokio::spawn(async move {
570 let logs = async {
571 let mut file =
572 File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
573 .await
574 .wrap_err(format!(
575 "Could not open log file: {:#?}",
576 log::log_path(&working_dir, &dataflow_id, &node_id)
577 ))?;
578
579 let mut contents = vec![];
580 file.read_to_end(&mut contents)
581 .await
582 .wrap_err("Could not read content of log file")?;
583 Result::<Vec<u8>, eyre::Report>::Ok(contents)
584 }
585 .await
586 .map_err(|err| format!("{err:?}"));
587 let _ = reply_tx
588 .send(Some(DaemonCoordinatorReply::Logs(logs)))
589 .map_err(|_| {
590 error!("could not send logs reply from daemon to coordinator")
591 });
592 });
593 }
594 None => {
595 tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
596 let _ = reply_tx.send(None).map_err(|_| {
597 error!(
598 "could not send `AllNodesReady` reply from daemon to coordinator"
599 )
600 });
601 }
602 }
603 RunStatus::Continue
604 }
605 DaemonCoordinatorEvent::ReloadDataflow {
606 dataflow_id,
607 node_id,
608 operator_id,
609 } => {
610 let result = self.send_reload(dataflow_id, node_id, operator_id).await;
611 let reply =
612 DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
613 let _ = reply_tx
614 .send(Some(reply))
615 .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
616 RunStatus::Continue
617 }
618 DaemonCoordinatorEvent::StopDataflow {
619 dataflow_id,
620 grace_duration,
621 } => {
622 let mut logger = self.logger.for_dataflow(dataflow_id);
623 let dataflow = self
624 .running
625 .get_mut(&dataflow_id)
626 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
627 let (reply, future) = match dataflow {
628 Ok(dataflow) => {
629 let future = dataflow.stop_all(
630 &mut self.coordinator_connection,
631 &self.clock,
632 grace_duration,
633 &mut logger,
634 );
635 (Ok(()), Some(future))
636 }
637 Err(err) => (Err(err.to_string()), None),
638 };
639
640 let _ = reply_tx
641 .send(Some(DaemonCoordinatorReply::StopResult(reply)))
642 .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
643
644 if let Some(future) = future {
645 future.await?;
646 }
647
648 RunStatus::Continue
649 }
650 DaemonCoordinatorEvent::Destroy => {
651 tracing::info!("received destroy command -> exiting");
652 let (notify_tx, notify_rx) = oneshot::channel();
653 let reply = DaemonCoordinatorReply::DestroyResult {
654 result: Ok(()),
655 notify: Some(notify_tx),
656 };
657 let _ = reply_tx
658 .send(Some(reply))
659 .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
660 if notify_rx.await.is_err() {
662 tracing::warn!("no confirmation received for DestroyReply");
663 }
664 RunStatus::Exit
665 }
666 DaemonCoordinatorEvent::Heartbeat => {
667 self.last_coordinator_heartbeat = Instant::now();
668 let _ = reply_tx.send(None);
669 RunStatus::Continue
670 }
671 };
672 Ok(status)
673 }
674
675 async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
676 match event {
677 InterDaemonEvent::Output {
678 dataflow_id,
679 node_id,
680 output_id,
681 metadata,
682 data,
683 } => {
684 let inner = async {
685 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
686 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
687 })?;
688 send_output_to_local_receivers(
689 node_id.clone(),
690 output_id.clone(),
691 dataflow,
692 &metadata,
693 data.map(DataMessage::Vec),
694 &self.clock,
695 )
696 .await?;
697 Result::<_, eyre::Report>::Ok(())
698 };
699 if let Err(err) = inner
700 .await
701 .wrap_err("failed to forward remote output to local receivers")
702 {
703 let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
704 logger
705 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
706 .await;
707 }
708 Ok(())
709 }
710 InterDaemonEvent::OutputClosed {
711 dataflow_id,
712 node_id,
713 output_id,
714 } => {
715 let output_id = OutputId(node_id.clone(), output_id);
716 let mut logger = self
717 .logger
718 .for_dataflow(dataflow_id)
719 .for_node(node_id.clone());
720 logger
721 .log(
722 LogLevel::Debug,
723 Some("daemon".into()),
724 format!("received OutputClosed event for output {output_id:?}"),
725 )
726 .await;
727
728 let inner = async {
729 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
730 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
731 })?;
732
733 if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
734 for (receiver_id, input_id) in &inputs {
735 close_input(dataflow, receiver_id, input_id, &self.clock);
736 }
737 }
738 Result::<(), eyre::Report>::Ok(())
739 };
740 if let Err(err) = inner
741 .await
742 .wrap_err("failed to handle InputsClosed event sent by coordinator")
743 {
744 logger
745 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
746 .await;
747 }
748 Ok(())
749 }
750 }
751 }
752
753 async fn spawn_dataflow(
754 &mut self,
755 dataflow_id: uuid::Uuid,
756 working_dir: PathBuf,
757 nodes: BTreeMap<NodeId, ResolvedNode>,
758 dataflow_descriptor: Descriptor,
759 spawn_nodes: BTreeSet<NodeId>,
760 uv: bool,
761 ) -> eyre::Result<()> {
762 let mut logger = self.logger.for_dataflow(dataflow_id);
763 let dataflow =
764 RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
765 let dataflow = match self.running.entry(dataflow_id) {
766 std::collections::hash_map::Entry::Vacant(entry) => {
767 self.working_dir.insert(dataflow_id, working_dir.clone());
768 entry.insert(dataflow)
769 }
770 std::collections::hash_map::Entry::Occupied(_) => {
771 bail!("there is already a running dataflow with ID `{dataflow_id}`")
772 }
773 };
774
775 let mut stopped = Vec::new();
776
777 for node in nodes.values() {
779 let local = spawn_nodes.contains(&node.id);
780
781 let inputs = node_inputs(node);
782 for (input_id, input) in inputs {
783 if local {
784 dataflow
785 .open_inputs
786 .entry(node.id.clone())
787 .or_default()
788 .insert(input_id.clone());
789 match input.mapping {
790 InputMapping::User(mapping) => {
791 dataflow
792 .mappings
793 .entry(OutputId(mapping.source, mapping.output))
794 .or_default()
795 .insert((node.id.clone(), input_id));
796 }
797 InputMapping::Timer { interval } => {
798 dataflow
799 .timers
800 .entry(interval)
801 .or_default()
802 .insert((node.id.clone(), input_id));
803 }
804 }
805 } else if let InputMapping::User(mapping) = input.mapping {
806 dataflow
807 .open_external_mappings
808 .insert(OutputId(mapping.source, mapping.output));
809 }
810 }
811 }
812
813 for node in nodes.into_values() {
815 let mut logger = logger.reborrow().for_node(node.id.clone());
816 let local = spawn_nodes.contains(&node.id);
817 if local {
818 if node.kind.dynamic() {
819 dataflow.dynamic_nodes.insert(node.id.clone());
820 } else {
821 dataflow.pending_nodes.insert(node.id.clone());
822 }
823
824 let node_id = node.id.clone();
825 let node_stderr_most_recent = dataflow
826 .node_stderr_most_recent
827 .entry(node.id.clone())
828 .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
829 .clone();
830 logger
831 .log(LogLevel::Info, Some("daemon".into()), "spawning")
832 .await;
833 match spawn::spawn_node(
834 dataflow_id,
835 &working_dir,
836 node,
837 self.events_tx.clone(),
838 dataflow_descriptor.clone(),
839 self.clock.clone(),
840 node_stderr_most_recent,
841 uv,
842 &mut logger,
843 )
844 .await
845 .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
846 {
847 Ok(running_node) => {
848 dataflow.running_nodes.insert(node_id, running_node);
849 }
850 Err(err) => {
851 logger
852 .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
853 .await;
854 self.dataflow_node_results
855 .entry(dataflow_id)
856 .or_default()
857 .insert(
858 node_id.clone(),
859 Err(NodeError {
860 timestamp: self.clock.new_timestamp(),
861 cause: NodeErrorCause::Other {
862 stderr: format!("spawn failed: {err:?}"),
863 },
864 exit_status: NodeExitStatus::Unknown,
865 }),
866 );
867 stopped.push(node_id.clone());
868 }
869 }
870 } else {
871 dataflow.pending_nodes.set_external_nodes(true);
873
874 for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
876 let tx = self
877 .remote_daemon_events_tx
878 .clone()
879 .wrap_err("no remote_daemon_events_tx channel")?;
880 let mut finished_rx = dataflow.finished_tx.subscribe();
881 let subscribe_topic = dataflow.output_publish_topic(output_id);
882 tracing::debug!("declaring subscriber on {subscribe_topic}");
883 let subscriber = self
884 .zenoh_session
885 .declare_subscriber(subscribe_topic)
886 .await
887 .map_err(|e| eyre!(e))
888 .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
889 tokio::spawn(async move {
890 let mut finished = pin!(finished_rx.recv());
891 loop {
892 let finished_or_next =
893 futures::future::select(finished, subscriber.recv_async());
894 match finished_or_next.await {
895 future::Either::Left((finished, _)) => {
896 match finished {
897 Err(broadcast::error::RecvError::Closed) => {
898 tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
899 break;
900 }
901 other => {
902 tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
903 break;
904 }
905 }
906 }
907 future::Either::Right((sample, f)) => {
908 finished = f;
909 let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
910 Timestamped::deserialize_inter_daemon_event(
911 &s.payload().to_bytes(),
912 )
913 });
914 if tx.send_async(event).await.is_err() {
915 break;
917 }
918 }
919 }
920 }
921 });
922 }
923 }
924 }
925 for node_id in stopped {
926 self.handle_node_stop(dataflow_id, &node_id).await?;
927 }
928
929 Ok(())
930 }
931
932 async fn handle_dynamic_node_event(
933 &mut self,
934 event: DynamicNodeEventWrapper,
935 ) -> eyre::Result<()> {
936 match event {
937 DynamicNodeEventWrapper {
938 event: DynamicNodeEvent::NodeConfig { node_id },
939 reply_tx,
940 } => {
941 let number_node_id = self
942 .running
943 .iter()
944 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
945 .count();
946
947 let node_config = match number_node_id {
948 2.. => Err(format!(
949 "multiple dataflows contains dynamic node id {node_id}. \
950 Please only have one running dataflow with the specified \
951 node id if you want to use dynamic node",
952 )),
953 1 => self
954 .running
955 .iter()
956 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
957 .map(|(id, dataflow)| -> Result<NodeConfig> {
958 let node_config = dataflow
959 .running_nodes
960 .get(&node_id)
961 .context("no node with ID `{node_id}` within the given dataflow")?
962 .node_config
963 .clone();
964 if !node_config.dynamic {
965 bail!("node with ID `{node_id}` in {id} is not dynamic");
966 }
967 Ok(node_config)
968 })
969 .next()
970 .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
971 .and_then(|r| r)
972 .map_err(|err| {
973 format!(
974 "failed to get dynamic node config within given dataflow: {err}"
975 )
976 }),
977 0 => Err("no node with ID `{node_id}`".to_string()),
978 };
979
980 let reply = DaemonReply::NodeConfig {
981 result: node_config,
982 };
983 let _ = reply_tx.send(Some(reply)).map_err(|_| {
984 error!("could not send node info reply from daemon to coordinator")
985 });
986 Ok(())
987 }
988 }
989 }
990
991 async fn handle_node_event(
992 &mut self,
993 event: DaemonNodeEvent,
994 dataflow_id: DataflowId,
995 node_id: NodeId,
996 ) -> eyre::Result<()> {
997 match event {
998 DaemonNodeEvent::Subscribe {
999 event_sender,
1000 reply_sender,
1001 } => {
1002 let mut logger = self.logger.for_dataflow(dataflow_id);
1003 logger
1004 .log(
1005 LogLevel::Info,
1006 Some(node_id.clone()),
1007 Some("daemon".into()),
1008 "node is ready",
1009 )
1010 .await;
1011
1012 let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1013 format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1014 });
1015
1016 match dataflow {
1017 Err(err) => {
1018 let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1019 }
1020 Ok(dataflow) => {
1021 Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1022
1023 let status = dataflow
1024 .pending_nodes
1025 .handle_node_subscription(
1026 node_id.clone(),
1027 reply_sender,
1028 &mut self.coordinator_connection,
1029 &self.clock,
1030 &mut dataflow.cascading_error_causes,
1031 &mut logger,
1032 )
1033 .await?;
1034 match status {
1035 DataflowStatus::AllNodesReady => {
1036 logger
1037 .log(
1038 LogLevel::Info,
1039 None,
1040 Some("daemon".into()),
1041 "all nodes are ready, starting dataflow",
1042 )
1043 .await;
1044 dataflow.start(&self.events_tx, &self.clock).await?;
1045 }
1046 DataflowStatus::Pending => {}
1047 }
1048 }
1049 }
1050 }
1051 DaemonNodeEvent::SubscribeDrop {
1052 event_sender,
1053 reply_sender,
1054 } => {
1055 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1056 format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1057 });
1058 let result = match dataflow {
1059 Ok(dataflow) => {
1060 dataflow.drop_channels.insert(node_id, event_sender);
1061 Ok(())
1062 }
1063 Err(err) => Err(err.to_string()),
1064 };
1065 let _ = reply_sender.send(DaemonReply::Result(result));
1066 }
1067 DaemonNodeEvent::CloseOutputs {
1068 outputs,
1069 reply_sender,
1070 } => {
1071 let inner = async {
1073 self.send_output_closed_events(dataflow_id, node_id, outputs)
1074 .await
1075 };
1076
1077 let reply = inner.await.map_err(|err| format!("{err:?}"));
1078 let _ = reply_sender.send(DaemonReply::Result(reply));
1079 }
1080 DaemonNodeEvent::OutputsDone { reply_sender } => {
1081 let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1082
1083 let _ = reply_sender.send(DaemonReply::Result(
1084 result.map_err(|err| format!("{err:?}")),
1085 ));
1086 }
1087 DaemonNodeEvent::SendOut {
1088 output_id,
1089 metadata,
1090 data,
1091 } => self
1092 .send_out(dataflow_id, node_id, output_id, metadata, data)
1093 .await
1094 .context("failed to send out")?,
1095 DaemonNodeEvent::ReportDrop { tokens } => {
1096 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1097 format!(
1098 "failed to get handle drop tokens: \
1099 no running dataflow with ID `{dataflow_id}`"
1100 )
1101 });
1102
1103 match dataflow {
1104 Ok(dataflow) => {
1105 for token in tokens {
1106 match dataflow.pending_drop_tokens.get_mut(&token) {
1107 Some(info) => {
1108 if info.pending_nodes.remove(&node_id) {
1109 dataflow.check_drop_token(token, &self.clock).await?;
1110 } else {
1111 tracing::warn!(
1112 "node `{node_id}` is not pending for drop token `{token:?}`"
1113 );
1114 }
1115 }
1116 None => tracing::warn!("unknown drop token `{token:?}`"),
1117 }
1118 }
1119 }
1120 Err(err) => tracing::warn!("{err:?}"),
1121 }
1122 }
1123 DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1124 let inner = async {
1125 let dataflow = self
1126 .running
1127 .get_mut(&dataflow_id)
1128 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1129 dataflow.subscribe_channels.remove(&node_id);
1130 Result::<_, eyre::Error>::Ok(())
1131 };
1132
1133 let reply = inner.await.map_err(|err| format!("{err:?}"));
1134 let _ = reply_sender.send(DaemonReply::Result(reply));
1135 }
1136 }
1137 Ok(())
1138 }
1139
1140 async fn send_reload(
1141 &mut self,
1142 dataflow_id: Uuid,
1143 node_id: NodeId,
1144 operator_id: Option<OperatorId>,
1145 ) -> Result<(), eyre::ErrReport> {
1146 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1147 format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1148 })?;
1149 if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1150 match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1151 Ok(()) => {}
1152 Err(_) => {
1153 dataflow.subscribe_channels.remove(&node_id);
1154 }
1155 }
1156 }
1157 Ok(())
1158 }
1159
1160 async fn send_out(
1161 &mut self,
1162 dataflow_id: Uuid,
1163 node_id: NodeId,
1164 output_id: DataId,
1165 metadata: dora_message::metadata::Metadata,
1166 data: Option<DataMessage>,
1167 ) -> Result<(), eyre::ErrReport> {
1168 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1169 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1170 })?;
1171 let data_bytes = send_output_to_local_receivers(
1172 node_id.clone(),
1173 output_id.clone(),
1174 dataflow,
1175 &metadata,
1176 data,
1177 &self.clock,
1178 )
1179 .await?;
1180
1181 let output_id = OutputId(node_id, output_id);
1182 let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1183 || dataflow.publish_all_messages_to_zenoh;
1184 if remote_receivers {
1185 let event = InterDaemonEvent::Output {
1186 dataflow_id,
1187 node_id: output_id.0.clone(),
1188 output_id: output_id.1.clone(),
1189 metadata,
1190 data: data_bytes,
1191 };
1192 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1193 .await?;
1194 }
1195
1196 Ok(())
1197 }
1198
1199 async fn send_to_remote_receivers(
1200 &mut self,
1201 dataflow_id: Uuid,
1202 output_id: &OutputId,
1203 event: InterDaemonEvent,
1204 ) -> Result<(), eyre::Error> {
1205 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1206 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1207 })?;
1208
1209 let publisher = match dataflow.publishers.get(output_id) {
1211 Some(publisher) => publisher,
1212 None => {
1213 let publish_topic = dataflow.output_publish_topic(output_id);
1214 tracing::debug!("declaring publisher on {publish_topic}");
1215 let publisher = self
1216 .zenoh_session
1217 .declare_publisher(publish_topic)
1218 .await
1219 .map_err(|e| eyre!(e))
1220 .context("failed to create zenoh publisher")?;
1221 dataflow.publishers.insert(output_id.clone(), publisher);
1222 dataflow.publishers.get(output_id).unwrap()
1223 }
1224 };
1225
1226 let serialized_event = Timestamped {
1227 inner: event,
1228 timestamp: self.clock.new_timestamp(),
1229 }
1230 .serialize();
1231 publisher
1232 .put(serialized_event)
1233 .await
1234 .map_err(|e| eyre!(e))
1235 .context("zenoh put failed")?;
1236 Ok(())
1237 }
1238
1239 async fn send_output_closed_events(
1240 &mut self,
1241 dataflow_id: DataflowId,
1242 node_id: NodeId,
1243 outputs: Vec<DataId>,
1244 ) -> eyre::Result<()> {
1245 let dataflow = self
1246 .running
1247 .get_mut(&dataflow_id)
1248 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1249 let local_node_inputs: BTreeSet<_> = dataflow
1250 .mappings
1251 .iter()
1252 .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1253 .flat_map(|(_, v)| v)
1254 .cloned()
1255 .collect();
1256 for (receiver_id, input_id) in &local_node_inputs {
1257 close_input(dataflow, receiver_id, input_id, &self.clock);
1258 }
1259
1260 let mut closed = Vec::new();
1261 for output_id in &dataflow.open_external_mappings {
1262 if output_id.0 == node_id && outputs.contains(&output_id.1) {
1263 closed.push(output_id.clone());
1264 }
1265 }
1266
1267 for output_id in closed {
1268 let event = InterDaemonEvent::OutputClosed {
1269 dataflow_id,
1270 node_id: output_id.0.clone(),
1271 output_id: output_id.1.clone(),
1272 };
1273 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1274 .await?;
1275 }
1276
1277 Ok(())
1278 }
1279
1280 async fn subscribe(
1281 dataflow: &mut RunningDataflow,
1282 node_id: NodeId,
1283 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1284 clock: &HLC,
1285 ) {
1286 let closed_inputs = dataflow
1288 .mappings
1289 .values()
1290 .flatten()
1291 .filter(|(node, _)| node == &node_id)
1292 .map(|(_, input)| input)
1293 .filter(|input| {
1294 dataflow
1295 .open_inputs
1296 .get(&node_id)
1297 .map(|open_inputs| !open_inputs.contains(*input))
1298 .unwrap_or(true)
1299 });
1300 for input_id in closed_inputs {
1301 let _ = send_with_timestamp(
1302 &event_sender,
1303 NodeEvent::InputClosed {
1304 id: input_id.clone(),
1305 },
1306 clock,
1307 );
1308 }
1309 if dataflow.open_inputs(&node_id).is_empty() {
1310 let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1311 }
1312
1313 if dataflow.stop_sent {
1316 let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1317 }
1318
1319 dataflow.subscribe_channels.insert(node_id, event_sender);
1320 }
1321
1322 #[tracing::instrument(skip(self), level = "trace")]
1323 async fn handle_outputs_done(
1324 &mut self,
1325 dataflow_id: DataflowId,
1326 node_id: &NodeId,
1327 ) -> eyre::Result<()> {
1328 let dataflow = self
1329 .running
1330 .get_mut(&dataflow_id)
1331 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1332
1333 let outputs = dataflow
1334 .mappings
1335 .keys()
1336 .filter(|m| &m.0 == node_id)
1337 .map(|m| &m.1)
1338 .cloned()
1339 .collect();
1340 self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1341 .await?;
1342
1343 let dataflow = self
1344 .running
1345 .get_mut(&dataflow_id)
1346 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1347 dataflow.drop_channels.remove(node_id);
1348 Ok(())
1349 }
1350
1351 async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
1352 let mut logger = self.logger.for_dataflow(dataflow_id);
1353 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1354 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1355 })?;
1356
1357 dataflow
1358 .pending_nodes
1359 .handle_node_stop(
1360 node_id,
1361 &mut self.coordinator_connection,
1362 &self.clock,
1363 &mut dataflow.cascading_error_causes,
1364 &mut logger,
1365 )
1366 .await?;
1367
1368 self.handle_outputs_done(dataflow_id, node_id).await?;
1369
1370 let mut logger = self.logger.for_dataflow(dataflow_id);
1371 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1372 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1373 })?;
1374 if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1375 pid.mark_as_stopped()
1376 }
1377 if dataflow
1378 .running_nodes
1379 .iter()
1380 .all(|(_id, n)| n.node_config.dynamic)
1381 {
1382 let result = DataflowDaemonResult {
1383 timestamp: self.clock.new_timestamp(),
1384 node_results: self
1385 .dataflow_node_results
1386 .get(&dataflow.id)
1387 .context("failed to get dataflow node results")?
1388 .clone(),
1389 };
1390
1391 logger
1392 .log(
1393 LogLevel::Info,
1394 None,
1395 Some("daemon".into()),
1396 format!("dataflow finished on machine `{}`", self.daemon_id),
1397 )
1398 .await;
1399 if let Some(connection) = &mut self.coordinator_connection {
1400 let msg = serde_json::to_vec(&Timestamped {
1401 inner: CoordinatorRequest::Event {
1402 daemon_id: self.daemon_id.clone(),
1403 event: DaemonEvent::AllNodesFinished {
1404 dataflow_id,
1405 result,
1406 },
1407 },
1408 timestamp: self.clock.new_timestamp(),
1409 })?;
1410 socket_stream_send(connection, &msg)
1411 .await
1412 .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1413 }
1414 self.running.remove(&dataflow_id);
1415 }
1416
1417 Ok(())
1418 }
1419
1420 async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<RunStatus> {
1421 match event {
1422 DoraEvent::Timer {
1423 dataflow_id,
1424 interval,
1425 metadata,
1426 } => {
1427 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1428 tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1429 return Ok(RunStatus::Continue);
1430 };
1431
1432 let Some(subscribers) = dataflow.timers.get(&interval) else {
1433 return Ok(RunStatus::Continue);
1434 };
1435
1436 let mut closed = Vec::new();
1437 for (receiver_id, input_id) in subscribers {
1438 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1439 continue;
1440 };
1441
1442 let send_result = send_with_timestamp(
1443 channel,
1444 NodeEvent::Input {
1445 id: input_id.clone(),
1446 metadata: metadata.clone(),
1447 data: None,
1448 },
1449 &self.clock,
1450 );
1451 match send_result {
1452 Ok(()) => {}
1453 Err(_) => {
1454 closed.push(receiver_id);
1455 }
1456 }
1457 }
1458 for id in closed {
1459 dataflow.subscribe_channels.remove(id);
1460 }
1461 }
1462 DoraEvent::Logs {
1463 dataflow_id,
1464 output_id,
1465 message,
1466 metadata,
1467 } => {
1468 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1469 tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
1470 return Ok(RunStatus::Continue);
1471 };
1472
1473 let Some(subscribers) = dataflow.mappings.get(&output_id) else {
1474 tracing::warn!(
1475 "No subscribers found for {:?} in {:?}",
1476 output_id,
1477 dataflow.mappings
1478 );
1479 return Ok(RunStatus::Continue);
1480 };
1481
1482 let mut closed = Vec::new();
1483 for (receiver_id, input_id) in subscribers {
1484 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1485 tracing::warn!("No subscriber channel found for {:?}", output_id);
1486 continue;
1487 };
1488
1489 let send_result = send_with_timestamp(
1490 channel,
1491 NodeEvent::Input {
1492 id: input_id.clone(),
1493 metadata: metadata.clone(),
1494 data: Some(message.clone()),
1495 },
1496 &self.clock,
1497 );
1498 match send_result {
1499 Ok(()) => {}
1500 Err(_) => {
1501 closed.push(receiver_id);
1502 }
1503 }
1504 }
1505 for id in closed {
1506 dataflow.subscribe_channels.remove(id);
1507 }
1508 }
1509 DoraEvent::SpawnedNodeResult {
1510 dataflow_id,
1511 node_id,
1512 exit_status,
1513 } => {
1514 let mut logger = self
1515 .logger
1516 .for_dataflow(dataflow_id)
1517 .for_node(node_id.clone());
1518 logger
1519 .log(
1520 LogLevel::Debug,
1521 Some("daemon".into()),
1522 format!("handling node stop with exit status {exit_status:?}"),
1523 )
1524 .await;
1525
1526 let node_result = match exit_status {
1527 NodeExitStatus::Success => Ok(()),
1528 exit_status => {
1529 let dataflow = self.running.get(&dataflow_id);
1530 let caused_by_node = dataflow
1531 .and_then(|dataflow| {
1532 dataflow.cascading_error_causes.error_caused_by(&node_id)
1533 })
1534 .cloned();
1535 let grace_duration_kill = dataflow
1536 .map(|d| d.grace_duration_kills.contains(&node_id))
1537 .unwrap_or_default();
1538
1539 let cause = match caused_by_node {
1540 Some(caused_by_node) => {
1541 logger
1542 .log(
1543 LogLevel::Info,
1544 Some("daemon".into()),
1545 format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
1546 )
1547 .await;
1548
1549 NodeErrorCause::Cascading { caused_by_node }
1550 }
1551 None if grace_duration_kill => NodeErrorCause::GraceDuration,
1552 None => {
1553 let cause = dataflow
1554 .and_then(|d| d.node_stderr_most_recent.get(&node_id))
1555 .map(|queue| {
1556 let mut s = if queue.is_full() {
1557 "[...]".into()
1558 } else {
1559 String::new()
1560 };
1561 while let Some(line) = queue.pop() {
1562 s += &line;
1563 }
1564 s
1565 })
1566 .unwrap_or_default();
1567
1568 NodeErrorCause::Other { stderr: cause }
1569 }
1570 };
1571 Err(NodeError {
1572 timestamp: self.clock.new_timestamp(),
1573 cause,
1574 exit_status,
1575 })
1576 }
1577 };
1578
1579 logger
1580 .log(
1581 if node_result.is_ok() {
1582 LogLevel::Info
1583 } else {
1584 LogLevel::Error
1585 },
1586 Some("daemon".into()),
1587 match &node_result {
1588 Ok(()) => format!("{node_id} finished successfully"),
1589 Err(err) => format!("{err}"),
1590 },
1591 )
1592 .await;
1593
1594 self.dataflow_node_results
1595 .entry(dataflow_id)
1596 .or_default()
1597 .insert(node_id.clone(), node_result);
1598
1599 self.handle_node_stop(dataflow_id, &node_id).await?;
1600
1601 if let Some(exit_when_done) = &mut self.exit_when_done {
1602 exit_when_done.remove(&(dataflow_id, node_id));
1603 if exit_when_done.is_empty() {
1604 tracing::info!(
1605 "exiting daemon because all required dataflows are finished"
1606 );
1607 return Ok(RunStatus::Exit);
1608 }
1609 }
1610 if self.exit_when_all_finished && self.running.is_empty() {
1611 return Ok(RunStatus::Exit);
1612 }
1613 }
1614 }
1615 Ok(RunStatus::Continue)
1616 }
1617}
1618
1619async fn set_up_event_stream(
1620 coordinator_addr: SocketAddr,
1621 machine_id: &Option<String>,
1622 clock: &Arc<HLC>,
1623 remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
1624 local_listen_port: u16,
1626) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
1627 let clock_cloned = clock.clone();
1628 let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
1629 Ok(e) => Timestamped {
1630 inner: Event::Daemon(e.inner),
1631 timestamp: e.timestamp,
1632 },
1633 Err(err) => Timestamped {
1634 inner: Event::DaemonError(err),
1635 timestamp: clock_cloned.new_timestamp(),
1636 },
1637 });
1638 let (daemon_id, coordinator_events) =
1639 coordinator::register(coordinator_addr, machine_id.clone(), clock)
1640 .await
1641 .wrap_err("failed to connect to dora-coordinator")?;
1642 let coordinator_events = coordinator_events.map(
1643 |Timestamped {
1644 inner: event,
1645 timestamp,
1646 }| Timestamped {
1647 inner: Event::Coordinator(event),
1648 timestamp,
1649 },
1650 );
1651 let (events_tx, events_rx) = flume::bounded(10);
1652 let _listen_port =
1653 local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
1654 .await?;
1655 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
1656 inner: Event::DynamicNode(e.inner),
1657 timestamp: e.timestamp,
1658 });
1659 let incoming = (
1660 coordinator_events,
1661 remote_daemon_events,
1662 dynamic_node_events,
1663 )
1664 .merge();
1665 Ok((daemon_id, incoming))
1666}
1667
1668async fn send_output_to_local_receivers(
1669 node_id: NodeId,
1670 output_id: DataId,
1671 dataflow: &mut RunningDataflow,
1672 metadata: &metadata::Metadata,
1673 data: Option<DataMessage>,
1674 clock: &HLC,
1675) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
1676 let timestamp = metadata.timestamp();
1677 let empty_set = BTreeSet::new();
1678 let output_id = OutputId(node_id, output_id);
1679 let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
1680 let OutputId(node_id, _) = output_id;
1681 let mut closed = Vec::new();
1682 for (receiver_id, input_id) in local_receivers {
1683 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1684 let item = NodeEvent::Input {
1685 id: input_id.clone(),
1686 metadata: metadata.clone(),
1687 data: data.clone(),
1688 };
1689 match channel.send(Timestamped {
1690 inner: item,
1691 timestamp,
1692 }) {
1693 Ok(()) => {
1694 if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
1695 dataflow
1696 .pending_drop_tokens
1697 .entry(token)
1698 .or_insert_with(|| DropTokenInformation {
1699 owner: node_id.clone(),
1700 pending_nodes: Default::default(),
1701 })
1702 .pending_nodes
1703 .insert(receiver_id.clone());
1704 }
1705 }
1706 Err(_) => {
1707 closed.push(receiver_id);
1708 }
1709 }
1710 }
1711 }
1712 for id in closed {
1713 dataflow.subscribe_channels.remove(id);
1714 }
1715 let (data_bytes, drop_token) = match data {
1716 None => (None, None),
1717 Some(DataMessage::SharedMemory {
1718 shared_memory_id,
1719 len,
1720 drop_token,
1721 }) => {
1722 let memory = ShmemConf::new()
1723 .os_id(shared_memory_id)
1724 .open()
1725 .wrap_err("failed to map shared memory output")?;
1726 let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
1727 (data, Some(drop_token))
1728 }
1729 Some(DataMessage::Vec(v)) => (Some(v), None),
1730 };
1731 if let Some(token) = drop_token {
1732 dataflow
1734 .pending_drop_tokens
1735 .entry(token)
1736 .or_insert_with(|| DropTokenInformation {
1737 owner: node_id.clone(),
1738 pending_nodes: Default::default(),
1739 });
1740 dataflow.check_drop_token(token, clock).await?;
1742 }
1743 Ok(data_bytes)
1744}
1745
1746fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
1747 match &node.kind {
1748 CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
1749 CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
1750 }
1751}
1752
1753fn close_input(
1754 dataflow: &mut RunningDataflow,
1755 receiver_id: &NodeId,
1756 input_id: &DataId,
1757 clock: &HLC,
1758) {
1759 if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
1760 if !open_inputs.remove(input_id) {
1761 return;
1762 }
1763 }
1764 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1765 let _ = send_with_timestamp(
1766 channel,
1767 NodeEvent::InputClosed {
1768 id: input_id.clone(),
1769 },
1770 clock,
1771 );
1772
1773 if dataflow.open_inputs(receiver_id).is_empty() {
1774 let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
1775 }
1776 }
1777}
1778
1779#[derive(Debug)]
1780struct RunningNode {
1781 pid: Option<ProcessId>,
1782 node_config: NodeConfig,
1783}
1784
1785#[derive(Debug)]
1786struct ProcessId(Option<u32>);
1787
1788impl ProcessId {
1789 pub fn new(process_id: u32) -> Self {
1790 Self(Some(process_id))
1791 }
1792
1793 pub fn mark_as_stopped(&mut self) {
1794 self.0 = None;
1795 }
1796
1797 pub fn kill(&mut self) -> bool {
1798 if let Some(pid) = self.0 {
1799 let mut system = sysinfo::System::new();
1800 system.refresh_processes();
1801
1802 if let Some(process) = system.process(Pid::from(pid as usize)) {
1803 process.kill();
1804 self.mark_as_stopped();
1805 return true;
1806 }
1807 }
1808
1809 false
1810 }
1811}
1812
1813impl Drop for ProcessId {
1814 fn drop(&mut self) {
1815 if let Some(pid) = self.0 {
1817 if self.kill() {
1818 warn!("process {pid} was killed on drop because it was still running")
1819 }
1820 }
1821 }
1822}
1823
1824pub struct RunningDataflow {
1825 id: Uuid,
1826 pending_nodes: PendingNodes,
1828
1829 subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
1830 drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
1831 mappings: HashMap<OutputId, BTreeSet<InputId>>,
1832 timers: BTreeMap<Duration, BTreeSet<InputId>>,
1833 open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
1834 running_nodes: BTreeMap<NodeId, RunningNode>,
1835
1836 dynamic_nodes: BTreeSet<NodeId>,
1841
1842 open_external_mappings: BTreeSet<OutputId>,
1843
1844 pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
1845
1846 _timer_handles: Vec<futures::future::RemoteHandle<()>>,
1848 stop_sent: bool,
1849
1850 empty_set: BTreeSet<DataId>,
1854
1855 cascading_error_causes: CascadingErrorCauses,
1857 grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
1858
1859 node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
1860
1861 publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
1862
1863 finished_tx: broadcast::Sender<()>,
1864
1865 publish_all_messages_to_zenoh: bool,
1866}
1867
1868impl RunningDataflow {
1869 fn new(
1870 dataflow_id: Uuid,
1871 daemon_id: DaemonId,
1872 dataflow_descriptor: &Descriptor,
1873 ) -> RunningDataflow {
1874 let (finished_tx, _) = broadcast::channel(1);
1875 Self {
1876 id: dataflow_id,
1877 pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
1878 subscribe_channels: HashMap::new(),
1879 drop_channels: HashMap::new(),
1880 mappings: HashMap::new(),
1881 timers: BTreeMap::new(),
1882 open_inputs: BTreeMap::new(),
1883 running_nodes: BTreeMap::new(),
1884 dynamic_nodes: BTreeSet::new(),
1885 open_external_mappings: Default::default(),
1886 pending_drop_tokens: HashMap::new(),
1887 _timer_handles: Vec::new(),
1888 stop_sent: false,
1889 empty_set: BTreeSet::new(),
1890 cascading_error_causes: Default::default(),
1891 grace_duration_kills: Default::default(),
1892 node_stderr_most_recent: BTreeMap::new(),
1893 publishers: Default::default(),
1894 finished_tx,
1895 publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
1896 }
1897 }
1898
1899 async fn start(
1900 &mut self,
1901 events_tx: &mpsc::Sender<Timestamped<Event>>,
1902 clock: &Arc<HLC>,
1903 ) -> eyre::Result<()> {
1904 for interval in self.timers.keys().copied() {
1905 let events_tx = events_tx.clone();
1906 let dataflow_id = self.id;
1907 let clock = clock.clone();
1908 let task = async move {
1909 let mut interval_stream = tokio::time::interval(interval);
1910 let hlc = HLC::default();
1911 loop {
1912 interval_stream.tick().await;
1913
1914 let span = tracing::span!(tracing::Level::TRACE, "tick");
1915 let _ = span.enter();
1916
1917 let mut parameters = BTreeMap::new();
1918 parameters.insert(
1919 "open_telemetry_context".to_string(),
1920 #[cfg(feature = "telemetry")]
1921 Parameter::String(serialize_context(&span.context())),
1922 #[cfg(not(feature = "telemetry"))]
1923 Parameter::String("".into()),
1924 );
1925
1926 let metadata = metadata::Metadata::from_parameters(
1927 hlc.new_timestamp(),
1928 empty_type_info(),
1929 parameters,
1930 );
1931
1932 let event = Timestamped {
1933 inner: DoraEvent::Timer {
1934 dataflow_id,
1935 interval,
1936 metadata,
1937 }
1938 .into(),
1939 timestamp: clock.new_timestamp(),
1940 };
1941 if events_tx.send(event).await.is_err() {
1942 break;
1943 }
1944 }
1945 };
1946 let (task, handle) = task.remote_handle();
1947 tokio::spawn(task);
1948 self._timer_handles.push(handle);
1949 }
1950
1951 Ok(())
1952 }
1953
1954 async fn stop_all(
1955 &mut self,
1956 coordinator_connection: &mut Option<TcpStream>,
1957 clock: &HLC,
1958 grace_duration: Option<Duration>,
1959 logger: &mut DataflowLogger<'_>,
1960 ) -> eyre::Result<()> {
1961 self.pending_nodes
1962 .handle_dataflow_stop(
1963 coordinator_connection,
1964 clock,
1965 &mut self.cascading_error_causes,
1966 &self.dynamic_nodes,
1967 logger,
1968 )
1969 .await?;
1970
1971 for (_node_id, channel) in self.subscribe_channels.drain() {
1972 let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
1973 }
1974
1975 let running_processes: Vec<_> = self
1976 .running_nodes
1977 .iter_mut()
1978 .map(|(id, n)| (id.clone(), n.pid.take()))
1979 .collect();
1980 let grace_duration_kills = self.grace_duration_kills.clone();
1981 tokio::spawn(async move {
1982 let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
1983 tokio::time::sleep(duration).await;
1984
1985 for (node, pid) in running_processes {
1986 if let Some(mut pid) = pid {
1987 if pid.kill() {
1988 grace_duration_kills.insert(node.clone());
1989 warn!(
1990 "{node} was killed due to not stopping within the {:#?} grace period",
1991 duration
1992 )
1993 }
1994 }
1995 }
1996 });
1997 self.stop_sent = true;
1998 Ok(())
1999 }
2000
2001 fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2002 self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2003 }
2004
2005 async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2006 match self.pending_drop_tokens.entry(token) {
2007 std::collections::hash_map::Entry::Occupied(entry) => {
2008 if entry.get().pending_nodes.is_empty() {
2009 let (drop_token, info) = entry.remove_entry();
2010 let result = match self.drop_channels.get_mut(&info.owner) {
2011 Some(channel) => send_with_timestamp(
2012 channel,
2013 NodeDropEvent::OutputDropped { drop_token },
2014 clock,
2015 )
2016 .wrap_err("send failed"),
2017 None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2018 };
2019 if let Err(err) = result.wrap_err_with(|| {
2020 format!(
2021 "failed to report drop token `{drop_token:?}` to owner `{}`",
2022 &info.owner
2023 )
2024 }) {
2025 tracing::warn!("{err:?}");
2026 }
2027 }
2028 }
2029 std::collections::hash_map::Entry::Vacant(_) => {
2030 tracing::warn!("check_drop_token called with already closed token")
2031 }
2032 }
2033
2034 Ok(())
2035 }
2036
2037 fn output_publish_topic(&self, output_id: &OutputId) -> String {
2038 let network_id = "default";
2039 let dataflow_id = self.id;
2040 let OutputId(node_id, output_id) = output_id;
2041 format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
2042 }
2043}
2044
2045fn empty_type_info() -> ArrowTypeInfo {
2046 ArrowTypeInfo {
2047 data_type: DataType::Null,
2048 len: 0,
2049 null_count: 0,
2050 validity: None,
2051 offset: 0,
2052 buffer_offsets: Vec::new(),
2053 child_data: Vec::new(),
2054 }
2055}
2056
2057#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2058pub struct OutputId(NodeId, DataId);
2059type InputId = (NodeId, DataId);
2060
2061struct DropTokenInformation {
2062 owner: NodeId,
2064 pending_nodes: BTreeSet<NodeId>,
2067}
2068
2069#[derive(Debug)]
2070pub enum Event {
2071 Node {
2072 dataflow_id: DataflowId,
2073 node_id: NodeId,
2074 event: DaemonNodeEvent,
2075 },
2076 Coordinator(CoordinatorEvent),
2077 Daemon(InterDaemonEvent),
2078 Dora(DoraEvent),
2079 DynamicNode(DynamicNodeEventWrapper),
2080 HeartbeatInterval,
2081 CtrlC,
2082 SecondCtrlC,
2083 DaemonError(eyre::Report),
2084}
2085
2086impl From<DoraEvent> for Event {
2087 fn from(event: DoraEvent) -> Self {
2088 Event::Dora(event)
2089 }
2090}
2091
2092#[derive(Debug)]
2093pub enum DaemonNodeEvent {
2094 OutputsDone {
2095 reply_sender: oneshot::Sender<DaemonReply>,
2096 },
2097 Subscribe {
2098 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2099 reply_sender: oneshot::Sender<DaemonReply>,
2100 },
2101 SubscribeDrop {
2102 event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2103 reply_sender: oneshot::Sender<DaemonReply>,
2104 },
2105 CloseOutputs {
2106 outputs: Vec<dora_core::config::DataId>,
2107 reply_sender: oneshot::Sender<DaemonReply>,
2108 },
2109 SendOut {
2110 output_id: DataId,
2111 metadata: metadata::Metadata,
2112 data: Option<DataMessage>,
2113 },
2114 ReportDrop {
2115 tokens: Vec<DropToken>,
2116 },
2117 EventStreamDropped {
2118 reply_sender: oneshot::Sender<DaemonReply>,
2119 },
2120}
2121
2122#[derive(Debug)]
2123pub enum DoraEvent {
2124 Timer {
2125 dataflow_id: DataflowId,
2126 interval: Duration,
2127 metadata: metadata::Metadata,
2128 },
2129 Logs {
2130 dataflow_id: DataflowId,
2131 output_id: OutputId,
2132 message: DataMessage,
2133 metadata: metadata::Metadata,
2134 },
2135 SpawnedNodeResult {
2136 dataflow_id: DataflowId,
2137 node_id: NodeId,
2138 exit_status: NodeExitStatus,
2139 },
2140}
2141
2142#[must_use]
2143enum RunStatus {
2144 Continue,
2145 Exit,
2146}
2147
2148fn send_with_timestamp<T>(
2149 sender: &UnboundedSender<Timestamped<T>>,
2150 event: T,
2151 clock: &HLC,
2152) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2153 sender.send(Timestamped {
2154 inner: event,
2155 timestamp: clock.new_timestamp(),
2156 })
2157}
2158
2159fn set_up_ctrlc_handler(
2160 clock: Arc<HLC>,
2161) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2162 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2163
2164 let mut ctrlc_sent = 0;
2165 ctrlc::set_handler(move || {
2166 let event = match ctrlc_sent {
2167 0 => Event::CtrlC,
2168 1 => Event::SecondCtrlC,
2169 _ => {
2170 tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2171 std::process::abort();
2172 }
2173 };
2174 if ctrlc_tx
2175 .blocking_send(Timestamped {
2176 inner: event,
2177 timestamp: clock.new_timestamp(),
2178 })
2179 .is_err()
2180 {
2181 tracing::error!("failed to report ctrl-c event to dora-coordinator");
2182 }
2183
2184 ctrlc_sent += 1;
2185 })
2186 .wrap_err("failed to set ctrl-c handler")?;
2187
2188 Ok(ctrlc_rx)
2189}
2190
2191#[derive(Debug, Default, Clone, PartialEq, Eq)]
2192pub struct CascadingErrorCauses {
2193 caused_by: BTreeMap<NodeId, NodeId>,
2194}
2195
2196impl CascadingErrorCauses {
2197 pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2198 self.caused_by.contains_key(node)
2199 }
2200
2201 pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2203 self.caused_by.get(node)
2204 }
2205
2206 pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2207 self.caused_by.entry(affected_node).or_insert(causing_node);
2208 }
2209}
2210
2211fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2212 n.operators
2213 .iter()
2214 .flat_map(|operator| {
2215 operator.config.inputs.iter().map(|(input_id, mapping)| {
2216 (
2217 DataId::from(format!("{}/{input_id}", operator.id)),
2218 mapping.clone(),
2219 )
2220 })
2221 })
2222 .collect()
2223}
2224
2225fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2226 n.operators
2227 .iter()
2228 .flat_map(|operator| {
2229 operator
2230 .config
2231 .outputs
2232 .iter()
2233 .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2234 })
2235 .collect()
2236}
2237
2238trait CoreNodeKindExt {
2239 fn run_config(&self) -> NodeRunConfig;
2240 fn dynamic(&self) -> bool;
2241}
2242
2243impl CoreNodeKindExt for CoreNodeKind {
2244 fn run_config(&self) -> NodeRunConfig {
2245 match self {
2246 CoreNodeKind::Runtime(n) => NodeRunConfig {
2247 inputs: runtime_node_inputs(n),
2248 outputs: runtime_node_outputs(n),
2249 },
2250 CoreNodeKind::Custom(n) => n.run_config.clone(),
2251 }
2252 }
2253
2254 fn dynamic(&self) -> bool {
2255 match self {
2256 CoreNodeKind::Runtime(_n) => false,
2257 CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
2258 }
2259 }
2260}