1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5 config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
6 descriptor::{
7 read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
8 DYNAMIC_SOURCE,
9 },
10 topics::LOCALHOST,
11 uhlc::{self, HLC},
12};
13use dora_message::{
14 common::{
15 DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
16 },
17 coordinator_to_cli::DataflowResult,
18 coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
19 daemon_to_coordinator::{
20 CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
21 },
22 daemon_to_daemon::InterDaemonEvent,
23 daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
24 metadata::{self, ArrowTypeInfo},
25 node_to_daemon::{DynamicNodeEvent, Timestamped},
26 DataflowId,
27};
28use dora_node_api::{arrow::datatypes::DataType, Parameter};
29use eyre::{bail, eyre, Context, ContextCompat, Result};
30use futures::{future, stream, FutureExt, TryFutureExt};
31use futures_concurrency::stream::Merge;
32use local_listener::DynamicNodeEventWrapper;
33use log::{DaemonLogger, DataflowLogger, Logger};
34use pending::PendingNodes;
35use shared_memory_server::ShmemConf;
36use socket_stream_utils::socket_stream_send;
37use std::{
38 collections::{BTreeMap, BTreeSet, HashMap},
39 net::SocketAddr,
40 path::{Path, PathBuf},
41 pin::pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use sysinfo::Pid;
46use tokio::{
47 fs::File,
48 io::AsyncReadExt,
49 net::TcpStream,
50 sync::{
51 broadcast,
52 mpsc::{self, UnboundedSender},
53 oneshot::{self, Sender},
54 },
55};
56use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
57use tracing::{error, warn};
58use uuid::{NoContext, Timestamp, Uuid};
59
60mod coordinator;
61mod local_listener;
62mod log;
63mod node_communication;
64mod pending;
65mod socket_stream_utils;
66mod spawn;
67
68#[cfg(feature = "telemetry")]
69use dora_tracing::telemetry::serialize_context;
70#[cfg(feature = "telemetry")]
71use tracing_opentelemetry::OpenTelemetrySpanExt;
72
73use crate::pending::DataflowStatus;
74
75const STDERR_LOG_LINES: usize = 10;
76
77pub struct Daemon {
78 running: HashMap<DataflowId, RunningDataflow>,
79 working_dir: HashMap<DataflowId, PathBuf>,
80
81 events_tx: mpsc::Sender<Timestamped<Event>>,
82
83 coordinator_connection: Option<TcpStream>,
84 last_coordinator_heartbeat: Instant,
85 daemon_id: DaemonId,
86
87 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
89 exit_when_all_finished: bool,
91 dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
93
94 clock: Arc<uhlc::HLC>,
95
96 zenoh_session: zenoh::Session,
97 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
98
99 logger: DaemonLogger,
100}
101
102type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
103
104impl Daemon {
105 pub async fn run(
106 coordinator_addr: SocketAddr,
107 machine_id: Option<String>,
108 local_listen_port: u16,
109 ) -> eyre::Result<()> {
110 let clock = Arc::new(HLC::default());
111
112 let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
113 let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
114 let (daemon_id, incoming_events) = {
115 let incoming_events = set_up_event_stream(
116 coordinator_addr,
117 &machine_id,
118 &clock,
119 remote_daemon_events_rx,
120 local_listen_port,
121 );
122
123 let ctrl_c = pin!(ctrlc_events.recv());
125 match futures::future::select(ctrl_c, pin!(incoming_events)).await {
126 future::Either::Left((_ctrl_c, _)) => {
127 tracing::info!("received ctrl-c signal -> stopping daemon");
128 return Ok(());
129 }
130 future::Either::Right((events, _)) => events?,
131 }
132 };
133 Self::run_general(
134 (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
135 Some(coordinator_addr),
136 daemon_id,
137 None,
138 clock,
139 Some(remote_daemon_events_tx),
140 )
141 .await
142 .map(|_| ())
143 }
144
145 pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
146 let working_dir = dataflow_path
147 .canonicalize()
148 .context("failed to canonicalize dataflow path")?
149 .parent()
150 .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
151 .to_owned();
152
153 let descriptor = read_as_descriptor(dataflow_path).await?;
154 descriptor.check(&working_dir)?;
155 let nodes = descriptor.resolve_aliases_and_set_defaults()?;
156
157 let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
158 let spawn_command = SpawnDataflowNodes {
159 dataflow_id,
160 working_dir,
161 spawn_nodes: nodes.keys().cloned().collect(),
162 nodes,
163 dataflow_descriptor: descriptor,
164 uv,
165 };
166
167 let clock = Arc::new(HLC::default());
168
169 let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
170
171 let exit_when_done = spawn_command
172 .nodes
173 .values()
174 .map(|n| (spawn_command.dataflow_id, n.id.clone()))
175 .collect();
176 let (reply_tx, reply_rx) = oneshot::channel();
177 let timestamp = clock.new_timestamp();
178 let coordinator_events = stream::once(async move {
179 Timestamped {
180 inner: Event::Coordinator(CoordinatorEvent {
181 event: DaemonCoordinatorEvent::Spawn(spawn_command),
182 reply_tx,
183 }),
184 timestamp,
185 }
186 });
187 let events = (coordinator_events, ctrlc_events).merge();
188 let run_result = Self::run_general(
189 Box::pin(events),
190 None,
191 DaemonId::new(None),
192 Some(exit_when_done),
193 clock.clone(),
194 None,
195 );
196
197 let spawn_result = reply_rx
198 .map_err(|err| eyre!("failed to receive spawn result: {err}"))
199 .and_then(|r| async {
200 match r {
201 Some(DaemonCoordinatorReply::SpawnResult(result)) => {
202 result.map_err(|err| eyre!(err))
203 }
204 _ => Err(eyre!("unexpected spawn reply")),
205 }
206 });
207
208 let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
209
210 Ok(DataflowResult {
211 uuid: dataflow_id,
212 timestamp: clock.new_timestamp(),
213 node_results: dataflow_results
214 .remove(&dataflow_id)
215 .context("no node results for dataflow_id")?,
216 })
217 }
218
219 async fn run_general(
220 external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
221 coordinator_addr: Option<SocketAddr>,
222 daemon_id: DaemonId,
223 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
224 clock: Arc<HLC>,
225 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
226 ) -> eyre::Result<DaemonRunResult> {
227 let coordinator_connection = match coordinator_addr {
228 Some(addr) => {
229 let stream = TcpStream::connect(addr)
230 .await
231 .wrap_err("failed to connect to dora-coordinator")?;
232 stream
233 .set_nodelay(true)
234 .wrap_err("failed to set TCP_NODELAY")?;
235 Some(stream)
236 }
237 None => None,
238 };
239
240 let logger_coordinator_connection = match coordinator_addr {
242 Some(addr) => {
243 let stream = TcpStream::connect(addr)
244 .await
245 .wrap_err("failed to connect log to dora-coordinator")?;
246 stream
247 .set_nodelay(true)
248 .wrap_err("failed to set TCP_NODELAY")?;
249 Some(stream)
250 }
251 None => None,
252 };
253
254 let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
255 Ok(path) => {
256 let zenoh_config = zenoh::Config::from_file(&path)
257 .map_err(|e| eyre!(e))
258 .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
259 zenoh::open(zenoh_config)
260 .await
261 .map_err(|e| eyre!(e))
262 .context("failed to open zenoh session")?
263 }
264 Err(std::env::VarError::NotPresent) => {
265 let mut zenoh_config = zenoh::Config::default();
266
267 if let Some(addr) = coordinator_addr {
268 zenoh_config
270 .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
271 .unwrap();
272
273 zenoh_config
274 .insert_json5(
275 "connect/endpoints",
276 &format!(
277 r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
278 addr.ip()
279 ),
280 )
281 .unwrap();
282 zenoh_config
283 .insert_json5(
284 "listen/endpoints",
285 r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
286 )
287 .unwrap();
288 if cfg!(not(target_os = "linux")) {
289 warn!("disabling multicast on non-linux systems. Enable it with the ZENOH_CONFIG env variable or file");
290 zenoh_config
291 .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
292 .unwrap();
293 }
294 };
295 if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
296 zenoh_session
297 } else {
298 warn!(
299 "failed to open zenoh session, retrying with default config + coordinator"
300 );
301 let mut zenoh_config = zenoh::Config::default();
302 zenoh_config
303 .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
304 .unwrap();
305
306 if let Some(addr) = coordinator_addr {
307 zenoh_config
308 .insert_json5(
309 "connect/endpoints",
310 &format!(
311 r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
312 addr.ip()
313 ),
314 )
315 .unwrap();
316 if cfg!(not(target_os = "linux")) {
317 warn!("disabling multicast on non-linux systems. Enable it with the ZENOH_CONFIG env variable or file");
318 zenoh_config
319 .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
320 .unwrap();
321 }
322 }
323 if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
324 zenoh_session
325 } else {
326 warn!("failed to open zenoh session, retrying with default config");
327 let zenoh_config = zenoh::Config::default();
328 zenoh::open(zenoh_config)
329 .await
330 .map_err(|e| eyre!(e))
331 .context("failed to open zenoh session")?
332 }
333 }
334 }
335 Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
336 "{} env variable is not valid unicode",
337 zenoh::Config::DEFAULT_CONFIG_PATH_ENV
338 ),
339 };
340 let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
341 let daemon = Self {
342 logger: Logger {
343 coordinator_connection: logger_coordinator_connection,
344 daemon_id: daemon_id.clone(),
345 clock: clock.clone(),
346 }
347 .for_daemon(daemon_id.clone()),
348 running: HashMap::new(),
349 working_dir: HashMap::new(),
350 events_tx: dora_events_tx,
351 coordinator_connection,
352 last_coordinator_heartbeat: Instant::now(),
353 daemon_id,
354 exit_when_done,
355 exit_when_all_finished: false,
356 dataflow_node_results: BTreeMap::new(),
357 clock,
358 zenoh_session,
359 remote_daemon_events_tx,
360 };
361
362 let dora_events = ReceiverStream::new(dora_events_rx);
363 let watchdog_clock = daemon.clock.clone();
364 let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
365 Duration::from_secs(5),
366 ))
367 .map(|_| Timestamped {
368 inner: Event::HeartbeatInterval,
369 timestamp: watchdog_clock.new_timestamp(),
370 });
371 let events = (external_events, dora_events, watchdog_interval).merge();
372 daemon.run_inner(events).await
373 }
374
375 #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
376 async fn run_inner(
377 mut self,
378 incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
379 ) -> eyre::Result<DaemonRunResult> {
380 let mut events = incoming_events;
381
382 while let Some(event) = events.next().await {
383 let Timestamped { inner, timestamp } = event;
384 if let Err(err) = self.clock.update_with_timestamp(×tamp) {
385 tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
386 }
387
388 match inner {
389 Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
390 let status = self.handle_coordinator_event(event, reply_tx).await?;
391
392 match status {
393 RunStatus::Continue => {}
394 RunStatus::Exit => break,
395 }
396 }
397 Event::Daemon(event) => {
398 self.handle_inter_daemon_event(event).await?;
399 }
400 Event::Node {
401 dataflow_id: dataflow,
402 node_id,
403 event,
404 } => self.handle_node_event(event, dataflow, node_id).await?,
405 Event::Dora(event) => match self.handle_dora_event(event).await? {
406 RunStatus::Continue => {}
407 RunStatus::Exit => break,
408 },
409 Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
410 Event::HeartbeatInterval => {
411 if let Some(connection) = &mut self.coordinator_connection {
412 let msg = serde_json::to_vec(&Timestamped {
413 inner: CoordinatorRequest::Event {
414 daemon_id: self.daemon_id.clone(),
415 event: DaemonEvent::Heartbeat,
416 },
417 timestamp: self.clock.new_timestamp(),
418 })?;
419 socket_stream_send(connection, &msg)
420 .await
421 .wrap_err("failed to send watchdog message to dora-coordinator")?;
422
423 if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
424 bail!("lost connection to coordinator")
425 }
426 }
427 }
428 Event::CtrlC => {
429 tracing::info!("received ctrlc signal -> stopping all dataflows");
430 for dataflow in self.running.values_mut() {
431 let mut logger = self.logger.for_dataflow(dataflow.id);
432 dataflow
433 .stop_all(
434 &mut self.coordinator_connection,
435 &self.clock,
436 None,
437 &mut logger,
438 )
439 .await?;
440 }
441 self.exit_when_all_finished = true;
442 if self.running.is_empty() {
443 break;
444 }
445 }
446 Event::SecondCtrlC => {
447 tracing::warn!("received second ctrlc signal -> exit immediately");
448 bail!("received second ctrl-c signal");
449 }
450 Event::DaemonError(err) => {
451 tracing::error!("Daemon error: {err:?}");
452 }
453 }
454 }
455
456 if let Some(mut connection) = self.coordinator_connection.take() {
457 let msg = serde_json::to_vec(&Timestamped {
458 inner: CoordinatorRequest::Event {
459 daemon_id: self.daemon_id.clone(),
460 event: DaemonEvent::Exit,
461 },
462 timestamp: self.clock.new_timestamp(),
463 })?;
464 socket_stream_send(&mut connection, &msg)
465 .await
466 .wrap_err("failed to send Exit message to dora-coordinator")?;
467 }
468
469 Ok(self.dataflow_node_results)
470 }
471
472 async fn handle_coordinator_event(
473 &mut self,
474 event: DaemonCoordinatorEvent,
475 reply_tx: Sender<Option<DaemonCoordinatorReply>>,
476 ) -> eyre::Result<RunStatus> {
477 let status = match event {
478 DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
479 dataflow_id,
480 working_dir,
481 nodes,
482 dataflow_descriptor,
483 spawn_nodes,
484 uv,
485 }) => {
486 match dataflow_descriptor.communication.remote {
487 dora_core::config::RemoteCommunicationConfig::Tcp => {}
488 }
489
490 let working_dir = if working_dir.exists() {
492 working_dir
493 } else {
494 std::env::current_dir().wrap_err("failed to get current working dir")?
495 };
496
497 let result = self
498 .spawn_dataflow(
499 dataflow_id,
500 working_dir,
501 nodes,
502 dataflow_descriptor,
503 spawn_nodes,
504 uv,
505 )
506 .await;
507 if let Err(err) = &result {
508 tracing::error!("{err:?}");
509 }
510 let reply =
511 DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
512 let _ = reply_tx.send(Some(reply)).map_err(|_| {
513 error!("could not send `SpawnResult` reply from daemon to coordinator")
514 });
515 RunStatus::Continue
516 }
517 DaemonCoordinatorEvent::AllNodesReady {
518 dataflow_id,
519 exited_before_subscribe,
520 } => {
521 let mut logger = self.logger.for_dataflow(dataflow_id);
522 logger.log(LogLevel::Debug, None,
523 Some("daemon".into()),
524 format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
525 )).await;
526 match self.running.get_mut(&dataflow_id) {
527 Some(dataflow) => {
528 let ready = exited_before_subscribe.is_empty();
529 dataflow
530 .pending_nodes
531 .handle_external_all_nodes_ready(
532 exited_before_subscribe,
533 &mut dataflow.cascading_error_causes,
534 )
535 .await?;
536 if ready {
537 logger.log(LogLevel::Info, None,
538 Some("daemon".into()),
539 "coordinator reported that all nodes are ready, starting dataflow",
540 ).await;
541 dataflow.start(&self.events_tx, &self.clock).await?;
542 }
543 }
544 None => {
545 tracing::warn!(
546 "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
547 );
548 }
549 }
550 let _ = reply_tx.send(None).map_err(|_| {
551 error!("could not send `AllNodesReady` reply from daemon to coordinator")
552 });
553 RunStatus::Continue
554 }
555 DaemonCoordinatorEvent::Logs {
556 dataflow_id,
557 node_id,
558 } => {
559 match self.working_dir.get(&dataflow_id) {
560 Some(working_dir) => {
561 let working_dir = working_dir.clone();
562 tokio::spawn(async move {
563 let logs = async {
564 let mut file =
565 File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
566 .await
567 .wrap_err(format!(
568 "Could not open log file: {:#?}",
569 log::log_path(&working_dir, &dataflow_id, &node_id)
570 ))?;
571
572 let mut contents = vec![];
573 file.read_to_end(&mut contents)
574 .await
575 .wrap_err("Could not read content of log file")?;
576 Result::<Vec<u8>, eyre::Report>::Ok(contents)
577 }
578 .await
579 .map_err(|err| format!("{err:?}"));
580 let _ = reply_tx
581 .send(Some(DaemonCoordinatorReply::Logs(logs)))
582 .map_err(|_| {
583 error!("could not send logs reply from daemon to coordinator")
584 });
585 });
586 }
587 None => {
588 tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
589 let _ = reply_tx.send(None).map_err(|_| {
590 error!(
591 "could not send `AllNodesReady` reply from daemon to coordinator"
592 )
593 });
594 }
595 }
596 RunStatus::Continue
597 }
598 DaemonCoordinatorEvent::ReloadDataflow {
599 dataflow_id,
600 node_id,
601 operator_id,
602 } => {
603 let result = self.send_reload(dataflow_id, node_id, operator_id).await;
604 let reply =
605 DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
606 let _ = reply_tx
607 .send(Some(reply))
608 .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
609 RunStatus::Continue
610 }
611 DaemonCoordinatorEvent::StopDataflow {
612 dataflow_id,
613 grace_duration,
614 } => {
615 let mut logger = self.logger.for_dataflow(dataflow_id);
616 let dataflow = self
617 .running
618 .get_mut(&dataflow_id)
619 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
620 let (reply, future) = match dataflow {
621 Ok(dataflow) => {
622 let future = dataflow.stop_all(
623 &mut self.coordinator_connection,
624 &self.clock,
625 grace_duration,
626 &mut logger,
627 );
628 (Ok(()), Some(future))
629 }
630 Err(err) => (Err(err.to_string()), None),
631 };
632
633 let _ = reply_tx
634 .send(Some(DaemonCoordinatorReply::StopResult(reply)))
635 .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
636
637 if let Some(future) = future {
638 future.await?;
639 }
640
641 RunStatus::Continue
642 }
643 DaemonCoordinatorEvent::Destroy => {
644 tracing::info!("received destroy command -> exiting");
645 let (notify_tx, notify_rx) = oneshot::channel();
646 let reply = DaemonCoordinatorReply::DestroyResult {
647 result: Ok(()),
648 notify: Some(notify_tx),
649 };
650 let _ = reply_tx
651 .send(Some(reply))
652 .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
653 if notify_rx.await.is_err() {
655 tracing::warn!("no confirmation received for DestroyReply");
656 }
657 RunStatus::Exit
658 }
659 DaemonCoordinatorEvent::Heartbeat => {
660 self.last_coordinator_heartbeat = Instant::now();
661 let _ = reply_tx.send(None);
662 RunStatus::Continue
663 }
664 };
665 Ok(status)
666 }
667
668 async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
669 match event {
670 InterDaemonEvent::Output {
671 dataflow_id,
672 node_id,
673 output_id,
674 metadata,
675 data,
676 } => {
677 let inner = async {
678 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
679 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
680 })?;
681 send_output_to_local_receivers(
682 node_id.clone(),
683 output_id.clone(),
684 dataflow,
685 &metadata,
686 data.map(DataMessage::Vec),
687 &self.clock,
688 )
689 .await?;
690 Result::<_, eyre::Report>::Ok(())
691 };
692 if let Err(err) = inner
693 .await
694 .wrap_err("failed to forward remote output to local receivers")
695 {
696 let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
697 logger
698 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
699 .await;
700 }
701 Ok(())
702 }
703 InterDaemonEvent::OutputClosed {
704 dataflow_id,
705 node_id,
706 output_id,
707 } => {
708 let output_id = OutputId(node_id.clone(), output_id);
709 let mut logger = self
710 .logger
711 .for_dataflow(dataflow_id)
712 .for_node(node_id.clone());
713 logger
714 .log(
715 LogLevel::Debug,
716 Some("daemon".into()),
717 format!("received OutputClosed event for output {output_id:?}"),
718 )
719 .await;
720
721 let inner = async {
722 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
723 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
724 })?;
725
726 if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
727 for (receiver_id, input_id) in &inputs {
728 close_input(dataflow, receiver_id, input_id, &self.clock);
729 }
730 }
731 Result::<(), eyre::Report>::Ok(())
732 };
733 if let Err(err) = inner
734 .await
735 .wrap_err("failed to handle InputsClosed event sent by coordinator")
736 {
737 logger
738 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
739 .await;
740 }
741 Ok(())
742 }
743 }
744 }
745
746 async fn spawn_dataflow(
747 &mut self,
748 dataflow_id: uuid::Uuid,
749 working_dir: PathBuf,
750 nodes: BTreeMap<NodeId, ResolvedNode>,
751 dataflow_descriptor: Descriptor,
752 spawn_nodes: BTreeSet<NodeId>,
753 uv: bool,
754 ) -> eyre::Result<()> {
755 let mut logger = self.logger.for_dataflow(dataflow_id);
756 let dataflow =
757 RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
758 let dataflow = match self.running.entry(dataflow_id) {
759 std::collections::hash_map::Entry::Vacant(entry) => {
760 self.working_dir.insert(dataflow_id, working_dir.clone());
761 entry.insert(dataflow)
762 }
763 std::collections::hash_map::Entry::Occupied(_) => {
764 bail!("there is already a running dataflow with ID `{dataflow_id}`")
765 }
766 };
767
768 let mut stopped = Vec::new();
769
770 for node in nodes.values() {
772 let local = spawn_nodes.contains(&node.id);
773
774 let inputs = node_inputs(node);
775 for (input_id, input) in inputs {
776 if local {
777 dataflow
778 .open_inputs
779 .entry(node.id.clone())
780 .or_default()
781 .insert(input_id.clone());
782 match input.mapping {
783 InputMapping::User(mapping) => {
784 dataflow
785 .mappings
786 .entry(OutputId(mapping.source, mapping.output))
787 .or_default()
788 .insert((node.id.clone(), input_id));
789 }
790 InputMapping::Timer { interval } => {
791 dataflow
792 .timers
793 .entry(interval)
794 .or_default()
795 .insert((node.id.clone(), input_id));
796 }
797 }
798 } else if let InputMapping::User(mapping) = input.mapping {
799 dataflow
800 .open_external_mappings
801 .insert(OutputId(mapping.source, mapping.output));
802 }
803 }
804 }
805
806 for node in nodes.into_values() {
808 let mut logger = logger.reborrow().for_node(node.id.clone());
809 let local = spawn_nodes.contains(&node.id);
810 if local {
811 if node.kind.dynamic() {
812 dataflow.dynamic_nodes.insert(node.id.clone());
813 } else {
814 dataflow.pending_nodes.insert(node.id.clone());
815 }
816
817 let node_id = node.id.clone();
818 let node_stderr_most_recent = dataflow
819 .node_stderr_most_recent
820 .entry(node.id.clone())
821 .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
822 .clone();
823 logger
824 .log(LogLevel::Info, Some("daemon".into()), "spawning")
825 .await;
826 match spawn::spawn_node(
827 dataflow_id,
828 &working_dir,
829 node,
830 self.events_tx.clone(),
831 dataflow_descriptor.clone(),
832 self.clock.clone(),
833 node_stderr_most_recent,
834 uv,
835 &mut logger,
836 )
837 .await
838 .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
839 {
840 Ok(running_node) => {
841 dataflow.running_nodes.insert(node_id, running_node);
842 }
843 Err(err) => {
844 logger
845 .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
846 .await;
847 self.dataflow_node_results
848 .entry(dataflow_id)
849 .or_default()
850 .insert(
851 node_id.clone(),
852 Err(NodeError {
853 timestamp: self.clock.new_timestamp(),
854 cause: NodeErrorCause::Other {
855 stderr: format!("spawn failed: {err:?}"),
856 },
857 exit_status: NodeExitStatus::Unknown,
858 }),
859 );
860 stopped.push(node_id.clone());
861 }
862 }
863 } else {
864 dataflow.pending_nodes.set_external_nodes(true);
866
867 for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
869 let tx = self
870 .remote_daemon_events_tx
871 .clone()
872 .wrap_err("no remote_daemon_events_tx channel")?;
873 let mut finished_rx = dataflow.finished_tx.subscribe();
874 let subscribe_topic = dataflow.output_publish_topic(output_id);
875 tracing::debug!("declaring subscriber on {subscribe_topic}");
876 let subscriber = self
877 .zenoh_session
878 .declare_subscriber(subscribe_topic)
879 .await
880 .map_err(|e| eyre!(e))
881 .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
882 tokio::spawn(async move {
883 let mut finished = pin!(finished_rx.recv());
884 loop {
885 let finished_or_next =
886 futures::future::select(finished, subscriber.recv_async());
887 match finished_or_next.await {
888 future::Either::Left((finished, _)) => {
889 match finished {
890 Err(broadcast::error::RecvError::Closed) => {
891 tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
892 break;
893 }
894 other => {
895 tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
896 break;
897 }
898 }
899 }
900 future::Either::Right((sample, f)) => {
901 finished = f;
902 let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
903 Timestamped::deserialize_inter_daemon_event(
904 &s.payload().to_bytes(),
905 )
906 });
907 if tx.send_async(event).await.is_err() {
908 break;
910 }
911 }
912 }
913 }
914 });
915 }
916 }
917 }
918 for node_id in stopped {
919 self.handle_node_stop(dataflow_id, &node_id).await?;
920 }
921
922 Ok(())
923 }
924
925 async fn handle_dynamic_node_event(
926 &mut self,
927 event: DynamicNodeEventWrapper,
928 ) -> eyre::Result<()> {
929 match event {
930 DynamicNodeEventWrapper {
931 event: DynamicNodeEvent::NodeConfig { node_id },
932 reply_tx,
933 } => {
934 let number_node_id = self
935 .running
936 .iter()
937 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
938 .count();
939
940 let node_config = match number_node_id {
941 2.. => Err(format!(
942 "multiple dataflows contains dynamic node id {node_id}. \
943 Please only have one running dataflow with the specified \
944 node id if you want to use dynamic node",
945 )),
946 1 => self
947 .running
948 .iter()
949 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
950 .map(|(id, dataflow)| -> Result<NodeConfig> {
951 let node_config = dataflow
952 .running_nodes
953 .get(&node_id)
954 .context("no node with ID `{node_id}` within the given dataflow")?
955 .node_config
956 .clone();
957 if !node_config.dynamic {
958 bail!("node with ID `{node_id}` in {id} is not dynamic");
959 }
960 Ok(node_config)
961 })
962 .next()
963 .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
964 .and_then(|r| r)
965 .map_err(|err| {
966 format!(
967 "failed to get dynamic node config within given dataflow: {err}"
968 )
969 }),
970 0 => Err("no node with ID `{node_id}`".to_string()),
971 };
972
973 let reply = DaemonReply::NodeConfig {
974 result: node_config,
975 };
976 let _ = reply_tx.send(Some(reply)).map_err(|_| {
977 error!("could not send node info reply from daemon to coordinator")
978 });
979 Ok(())
980 }
981 }
982 }
983
984 async fn handle_node_event(
985 &mut self,
986 event: DaemonNodeEvent,
987 dataflow_id: DataflowId,
988 node_id: NodeId,
989 ) -> eyre::Result<()> {
990 match event {
991 DaemonNodeEvent::Subscribe {
992 event_sender,
993 reply_sender,
994 } => {
995 let mut logger = self.logger.for_dataflow(dataflow_id);
996 logger
997 .log(
998 LogLevel::Info,
999 Some(node_id.clone()),
1000 Some("daemon".into()),
1001 "node is ready",
1002 )
1003 .await;
1004
1005 let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1006 format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1007 });
1008
1009 match dataflow {
1010 Err(err) => {
1011 let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1012 }
1013 Ok(dataflow) => {
1014 Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1015
1016 let status = dataflow
1017 .pending_nodes
1018 .handle_node_subscription(
1019 node_id.clone(),
1020 reply_sender,
1021 &mut self.coordinator_connection,
1022 &self.clock,
1023 &mut dataflow.cascading_error_causes,
1024 &mut logger,
1025 )
1026 .await?;
1027 match status {
1028 DataflowStatus::AllNodesReady => {
1029 logger
1030 .log(
1031 LogLevel::Info,
1032 None,
1033 Some("daemon".into()),
1034 "all nodes are ready, starting dataflow",
1035 )
1036 .await;
1037 dataflow.start(&self.events_tx, &self.clock).await?;
1038 }
1039 DataflowStatus::Pending => {}
1040 }
1041 }
1042 }
1043 }
1044 DaemonNodeEvent::SubscribeDrop {
1045 event_sender,
1046 reply_sender,
1047 } => {
1048 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1049 format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1050 });
1051 let result = match dataflow {
1052 Ok(dataflow) => {
1053 dataflow.drop_channels.insert(node_id, event_sender);
1054 Ok(())
1055 }
1056 Err(err) => Err(err.to_string()),
1057 };
1058 let _ = reply_sender.send(DaemonReply::Result(result));
1059 }
1060 DaemonNodeEvent::CloseOutputs {
1061 outputs,
1062 reply_sender,
1063 } => {
1064 let inner = async {
1066 self.send_output_closed_events(dataflow_id, node_id, outputs)
1067 .await
1068 };
1069
1070 let reply = inner.await.map_err(|err| format!("{err:?}"));
1071 let _ = reply_sender.send(DaemonReply::Result(reply));
1072 }
1073 DaemonNodeEvent::OutputsDone { reply_sender } => {
1074 let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1075
1076 let _ = reply_sender.send(DaemonReply::Result(
1077 result.map_err(|err| format!("{err:?}")),
1078 ));
1079 }
1080 DaemonNodeEvent::SendOut {
1081 output_id,
1082 metadata,
1083 data,
1084 } => self
1085 .send_out(dataflow_id, node_id, output_id, metadata, data)
1086 .await
1087 .context("failed to send out")?,
1088 DaemonNodeEvent::ReportDrop { tokens } => {
1089 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1090 format!(
1091 "failed to get handle drop tokens: \
1092 no running dataflow with ID `{dataflow_id}`"
1093 )
1094 });
1095
1096 match dataflow {
1097 Ok(dataflow) => {
1098 for token in tokens {
1099 match dataflow.pending_drop_tokens.get_mut(&token) {
1100 Some(info) => {
1101 if info.pending_nodes.remove(&node_id) {
1102 dataflow.check_drop_token(token, &self.clock).await?;
1103 } else {
1104 tracing::warn!(
1105 "node `{node_id}` is not pending for drop token `{token:?}`"
1106 );
1107 }
1108 }
1109 None => tracing::warn!("unknown drop token `{token:?}`"),
1110 }
1111 }
1112 }
1113 Err(err) => tracing::warn!("{err:?}"),
1114 }
1115 }
1116 DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1117 let inner = async {
1118 let dataflow = self
1119 .running
1120 .get_mut(&dataflow_id)
1121 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1122 dataflow.subscribe_channels.remove(&node_id);
1123 Result::<_, eyre::Error>::Ok(())
1124 };
1125
1126 let reply = inner.await.map_err(|err| format!("{err:?}"));
1127 let _ = reply_sender.send(DaemonReply::Result(reply));
1128 }
1129 }
1130 Ok(())
1131 }
1132
1133 async fn send_reload(
1134 &mut self,
1135 dataflow_id: Uuid,
1136 node_id: NodeId,
1137 operator_id: Option<OperatorId>,
1138 ) -> Result<(), eyre::ErrReport> {
1139 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1140 format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1141 })?;
1142 if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1143 match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1144 Ok(()) => {}
1145 Err(_) => {
1146 dataflow.subscribe_channels.remove(&node_id);
1147 }
1148 }
1149 }
1150 Ok(())
1151 }
1152
1153 async fn send_out(
1154 &mut self,
1155 dataflow_id: Uuid,
1156 node_id: NodeId,
1157 output_id: DataId,
1158 metadata: dora_message::metadata::Metadata,
1159 data: Option<DataMessage>,
1160 ) -> Result<(), eyre::ErrReport> {
1161 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1162 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1163 })?;
1164 let data_bytes = send_output_to_local_receivers(
1165 node_id.clone(),
1166 output_id.clone(),
1167 dataflow,
1168 &metadata,
1169 data,
1170 &self.clock,
1171 )
1172 .await?;
1173
1174 let output_id = OutputId(node_id, output_id);
1175 let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1176 || dataflow.publish_all_messages_to_zenoh;
1177 if remote_receivers {
1178 let event = InterDaemonEvent::Output {
1179 dataflow_id,
1180 node_id: output_id.0.clone(),
1181 output_id: output_id.1.clone(),
1182 metadata,
1183 data: data_bytes,
1184 };
1185 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1186 .await?;
1187 }
1188
1189 Ok(())
1190 }
1191
1192 async fn send_to_remote_receivers(
1193 &mut self,
1194 dataflow_id: Uuid,
1195 output_id: &OutputId,
1196 event: InterDaemonEvent,
1197 ) -> Result<(), eyre::Error> {
1198 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1199 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1200 })?;
1201
1202 let publisher = match dataflow.publishers.get(output_id) {
1204 Some(publisher) => publisher,
1205 None => {
1206 let publish_topic = dataflow.output_publish_topic(output_id);
1207 tracing::debug!("declaring publisher on {publish_topic}");
1208 let publisher = self
1209 .zenoh_session
1210 .declare_publisher(publish_topic)
1211 .await
1212 .map_err(|e| eyre!(e))
1213 .context("failed to create zenoh publisher")?;
1214 dataflow.publishers.insert(output_id.clone(), publisher);
1215 dataflow.publishers.get(output_id).unwrap()
1216 }
1217 };
1218
1219 let serialized_event = Timestamped {
1220 inner: event,
1221 timestamp: self.clock.new_timestamp(),
1222 }
1223 .serialize();
1224 publisher
1225 .put(serialized_event)
1226 .await
1227 .map_err(|e| eyre!(e))
1228 .context("zenoh put failed")?;
1229 Ok(())
1230 }
1231
1232 async fn send_output_closed_events(
1233 &mut self,
1234 dataflow_id: DataflowId,
1235 node_id: NodeId,
1236 outputs: Vec<DataId>,
1237 ) -> eyre::Result<()> {
1238 let dataflow = self
1239 .running
1240 .get_mut(&dataflow_id)
1241 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1242 let local_node_inputs: BTreeSet<_> = dataflow
1243 .mappings
1244 .iter()
1245 .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1246 .flat_map(|(_, v)| v)
1247 .cloned()
1248 .collect();
1249 for (receiver_id, input_id) in &local_node_inputs {
1250 close_input(dataflow, receiver_id, input_id, &self.clock);
1251 }
1252
1253 let mut closed = Vec::new();
1254 for output_id in &dataflow.open_external_mappings {
1255 if output_id.0 == node_id && outputs.contains(&output_id.1) {
1256 closed.push(output_id.clone());
1257 }
1258 }
1259
1260 for output_id in closed {
1261 let event = InterDaemonEvent::OutputClosed {
1262 dataflow_id,
1263 node_id: output_id.0.clone(),
1264 output_id: output_id.1.clone(),
1265 };
1266 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1267 .await?;
1268 }
1269
1270 Ok(())
1271 }
1272
1273 async fn subscribe(
1274 dataflow: &mut RunningDataflow,
1275 node_id: NodeId,
1276 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1277 clock: &HLC,
1278 ) {
1279 let closed_inputs = dataflow
1281 .mappings
1282 .values()
1283 .flatten()
1284 .filter(|(node, _)| node == &node_id)
1285 .map(|(_, input)| input)
1286 .filter(|input| {
1287 dataflow
1288 .open_inputs
1289 .get(&node_id)
1290 .map(|open_inputs| !open_inputs.contains(*input))
1291 .unwrap_or(true)
1292 });
1293 for input_id in closed_inputs {
1294 let _ = send_with_timestamp(
1295 &event_sender,
1296 NodeEvent::InputClosed {
1297 id: input_id.clone(),
1298 },
1299 clock,
1300 );
1301 }
1302 if dataflow.open_inputs(&node_id).is_empty() {
1303 let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1304 }
1305
1306 if dataflow.stop_sent {
1309 let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1310 }
1311
1312 dataflow.subscribe_channels.insert(node_id, event_sender);
1313 }
1314
1315 #[tracing::instrument(skip(self), level = "trace")]
1316 async fn handle_outputs_done(
1317 &mut self,
1318 dataflow_id: DataflowId,
1319 node_id: &NodeId,
1320 ) -> eyre::Result<()> {
1321 let dataflow = self
1322 .running
1323 .get_mut(&dataflow_id)
1324 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1325
1326 let outputs = dataflow
1327 .mappings
1328 .keys()
1329 .filter(|m| &m.0 == node_id)
1330 .map(|m| &m.1)
1331 .cloned()
1332 .collect();
1333 self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1334 .await?;
1335
1336 let dataflow = self
1337 .running
1338 .get_mut(&dataflow_id)
1339 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1340 dataflow.drop_channels.remove(node_id);
1341 Ok(())
1342 }
1343
1344 async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
1345 let mut logger = self.logger.for_dataflow(dataflow_id);
1346 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1347 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1348 })?;
1349
1350 dataflow
1351 .pending_nodes
1352 .handle_node_stop(
1353 node_id,
1354 &mut self.coordinator_connection,
1355 &self.clock,
1356 &mut dataflow.cascading_error_causes,
1357 &mut logger,
1358 )
1359 .await?;
1360
1361 self.handle_outputs_done(dataflow_id, node_id).await?;
1362
1363 let mut logger = self.logger.for_dataflow(dataflow_id);
1364 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1365 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1366 })?;
1367 if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1368 pid.mark_as_stopped()
1369 }
1370 if dataflow
1371 .running_nodes
1372 .iter()
1373 .all(|(_id, n)| n.node_config.dynamic)
1374 {
1375 let result = DataflowDaemonResult {
1376 timestamp: self.clock.new_timestamp(),
1377 node_results: self
1378 .dataflow_node_results
1379 .get(&dataflow.id)
1380 .context("failed to get dataflow node results")?
1381 .clone(),
1382 };
1383
1384 logger
1385 .log(
1386 LogLevel::Info,
1387 None,
1388 Some("daemon".into()),
1389 format!("dataflow finished on machine `{}`", self.daemon_id),
1390 )
1391 .await;
1392 if let Some(connection) = &mut self.coordinator_connection {
1393 let msg = serde_json::to_vec(&Timestamped {
1394 inner: CoordinatorRequest::Event {
1395 daemon_id: self.daemon_id.clone(),
1396 event: DaemonEvent::AllNodesFinished {
1397 dataflow_id,
1398 result,
1399 },
1400 },
1401 timestamp: self.clock.new_timestamp(),
1402 })?;
1403 socket_stream_send(connection, &msg)
1404 .await
1405 .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1406 }
1407 self.running.remove(&dataflow_id);
1408 }
1409
1410 Ok(())
1411 }
1412
1413 async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<RunStatus> {
1414 match event {
1415 DoraEvent::Timer {
1416 dataflow_id,
1417 interval,
1418 metadata,
1419 } => {
1420 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1421 tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1422 return Ok(RunStatus::Continue);
1423 };
1424
1425 let Some(subscribers) = dataflow.timers.get(&interval) else {
1426 return Ok(RunStatus::Continue);
1427 };
1428
1429 let mut closed = Vec::new();
1430 for (receiver_id, input_id) in subscribers {
1431 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1432 continue;
1433 };
1434
1435 let send_result = send_with_timestamp(
1436 channel,
1437 NodeEvent::Input {
1438 id: input_id.clone(),
1439 metadata: metadata.clone(),
1440 data: None,
1441 },
1442 &self.clock,
1443 );
1444 match send_result {
1445 Ok(()) => {}
1446 Err(_) => {
1447 closed.push(receiver_id);
1448 }
1449 }
1450 }
1451 for id in closed {
1452 dataflow.subscribe_channels.remove(id);
1453 }
1454 }
1455 DoraEvent::Logs {
1456 dataflow_id,
1457 output_id,
1458 message,
1459 metadata,
1460 } => {
1461 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1462 tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
1463 return Ok(RunStatus::Continue);
1464 };
1465
1466 let Some(subscribers) = dataflow.mappings.get(&output_id) else {
1467 tracing::warn!(
1468 "No subscribers found for {:?} in {:?}",
1469 output_id,
1470 dataflow.mappings
1471 );
1472 return Ok(RunStatus::Continue);
1473 };
1474
1475 let mut closed = Vec::new();
1476 for (receiver_id, input_id) in subscribers {
1477 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1478 tracing::warn!("No subscriber channel found for {:?}", output_id);
1479 continue;
1480 };
1481
1482 let send_result = send_with_timestamp(
1483 channel,
1484 NodeEvent::Input {
1485 id: input_id.clone(),
1486 metadata: metadata.clone(),
1487 data: Some(message.clone()),
1488 },
1489 &self.clock,
1490 );
1491 match send_result {
1492 Ok(()) => {}
1493 Err(_) => {
1494 closed.push(receiver_id);
1495 }
1496 }
1497 }
1498 for id in closed {
1499 dataflow.subscribe_channels.remove(id);
1500 }
1501 }
1502 DoraEvent::SpawnedNodeResult {
1503 dataflow_id,
1504 node_id,
1505 exit_status,
1506 } => {
1507 let mut logger = self
1508 .logger
1509 .for_dataflow(dataflow_id)
1510 .for_node(node_id.clone());
1511 logger
1512 .log(
1513 LogLevel::Debug,
1514 Some("daemon".into()),
1515 format!("handling node stop with exit status {exit_status:?}"),
1516 )
1517 .await;
1518
1519 let node_result = match exit_status {
1520 NodeExitStatus::Success => Ok(()),
1521 exit_status => {
1522 let dataflow = self.running.get(&dataflow_id);
1523 let caused_by_node = dataflow
1524 .and_then(|dataflow| {
1525 dataflow.cascading_error_causes.error_caused_by(&node_id)
1526 })
1527 .cloned();
1528 let grace_duration_kill = dataflow
1529 .map(|d| d.grace_duration_kills.contains(&node_id))
1530 .unwrap_or_default();
1531
1532 let cause = match caused_by_node {
1533 Some(caused_by_node) => {
1534 logger
1535 .log(
1536 LogLevel::Info,
1537 Some("daemon".into()),
1538 format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
1539 )
1540 .await;
1541
1542 NodeErrorCause::Cascading { caused_by_node }
1543 }
1544 None if grace_duration_kill => NodeErrorCause::GraceDuration,
1545 None => {
1546 let cause = dataflow
1547 .and_then(|d| d.node_stderr_most_recent.get(&node_id))
1548 .map(|queue| {
1549 let mut s = if queue.is_full() {
1550 "[...]".into()
1551 } else {
1552 String::new()
1553 };
1554 while let Some(line) = queue.pop() {
1555 s += &line;
1556 }
1557 s
1558 })
1559 .unwrap_or_default();
1560
1561 NodeErrorCause::Other { stderr: cause }
1562 }
1563 };
1564 Err(NodeError {
1565 timestamp: self.clock.new_timestamp(),
1566 cause,
1567 exit_status,
1568 })
1569 }
1570 };
1571
1572 logger
1573 .log(
1574 if node_result.is_ok() {
1575 LogLevel::Info
1576 } else {
1577 LogLevel::Error
1578 },
1579 Some("daemon".into()),
1580 match &node_result {
1581 Ok(()) => format!("{node_id} finished successfully"),
1582 Err(err) => format!("{err}"),
1583 },
1584 )
1585 .await;
1586
1587 self.dataflow_node_results
1588 .entry(dataflow_id)
1589 .or_default()
1590 .insert(node_id.clone(), node_result);
1591
1592 self.handle_node_stop(dataflow_id, &node_id).await?;
1593
1594 if let Some(exit_when_done) = &mut self.exit_when_done {
1595 exit_when_done.remove(&(dataflow_id, node_id));
1596 if exit_when_done.is_empty() {
1597 tracing::info!(
1598 "exiting daemon because all required dataflows are finished"
1599 );
1600 return Ok(RunStatus::Exit);
1601 }
1602 }
1603 if self.exit_when_all_finished && self.running.is_empty() {
1604 return Ok(RunStatus::Exit);
1605 }
1606 }
1607 }
1608 Ok(RunStatus::Continue)
1609 }
1610}
1611
1612async fn set_up_event_stream(
1613 coordinator_addr: SocketAddr,
1614 machine_id: &Option<String>,
1615 clock: &Arc<HLC>,
1616 remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
1617 local_listen_port: u16,
1619) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
1620 let clock_cloned = clock.clone();
1621 let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
1622 Ok(e) => Timestamped {
1623 inner: Event::Daemon(e.inner),
1624 timestamp: e.timestamp,
1625 },
1626 Err(err) => Timestamped {
1627 inner: Event::DaemonError(err),
1628 timestamp: clock_cloned.new_timestamp(),
1629 },
1630 });
1631 let (daemon_id, coordinator_events) =
1632 coordinator::register(coordinator_addr, machine_id.clone(), clock)
1633 .await
1634 .wrap_err("failed to connect to dora-coordinator")?;
1635 let coordinator_events = coordinator_events.map(
1636 |Timestamped {
1637 inner: event,
1638 timestamp,
1639 }| Timestamped {
1640 inner: Event::Coordinator(event),
1641 timestamp,
1642 },
1643 );
1644 let (events_tx, events_rx) = flume::bounded(10);
1645 let _listen_port =
1646 local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
1647 .await?;
1648 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
1649 inner: Event::DynamicNode(e.inner),
1650 timestamp: e.timestamp,
1651 });
1652 let incoming = (
1653 coordinator_events,
1654 remote_daemon_events,
1655 dynamic_node_events,
1656 )
1657 .merge();
1658 Ok((daemon_id, incoming))
1659}
1660
1661async fn send_output_to_local_receivers(
1662 node_id: NodeId,
1663 output_id: DataId,
1664 dataflow: &mut RunningDataflow,
1665 metadata: &metadata::Metadata,
1666 data: Option<DataMessage>,
1667 clock: &HLC,
1668) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
1669 let timestamp = metadata.timestamp();
1670 let empty_set = BTreeSet::new();
1671 let output_id = OutputId(node_id, output_id);
1672 let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
1673 let OutputId(node_id, _) = output_id;
1674 let mut closed = Vec::new();
1675 for (receiver_id, input_id) in local_receivers {
1676 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1677 let item = NodeEvent::Input {
1678 id: input_id.clone(),
1679 metadata: metadata.clone(),
1680 data: data.clone(),
1681 };
1682 match channel.send(Timestamped {
1683 inner: item,
1684 timestamp,
1685 }) {
1686 Ok(()) => {
1687 if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
1688 dataflow
1689 .pending_drop_tokens
1690 .entry(token)
1691 .or_insert_with(|| DropTokenInformation {
1692 owner: node_id.clone(),
1693 pending_nodes: Default::default(),
1694 })
1695 .pending_nodes
1696 .insert(receiver_id.clone());
1697 }
1698 }
1699 Err(_) => {
1700 closed.push(receiver_id);
1701 }
1702 }
1703 }
1704 }
1705 for id in closed {
1706 dataflow.subscribe_channels.remove(id);
1707 }
1708 let (data_bytes, drop_token) = match data {
1709 None => (None, None),
1710 Some(DataMessage::SharedMemory {
1711 shared_memory_id,
1712 len,
1713 drop_token,
1714 }) => {
1715 let memory = ShmemConf::new()
1716 .os_id(shared_memory_id)
1717 .open()
1718 .wrap_err("failed to map shared memory output")?;
1719 let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
1720 (data, Some(drop_token))
1721 }
1722 Some(DataMessage::Vec(v)) => (Some(v), None),
1723 };
1724 if let Some(token) = drop_token {
1725 dataflow
1727 .pending_drop_tokens
1728 .entry(token)
1729 .or_insert_with(|| DropTokenInformation {
1730 owner: node_id.clone(),
1731 pending_nodes: Default::default(),
1732 });
1733 dataflow.check_drop_token(token, clock).await?;
1735 }
1736 Ok(data_bytes)
1737}
1738
1739fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
1740 match &node.kind {
1741 CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
1742 CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
1743 }
1744}
1745
1746fn close_input(
1747 dataflow: &mut RunningDataflow,
1748 receiver_id: &NodeId,
1749 input_id: &DataId,
1750 clock: &HLC,
1751) {
1752 if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
1753 if !open_inputs.remove(input_id) {
1754 return;
1755 }
1756 }
1757 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
1758 let _ = send_with_timestamp(
1759 channel,
1760 NodeEvent::InputClosed {
1761 id: input_id.clone(),
1762 },
1763 clock,
1764 );
1765
1766 if dataflow.open_inputs(receiver_id).is_empty() {
1767 let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
1768 }
1769 }
1770}
1771
1772#[derive(Debug)]
1773struct RunningNode {
1774 pid: Option<ProcessId>,
1775 node_config: NodeConfig,
1776}
1777
1778#[derive(Debug)]
1779struct ProcessId(Option<u32>);
1780
1781impl ProcessId {
1782 pub fn new(process_id: u32) -> Self {
1783 Self(Some(process_id))
1784 }
1785
1786 pub fn mark_as_stopped(&mut self) {
1787 self.0 = None;
1788 }
1789
1790 pub fn kill(&mut self) -> bool {
1791 if let Some(pid) = self.0 {
1792 let mut system = sysinfo::System::new();
1793 system.refresh_processes();
1794
1795 if let Some(process) = system.process(Pid::from(pid as usize)) {
1796 process.kill();
1797 self.mark_as_stopped();
1798 return true;
1799 }
1800 }
1801
1802 false
1803 }
1804}
1805
1806impl Drop for ProcessId {
1807 fn drop(&mut self) {
1808 if let Some(pid) = self.0 {
1810 if self.kill() {
1811 warn!("process {pid} was killed on drop because it was still running")
1812 }
1813 }
1814 }
1815}
1816
1817pub struct RunningDataflow {
1818 id: Uuid,
1819 pending_nodes: PendingNodes,
1821
1822 subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
1823 drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
1824 mappings: HashMap<OutputId, BTreeSet<InputId>>,
1825 timers: BTreeMap<Duration, BTreeSet<InputId>>,
1826 open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
1827 running_nodes: BTreeMap<NodeId, RunningNode>,
1828
1829 dynamic_nodes: BTreeSet<NodeId>,
1834
1835 open_external_mappings: BTreeSet<OutputId>,
1836
1837 pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
1838
1839 _timer_handles: Vec<futures::future::RemoteHandle<()>>,
1841 stop_sent: bool,
1842
1843 empty_set: BTreeSet<DataId>,
1847
1848 cascading_error_causes: CascadingErrorCauses,
1850 grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
1851
1852 node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
1853
1854 publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
1855
1856 finished_tx: broadcast::Sender<()>,
1857
1858 publish_all_messages_to_zenoh: bool,
1859}
1860
1861impl RunningDataflow {
1862 fn new(
1863 dataflow_id: Uuid,
1864 daemon_id: DaemonId,
1865 dataflow_descriptor: &Descriptor,
1866 ) -> RunningDataflow {
1867 let (finished_tx, _) = broadcast::channel(1);
1868 Self {
1869 id: dataflow_id,
1870 pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
1871 subscribe_channels: HashMap::new(),
1872 drop_channels: HashMap::new(),
1873 mappings: HashMap::new(),
1874 timers: BTreeMap::new(),
1875 open_inputs: BTreeMap::new(),
1876 running_nodes: BTreeMap::new(),
1877 dynamic_nodes: BTreeSet::new(),
1878 open_external_mappings: Default::default(),
1879 pending_drop_tokens: HashMap::new(),
1880 _timer_handles: Vec::new(),
1881 stop_sent: false,
1882 empty_set: BTreeSet::new(),
1883 cascading_error_causes: Default::default(),
1884 grace_duration_kills: Default::default(),
1885 node_stderr_most_recent: BTreeMap::new(),
1886 publishers: Default::default(),
1887 finished_tx,
1888 publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
1889 }
1890 }
1891
1892 async fn start(
1893 &mut self,
1894 events_tx: &mpsc::Sender<Timestamped<Event>>,
1895 clock: &Arc<HLC>,
1896 ) -> eyre::Result<()> {
1897 for interval in self.timers.keys().copied() {
1898 let events_tx = events_tx.clone();
1899 let dataflow_id = self.id;
1900 let clock = clock.clone();
1901 let task = async move {
1902 let mut interval_stream = tokio::time::interval(interval);
1903 let hlc = HLC::default();
1904 loop {
1905 interval_stream.tick().await;
1906
1907 let span = tracing::span!(tracing::Level::TRACE, "tick");
1908 let _ = span.enter();
1909
1910 let mut parameters = BTreeMap::new();
1911 parameters.insert(
1912 "open_telemetry_context".to_string(),
1913 #[cfg(feature = "telemetry")]
1914 Parameter::String(serialize_context(&span.context())),
1915 #[cfg(not(feature = "telemetry"))]
1916 Parameter::String("".into()),
1917 );
1918
1919 let metadata = metadata::Metadata::from_parameters(
1920 hlc.new_timestamp(),
1921 empty_type_info(),
1922 parameters,
1923 );
1924
1925 let event = Timestamped {
1926 inner: DoraEvent::Timer {
1927 dataflow_id,
1928 interval,
1929 metadata,
1930 }
1931 .into(),
1932 timestamp: clock.new_timestamp(),
1933 };
1934 if events_tx.send(event).await.is_err() {
1935 break;
1936 }
1937 }
1938 };
1939 let (task, handle) = task.remote_handle();
1940 tokio::spawn(task);
1941 self._timer_handles.push(handle);
1942 }
1943
1944 Ok(())
1945 }
1946
1947 async fn stop_all(
1948 &mut self,
1949 coordinator_connection: &mut Option<TcpStream>,
1950 clock: &HLC,
1951 grace_duration: Option<Duration>,
1952 logger: &mut DataflowLogger<'_>,
1953 ) -> eyre::Result<()> {
1954 self.pending_nodes
1955 .handle_dataflow_stop(
1956 coordinator_connection,
1957 clock,
1958 &mut self.cascading_error_causes,
1959 &self.dynamic_nodes,
1960 logger,
1961 )
1962 .await?;
1963
1964 for (_node_id, channel) in self.subscribe_channels.drain() {
1965 let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
1966 }
1967
1968 let running_processes: Vec<_> = self
1969 .running_nodes
1970 .iter_mut()
1971 .map(|(id, n)| (id.clone(), n.pid.take()))
1972 .collect();
1973 let grace_duration_kills = self.grace_duration_kills.clone();
1974 tokio::spawn(async move {
1975 let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
1976 tokio::time::sleep(duration).await;
1977
1978 for (node, pid) in running_processes {
1979 if let Some(mut pid) = pid {
1980 if pid.kill() {
1981 grace_duration_kills.insert(node.clone());
1982 warn!(
1983 "{node} was killed due to not stopping within the {:#?} grace period",
1984 duration
1985 )
1986 }
1987 }
1988 }
1989 });
1990 self.stop_sent = true;
1991 Ok(())
1992 }
1993
1994 fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
1995 self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
1996 }
1997
1998 async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
1999 match self.pending_drop_tokens.entry(token) {
2000 std::collections::hash_map::Entry::Occupied(entry) => {
2001 if entry.get().pending_nodes.is_empty() {
2002 let (drop_token, info) = entry.remove_entry();
2003 let result = match self.drop_channels.get_mut(&info.owner) {
2004 Some(channel) => send_with_timestamp(
2005 channel,
2006 NodeDropEvent::OutputDropped { drop_token },
2007 clock,
2008 )
2009 .wrap_err("send failed"),
2010 None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2011 };
2012 if let Err(err) = result.wrap_err_with(|| {
2013 format!(
2014 "failed to report drop token `{drop_token:?}` to owner `{}`",
2015 &info.owner
2016 )
2017 }) {
2018 tracing::warn!("{err:?}");
2019 }
2020 }
2021 }
2022 std::collections::hash_map::Entry::Vacant(_) => {
2023 tracing::warn!("check_drop_token called with already closed token")
2024 }
2025 }
2026
2027 Ok(())
2028 }
2029
2030 fn output_publish_topic(&self, output_id: &OutputId) -> String {
2031 let network_id = "default";
2032 let dataflow_id = self.id;
2033 let OutputId(node_id, output_id) = output_id;
2034 format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
2035 }
2036}
2037
2038fn empty_type_info() -> ArrowTypeInfo {
2039 ArrowTypeInfo {
2040 data_type: DataType::Null,
2041 len: 0,
2042 null_count: 0,
2043 validity: None,
2044 offset: 0,
2045 buffer_offsets: Vec::new(),
2046 child_data: Vec::new(),
2047 }
2048}
2049
2050#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2051pub struct OutputId(NodeId, DataId);
2052type InputId = (NodeId, DataId);
2053
2054struct DropTokenInformation {
2055 owner: NodeId,
2057 pending_nodes: BTreeSet<NodeId>,
2060}
2061
2062#[derive(Debug)]
2063pub enum Event {
2064 Node {
2065 dataflow_id: DataflowId,
2066 node_id: NodeId,
2067 event: DaemonNodeEvent,
2068 },
2069 Coordinator(CoordinatorEvent),
2070 Daemon(InterDaemonEvent),
2071 Dora(DoraEvent),
2072 DynamicNode(DynamicNodeEventWrapper),
2073 HeartbeatInterval,
2074 CtrlC,
2075 SecondCtrlC,
2076 DaemonError(eyre::Report),
2077}
2078
2079impl From<DoraEvent> for Event {
2080 fn from(event: DoraEvent) -> Self {
2081 Event::Dora(event)
2082 }
2083}
2084
2085#[derive(Debug)]
2086pub enum DaemonNodeEvent {
2087 OutputsDone {
2088 reply_sender: oneshot::Sender<DaemonReply>,
2089 },
2090 Subscribe {
2091 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2092 reply_sender: oneshot::Sender<DaemonReply>,
2093 },
2094 SubscribeDrop {
2095 event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2096 reply_sender: oneshot::Sender<DaemonReply>,
2097 },
2098 CloseOutputs {
2099 outputs: Vec<dora_core::config::DataId>,
2100 reply_sender: oneshot::Sender<DaemonReply>,
2101 },
2102 SendOut {
2103 output_id: DataId,
2104 metadata: metadata::Metadata,
2105 data: Option<DataMessage>,
2106 },
2107 ReportDrop {
2108 tokens: Vec<DropToken>,
2109 },
2110 EventStreamDropped {
2111 reply_sender: oneshot::Sender<DaemonReply>,
2112 },
2113}
2114
2115#[derive(Debug)]
2116pub enum DoraEvent {
2117 Timer {
2118 dataflow_id: DataflowId,
2119 interval: Duration,
2120 metadata: metadata::Metadata,
2121 },
2122 Logs {
2123 dataflow_id: DataflowId,
2124 output_id: OutputId,
2125 message: DataMessage,
2126 metadata: metadata::Metadata,
2127 },
2128 SpawnedNodeResult {
2129 dataflow_id: DataflowId,
2130 node_id: NodeId,
2131 exit_status: NodeExitStatus,
2132 },
2133}
2134
2135#[must_use]
2136enum RunStatus {
2137 Continue,
2138 Exit,
2139}
2140
2141fn send_with_timestamp<T>(
2142 sender: &UnboundedSender<Timestamped<T>>,
2143 event: T,
2144 clock: &HLC,
2145) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2146 sender.send(Timestamped {
2147 inner: event,
2148 timestamp: clock.new_timestamp(),
2149 })
2150}
2151
2152fn set_up_ctrlc_handler(
2153 clock: Arc<HLC>,
2154) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2155 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2156
2157 let mut ctrlc_sent = 0;
2158 ctrlc::set_handler(move || {
2159 let event = match ctrlc_sent {
2160 0 => Event::CtrlC,
2161 1 => Event::SecondCtrlC,
2162 _ => {
2163 tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2164 std::process::abort();
2165 }
2166 };
2167 if ctrlc_tx
2168 .blocking_send(Timestamped {
2169 inner: event,
2170 timestamp: clock.new_timestamp(),
2171 })
2172 .is_err()
2173 {
2174 tracing::error!("failed to report ctrl-c event to dora-coordinator");
2175 }
2176
2177 ctrlc_sent += 1;
2178 })
2179 .wrap_err("failed to set ctrl-c handler")?;
2180
2181 Ok(ctrlc_rx)
2182}
2183
2184#[derive(Debug, Default, Clone, PartialEq, Eq)]
2185pub struct CascadingErrorCauses {
2186 caused_by: BTreeMap<NodeId, NodeId>,
2187}
2188
2189impl CascadingErrorCauses {
2190 pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2191 self.caused_by.contains_key(node)
2192 }
2193
2194 pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2196 self.caused_by.get(node)
2197 }
2198
2199 pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2200 self.caused_by.entry(affected_node).or_insert(causing_node);
2201 }
2202}
2203
2204fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2205 n.operators
2206 .iter()
2207 .flat_map(|operator| {
2208 operator.config.inputs.iter().map(|(input_id, mapping)| {
2209 (
2210 DataId::from(format!("{}/{input_id}", operator.id)),
2211 mapping.clone(),
2212 )
2213 })
2214 })
2215 .collect()
2216}
2217
2218fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2219 n.operators
2220 .iter()
2221 .flat_map(|operator| {
2222 operator
2223 .config
2224 .outputs
2225 .iter()
2226 .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2227 })
2228 .collect()
2229}
2230
2231trait CoreNodeKindExt {
2232 fn run_config(&self) -> NodeRunConfig;
2233 fn dynamic(&self) -> bool;
2234}
2235
2236impl CoreNodeKindExt for CoreNodeKind {
2237 fn run_config(&self) -> NodeRunConfig {
2238 match self {
2239 CoreNodeKind::Runtime(n) => NodeRunConfig {
2240 inputs: runtime_node_inputs(n),
2241 outputs: runtime_node_outputs(n),
2242 },
2243 CoreNodeKind::Custom(n) => n.run_config.clone(),
2244 }
2245 }
2246
2247 fn dynamic(&self) -> bool {
2248 match self {
2249 CoreNodeKind::Runtime(_n) => false,
2250 CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
2251 }
2252 }
2253}