1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5 build::{self, BuildInfo, GitManager, PrevGitSource},
6 config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
7 descriptor::{
8 CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
9 read_as_descriptor,
10 },
11 topics::{
12 DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST, open_zenoh_session,
13 zenoh_output_publish_topic,
14 },
15 uhlc::{self, HLC},
16};
17use dora_message::{
18 BuildId, DataflowId, SessionId,
19 common::{
20 DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
21 NodeExitStatus,
22 },
23 coordinator_to_cli::DataflowResult,
24 coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
25 daemon_to_coordinator::{
26 CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
27 },
28 daemon_to_daemon::InterDaemonEvent,
29 daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
30 descriptor::{NodeSource, RestartPolicy},
31 metadata::{self, ArrowTypeInfo},
32 node_to_daemon::{DynamicNodeEvent, Timestamped},
33};
34use dora_node_api::{Parameter, arrow::datatypes::DataType};
35use eyre::{Context, ContextCompat, Result, bail, eyre};
36use futures::{FutureExt, TryFutureExt, future, stream};
37use futures_concurrency::stream::Merge;
38use local_listener::DynamicNodeEventWrapper;
39use log::{DaemonLogger, DataflowLogger, Logger};
40use pending::PendingNodes;
41use process_wrap::tokio::TokioChildWrapper;
42use shared_memory_server::ShmemConf;
43use socket_stream_utils::socket_stream_send;
44use spawn::Spawner;
45use std::{
46 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
47 env::current_dir,
48 future::Future,
49 io,
50 net::SocketAddr,
51 path::{Path, PathBuf},
52 pin::pin,
53 sync::{
54 Arc,
55 atomic::{self, AtomicBool, AtomicU32},
56 },
57 time::{Duration, Instant},
58};
59use tokio::{
60 fs::File,
61 io::{AsyncReadExt, AsyncSeekExt},
62 net::TcpStream,
63 sync::{
64 broadcast,
65 mpsc::{self, UnboundedSender},
66 oneshot::{self, Sender},
67 },
68};
69use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
70use tracing::{error, warn};
71use uuid::{NoContext, Timestamp, Uuid};
72
73pub use flume;
74pub use log::LogDestination;
75
76mod coordinator;
77mod local_listener;
78mod log;
79mod node_communication;
80mod pending;
81mod socket_stream_utils;
82mod spawn;
83
84#[cfg(feature = "telemetry")]
85use dora_tracing::telemetry::serialize_context;
86#[cfg(feature = "telemetry")]
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88
89use crate::pending::DataflowStatus;
90
91const STDERR_LOG_LINES: usize = 10;
92
93pub struct Daemon {
94 running: HashMap<DataflowId, RunningDataflow>,
95 working_dir: HashMap<DataflowId, PathBuf>,
96
97 events_tx: mpsc::Sender<Timestamped<Event>>,
98
99 coordinator_connection: Option<TcpStream>,
100 last_coordinator_heartbeat: Instant,
101 daemon_id: DaemonId,
102
103 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
105 exit_when_all_finished: bool,
107 dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
109
110 clock: Arc<uhlc::HLC>,
111
112 zenoh_session: zenoh::Session,
113 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
114
115 logger: DaemonLogger,
116
117 sessions: BTreeMap<SessionId, BuildId>,
118 builds: BTreeMap<BuildId, BuildInfo>,
119 git_manager: GitManager,
120 metrics_system: sysinfo::System,
122}
123
124type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
125
126struct NodeBuildTask<F> {
127 node_id: NodeId,
128 dynamic_node: bool,
129 task: F,
130}
131
132impl Daemon {
133 pub async fn run(
134 coordinator_addr: SocketAddr,
135 machine_id: Option<String>,
136 local_listen_port: u16,
137 ) -> eyre::Result<()> {
138 let clock = Arc::new(HLC::default());
139
140 let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
141 let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
142 let (daemon_id, incoming_events) = {
143 let incoming_events = set_up_event_stream(
144 coordinator_addr,
145 &machine_id,
146 &clock,
147 remote_daemon_events_rx,
148 local_listen_port,
149 );
150
151 let ctrl_c = pin!(ctrlc_events.recv());
153 match futures::future::select(ctrl_c, pin!(incoming_events)).await {
154 future::Either::Left((_ctrl_c, _)) => {
155 tracing::info!("received ctrl-c signal -> stopping daemon");
156 return Ok(());
157 }
158 future::Either::Right((events, _)) => events?,
159 }
160 };
161
162 let log_destination = {
163 let stream = TcpStream::connect(coordinator_addr)
165 .await
166 .wrap_err("failed to connect log to dora-coordinator")?;
167 stream
168 .set_nodelay(true)
169 .wrap_err("failed to set TCP_NODELAY")?;
170 LogDestination::Coordinator {
171 coordinator_connection: stream,
172 }
173 };
174
175 Self::run_general(
176 (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
177 Some(coordinator_addr),
178 daemon_id,
179 None,
180 clock.clone(),
181 Some(remote_daemon_events_tx),
182 Default::default(),
183 log_destination,
184 )
185 .await
186 .map(|_| ())
187 }
188
189 pub async fn run_dataflow(
190 dataflow_path: &Path,
191 build_id: Option<BuildId>,
192 local_build: Option<BuildInfo>,
193 session_id: SessionId,
194 uv: bool,
195 log_destination: LogDestination,
196 write_events_to: Option<PathBuf>,
197 ) -> eyre::Result<DataflowResult> {
198 let working_dir = dataflow_path
199 .canonicalize()
200 .context("failed to canonicalize dataflow path")?
201 .parent()
202 .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
203 .to_owned();
204
205 let descriptor = read_as_descriptor(dataflow_path).await?;
206 if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
207 eyre::bail!(
208 "node {} has a `deploy` section, which is not supported in `dora run`\n\n
209 Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
210 instances and then use `dora start`.",
211 node.id
212 )
213 }
214
215 descriptor.check(&working_dir)?;
216 let nodes = descriptor.resolve_aliases_and_set_defaults()?;
217
218 let (events_tx, events_rx) = flume::bounded(10);
219 if nodes
220 .iter()
221 .find(|(_n, resolved_nodes)| resolved_nodes.kind.dynamic())
222 .is_some()
223 {
224 let _listen_port = local_listener::spawn_listener_loop(
226 (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(),
227 events_tx,
228 )
229 .await?;
230 }
231 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
232 inner: Event::DynamicNode(e.inner),
233 timestamp: e.timestamp,
234 });
235
236 let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
237 let spawn_command = SpawnDataflowNodes {
238 build_id,
239 session_id,
240 dataflow_id,
241 local_working_dir: Some(working_dir),
242 spawn_nodes: nodes.keys().cloned().collect(),
243 nodes,
244 dataflow_descriptor: descriptor,
245 uv,
246 write_events_to,
247 };
248
249 let clock = Arc::new(HLC::default());
250
251 let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
252
253 let exit_when_done = spawn_command
254 .nodes
255 .values()
256 .map(|n| (spawn_command.dataflow_id, n.id.clone()))
257 .collect();
258 let (reply_tx, reply_rx) = oneshot::channel();
259 let timestamp = clock.new_timestamp();
260 let coordinator_events = stream::once(async move {
261 Timestamped {
262 inner: Event::Coordinator(CoordinatorEvent {
263 event: DaemonCoordinatorEvent::Spawn(spawn_command),
264 reply_tx,
265 }),
266 timestamp,
267 }
268 });
269 let events = (coordinator_events, ctrlc_events, dynamic_node_events).merge();
270 let run_result = Self::run_general(
271 Box::pin(events),
272 None,
273 DaemonId::new(None),
274 Some(exit_when_done),
275 clock.clone(),
276 None,
277 if let Some(local_build) = local_build {
278 let Some(build_id) = build_id else {
279 bail!("no build_id, but local_build set")
280 };
281 let mut builds = BTreeMap::new();
282 builds.insert(build_id, local_build);
283 builds
284 } else {
285 Default::default()
286 },
287 log_destination,
288 );
289
290 let spawn_result = reply_rx
291 .map_err(|err| eyre!("failed to receive spawn result: {err}"))
292 .and_then(|r| async {
293 match r {
294 Some(DaemonCoordinatorReply::TriggerSpawnResult(result)) => {
295 result.map_err(|err| eyre!(err))
296 }
297 _ => Err(eyre!("unexpected spawn reply")),
298 }
299 });
300
301 let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
302
303 Ok(DataflowResult {
304 uuid: dataflow_id,
305 timestamp: clock.new_timestamp(),
306 node_results: dataflow_results
307 .remove(&dataflow_id)
308 .context("no node results for dataflow_id")?,
309 })
310 }
311
312 #[allow(clippy::too_many_arguments)]
313 async fn run_general(
314 external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
315 coordinator_addr: Option<SocketAddr>,
316 daemon_id: DaemonId,
317 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
318 clock: Arc<HLC>,
319 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
320 builds: BTreeMap<BuildId, BuildInfo>,
321 log_destination: LogDestination,
322 ) -> eyre::Result<DaemonRunResult> {
323 let coordinator_connection = match coordinator_addr {
324 Some(addr) => {
325 let stream = TcpStream::connect(addr)
326 .await
327 .wrap_err("failed to connect to dora-coordinator")?;
328 stream
329 .set_nodelay(true)
330 .wrap_err("failed to set TCP_NODELAY")?;
331 Some(stream)
332 }
333 None => None,
334 };
335
336 let zenoh_session = open_zenoh_session(coordinator_addr.map(|addr| addr.ip()))
337 .await
338 .wrap_err("failed to open zenoh session")?;
339 let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
340 let daemon = Self {
341 logger: Logger {
342 destination: log_destination,
343 daemon_id: daemon_id.clone(),
344 clock: clock.clone(),
345 }
346 .for_daemon(daemon_id.clone()),
347 running: HashMap::new(),
348 working_dir: HashMap::new(),
349 events_tx: dora_events_tx,
350 coordinator_connection,
351 last_coordinator_heartbeat: Instant::now(),
352 daemon_id,
353 exit_when_done,
354 exit_when_all_finished: false,
355 dataflow_node_results: BTreeMap::new(),
356 clock,
357 zenoh_session,
358 remote_daemon_events_tx,
359 git_manager: Default::default(),
360 builds,
361 sessions: Default::default(),
362 metrics_system: sysinfo::System::new(),
363 };
364
365 let dora_events = ReceiverStream::new(dora_events_rx);
366 let watchdog_clock = daemon.clock.clone();
367 let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
368 Duration::from_secs(5),
369 ))
370 .map(|_| Timestamped {
371 inner: Event::HeartbeatInterval,
372 timestamp: watchdog_clock.new_timestamp(),
373 });
374
375 let metrics_clock = daemon.clock.clone();
376 let metrics_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
377 Duration::from_secs(2), ))
379 .map(|_| Timestamped {
380 inner: Event::MetricsInterval,
381 timestamp: metrics_clock.new_timestamp(),
382 });
383
384 let events = (
385 external_events,
386 dora_events,
387 watchdog_interval,
388 metrics_interval,
389 )
390 .merge();
391 daemon.run_inner(events).await
392 }
393
394 #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
395 async fn run_inner(
396 mut self,
397 incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
398 ) -> eyre::Result<DaemonRunResult> {
399 let mut events = incoming_events;
400
401 while let Some(event) = events.next().await {
402 let Timestamped { inner, timestamp } = event;
403 if let Err(err) = self.clock.update_with_timestamp(×tamp) {
404 tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
405 }
406
407 let start = Instant::now();
409 let event_kind = inner.kind();
410
411 match inner {
412 Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
413 let status = self.handle_coordinator_event(event, reply_tx).await?;
414
415 match status {
416 RunStatus::Continue => {}
417 RunStatus::Exit => break,
418 }
419 }
420 Event::Daemon(event) => {
421 self.handle_inter_daemon_event(event).await?;
422 }
423 Event::Node {
424 dataflow_id: dataflow,
425 node_id,
426 event,
427 } => self.handle_node_event(event, dataflow, node_id).await?,
428 Event::Dora(event) => self.handle_dora_event(event).await?,
429 Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
430 Event::HeartbeatInterval => {
431 if let Some(connection) = &mut self.coordinator_connection {
432 let msg = serde_json::to_vec(&Timestamped {
433 inner: CoordinatorRequest::Event {
434 daemon_id: self.daemon_id.clone(),
435 event: DaemonEvent::Heartbeat,
436 },
437 timestamp: self.clock.new_timestamp(),
438 })?;
439 socket_stream_send(connection, &msg)
440 .await
441 .wrap_err("failed to send watchdog message to dora-coordinator")?;
442
443 if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
444 bail!("lost connection to coordinator")
445 }
446 }
447 }
448 Event::MetricsInterval => {
449 self.collect_and_send_metrics().await?;
450 }
451 Event::CtrlC => {
452 tracing::info!("received ctrlc signal -> stopping all dataflows");
453 for dataflow in self.running.values_mut() {
454 let mut logger = self.logger.for_dataflow(dataflow.id);
455 dataflow
456 .stop_all(
457 &mut self.coordinator_connection,
458 &self.clock,
459 None,
460 false,
461 &mut logger,
462 )
463 .await?;
464 }
465 self.exit_when_all_finished = true;
466 if self.running.is_empty() {
467 break;
468 }
469 }
470 Event::SecondCtrlC => {
471 tracing::warn!("received second ctrlc signal -> exit immediately");
472 bail!("received second ctrl-c signal");
473 }
474 Event::DaemonError(err) => {
475 tracing::error!("Daemon error: {err:?}");
476 }
477 Event::SpawnNodeResult {
478 dataflow_id,
479 node_id,
480 dynamic_node,
481 result,
482 } => match result {
483 Ok(running_node) => {
484 if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
485 dataflow.running_nodes.insert(node_id, running_node);
486 } else {
487 tracing::error!(
488 "failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}"
489 );
490 }
491 }
492 Err(error) => {
493 self.dataflow_node_results
494 .entry(dataflow_id)
495 .or_default()
496 .insert(node_id.clone(), Err(error));
497 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
498 .await?;
499 }
500 },
501 Event::BuildDataflowResult {
502 build_id,
503 session_id,
504 result,
505 } => {
506 let (build_info, result) = match result {
507 Ok(build_info) => (Some(build_info), Ok(())),
508 Err(err) => (None, Err(err)),
509 };
510 if let Some(build_info) = build_info {
511 self.builds.insert(build_id, build_info);
512 if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
513 self.builds.remove(&old_build_id);
514 }
515 }
516 if let Some(connection) = &mut self.coordinator_connection {
517 let msg = serde_json::to_vec(&Timestamped {
518 inner: CoordinatorRequest::Event {
519 daemon_id: self.daemon_id.clone(),
520 event: DaemonEvent::BuildResult {
521 build_id,
522 result: result.map_err(|err| format!("{err:?}")),
523 },
524 },
525 timestamp: self.clock.new_timestamp(),
526 })?;
527 socket_stream_send(connection, &msg).await.wrap_err(
528 "failed to send BuildDataflowResult message to dora-coordinator",
529 )?;
530 }
531 }
532 Event::SpawnDataflowResult {
533 dataflow_id,
534 result,
535 } => {
536 if let Some(connection) = &mut self.coordinator_connection {
537 let msg = serde_json::to_vec(&Timestamped {
538 inner: CoordinatorRequest::Event {
539 daemon_id: self.daemon_id.clone(),
540 event: DaemonEvent::SpawnResult {
541 dataflow_id,
542 result: result.map_err(|err| format!("{err:?}")),
543 },
544 },
545 timestamp: self.clock.new_timestamp(),
546 })?;
547 socket_stream_send(connection, &msg).await.wrap_err(
548 "failed to send SpawnDataflowResult message to dora-coordinator",
549 )?;
550 }
551 }
552 Event::NodeStopped {
553 dataflow_id,
554 node_id,
555 } => {
556 if let Some(exit_when_done) = &mut self.exit_when_done {
557 exit_when_done.remove(&(dataflow_id, node_id));
558 if exit_when_done.is_empty() {
559 tracing::info!(
560 "exiting daemon because all required dataflows are finished"
561 );
562 break;
563 }
564 }
565 if self.exit_when_all_finished && self.running.is_empty() {
566 break;
567 }
568 }
569 }
570
571 let elapsed = start.elapsed();
573 if elapsed > Duration::from_millis(100) {
574 tracing::warn!(
575 "Daemon took {}ms for handling event: {event_kind}",
576 elapsed.as_millis()
577 );
578 }
579 }
580
581 if let Some(mut connection) = self.coordinator_connection.take() {
582 let msg = serde_json::to_vec(&Timestamped {
583 inner: CoordinatorRequest::Event {
584 daemon_id: self.daemon_id.clone(),
585 event: DaemonEvent::Exit,
586 },
587 timestamp: self.clock.new_timestamp(),
588 })?;
589 socket_stream_send(&mut connection, &msg)
590 .await
591 .wrap_err("failed to send Exit message to dora-coordinator")?;
592 }
593
594 Ok(self.dataflow_node_results)
595 }
596
597 async fn handle_coordinator_event(
598 &mut self,
599 event: DaemonCoordinatorEvent,
600 reply_tx: Sender<Option<DaemonCoordinatorReply>>,
601 ) -> eyre::Result<RunStatus> {
602 let status = match event {
603 DaemonCoordinatorEvent::Build(BuildDataflowNodes {
604 build_id,
605 session_id,
606 local_working_dir,
607 git_sources,
608 prev_git_sources,
609 dataflow_descriptor,
610 nodes_on_machine,
611 uv,
612 }) => {
613 match dataflow_descriptor.communication.remote {
614 dora_core::config::RemoteCommunicationConfig::Tcp => {}
615 }
616
617 let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
618
619 let result = self
620 .build_dataflow(
621 build_id,
622 session_id,
623 base_working_dir,
624 git_sources,
625 prev_git_sources,
626 dataflow_descriptor,
627 nodes_on_machine,
628 uv,
629 )
630 .await;
631 let (trigger_result, result_task) = match result {
632 Ok(result_task) => (Ok(()), Some(result_task)),
633 Err(err) => (Err(format!("{err:?}")), None),
634 };
635 let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
636 let _ = reply_tx.send(Some(reply)).map_err(|_| {
637 error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
638 });
639
640 let result_tx = self.events_tx.clone();
641 let clock = self.clock.clone();
642 if let Some(result_task) = result_task {
643 tokio::spawn(async move {
644 let message = Timestamped {
645 inner: Event::BuildDataflowResult {
646 build_id,
647 session_id,
648 result: result_task.await,
649 },
650 timestamp: clock.new_timestamp(),
651 };
652 let _ = result_tx
653 .send(message)
654 .map_err(|_| {
655 error!(
656 "could not send `BuildResult` reply from daemon to coordinator"
657 )
658 })
659 .await;
660 });
661 }
662
663 RunStatus::Continue
664 }
665 DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
666 build_id,
667 session_id,
668 dataflow_id,
669 local_working_dir,
670 nodes,
671 dataflow_descriptor,
672 spawn_nodes,
673 uv,
674 write_events_to,
675 }) => {
676 match dataflow_descriptor.communication.remote {
677 dora_core::config::RemoteCommunicationConfig::Tcp => {}
678 }
679
680 let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
681
682 let result = self
683 .spawn_dataflow(
684 build_id,
685 dataflow_id,
686 base_working_dir,
687 nodes,
688 dataflow_descriptor,
689 spawn_nodes,
690 uv,
691 write_events_to,
692 )
693 .await;
694 let (trigger_result, result_task) = match result {
695 Ok(result_task) => (Ok(()), Some(result_task)),
696 Err(err) => (Err(format!("{err:?}")), None),
697 };
698 let reply = DaemonCoordinatorReply::TriggerSpawnResult(trigger_result);
699 let _ = reply_tx.send(Some(reply)).map_err(|_| {
700 error!("could not send `TriggerSpawnResult` reply from daemon to coordinator")
701 });
702
703 let result_tx = self.events_tx.clone();
704 let clock = self.clock.clone();
705 if let Some(result_task) = result_task {
706 tokio::spawn(async move {
707 let message = Timestamped {
708 inner: Event::SpawnDataflowResult {
709 dataflow_id,
710 result: result_task.await,
711 },
712 timestamp: clock.new_timestamp(),
713 };
714 let _ = result_tx
715 .send(message)
716 .map_err(|_| {
717 error!(
718 "could not send `SpawnResult` reply from daemon to coordinator"
719 )
720 })
721 .await;
722 });
723 }
724
725 RunStatus::Continue
726 }
727 DaemonCoordinatorEvent::AllNodesReady {
728 dataflow_id,
729 exited_before_subscribe,
730 } => {
731 let mut logger = self.logger.for_dataflow(dataflow_id);
732 logger.log(LogLevel::Debug, None,
733 Some("daemon".into()),
734 format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
735 )).await;
736 match self.running.get_mut(&dataflow_id) {
737 Some(dataflow) => {
738 let ready = exited_before_subscribe.is_empty();
739 dataflow
740 .pending_nodes
741 .handle_external_all_nodes_ready(
742 exited_before_subscribe,
743 &mut dataflow.cascading_error_causes,
744 )
745 .await?;
746 if ready {
747 logger.log(LogLevel::Info, None,
748 Some("daemon".into()),
749 "coordinator reported that all nodes are ready, starting dataflow",
750 ).await;
751 dataflow.start(&self.events_tx, &self.clock).await?;
752 }
753 }
754 None => {
755 tracing::warn!(
756 "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
757 );
758 }
759 }
760 let _ = reply_tx.send(None).map_err(|_| {
761 error!("could not send `AllNodesReady` reply from daemon to coordinator")
762 });
763 RunStatus::Continue
764 }
765 DaemonCoordinatorEvent::Logs {
766 dataflow_id,
767 node_id,
768 tail,
769 } => {
770 match self.working_dir.get(&dataflow_id) {
771 Some(working_dir) => {
772 let working_dir = working_dir.clone();
773 tokio::spawn(async move {
774 let logs = async {
775 let mut file =
776 File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
777 .await
778 .wrap_err(format!(
779 "Could not open log file: {:#?}",
780 log::log_path(&working_dir, &dataflow_id, &node_id)
781 ))?;
782
783 let mut contents = match tail {
784 None | Some(0) => {
785 let mut contents = vec![];
786 file.read_to_end(&mut contents).await.map(|_| contents)
787 }
788 Some(tail) => read_last_n_lines(&mut file, tail).await,
789 }
790 .wrap_err("Could not read last n lines of log file")?;
791 if !contents.ends_with(b"\n") {
792 contents.push(b'\n');
794 }
795 Result::<Vec<u8>, eyre::Report>::Ok(contents)
796 }
797 .await
798 .map_err(|err| format!("{err:?}"));
799 let _ = reply_tx
800 .send(Some(DaemonCoordinatorReply::Logs(logs)))
801 .map_err(|_| {
802 error!("could not send logs reply from daemon to coordinator")
803 });
804 });
805 }
806 None => {
807 tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
808 let _ = reply_tx.send(None).map_err(|_| {
809 error!(
810 "could not send `AllNodesReady` reply from daemon to coordinator"
811 )
812 });
813 }
814 }
815 RunStatus::Continue
816 }
817 DaemonCoordinatorEvent::ReloadDataflow {
818 dataflow_id,
819 node_id,
820 operator_id,
821 } => {
822 let result = self.send_reload(dataflow_id, node_id, operator_id).await;
823 let reply =
824 DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
825 let _ = reply_tx
826 .send(Some(reply))
827 .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
828 RunStatus::Continue
829 }
830 DaemonCoordinatorEvent::StopDataflow {
831 dataflow_id,
832 grace_duration,
833 force,
834 } => {
835 let mut logger = self.logger.for_dataflow(dataflow_id);
836 let dataflow = self
837 .running
838 .get_mut(&dataflow_id)
839 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
840 let (reply, future) = match dataflow {
841 Ok(dataflow) => {
842 let future = dataflow.stop_all(
843 &mut self.coordinator_connection,
844 &self.clock,
845 grace_duration,
846 force,
847 &mut logger,
848 );
849 (Ok(()), Some(future))
850 }
851 Err(err) => (Err(err.to_string()), None),
852 };
853
854 let _ = reply_tx
855 .send(Some(DaemonCoordinatorReply::StopResult(reply)))
856 .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
857
858 if let Some(future) = future {
859 future.await?;
860 }
861
862 RunStatus::Continue
863 }
864 DaemonCoordinatorEvent::Destroy => {
865 tracing::info!("received destroy command -> exiting");
866 let (notify_tx, notify_rx) = oneshot::channel();
867 let reply = DaemonCoordinatorReply::DestroyResult {
868 result: Ok(()),
869 notify: Some(notify_tx),
870 };
871 let _ = reply_tx
872 .send(Some(reply))
873 .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
874 if notify_rx.await.is_err() {
876 tracing::warn!("no confirmation received for DestroyReply");
877 }
878 RunStatus::Exit
879 }
880 DaemonCoordinatorEvent::Heartbeat => {
881 self.last_coordinator_heartbeat = Instant::now();
882 let _ = reply_tx.send(None);
883 RunStatus::Continue
884 }
885 };
886 Ok(status)
887 }
888
889 async fn collect_and_send_metrics(&mut self) -> eyre::Result<()> {
890 use dora_message::daemon_to_coordinator::NodeMetrics;
891 use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate};
892
893 if self.coordinator_connection.is_none() {
894 return Ok(());
895 }
896
897 let system = &mut self.metrics_system;
899
900 const METRICS_INTERVAL_SECS: f64 = 2.0;
902
903 for (dataflow_id, dataflow) in &self.running {
905 let mut metrics = BTreeMap::new();
906
907 let pids: Vec<Pid> = dataflow
909 .running_nodes
910 .values()
911 .filter_map(|node| {
912 node.pid
913 .as_ref()
914 .map(|pid| Pid::from_u32(pid.load(atomic::Ordering::Acquire)))
915 })
916 .collect();
917
918 if !pids.is_empty() {
919 let refresh_kind = ProcessRefreshKind::nothing()
921 .with_cpu()
922 .with_memory()
923 .with_disk_usage();
924 system.refresh_processes_specifics(
925 ProcessesToUpdate::Some(&pids),
926 true,
927 refresh_kind,
928 );
929
930 for (node_id, running_node) in &dataflow.running_nodes {
932 if let Some(pid) = running_node.pid.as_ref() {
933 let pid = pid.load(atomic::Ordering::Acquire);
934 let sys_pid = Pid::from_u32(pid);
935 if let Some(process) = system.process(sys_pid) {
936 let disk_usage = process.disk_usage();
937 metrics.insert(
939 node_id.clone(),
940 NodeMetrics {
941 pid,
942 cpu_usage: process.cpu_usage(),
943 memory_bytes: process.memory(),
944 disk_read_bytes: Some(
945 (disk_usage.read_bytes as f64 / METRICS_INTERVAL_SECS)
946 as u64,
947 ),
948 disk_write_bytes: Some(
949 (disk_usage.written_bytes as f64 / METRICS_INTERVAL_SECS)
950 as u64,
951 ),
952 },
953 );
954 }
955 }
956 }
957 }
958
959 if !metrics.is_empty() {
961 if let Some(connection) = &mut self.coordinator_connection {
962 let msg = serde_json::to_vec(&Timestamped {
963 inner: CoordinatorRequest::Event {
964 daemon_id: self.daemon_id.clone(),
965 event: DaemonEvent::NodeMetrics {
966 dataflow_id: *dataflow_id,
967 metrics,
968 },
969 },
970 timestamp: self.clock.new_timestamp(),
971 })?;
972 socket_stream_send(connection, &msg)
973 .await
974 .wrap_err("failed to send metrics to coordinator")?;
975 }
976 }
977 }
978
979 Ok(())
980 }
981
982 async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
983 match event {
984 InterDaemonEvent::Output {
985 dataflow_id,
986 node_id,
987 output_id,
988 metadata,
989 data,
990 } => {
991 let inner = async {
992 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
993 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
994 })?;
995 send_output_to_local_receivers(
996 node_id.clone(),
997 output_id.clone(),
998 dataflow,
999 &metadata,
1000 data.map(DataMessage::Vec),
1001 &self.clock,
1002 )
1003 .await?;
1004 Result::<_, eyre::Report>::Ok(())
1005 };
1006 if let Err(err) = inner
1007 .await
1008 .wrap_err("failed to forward remote output to local receivers")
1009 {
1010 let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
1011 logger
1012 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
1013 .await;
1014 }
1015 Ok(())
1016 }
1017 InterDaemonEvent::OutputClosed {
1018 dataflow_id,
1019 node_id,
1020 output_id,
1021 } => {
1022 let output_id = OutputId(node_id.clone(), output_id);
1023 let mut logger = self
1024 .logger
1025 .for_dataflow(dataflow_id)
1026 .for_node(node_id.clone());
1027 logger
1028 .log(
1029 LogLevel::Debug,
1030 Some("daemon".into()),
1031 format!("received OutputClosed event for output {output_id:?}"),
1032 )
1033 .await;
1034
1035 let inner = async {
1036 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1037 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1038 })?;
1039
1040 if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
1041 for (receiver_id, input_id) in &inputs {
1042 close_input(dataflow, receiver_id, input_id, &self.clock);
1043 }
1044 }
1045 Result::<(), eyre::Report>::Ok(())
1046 };
1047 if let Err(err) = inner
1048 .await
1049 .wrap_err("failed to handle InputsClosed event sent by coordinator")
1050 {
1051 logger
1052 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
1053 .await;
1054 }
1055 Ok(())
1056 }
1057 }
1058 }
1059
1060 #[allow(clippy::too_many_arguments)]
1061 async fn build_dataflow(
1062 &mut self,
1063 build_id: BuildId,
1064 session_id: SessionId,
1065 base_working_dir: PathBuf,
1066 git_sources: BTreeMap<NodeId, GitSource>,
1067 prev_git_sources: BTreeMap<NodeId, GitSource>,
1068 dataflow_descriptor: Descriptor,
1069 local_nodes: BTreeSet<NodeId>,
1070 uv: bool,
1071 ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
1072 let builder = build::Builder {
1073 session_id,
1074 base_working_dir,
1075 uv,
1076 };
1077 self.git_manager.clear_planned_builds(session_id);
1078
1079 let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1080
1081 let mut tasks = Vec::new();
1082
1083 for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1085 let dynamic_node = node.kind.dynamic();
1086
1087 let node_id = node.id.clone();
1088 let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1089 logger.log(LogLevel::Debug, "building").await;
1090 let git_source = git_sources.get(&node_id).cloned();
1091 let prev_git_source = prev_git_sources.get(&node_id).cloned();
1092 let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1093 still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1094 git_source: prev_source,
1095 });
1096
1097 let logger_cloned = logger
1098 .try_clone_impl()
1099 .await
1100 .wrap_err("failed to clone logger")?;
1101
1102 let mut builder = builder.clone();
1103 if let Some(node_working_dir) =
1104 node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1105 {
1106 builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1107 }
1108
1109 match builder
1110 .build_node(
1111 node,
1112 git_source,
1113 prev_git,
1114 logger_cloned,
1115 &mut self.git_manager,
1116 )
1117 .await
1118 .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1119 {
1120 Ok(result) => {
1121 tasks.push(NodeBuildTask {
1122 node_id,
1123 task: result,
1124 dynamic_node,
1125 });
1126 }
1127 Err(err) => {
1128 logger.log(LogLevel::Error, format!("{err:?}")).await;
1129 return Err(err);
1130 }
1131 }
1132 }
1133
1134 let task = async move {
1135 let mut info = BuildInfo {
1136 node_working_dirs: Default::default(),
1137 };
1138 for task in tasks {
1139 let NodeBuildTask {
1140 node_id,
1141 dynamic_node: _,
1142 task,
1143 } = task;
1144 let node = task
1145 .await
1146 .with_context(|| format!("failed to build node `{node_id}`"))?;
1147 info.node_working_dirs
1148 .insert(node_id, node.node_working_dir);
1149 }
1150 Ok(info)
1151 };
1152
1153 Ok(task)
1154 }
1155
1156 #[allow(clippy::too_many_arguments)]
1157 async fn spawn_dataflow(
1158 &mut self,
1159 build_id: Option<BuildId>,
1160 dataflow_id: DataflowId,
1161 base_working_dir: PathBuf,
1162 nodes: BTreeMap<NodeId, ResolvedNode>,
1163 dataflow_descriptor: Descriptor,
1164 spawn_nodes: BTreeSet<NodeId>,
1165 uv: bool,
1166 write_events_to: Option<PathBuf>,
1167 ) -> eyre::Result<impl Future<Output = eyre::Result<()>> + use<>> {
1168 let mut logger = self
1169 .logger
1170 .for_dataflow(dataflow_id)
1171 .try_clone()
1172 .await
1173 .context("failed to clone logger")?;
1174 let dataflow =
1175 RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
1176 let dataflow = match self.running.entry(dataflow_id) {
1177 std::collections::hash_map::Entry::Vacant(entry) => {
1178 self.working_dir
1179 .insert(dataflow_id, base_working_dir.clone());
1180 entry.insert(dataflow)
1181 }
1182 std::collections::hash_map::Entry::Occupied(_) => {
1183 bail!("there is already a running dataflow with ID `{dataflow_id}`")
1184 }
1185 };
1186
1187 let mut stopped = Vec::new();
1188
1189 let build_info = build_id.and_then(|build_id| self.builds.get(&build_id));
1190 let node_with_git_source = nodes.values().find(|n| n.has_git_source());
1191 if let Some(git_node) = node_with_git_source {
1192 if build_info.is_none() {
1193 eyre::bail!(
1194 "node {} has git source, but no `dora build` was run yet\n\n\
1195 nodes with a `git` field must be built using `dora build` before starting the \
1196 dataflow",
1197 git_node.id
1198 )
1199 }
1200 }
1201 let node_working_dirs = build_info
1202 .map(|info| info.node_working_dirs.clone())
1203 .unwrap_or_default();
1204
1205 for node in nodes.values() {
1207 let local = spawn_nodes.contains(&node.id);
1208
1209 let inputs = node_inputs(node);
1210 for (input_id, input) in inputs {
1211 if local {
1212 dataflow
1213 .open_inputs
1214 .entry(node.id.clone())
1215 .or_default()
1216 .insert(input_id.clone());
1217 match input.mapping {
1218 InputMapping::User(mapping) => {
1219 dataflow
1220 .mappings
1221 .entry(OutputId(mapping.source, mapping.output))
1222 .or_default()
1223 .insert((node.id.clone(), input_id));
1224 }
1225 InputMapping::Timer { interval } => {
1226 dataflow
1227 .timers
1228 .entry(interval)
1229 .or_default()
1230 .insert((node.id.clone(), input_id));
1231 }
1232 }
1233 } else if let InputMapping::User(mapping) = input.mapping {
1234 dataflow
1235 .open_external_mappings
1236 .insert(OutputId(mapping.source, mapping.output));
1237 }
1238 }
1239 }
1240
1241 let spawner = Spawner {
1242 dataflow_id,
1243 daemon_tx: self.events_tx.clone(),
1244 dataflow_descriptor,
1245 clock: self.clock.clone(),
1246 uv,
1247 };
1248
1249 let mut tasks = Vec::new();
1250
1251 for node in nodes.into_values() {
1253 let mut logger = logger.reborrow().for_node(node.id.clone());
1254 let local = spawn_nodes.contains(&node.id);
1255 if local {
1256 let dynamic_node = node.kind.dynamic();
1257 if dynamic_node {
1258 dataflow.dynamic_nodes.insert(node.id.clone());
1259 } else {
1260 dataflow.pending_nodes.insert(node.id.clone());
1261 }
1262
1263 let node_id = node.id.clone();
1264 let node_stderr_most_recent = dataflow
1265 .node_stderr_most_recent
1266 .entry(node.id.clone())
1267 .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
1268 .clone();
1269
1270 let configured_node_working_dir = node_working_dirs.get(&node_id).cloned();
1271 if configured_node_working_dir.is_none() && node.has_git_source() {
1272 eyre::bail!(
1273 "node {} has git source, but no git clone directory was found for it\n\n\
1274 try running `dora build` again",
1275 node.id
1276 )
1277 }
1278 let node_working_dir = configured_node_working_dir
1279 .or_else(|| {
1280 node.deploy
1281 .as_ref()
1282 .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1283 })
1284 .unwrap_or(base_working_dir.clone())
1285 .clone();
1286 let node_write_events_to = write_events_to
1287 .as_ref()
1288 .map(|p| p.join(format!("inputs-{}.json", node.id)));
1289 match spawner
1290 .clone()
1291 .spawn_node(
1292 node,
1293 node_working_dir,
1294 node_stderr_most_recent,
1295 node_write_events_to,
1296 &mut logger,
1297 )
1298 .await
1299 .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1300 {
1301 Ok(result) => {
1302 tasks.push(NodeBuildTask {
1303 node_id,
1304 task: result,
1305 dynamic_node,
1306 });
1307 }
1308 Err(err) => {
1309 logger
1310 .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1311 .await;
1312 self.dataflow_node_results
1313 .entry(dataflow_id)
1314 .or_default()
1315 .insert(
1316 node_id.clone(),
1317 Err(NodeError {
1318 timestamp: self.clock.new_timestamp(),
1319 cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1320 exit_status: NodeExitStatus::Unknown,
1321 }),
1322 );
1323 stopped.push((node_id.clone(), dynamic_node));
1324 }
1325 }
1326 } else {
1327 dataflow.pending_nodes.set_external_nodes(true);
1329
1330 for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1332 let tx = self
1333 .remote_daemon_events_tx
1334 .clone()
1335 .wrap_err("no remote_daemon_events_tx channel")?;
1336 let mut finished_rx = dataflow.finished_tx.subscribe();
1337 let subscribe_topic =
1338 zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1339 tracing::debug!("declaring subscriber on {subscribe_topic}");
1340 let subscriber = self
1341 .zenoh_session
1342 .declare_subscriber(subscribe_topic)
1343 .await
1344 .map_err(|e| eyre!(e))
1345 .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1346 tokio::spawn(async move {
1347 let mut finished = pin!(finished_rx.recv());
1348 loop {
1349 let finished_or_next =
1350 futures::future::select(finished, subscriber.recv_async());
1351 match finished_or_next.await {
1352 future::Either::Left((finished, _)) => match finished {
1353 Err(broadcast::error::RecvError::Closed) => {
1354 tracing::debug!(
1355 "dataflow finished, breaking from zenoh subscribe task"
1356 );
1357 break;
1358 }
1359 other => {
1360 tracing::warn!(
1361 "unexpected return value of dataflow finished_rx channel: {other:?}"
1362 );
1363 break;
1364 }
1365 },
1366 future::Either::Right((sample, f)) => {
1367 finished = f;
1368 let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1369 Timestamped::deserialize_inter_daemon_event(
1370 &s.payload().to_bytes(),
1371 )
1372 });
1373 if tx.send_async(event).await.is_err() {
1374 break;
1376 }
1377 }
1378 }
1379 }
1380 });
1381 }
1382 }
1383 }
1384 for (node_id, dynamic) in stopped {
1385 self.handle_node_stop(dataflow_id, &node_id, dynamic)
1386 .await?;
1387 }
1388
1389 let spawn_result = Self::spawn_prepared_nodes(
1390 dataflow_id,
1391 logger,
1392 tasks,
1393 self.events_tx.clone(),
1394 self.clock.clone(),
1395 );
1396
1397 Ok(spawn_result)
1398 }
1399
1400 async fn spawn_prepared_nodes(
1401 dataflow_id: Uuid,
1402 mut logger: DataflowLogger<'_>,
1403 tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1404 events_tx: mpsc::Sender<Timestamped<Event>>,
1405 clock: Arc<HLC>,
1406 ) -> eyre::Result<()> {
1407 let node_result = |node_id, dynamic_node, result| Timestamped {
1408 inner: Event::SpawnNodeResult {
1409 dataflow_id,
1410 node_id,
1411 dynamic_node,
1412 result,
1413 },
1414 timestamp: clock.new_timestamp(),
1415 };
1416 let mut failed_to_prepare = None;
1417 let mut prepared_nodes = Vec::new();
1418 for task in tasks {
1419 let NodeBuildTask {
1420 node_id,
1421 dynamic_node,
1422 task,
1423 } = task;
1424 match task.await {
1425 Ok(node) => prepared_nodes.push(node),
1426 Err(err) => {
1427 if failed_to_prepare.is_none() {
1428 failed_to_prepare = Some(node_id.clone());
1429 }
1430 let node_err: NodeError = NodeError {
1431 timestamp: clock.new_timestamp(),
1432 cause: NodeErrorCause::FailedToSpawn(format!(
1433 "preparing for spawn failed: {err:?}"
1434 )),
1435 exit_status: NodeExitStatus::Unknown,
1436 };
1437 let send_result = events_tx
1438 .send(node_result(node_id, dynamic_node, Err(node_err)))
1439 .await;
1440 if send_result.is_err() {
1441 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1442 }
1443 }
1444 }
1445 }
1446
1447 if let Some(failed_node) = failed_to_prepare {
1449 for node in prepared_nodes {
1451 let err = NodeError {
1452 timestamp: clock.new_timestamp(),
1453 cause: NodeErrorCause::Cascading {
1454 caused_by_node: failed_node.clone(),
1455 },
1456 exit_status: NodeExitStatus::Unknown,
1457 };
1458 let send_result = events_tx
1459 .send(node_result(
1460 node.node_id().clone(),
1461 node.dynamic(),
1462 Err(err),
1463 ))
1464 .await;
1465 if send_result.is_err() {
1466 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1467 }
1468 }
1469 Err(eyre!("failed to prepare node {failed_node}"))
1470 } else {
1471 let mut spawn_result = Ok(());
1472
1473 logger
1474 .log(
1475 LogLevel::Info,
1476 None,
1477 Some("dora daemon".into()),
1478 "finished building nodes, spawning...",
1479 )
1480 .await;
1481
1482 for node in prepared_nodes {
1484 let node_id = node.node_id().clone();
1485 let dynamic_node = node.dynamic();
1486 let logger = logger
1487 .reborrow()
1488 .for_node(node_id.clone())
1489 .try_clone()
1490 .await
1491 .context("failed to clone NodeLogger")?;
1492 let result = node.spawn(logger).await;
1493 let node_spawn_result = match result {
1494 Ok(node) => Ok(node),
1495 Err(err) => {
1496 let node_err = NodeError {
1497 timestamp: clock.new_timestamp(),
1498 cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1499 exit_status: NodeExitStatus::Unknown,
1500 };
1501 if spawn_result.is_ok() {
1502 spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1503 }
1504 Err(node_err)
1505 }
1506 };
1507 let send_result = events_tx
1508 .send(node_result(node_id, dynamic_node, node_spawn_result))
1509 .await;
1510 if send_result.is_err() {
1511 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1512 }
1513 }
1514 spawn_result
1515 }
1516 }
1517
1518 async fn handle_dynamic_node_event(
1519 &mut self,
1520 event: DynamicNodeEventWrapper,
1521 ) -> eyre::Result<()> {
1522 match event {
1523 DynamicNodeEventWrapper {
1524 event: DynamicNodeEvent::NodeConfig { node_id },
1525 reply_tx,
1526 } => {
1527 let number_node_id = self
1528 .running
1529 .iter()
1530 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1531 .count();
1532
1533 let node_config = match number_node_id {
1534 2.. => Err(format!(
1535 "multiple dataflows contain dynamic node id {node_id}. \
1536 Please only have one running dataflow with the specified \
1537 node id if you want to use dynamic node",
1538 )),
1539 1 => self
1540 .running
1541 .iter()
1542 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1543 .map(|(id, dataflow)| -> Result<NodeConfig> {
1544 let node_config = dataflow
1545 .running_nodes
1546 .get(&node_id)
1547 .with_context(|| {
1548 format!("no node with ID `{node_id}` within the given dataflow")
1549 })?
1550 .node_config
1551 .clone();
1552 if !node_config.dynamic {
1553 bail!("node with ID `{node_id}` in {id} is not dynamic");
1554 }
1555 Ok(node_config)
1556 })
1557 .next()
1558 .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1559 .and_then(|r| r)
1560 .map_err(|err| {
1561 format!(
1562 "failed to get dynamic node config within given dataflow: {err}"
1563 )
1564 }),
1565 0 => Err(format!("no node with ID `{node_id}`")),
1566 };
1567
1568 let reply = DaemonReply::NodeConfig {
1569 result: node_config,
1570 };
1571 let _ = reply_tx.send(Some(reply)).map_err(|_| {
1572 error!("could not send node info reply from daemon to coordinator")
1573 });
1574 Ok(())
1575 }
1576 }
1577 }
1578
1579 async fn handle_node_event(
1580 &mut self,
1581 event: DaemonNodeEvent,
1582 dataflow_id: DataflowId,
1583 node_id: NodeId,
1584 ) -> eyre::Result<()> {
1585 let might_restart = || {
1586 let dataflow = self.running.get(&dataflow_id)?;
1587 let node = dataflow.running_nodes.get(&node_id)?;
1588 Some(match node.restart_policy {
1589 RestartPolicy::Never => false,
1590 _ if node.restarts_disabled() => false,
1591 RestartPolicy::OnFailure | RestartPolicy::Always => true,
1592 })
1593 };
1594 match event {
1595 DaemonNodeEvent::Subscribe {
1596 event_sender,
1597 reply_sender,
1598 } => {
1599 let mut logger = self.logger.for_dataflow(dataflow_id);
1600 logger
1601 .log(
1602 LogLevel::Info,
1603 Some(node_id.clone()),
1604 Some("daemon".into()),
1605 "node is ready",
1606 )
1607 .await;
1608
1609 let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1610 format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1611 });
1612
1613 match dataflow {
1614 Err(err) => {
1615 let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1616 }
1617 Ok(dataflow) => {
1618 Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1619
1620 let status = dataflow
1621 .pending_nodes
1622 .handle_node_subscription(
1623 node_id.clone(),
1624 reply_sender,
1625 &mut self.coordinator_connection,
1626 &self.clock,
1627 &mut dataflow.cascading_error_causes,
1628 &mut logger,
1629 )
1630 .await?;
1631 match status {
1632 DataflowStatus::AllNodesReady if !dataflow.dataflow_started => {
1633 logger
1634 .log(
1635 LogLevel::Info,
1636 None,
1637 Some("daemon".into()),
1638 "all nodes are ready, starting dataflow",
1639 )
1640 .await;
1641 dataflow.start(&self.events_tx, &self.clock).await?;
1642 dataflow.dataflow_started = true;
1643 }
1644 _ => {}
1645 }
1646 }
1647 }
1648 }
1649 DaemonNodeEvent::SubscribeDrop {
1650 event_sender,
1651 reply_sender,
1652 } => {
1653 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1654 format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1655 });
1656 let result = match dataflow {
1657 Ok(dataflow) => {
1658 dataflow.drop_channels.insert(node_id, event_sender);
1659 Ok(())
1660 }
1661 Err(err) => Err(err.to_string()),
1662 };
1663 let _ = reply_sender.send(DaemonReply::Result(result));
1664 }
1665 DaemonNodeEvent::CloseOutputs {
1666 outputs,
1667 reply_sender,
1668 } => {
1669 let reply = if might_restart().unwrap_or(false) {
1670 self.logger
1671 .for_dataflow(dataflow_id)
1672 .for_node(node_id.clone())
1673 .log(
1674 LogLevel::Debug,
1675 Some("daemon".into()),
1676 "skipping CloseOutputs because node might restart",
1677 )
1678 .await;
1679 Ok(())
1680 } else {
1681 let inner = async {
1683 self.send_output_closed_events(dataflow_id, node_id, outputs)
1684 .await
1685 };
1686
1687 inner.await.map_err(|err| format!("{err:?}"))
1688 };
1689 let _ = reply_sender.send(DaemonReply::Result(reply));
1690 }
1691 DaemonNodeEvent::OutputsDone { reply_sender } => {
1692 let result = self
1693 .handle_outputs_done(dataflow_id, &node_id, might_restart().unwrap_or(false))
1694 .await;
1695
1696 let _ = reply_sender.send(DaemonReply::Result(
1697 result.map_err(|err| format!("{err:?}")),
1698 ));
1699 }
1700 DaemonNodeEvent::SendOut {
1701 output_id,
1702 metadata,
1703 data,
1704 } => self
1705 .send_out(dataflow_id, node_id, output_id, metadata, data)
1706 .await
1707 .context("failed to send out")?,
1708 DaemonNodeEvent::ReportDrop { tokens } => {
1709 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1710 format!(
1711 "failed to get handle drop tokens: \
1712 no running dataflow with ID `{dataflow_id}`"
1713 )
1714 });
1715
1716 match dataflow {
1717 Ok(dataflow) => {
1718 for token in tokens {
1719 match dataflow.pending_drop_tokens.get_mut(&token) {
1720 Some(info) => {
1721 if info.pending_nodes.remove(&node_id) {
1722 dataflow.check_drop_token(token, &self.clock).await?;
1723 } else {
1724 tracing::warn!(
1725 "node `{node_id}` is not pending for drop token `{token:?}`"
1726 );
1727 }
1728 }
1729 None => tracing::warn!("unknown drop token `{token:?}`"),
1730 }
1731 }
1732 }
1733 Err(err) => tracing::warn!("{err:?}"),
1734 }
1735 }
1736 DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1737 let inner = async {
1738 let dataflow = self
1739 .running
1740 .get_mut(&dataflow_id)
1741 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1742 dataflow.subscribe_channels.remove(&node_id);
1743 Result::<_, eyre::Error>::Ok(())
1744 };
1745
1746 let reply = inner.await.map_err(|err| format!("{err:?}"));
1747 let _ = reply_sender.send(DaemonReply::Result(reply));
1748 }
1749 }
1750 Ok(())
1751 }
1752
1753 async fn send_reload(
1754 &mut self,
1755 dataflow_id: Uuid,
1756 node_id: NodeId,
1757 operator_id: Option<OperatorId>,
1758 ) -> Result<(), eyre::ErrReport> {
1759 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1760 format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1761 })?;
1762 if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1763 match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1764 Ok(()) => {}
1765 Err(_) => {
1766 dataflow.subscribe_channels.remove(&node_id);
1767 }
1768 }
1769 }
1770 Ok(())
1771 }
1772
1773 async fn send_out(
1774 &mut self,
1775 dataflow_id: Uuid,
1776 node_id: NodeId,
1777 output_id: DataId,
1778 metadata: dora_message::metadata::Metadata,
1779 data: Option<DataMessage>,
1780 ) -> Result<(), eyre::ErrReport> {
1781 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1782 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1783 })?;
1784 let data_bytes = send_output_to_local_receivers(
1785 node_id.clone(),
1786 output_id.clone(),
1787 dataflow,
1788 &metadata,
1789 data,
1790 &self.clock,
1791 )
1792 .await?;
1793
1794 let output_id = OutputId(node_id, output_id);
1795 let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1796 || dataflow.publish_all_messages_to_zenoh;
1797 if remote_receivers {
1798 let event = InterDaemonEvent::Output {
1799 dataflow_id,
1800 node_id: output_id.0.clone(),
1801 output_id: output_id.1.clone(),
1802 metadata,
1803 data: data_bytes,
1804 };
1805 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1806 .await?;
1807 }
1808
1809 Ok(())
1810 }
1811
1812 async fn send_to_remote_receivers(
1813 &mut self,
1814 dataflow_id: Uuid,
1815 output_id: &OutputId,
1816 event: InterDaemonEvent,
1817 ) -> Result<(), eyre::Error> {
1818 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1819 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1820 })?;
1821
1822 let publisher = match dataflow.publishers.get(output_id) {
1824 Some(publisher) => publisher,
1825 None => {
1826 let publish_topic =
1827 zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1828 tracing::debug!("declaring publisher on {publish_topic}");
1829 let publisher = self
1830 .zenoh_session
1831 .declare_publisher(publish_topic)
1832 .await
1833 .map_err(|e| eyre!(e))
1834 .context("failed to create zenoh publisher")?;
1835 dataflow.publishers.insert(output_id.clone(), publisher);
1836 dataflow.publishers.get(output_id).unwrap()
1837 }
1838 };
1839
1840 let serialized_event = Timestamped {
1841 inner: event,
1842 timestamp: self.clock.new_timestamp(),
1843 }
1844 .serialize();
1845 publisher
1846 .put(serialized_event)
1847 .await
1848 .map_err(|e| eyre!(e))
1849 .context("zenoh put failed")?;
1850 Ok(())
1851 }
1852
1853 async fn send_output_closed_events(
1854 &mut self,
1855 dataflow_id: DataflowId,
1856 node_id: NodeId,
1857 outputs: Vec<DataId>,
1858 ) -> eyre::Result<()> {
1859 let dataflow = self
1860 .running
1861 .get_mut(&dataflow_id)
1862 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1863 let local_node_inputs: BTreeSet<_> = dataflow
1864 .mappings
1865 .iter()
1866 .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1867 .flat_map(|(_, v)| v)
1868 .cloned()
1869 .collect();
1870 for (receiver_id, input_id) in &local_node_inputs {
1871 close_input(dataflow, receiver_id, input_id, &self.clock);
1872 }
1873
1874 let mut closed = Vec::new();
1875 for output_id in &dataflow.open_external_mappings {
1876 if output_id.0 == node_id && outputs.contains(&output_id.1) {
1877 closed.push(output_id.clone());
1878 }
1879 }
1880
1881 for output_id in closed {
1882 let event = InterDaemonEvent::OutputClosed {
1883 dataflow_id,
1884 node_id: output_id.0.clone(),
1885 output_id: output_id.1.clone(),
1886 };
1887 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1888 .await?;
1889 }
1890
1891 Ok(())
1892 }
1893
1894 async fn subscribe(
1895 dataflow: &mut RunningDataflow,
1896 node_id: NodeId,
1897 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1898 clock: &HLC,
1899 ) {
1900 let closed_inputs = dataflow
1902 .mappings
1903 .values()
1904 .flatten()
1905 .filter(|(node, _)| node == &node_id)
1906 .map(|(_, input)| input)
1907 .filter(|input| {
1908 dataflow
1909 .open_inputs
1910 .get(&node_id)
1911 .map(|open_inputs| !open_inputs.contains(*input))
1912 .unwrap_or(true)
1913 });
1914 for input_id in closed_inputs {
1915 let _ = send_with_timestamp(
1916 &event_sender,
1917 NodeEvent::InputClosed {
1918 id: input_id.clone(),
1919 },
1920 clock,
1921 );
1922 }
1923 if dataflow.open_inputs(&node_id).is_empty() {
1924 if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
1925 node.disable_restart();
1926 }
1927 let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1928 }
1929
1930 if dataflow.stop_sent {
1933 if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
1934 node.disable_restart();
1935 }
1936 let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1937 }
1938
1939 dataflow.subscribe_channels.insert(node_id, event_sender);
1940 }
1941
1942 #[tracing::instrument(skip(self), level = "trace")]
1943 async fn handle_outputs_done(
1944 &mut self,
1945 dataflow_id: DataflowId,
1946 node_id: &NodeId,
1947 might_restart: bool,
1948 ) -> eyre::Result<()> {
1949 let dataflow = self
1950 .running
1951 .get_mut(&dataflow_id)
1952 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1953
1954 let outputs = dataflow
1955 .mappings
1956 .keys()
1957 .filter(|m| &m.0 == node_id)
1958 .map(|m| &m.1)
1959 .cloned()
1960 .collect();
1961
1962 if might_restart {
1963 self.logger
1964 .for_dataflow(dataflow_id)
1965 .for_node(node_id.clone())
1966 .log(
1967 LogLevel::Debug,
1968 Some("daemon".into()),
1969 "keeping outputs open because node might restart",
1970 )
1971 .await;
1972 } else {
1973 self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1974 .await?;
1975 }
1976
1977 let dataflow = self
1978 .running
1979 .get_mut(&dataflow_id)
1980 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1981 dataflow.drop_channels.remove(node_id);
1982 Ok(())
1983 }
1984
1985 async fn handle_node_stop(
1986 &mut self,
1987 dataflow_id: Uuid,
1988 node_id: &NodeId,
1989 dynamic_node: bool,
1990 ) -> eyre::Result<()> {
1991 let result = self
1992 .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
1993 .await;
1994 let _ = self
1995 .events_tx
1996 .send(Timestamped {
1997 inner: Event::NodeStopped {
1998 dataflow_id,
1999 node_id: node_id.clone(),
2000 },
2001 timestamp: self.clock.new_timestamp(),
2002 })
2003 .await;
2004 result
2005 }
2006
2007 async fn handle_node_stop_inner(
2008 &mut self,
2009 dataflow_id: Uuid,
2010 node_id: &NodeId,
2011 dynamic_node: bool,
2012 ) -> eyre::Result<()> {
2013 let mut logger = self.logger.for_dataflow(dataflow_id);
2014 let dataflow = match self.running.get_mut(&dataflow_id) {
2015 Some(dataflow) => dataflow,
2016 None if dynamic_node => {
2017 tracing::debug!(
2020 "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
2021 );
2022 return Ok(());
2023 }
2024 None => eyre::bail!(
2025 "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2026 ),
2027 };
2028
2029 dataflow
2030 .pending_nodes
2031 .handle_node_stop(
2032 node_id,
2033 &mut self.coordinator_connection,
2034 &self.clock,
2035 &mut dataflow.cascading_error_causes,
2036 &mut logger,
2037 )
2038 .await?;
2039
2040 let might_restart = false;
2042
2043 self.handle_outputs_done(dataflow_id, node_id, might_restart)
2044 .await?;
2045
2046 let mut logger = self.logger.for_dataflow(dataflow_id);
2047 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
2048 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
2049 })?;
2050 dataflow.running_nodes.remove(node_id);
2051 if !dataflow.pending_nodes.local_nodes_pending()
2052 && dataflow
2053 .running_nodes
2054 .iter()
2055 .all(|(_id, n)| n.node_config.dynamic)
2056 {
2057 let result = DataflowDaemonResult {
2058 timestamp: self.clock.new_timestamp(),
2059 node_results: self
2060 .dataflow_node_results
2061 .get(&dataflow.id)
2062 .context("failed to get dataflow node results")?
2063 .clone(),
2064 };
2065
2066 self.git_manager
2067 .clones_in_use
2068 .values_mut()
2069 .for_each(|dataflows| {
2070 dataflows.remove(&dataflow_id);
2071 });
2072
2073 logger
2074 .log(
2075 LogLevel::Info,
2076 None,
2077 Some("daemon".into()),
2078 format!("dataflow finished on machine `{}`", self.daemon_id),
2079 )
2080 .await;
2081 if let Some(connection) = &mut self.coordinator_connection {
2082 let msg = serde_json::to_vec(&Timestamped {
2083 inner: CoordinatorRequest::Event {
2084 daemon_id: self.daemon_id.clone(),
2085 event: DaemonEvent::AllNodesFinished {
2086 dataflow_id,
2087 result,
2088 },
2089 },
2090 timestamp: self.clock.new_timestamp(),
2091 })?;
2092 socket_stream_send(connection, &msg)
2093 .await
2094 .wrap_err("failed to report dataflow finish to dora-coordinator")?;
2095 }
2096 self.running.remove(&dataflow_id);
2097 }
2098
2099 Ok(())
2100 }
2101
2102 async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
2103 match event {
2104 DoraEvent::Timer {
2105 dataflow_id,
2106 interval,
2107 metadata,
2108 } => {
2109 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2110 tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
2111 return Ok(());
2112 };
2113
2114 let Some(subscribers) = dataflow.timers.get(&interval) else {
2115 return Ok(());
2116 };
2117
2118 let mut closed = Vec::new();
2119 for (receiver_id, input_id) in subscribers {
2120 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2121 continue;
2122 };
2123
2124 let send_result = send_with_timestamp(
2125 channel,
2126 NodeEvent::Input {
2127 id: input_id.clone(),
2128 metadata: metadata.clone(),
2129 data: None,
2130 },
2131 &self.clock,
2132 );
2133 match send_result {
2134 Ok(()) => {}
2135 Err(_) => {
2136 closed.push(receiver_id);
2137 }
2138 }
2139 }
2140 for id in closed {
2141 dataflow.subscribe_channels.remove(id);
2142 }
2143 }
2144 DoraEvent::Logs {
2145 dataflow_id,
2146 output_id,
2147 message,
2148 metadata,
2149 } => {
2150 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2151 tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2152 return Ok(());
2153 };
2154
2155 let Some(subscribers) = dataflow.mappings.get(&output_id) else {
2156 tracing::warn!(
2157 "No subscribers found for {:?} in {:?}",
2158 output_id,
2159 dataflow.mappings
2160 );
2161 return Ok(());
2162 };
2163
2164 let mut closed = Vec::new();
2165 for (receiver_id, input_id) in subscribers {
2166 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2167 tracing::warn!("No subscriber channel found for {:?}", output_id);
2168 continue;
2169 };
2170
2171 let send_result = send_with_timestamp(
2172 channel,
2173 NodeEvent::Input {
2174 id: input_id.clone(),
2175 metadata: metadata.clone(),
2176 data: Some(message.clone()),
2177 },
2178 &self.clock,
2179 );
2180 match send_result {
2181 Ok(()) => {}
2182 Err(_) => {
2183 closed.push(receiver_id);
2184 }
2185 }
2186 }
2187 for id in closed {
2188 dataflow.subscribe_channels.remove(id);
2189 }
2190 }
2191 DoraEvent::SpawnedNodeResult {
2192 dataflow_id,
2193 node_id,
2194 dynamic_node,
2195 exit_status,
2196 restart,
2197 } => {
2198 let mut logger = self
2199 .logger
2200 .for_dataflow(dataflow_id)
2201 .for_node(node_id.clone());
2202 logger
2203 .log(
2204 LogLevel::Debug,
2205 Some("daemon".into()),
2206 format!("handling node stop with exit status {exit_status:?} (restart: {restart})"),
2207 )
2208 .await;
2209
2210 let node_result = match exit_status {
2211 NodeExitStatus::Success => Ok(()),
2212 exit_status => {
2213 let dataflow = self.running.get(&dataflow_id);
2214 let caused_by_node = dataflow
2215 .and_then(|dataflow| {
2216 dataflow.cascading_error_causes.error_caused_by(&node_id)
2217 })
2218 .cloned();
2219 let grace_duration_kill = dataflow
2220 .map(|d| d.grace_duration_kills.contains(&node_id))
2221 .unwrap_or_default();
2222
2223 let cause = match caused_by_node {
2224 Some(caused_by_node) => {
2225 logger
2226 .log(
2227 LogLevel::Info,
2228 Some("daemon".into()),
2229 format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2230 )
2231 .await;
2232
2233 NodeErrorCause::Cascading { caused_by_node }
2234 }
2235 None if grace_duration_kill => NodeErrorCause::GraceDuration,
2236 None => {
2237 let cause = dataflow
2238 .and_then(|d| d.node_stderr_most_recent.get(&node_id))
2239 .map(|queue| {
2240 let mut s = if queue.is_full() {
2241 "[...]".into()
2242 } else {
2243 String::new()
2244 };
2245 while let Some(line) = queue.pop() {
2246 s += &line;
2247 }
2248 s
2249 })
2250 .unwrap_or_default();
2251
2252 NodeErrorCause::Other { stderr: cause }
2253 }
2254 };
2255 Err(NodeError {
2256 timestamp: self.clock.new_timestamp(),
2257 cause,
2258 exit_status,
2259 })
2260 }
2261 };
2262
2263 logger
2264 .log(
2265 if node_result.is_ok() {
2266 LogLevel::Info
2267 } else {
2268 LogLevel::Error
2269 },
2270 Some("daemon".into()),
2271 match &node_result {
2272 Ok(()) => format!("{node_id} finished successfully"),
2273 Err(err) => format!("{err}"),
2274 },
2275 )
2276 .await;
2277
2278 if restart {
2279 logger
2280 .log(
2281 LogLevel::Info,
2282 Some("daemon".into()),
2283 "node will be restarted",
2284 )
2285 .await;
2286 } else {
2287 self.dataflow_node_results
2288 .entry(dataflow_id)
2289 .or_default()
2290 .insert(node_id.clone(), node_result);
2291
2292 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2293 .await?;
2294 }
2295 }
2296 }
2297 Ok(())
2298 }
2299
2300 fn base_working_dir(
2301 &self,
2302 local_working_dir: Option<PathBuf>,
2303 session_id: SessionId,
2304 ) -> eyre::Result<PathBuf> {
2305 match local_working_dir {
2306 Some(working_dir) => {
2307 if working_dir.exists() {
2309 Ok(working_dir)
2310 } else {
2311 bail!(
2312 "working directory does not exist: {}",
2313 working_dir.display(),
2314 )
2315 }
2316 }
2317 None => {
2318 let daemon_working_dir =
2320 current_dir().context("failed to get daemon working dir")?;
2321 Ok(daemon_working_dir
2322 .join("_work")
2323 .join(session_id.uuid().to_string()))
2324 }
2325 }
2326 }
2327}
2328
2329async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result<Vec<u8>> {
2330 let mut pos = file.seek(io::SeekFrom::End(0)).await?;
2331
2332 let mut output = VecDeque::<u8>::new();
2333 let mut extend_slice_to_start = |slice: &[u8]| {
2334 output.extend(slice);
2335 output.rotate_right(slice.len());
2336 };
2337
2338 let mut buffer = vec![0; 2048];
2339 let mut estimated_line_length = 0;
2340 let mut at_end = true;
2341 'main: while tail > 0 && pos > 0 {
2342 let new_pos = pos.saturating_sub(buffer.len() as u64);
2343 file.seek(io::SeekFrom::Start(new_pos)).await?;
2344 let read_len = (pos - new_pos) as usize;
2345 pos = new_pos;
2346
2347 file.read_exact(&mut buffer[..read_len]).await?;
2348 let read_buf = if at_end {
2349 at_end = false;
2350 &buffer[..read_len].trim_ascii_end()
2351 } else {
2352 &buffer[..read_len]
2353 };
2354
2355 let mut iter = memchr::memrchr_iter(b'\n', read_buf);
2356 let mut lines = 1;
2357 loop {
2358 let Some(pos) = iter.next() else {
2359 extend_slice_to_start(read_buf);
2360 break;
2361 };
2362 lines += 1;
2363 tail -= 1;
2364 if tail == 0 {
2365 extend_slice_to_start(&read_buf[(pos + 1)..]);
2366 break 'main;
2367 }
2368 }
2369
2370 estimated_line_length = estimated_line_length.max((read_buf.len() + 1).div_ceil(lines));
2371 let estimated_buffer_length = estimated_line_length * tail;
2372 if estimated_buffer_length >= buffer.len() * 2 {
2373 buffer.resize(buffer.len() * 2, 0);
2374 }
2375 }
2376
2377 Ok(output.into())
2378}
2379
2380async fn set_up_event_stream(
2381 coordinator_addr: SocketAddr,
2382 machine_id: &Option<String>,
2383 clock: &Arc<HLC>,
2384 remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2385 local_listen_port: u16,
2387) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
2388 let clock_cloned = clock.clone();
2389 let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2390 Ok(e) => Timestamped {
2391 inner: Event::Daemon(e.inner),
2392 timestamp: e.timestamp,
2393 },
2394 Err(err) => Timestamped {
2395 inner: Event::DaemonError(err),
2396 timestamp: clock_cloned.new_timestamp(),
2397 },
2398 });
2399 let (daemon_id, coordinator_events) =
2400 coordinator::register(coordinator_addr, machine_id.clone(), clock)
2401 .await
2402 .wrap_err("failed to connect to dora-coordinator")?;
2403 let coordinator_events = coordinator_events.map(
2404 |Timestamped {
2405 inner: event,
2406 timestamp,
2407 }| Timestamped {
2408 inner: Event::Coordinator(event),
2409 timestamp,
2410 },
2411 );
2412 let (events_tx, events_rx) = flume::bounded(10);
2413 let _listen_port =
2414 local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2415 .await?;
2416 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2417 inner: Event::DynamicNode(e.inner),
2418 timestamp: e.timestamp,
2419 });
2420 let incoming = (
2421 coordinator_events,
2422 remote_daemon_events,
2423 dynamic_node_events,
2424 )
2425 .merge();
2426 Ok((daemon_id, incoming))
2427}
2428
2429async fn send_output_to_local_receivers(
2430 node_id: NodeId,
2431 output_id: DataId,
2432 dataflow: &mut RunningDataflow,
2433 metadata: &metadata::Metadata,
2434 data: Option<DataMessage>,
2435 clock: &HLC,
2436) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2437 let timestamp = metadata.timestamp();
2438 let empty_set = BTreeSet::new();
2439 let output_id = OutputId(node_id, output_id);
2440 let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2441 let OutputId(node_id, _) = output_id;
2442 let mut closed = Vec::new();
2443 for (receiver_id, input_id) in local_receivers {
2444 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2445 let item = NodeEvent::Input {
2446 id: input_id.clone(),
2447 metadata: metadata.clone(),
2448 data: data.clone(),
2449 };
2450 match channel.send(Timestamped {
2451 inner: item,
2452 timestamp,
2453 }) {
2454 Ok(()) => {
2455 if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2456 dataflow
2457 .pending_drop_tokens
2458 .entry(token)
2459 .or_insert_with(|| DropTokenInformation {
2460 owner: node_id.clone(),
2461 pending_nodes: Default::default(),
2462 })
2463 .pending_nodes
2464 .insert(receiver_id.clone());
2465 }
2466 }
2467 Err(_) => {
2468 closed.push(receiver_id);
2469 }
2470 }
2471 }
2472 }
2473 for id in closed {
2474 dataflow.subscribe_channels.remove(id);
2475 }
2476 let (data_bytes, drop_token) = match data {
2477 None => (None, None),
2478 Some(DataMessage::SharedMemory {
2479 shared_memory_id,
2480 len,
2481 drop_token,
2482 }) => {
2483 let memory = ShmemConf::new()
2484 .os_id(shared_memory_id)
2485 .open()
2486 .wrap_err("failed to map shared memory output")?;
2487 let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2488 (data, Some(drop_token))
2489 }
2490 Some(DataMessage::Vec(v)) => (Some(v), None),
2491 };
2492 if let Some(token) = drop_token {
2493 dataflow
2495 .pending_drop_tokens
2496 .entry(token)
2497 .or_insert_with(|| DropTokenInformation {
2498 owner: node_id.clone(),
2499 pending_nodes: Default::default(),
2500 });
2501 dataflow.check_drop_token(token, clock).await?;
2503 }
2504 Ok(data_bytes)
2505}
2506
2507fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2508 match &node.kind {
2509 CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2510 CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2511 }
2512}
2513
2514fn close_input(
2515 dataflow: &mut RunningDataflow,
2516 receiver_id: &NodeId,
2517 input_id: &DataId,
2518 clock: &HLC,
2519) {
2520 if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2521 if !open_inputs.remove(input_id) {
2522 return;
2523 }
2524 }
2525 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2526 let _ = send_with_timestamp(
2527 channel,
2528 NodeEvent::InputClosed {
2529 id: input_id.clone(),
2530 },
2531 clock,
2532 );
2533
2534 if dataflow.open_inputs(receiver_id).is_empty() {
2535 if let Some(node) = dataflow.running_nodes.get_mut(receiver_id) {
2536 node.disable_restart();
2537 }
2538 let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2539 }
2540 }
2541}
2542
2543#[derive(Debug)]
2544pub struct RunningNode {
2545 process: Option<ProcessHandle>,
2546 node_config: NodeConfig,
2547 pid: Option<Arc<AtomicU32>>,
2548 restart_policy: RestartPolicy,
2549 disable_restart: Arc<AtomicBool>,
2554}
2555
2556impl RunningNode {
2557 pub fn restarts_disabled(&self) -> bool {
2558 self.disable_restart.load(atomic::Ordering::Acquire)
2559 }
2560
2561 pub fn disable_restart(&mut self) {
2562 self.disable_restart.store(true, atomic::Ordering::Release);
2563 }
2564}
2565
2566#[derive(Debug)]
2567enum ProcessOperation {
2568 SoftKill,
2569 Kill,
2570}
2571
2572impl ProcessOperation {
2573 pub fn execute(&self, child: &mut dyn TokioChildWrapper) {
2574 match self {
2575 Self::SoftKill => {
2576 #[cfg(unix)]
2577 {
2578 if let Err(err) = child.signal(15) {
2580 warn!("failed to send SIGTERM to process {:?}: {err}", child.id());
2581 }
2582 }
2583
2584 #[cfg(windows)]
2585 unsafe {
2586 let Some(pid) = child.id() else {
2587 warn!("failed to get child process id");
2588 return;
2589 };
2590 if let Err(err) = windows::Win32::System::Console::GenerateConsoleCtrlEvent(
2591 windows::Win32::System::Console::CTRL_BREAK_EVENT,
2592 pid,
2593 ) {
2594 warn!("failed to send CTRL_BREAK_EVENT to process {pid}: {err}");
2595 }
2596 }
2597
2598 #[cfg(not(any(unix, windows)))]
2599 {
2600 warn!("killing process is not implemented on this platform");
2601 }
2602 }
2603 Self::Kill => {
2604 if let Err(err) = child.start_kill() {
2605 warn!("failed to kill child process: {err}");
2606 }
2607 }
2608 }
2609 }
2610}
2611
2612#[derive(Debug)]
2613struct ProcessHandle {
2614 op_tx: flume::Sender<ProcessOperation>,
2615}
2616
2617impl ProcessHandle {
2618 pub fn new(op_tx: flume::Sender<ProcessOperation>) -> Self {
2619 Self { op_tx }
2620 }
2621
2622 pub fn submit(&self, operation: ProcessOperation) -> bool {
2625 self.op_tx.send(operation).is_ok()
2626 }
2627}
2628
2629impl Drop for ProcessHandle {
2630 fn drop(&mut self) {
2631 if self.submit(ProcessOperation::Kill) {
2632 warn!("process was killed on drop because it was still running");
2633 }
2634 }
2635}
2636
2637pub struct RunningDataflow {
2638 id: Uuid,
2639 pending_nodes: PendingNodes,
2641
2642 dataflow_started: bool,
2643
2644 subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2645 drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2646 mappings: HashMap<OutputId, BTreeSet<InputId>>,
2647 timers: BTreeMap<Duration, BTreeSet<InputId>>,
2648 open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2649 running_nodes: BTreeMap<NodeId, RunningNode>,
2650
2651 dynamic_nodes: BTreeSet<NodeId>,
2656
2657 open_external_mappings: BTreeSet<OutputId>,
2658
2659 pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
2660
2661 _timer_handles: BTreeMap<Duration, futures::future::RemoteHandle<()>>,
2663 stop_sent: bool,
2664
2665 empty_set: BTreeSet<DataId>,
2669
2670 cascading_error_causes: CascadingErrorCauses,
2672 grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
2673
2674 node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
2675
2676 publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
2677
2678 finished_tx: broadcast::Sender<()>,
2679
2680 publish_all_messages_to_zenoh: bool,
2681}
2682
2683impl RunningDataflow {
2684 fn new(
2685 dataflow_id: Uuid,
2686 daemon_id: DaemonId,
2687 dataflow_descriptor: &Descriptor,
2688 ) -> RunningDataflow {
2689 let (finished_tx, _) = broadcast::channel(1);
2690 Self {
2691 id: dataflow_id,
2692 pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
2693 dataflow_started: false,
2694 subscribe_channels: HashMap::new(),
2695 drop_channels: HashMap::new(),
2696 mappings: HashMap::new(),
2697 timers: BTreeMap::new(),
2698 open_inputs: BTreeMap::new(),
2699 running_nodes: BTreeMap::new(),
2700 dynamic_nodes: BTreeSet::new(),
2701 open_external_mappings: Default::default(),
2702 pending_drop_tokens: HashMap::new(),
2703 _timer_handles: BTreeMap::new(),
2704 stop_sent: false,
2705 empty_set: BTreeSet::new(),
2706 cascading_error_causes: Default::default(),
2707 grace_duration_kills: Default::default(),
2708 node_stderr_most_recent: BTreeMap::new(),
2709 publishers: Default::default(),
2710 finished_tx,
2711 publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
2712 }
2713 }
2714
2715 async fn start(
2716 &mut self,
2717 events_tx: &mpsc::Sender<Timestamped<Event>>,
2718 clock: &Arc<HLC>,
2719 ) -> eyre::Result<()> {
2720 for interval in self.timers.keys().copied() {
2721 if self._timer_handles.get(&interval).is_some() {
2722 continue;
2723 }
2724 let events_tx = events_tx.clone();
2725 let dataflow_id = self.id;
2726 let clock = clock.clone();
2727 let task = async move {
2728 let mut interval_stream = tokio::time::interval(interval);
2729 let hlc = HLC::default();
2730 loop {
2731 interval_stream.tick().await;
2732
2733 let span = tracing::span!(tracing::Level::TRACE, "tick");
2734 let _ = span.enter();
2735
2736 let mut parameters = BTreeMap::new();
2737 parameters.insert(
2738 "open_telemetry_context".to_string(),
2739 #[cfg(feature = "telemetry")]
2740 Parameter::String(serialize_context(&span.context())),
2741 #[cfg(not(feature = "telemetry"))]
2742 Parameter::String("".into()),
2743 );
2744
2745 let metadata = metadata::Metadata::from_parameters(
2746 hlc.new_timestamp(),
2747 empty_type_info(),
2748 parameters,
2749 );
2750
2751 let event = Timestamped {
2752 inner: DoraEvent::Timer {
2753 dataflow_id,
2754 interval,
2755 metadata,
2756 }
2757 .into(),
2758 timestamp: clock.new_timestamp(),
2759 };
2760 if events_tx.send(event).await.is_err() {
2761 break;
2762 }
2763 }
2764 };
2765 let (task, handle) = task.remote_handle();
2766 tokio::spawn(task);
2767 self._timer_handles.insert(interval, handle);
2768 }
2769
2770 Ok(())
2771 }
2772
2773 async fn stop_all(
2774 &mut self,
2775 coordinator_connection: &mut Option<TcpStream>,
2776 clock: &HLC,
2777 grace_duration: Option<Duration>,
2778 force: bool,
2779 logger: &mut DataflowLogger<'_>,
2780 ) -> eyre::Result<()> {
2781 self.pending_nodes
2782 .handle_dataflow_stop(
2783 coordinator_connection,
2784 clock,
2785 &mut self.cascading_error_causes,
2786 &self.dynamic_nodes,
2787 logger,
2788 )
2789 .await?;
2790
2791 for node in self.running_nodes.values_mut() {
2792 node.disable_restart();
2793 }
2794
2795 for (_node_id, channel) in self.subscribe_channels.drain() {
2796 let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
2797 }
2798
2799 let running_processes: Vec<_> = self
2800 .running_nodes
2801 .iter_mut()
2802 .map(|(id, n)| (id.clone(), n.process.take()))
2803 .collect();
2804 if force {
2805 for (_, proc) in &running_processes {
2806 if let Some(proc) = proc {
2807 proc.submit(crate::ProcessOperation::Kill);
2808 }
2809 }
2810 } else {
2811 let grace_duration_kills = self.grace_duration_kills.clone();
2812 tokio::spawn(async move {
2813 let duration = grace_duration.unwrap_or(Duration::from_millis(10000));
2814 tokio::time::sleep(duration).await;
2815
2816 for (node, proc) in &running_processes {
2817 if let Some(proc) = proc {
2818 if proc.submit(crate::ProcessOperation::SoftKill) {
2819 grace_duration_kills.insert(node.clone());
2820 }
2821 }
2822 }
2823
2824 let kill_duration = duration / 2;
2825 tokio::time::sleep(kill_duration).await;
2826
2827 for (node, proc) in &running_processes {
2828 if let Some(proc) = proc {
2829 if proc.submit(crate::ProcessOperation::Kill) {
2830 grace_duration_kills.insert(node.clone());
2831 warn!(
2832 "{node} was killed due to not stopping within the {:#?} grace period",
2833 duration + kill_duration
2834 );
2835 }
2836 }
2837 }
2838 });
2839 }
2840 self.stop_sent = true;
2841 Ok(())
2842 }
2843
2844 fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2845 self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2846 }
2847
2848 async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2849 match self.pending_drop_tokens.entry(token) {
2850 std::collections::hash_map::Entry::Occupied(entry) => {
2851 if entry.get().pending_nodes.is_empty() {
2852 let (drop_token, info) = entry.remove_entry();
2853 let result = match self.drop_channels.get_mut(&info.owner) {
2854 Some(channel) => send_with_timestamp(
2855 channel,
2856 NodeDropEvent::OutputDropped { drop_token },
2857 clock,
2858 )
2859 .wrap_err("send failed"),
2860 None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2861 };
2862 if let Err(err) = result.wrap_err_with(|| {
2863 format!(
2864 "failed to report drop token `{drop_token:?}` to owner `{}`",
2865 &info.owner
2866 )
2867 }) {
2868 tracing::warn!("{err:?}");
2869 }
2870 }
2871 }
2872 std::collections::hash_map::Entry::Vacant(_) => {
2873 tracing::warn!("check_drop_token called with already closed token")
2874 }
2875 }
2876
2877 Ok(())
2878 }
2879}
2880
2881fn empty_type_info() -> ArrowTypeInfo {
2882 ArrowTypeInfo {
2883 data_type: DataType::Null,
2884 len: 0,
2885 null_count: 0,
2886 validity: None,
2887 offset: 0,
2888 buffer_offsets: Vec::new(),
2889 child_data: Vec::new(),
2890 }
2891}
2892
2893#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2894pub struct OutputId(NodeId, DataId);
2895type InputId = (NodeId, DataId);
2896
2897struct DropTokenInformation {
2898 owner: NodeId,
2900 pending_nodes: BTreeSet<NodeId>,
2903}
2904
2905#[derive(Debug)]
2906pub enum Event {
2907 Node {
2908 dataflow_id: DataflowId,
2909 node_id: NodeId,
2910 event: DaemonNodeEvent,
2911 },
2912 Coordinator(CoordinatorEvent),
2913 Daemon(InterDaemonEvent),
2914 Dora(DoraEvent),
2915 DynamicNode(DynamicNodeEventWrapper),
2916 HeartbeatInterval,
2917 MetricsInterval,
2918 CtrlC,
2919 SecondCtrlC,
2920 DaemonError(eyre::Report),
2921 SpawnNodeResult {
2922 dataflow_id: DataflowId,
2923 node_id: NodeId,
2924 dynamic_node: bool,
2925 result: Result<RunningNode, NodeError>,
2926 },
2927 BuildDataflowResult {
2928 build_id: BuildId,
2929 session_id: SessionId,
2930 result: eyre::Result<BuildInfo>,
2931 },
2932 SpawnDataflowResult {
2933 dataflow_id: Uuid,
2934 result: eyre::Result<()>,
2935 },
2936 NodeStopped {
2937 dataflow_id: Uuid,
2938 node_id: NodeId,
2939 },
2940}
2941
2942impl From<DoraEvent> for Event {
2943 fn from(event: DoraEvent) -> Self {
2944 Event::Dora(event)
2945 }
2946}
2947
2948impl Event {
2949 pub fn kind(&self) -> &'static str {
2950 match self {
2951 Event::Node { .. } => "Node",
2952 Event::Coordinator(_) => "Coordinator",
2953 Event::Daemon(_) => "Daemon",
2954 Event::Dora(_) => "Dora",
2955 Event::DynamicNode(_) => "DynamicNode",
2956 Event::HeartbeatInterval => "HeartbeatInterval",
2957 Event::MetricsInterval => "MetricsInterval",
2958 Event::CtrlC => "CtrlC",
2959 Event::SecondCtrlC => "SecondCtrlC",
2960 Event::DaemonError(_) => "DaemonError",
2961 Event::SpawnNodeResult { .. } => "SpawnNodeResult",
2962 Event::BuildDataflowResult { .. } => "BuildDataflowResult",
2963 Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
2964 Event::NodeStopped { .. } => "NodeStopped",
2965 }
2966 }
2967}
2968
2969#[derive(Debug)]
2970#[allow(clippy::large_enum_variant)]
2971pub enum DaemonNodeEvent {
2972 OutputsDone {
2973 reply_sender: oneshot::Sender<DaemonReply>,
2974 },
2975 Subscribe {
2976 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2977 reply_sender: oneshot::Sender<DaemonReply>,
2978 },
2979 SubscribeDrop {
2980 event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2981 reply_sender: oneshot::Sender<DaemonReply>,
2982 },
2983 CloseOutputs {
2984 outputs: Vec<dora_core::config::DataId>,
2985 reply_sender: oneshot::Sender<DaemonReply>,
2986 },
2987 SendOut {
2988 output_id: DataId,
2989 metadata: metadata::Metadata,
2990 data: Option<DataMessage>,
2991 },
2992 ReportDrop {
2993 tokens: Vec<DropToken>,
2994 },
2995 EventStreamDropped {
2996 reply_sender: oneshot::Sender<DaemonReply>,
2997 },
2998}
2999
3000#[derive(Debug)]
3001pub enum DoraEvent {
3002 Timer {
3003 dataflow_id: DataflowId,
3004 interval: Duration,
3005 metadata: metadata::Metadata,
3006 },
3007 Logs {
3008 dataflow_id: DataflowId,
3009 output_id: OutputId,
3010 message: DataMessage,
3011 metadata: metadata::Metadata,
3012 },
3013 SpawnedNodeResult {
3014 dataflow_id: DataflowId,
3015 node_id: NodeId,
3016 dynamic_node: bool,
3017 exit_status: NodeExitStatus,
3018 restart: bool,
3020 },
3021}
3022
3023#[must_use]
3024enum RunStatus {
3025 Continue,
3026 Exit,
3027}
3028
3029fn send_with_timestamp<T>(
3030 sender: &UnboundedSender<Timestamped<T>>,
3031 event: T,
3032 clock: &HLC,
3033) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
3034 sender.send(Timestamped {
3035 inner: event,
3036 timestamp: clock.new_timestamp(),
3037 })
3038}
3039
3040fn set_up_ctrlc_handler(
3041 clock: Arc<HLC>,
3042) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
3043 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
3044
3045 let mut ctrlc_sent = 0;
3046 ctrlc::set_handler(move || {
3047 let event = match ctrlc_sent {
3048 0 => Event::CtrlC,
3049 1 => Event::SecondCtrlC,
3050 _ => {
3051 tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
3052 std::process::abort();
3053 }
3054 };
3055 if ctrlc_tx
3056 .blocking_send(Timestamped {
3057 inner: event,
3058 timestamp: clock.new_timestamp(),
3059 })
3060 .is_err()
3061 {
3062 tracing::error!("failed to report ctrl-c event to dora-coordinator");
3063 }
3064
3065 ctrlc_sent += 1;
3066 })
3067 .wrap_err("failed to set ctrl-c handler")?;
3068
3069 Ok(ctrlc_rx)
3070}
3071
3072#[derive(Debug, Default, Clone, PartialEq, Eq)]
3073pub struct CascadingErrorCauses {
3074 caused_by: BTreeMap<NodeId, NodeId>,
3075}
3076
3077impl CascadingErrorCauses {
3078 pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
3079 self.caused_by.contains_key(node)
3080 }
3081
3082 pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
3084 self.caused_by.get(node)
3085 }
3086
3087 pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
3088 self.caused_by.entry(affected_node).or_insert(causing_node);
3089 }
3090}
3091
3092fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
3093 n.operators
3094 .iter()
3095 .flat_map(|operator| {
3096 operator.config.inputs.iter().map(|(input_id, mapping)| {
3097 (
3098 DataId::from(format!("{}/{input_id}", operator.id)),
3099 mapping.clone(),
3100 )
3101 })
3102 })
3103 .collect()
3104}
3105
3106fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
3107 n.operators
3108 .iter()
3109 .flat_map(|operator| {
3110 operator
3111 .config
3112 .outputs
3113 .iter()
3114 .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
3115 })
3116 .collect()
3117}
3118
3119trait CoreNodeKindExt {
3120 fn run_config(&self) -> NodeRunConfig;
3121 fn dynamic(&self) -> bool;
3122}
3123
3124impl CoreNodeKindExt for CoreNodeKind {
3125 fn run_config(&self) -> NodeRunConfig {
3126 match self {
3127 CoreNodeKind::Runtime(n) => NodeRunConfig {
3128 inputs: runtime_node_inputs(n),
3129 outputs: runtime_node_outputs(n),
3130 },
3131 CoreNodeKind::Custom(n) => n.run_config.clone(),
3132 }
3133 }
3134
3135 fn dynamic(&self) -> bool {
3136 match self {
3137 CoreNodeKind::Runtime(_n) => false,
3138 CoreNodeKind::Custom(n) => {
3139 matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
3140 }
3141 }
3142 }
3143}