1use aligned_vec::{AVec, ConstAlign};
2use crossbeam::queue::ArrayQueue;
3use dora_core::{
4 build::{self, BuildInfo, PrevGitSource, TracingBuildLogger},
5 config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
6 descriptor::{
7 CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
8 read_as_descriptor,
9 },
10 topics::{
11 DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST, open_zenoh_session,
12 zenoh_output_publish_topic,
13 },
14 uhlc::HLC,
15};
16use dora_message::{
17 BuildId, DataflowId, SessionId,
18 common::{
19 DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
20 NodeExitStatus,
21 },
22 coordinator_to_cli::DataflowResult,
23 coordinator_to_daemon::SpawnDataflowNodes,
24 daemon_to_coordinator::DataflowDaemonResult,
25 daemon_to_daemon::InterDaemonEvent,
26 daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
27 descriptor::{NodeSource, RestartPolicy},
28 metadata::{self, ArrowTypeInfo},
29 node_to_daemon::{DynamicNodeEvent, Timestamped},
30 tarpc,
31};
32use dora_node_api::{Parameter, arrow::datatypes::DataType};
33use eyre::{Context, ContextCompat, Result, bail, eyre};
34use futures::{FutureExt, TryFutureExt, future, stream};
35use futures_concurrency::stream::Merge;
36use local_listener::DynamicNodeEventWrapper;
37use log::{DaemonLogger, DataflowLogger, Logger};
38use pending::PendingNodes;
39use process_wrap::tokio::TokioChildWrapper;
40use shared_memory_extended::ShmemConf;
41use spawn::Spawner;
42use std::{
43 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
44 env::current_dir,
45 future::Future,
46 io,
47 net::SocketAddr,
48 path::{Path, PathBuf},
49 pin::pin,
50 sync::{
51 Arc,
52 atomic::{self, AtomicBool, AtomicU32},
53 },
54 time::{Duration, Instant},
55};
56use tokio::{
57 fs::File,
58 io::{AsyncReadExt, AsyncSeekExt},
59 net::TcpStream,
60 sync::{
61 broadcast,
62 mpsc::{self, UnboundedSender},
63 oneshot::{self, Sender},
64 },
65};
66use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
67use tracing::{error, warn};
68use uuid::{NoContext, Timestamp, Uuid};
69
70pub use flume;
71pub use log::LogDestination;
72
73mod coordinator;
74mod extract_err_from_stderr;
75mod local_listener;
76mod log;
77mod node_communication;
78mod pending;
79mod socket_stream_utils;
80mod spawn;
81pub(crate) mod state;
82
83#[cfg(feature = "telemetry")]
84use dora_tracing::telemetry::serialize_context;
85#[cfg(feature = "telemetry")]
86use tracing_opentelemetry::OpenTelemetrySpanExt;
87
88use crate::{extract_err_from_stderr::extract_err_from_stderr, pending::DataflowStatus};
89
90const STDERR_LOG_LINES_MAX: usize = 500;
91
92static EMPTY_SET: BTreeSet<DataId> = BTreeSet::new();
93
94pub struct Daemon {
95 pub(crate) state: Arc<state::DaemonState>,
96
97 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
99 exit_when_all_finished: bool,
101
102 logger: DaemonLogger,
103
104 metrics_system: sysinfo::System,
106}
107
108type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
109
110struct NodeBuildTask<F> {
111 node_id: NodeId,
112 dynamic_node: bool,
113 task: F,
114}
115
116impl Daemon {
117 pub async fn run(
118 coordinator_addr: SocketAddr,
119 machine_id: Option<String>,
120 local_listen_port: u16,
121 ) -> eyre::Result<()> {
122 let clock = Arc::new(HLC::default());
123
124 let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
125 let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
126
127 let (dora_events_tx, dora_events_rx) = mpsc::channel(1000);
129
130 let zenoh_session = open_zenoh_session(Some(coordinator_addr.ip()))
131 .await
132 .wrap_err("failed to open zenoh session")?;
133
134 let daemon_state = Arc::new(state::DaemonState::new(
137 clock.clone(),
138 dora_events_tx,
139 Some(zenoh_session),
140 Some(remote_daemon_events_tx),
141 ));
142
143 let ((daemon_id, coordinator_client), incoming_events) = {
144 let incoming_events = set_up_event_stream(
145 coordinator_addr,
146 &machine_id,
147 &clock,
148 daemon_state.clone(),
149 remote_daemon_events_rx,
150 local_listen_port,
151 );
152
153 let ctrl_c = pin!(ctrlc_events.recv());
155 match futures::future::select(ctrl_c, pin!(incoming_events)).await {
156 future::Either::Left((_ctrl_c, _)) => {
157 tracing::info!("received ctrl-c signal -> stopping daemon");
158 return Ok(());
159 }
160 future::Either::Right((events, _)) => events?,
161 }
162 };
163
164 daemon_state.set_daemon_id(daemon_id.clone());
165 daemon_state.set_coordinator_client(coordinator_client);
166
167 let log_destination = {
168 let stream = TcpStream::connect(coordinator_addr)
170 .await
171 .wrap_err("failed to connect log to dora-coordinator")?;
172 stream
173 .set_nodelay(true)
174 .wrap_err("failed to set TCP_NODELAY")?;
175 LogDestination::Coordinator {
176 coordinator_connection: stream,
177 }
178 };
179
180 Self::run_general(
181 (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
182 daemon_id,
183 None,
184 daemon_state,
185 dora_events_rx,
186 log_destination,
187 )
188 .await
189 .map(|_| ())
190 }
191
192 pub async fn run_dataflow(
193 dataflow_path: &Path,
194 build_id: Option<BuildId>,
195 local_build: Option<BuildInfo>,
196 session_id: SessionId,
197 uv: bool,
198 log_destination: LogDestination,
199 write_events_to: Option<PathBuf>,
200 stop_after: Option<Duration>,
201 ) -> eyre::Result<DataflowResult> {
202 let working_dir = dataflow_path
203 .canonicalize()
204 .context("failed to canonicalize dataflow path")?
205 .parent()
206 .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
207 .to_owned();
208
209 let descriptor = read_as_descriptor(dataflow_path).await?;
210 if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
211 eyre::bail!(
212 "node {} has a `deploy` section, which is not supported in `dora run`\n\n
213 Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
214 instances and then use `dora start`.",
215 node.id
216 )
217 }
218
219 descriptor.check(&working_dir)?;
220 let nodes = descriptor.resolve_aliases_and_set_defaults()?;
221
222 let (events_tx, events_rx) = flume::bounded(10);
223 if nodes
224 .iter()
225 .find(|(_n, resolved_nodes)| resolved_nodes.kind.dynamic())
226 .is_some()
227 {
228 let _listen_port = local_listener::spawn_listener_loop(
230 (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(),
231 events_tx,
232 )
233 .await?;
234 }
235 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
236 inner: Event::DynamicNode(e.inner),
237 timestamp: e.timestamp,
238 });
239
240 let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
241 let spawn_command = SpawnDataflowNodes {
242 build_id,
243 session_id,
244 dataflow_id,
245 local_working_dir: Some(working_dir),
246 spawn_nodes: nodes.keys().cloned().collect(),
247 nodes,
248 dataflow_descriptor: descriptor,
249 uv,
250 write_events_to,
251 };
252
253 let clock = Arc::new(HLC::default());
254
255 let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
256
257 let timeout_events = if let Some(duration) = stop_after {
259 let clock = clock.clone();
260 let (tx, rx) = tokio::sync::mpsc::channel(1);
261 tokio::spawn(async move {
262 tokio::time::sleep(duration).await;
263 tracing::info!("stop-after timeout reached ({duration:?}) -> stopping dataflow");
264 let _ = tx
265 .send(Timestamped {
266 inner: Event::StopAfter(duration),
267 timestamp: clock.new_timestamp(),
268 })
269 .await;
270 });
271 ReceiverStream::new(rx)
272 } else {
273 ReceiverStream::new(tokio::sync::mpsc::channel(1).1)
275 };
276
277 let all_nodes_dynamic = spawn_command.nodes.values().all(|n| n.kind.dynamic());
278 let exit_when_done: BTreeSet<(Uuid, NodeId)> = spawn_command
279 .nodes
280 .values()
281 .filter(|n| !n.kind.dynamic())
282 .map(|n| (spawn_command.dataflow_id, n.id.clone()))
283 .collect();
284
285 let (reply_tx, reply_rx) = oneshot::channel();
288 let spawn_event_clock = clock.clone();
289 let spawn_event = stream::once(async move {
290 Timestamped {
291 inner: Event::SpawnRequest {
292 build_id: spawn_command.build_id,
293 dataflow_id: spawn_command.dataflow_id,
294 local_working_dir: spawn_command.local_working_dir,
295 nodes: spawn_command.nodes,
296 dataflow_descriptor: spawn_command.dataflow_descriptor,
297 spawn_nodes: spawn_command.spawn_nodes,
298 uv: spawn_command.uv,
299 write_events_to: spawn_command.write_events_to,
300 reply_tx,
301 },
302 timestamp: spawn_event_clock.new_timestamp(),
303 }
304 });
305 let events = (
306 spawn_event,
307 ctrlc_events,
308 timeout_events,
309 dynamic_node_events,
310 )
311 .merge();
312
313 let builds_map: BTreeMap<BuildId, BuildInfo> = if let Some(local_build) = local_build {
314 let Some(build_id) = build_id else {
315 bail!("no build_id, but local_build set")
316 };
317 let mut builds = BTreeMap::new();
318 builds.insert(build_id, local_build);
319 builds
320 } else {
321 Default::default()
322 };
323
324 let zenoh_session = open_zenoh_session(None)
325 .await
326 .wrap_err("failed to open zenoh session")?;
327 let daemon_id = DaemonId::new(None);
328 let (dora_events_tx, dora_events_rx) = mpsc::channel(1000);
329 let daemon_state = Arc::new(state::DaemonState::new_standalone(
330 clock.clone(),
331 daemon_id.clone(),
332 dora_events_tx,
333 zenoh_session,
334 builds_map,
335 ));
336
337 let run_result = Self::run_general(
338 Box::pin(events),
339 daemon_id,
340 Some(exit_when_done),
341 daemon_state,
342 dora_events_rx,
343 log_destination,
344 );
345
346 let spawn_result = reply_rx
347 .map_err(|err| eyre!("failed to receive spawn result: {err}"))
348 .and_then(|r| async { r.map_err(|err| eyre!(err)) });
349
350 let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
351
352 let node_results = match dataflow_results.remove(&dataflow_id) {
353 Some(results) => results,
354 None if all_nodes_dynamic => {
355 BTreeMap::new()
358 }
359 None => {
360 return Err(eyre::eyre!("no node results for dataflow_id {dataflow_id}"));
361 }
362 };
363
364 Ok(DataflowResult {
365 uuid: dataflow_id,
366 timestamp: clock.new_timestamp(),
367 node_results,
368 })
369 }
370
371 #[allow(clippy::too_many_arguments)]
374 async fn run_general(
375 external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
376 daemon_id: DaemonId,
377 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
378 daemon_state: Arc<state::DaemonState>,
379 dora_events_rx: mpsc::Receiver<Timestamped<Event>>,
380 log_destination: LogDestination,
381 ) -> eyre::Result<DaemonRunResult> {
382 let clock = daemon_state.clock.clone();
383 let daemon = Self {
384 logger: Logger {
385 destination: log_destination,
386 daemon_id: daemon_id.clone(),
387 clock: clock.clone(),
388 }
389 .for_daemon(daemon_id.clone()),
390 state: daemon_state,
391 exit_when_done,
392 exit_when_all_finished: false,
393 metrics_system: sysinfo::System::new(),
394 };
395
396 let dora_events = ReceiverStream::new(dora_events_rx);
397 let watchdog_clock = daemon.state.clock.clone();
398 let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
399 Duration::from_secs(5),
400 ))
401 .map(move |_| Timestamped {
402 inner: Event::HeartbeatInterval,
403 timestamp: watchdog_clock.new_timestamp(),
404 });
405
406 let metrics_clock = daemon.state.clock.clone();
407 let metrics_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
408 Duration::from_secs(2), ))
410 .map(move |_| Timestamped {
411 inner: Event::MetricsInterval,
412 timestamp: metrics_clock.new_timestamp(),
413 });
414
415 let events = (
416 external_events,
417 dora_events,
418 watchdog_interval,
419 metrics_interval,
420 )
421 .merge();
422 daemon.run_inner(events).await
423 }
424
425 #[tracing::instrument(skip(incoming_events, self), fields(daemon_id = ?self.state.daemon_id()))]
426 async fn run_inner(
427 mut self,
428 incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
429 ) -> eyre::Result<DaemonRunResult> {
430 let mut events = incoming_events;
431
432 while let Some(event) = events.next().await {
433 let Timestamped { inner, timestamp } = event;
434 if let Err(err) = self.state.clock.update_with_timestamp(×tamp) {
435 tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
436 }
437
438 let start = Instant::now();
440 let event_kind = inner.kind();
441
442 match inner {
443 Event::Daemon(event) => {
444 self.handle_inter_daemon_event(event).await?;
445 }
446 Event::Node {
447 dataflow_id: dataflow,
448 node_id,
449 event,
450 } => self.handle_node_event(event, dataflow, node_id).await?,
451 Event::Dora(event) => self.handle_dora_event(event).await?,
452 Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
453 Event::HeartbeatInterval => {
454 if let Some(client) = self.state.coordinator_client() {
455 let client = client.clone();
458 tokio::spawn(async move {
459 let _ = client.heartbeat(tarpc::context::current()).await;
460 });
461
462 let last_hb = self.state.last_coordinator_heartbeat.lock().await;
463 if last_hb.elapsed() > Duration::from_secs(20) {
464 bail!("lost connection to coordinator")
465 }
466 }
467 }
468 Event::Destroy => {
469 tracing::info!("received destroy command -> exiting");
470 break;
471 }
472 Event::SpawnRequest {
473 build_id,
474 dataflow_id,
475 local_working_dir,
476 nodes,
477 dataflow_descriptor,
478 spawn_nodes,
479 uv,
480 write_events_to,
481 reply_tx,
482 } => {
483 let result = self
484 .handle_spawn_request(
485 build_id,
486 dataflow_id,
487 local_working_dir,
488 nodes,
489 dataflow_descriptor,
490 spawn_nodes,
491 uv,
492 write_events_to,
493 )
494 .await;
495 let _ = reply_tx.send(result);
496 }
497 Event::StopDataflowRequest {
498 dataflow_id,
499 grace_duration,
500 force,
501 reply_tx,
502 } => {
503 let result = async {
504 {
507 let mut logger = self.logger.for_dataflow(dataflow_id);
508 let mut dataflow =
509 self.state.running.get_mut(&dataflow_id).ok_or_else(|| {
510 format!("no running dataflow with ID `{dataflow_id}`")
511 })?;
512 let df = &mut *dataflow;
513 df.pending_nodes
514 .handle_dataflow_stop(
515 &self.state.coordinator_client().cloned(),
516 &self.state.clock,
517 &mut df.cascading_error_causes,
518 &df.dynamic_nodes,
519 &mut logger,
520 )
521 .await
522 .map_err(|err| format!("{err:?}"))?;
523 }
524 let finish_when = {
526 let mut dataflow =
527 self.state.running.get_mut(&dataflow_id).ok_or_else(|| {
528 format!("no running dataflow with ID `{dataflow_id}`")
529 })?;
530 dataflow
531 .stop_all_after_pending(&self.state, grace_duration, force)
532 .map_err(|err| format!("{err:?}"))?
533 };
534 if matches!(finish_when, FinishDataflowWhen::Now) {
536 self.state
537 .finish_dataflow(dataflow_id)
538 .await
539 .map_err(|err| format!("{err:?}"))?;
540 }
541 Ok(())
542 }
543 .await;
544 let _ = reply_tx.send(result);
545 }
546 Event::MetricsInterval => {
547 self.collect_and_send_metrics().await?;
548 }
549 Event::CtrlC => {
550 tracing::info!("received ctrlc signal -> stopping all dataflows");
551 self.trigger_manual_stop().await?;
552 if self.state.running.is_empty() {
553 break;
554 }
555 }
556 Event::SecondCtrlC => {
557 tracing::warn!("received second ctrlc signal -> exit immediately");
558 bail!("received second ctrl-c signal");
559 }
560 Event::StopAfter(duration) => {
561 tracing::info!("stopping after {duration:?} as requested");
562 self.trigger_manual_stop().await?;
563 if self.state.running.is_empty() {
564 break;
565 }
566 }
567 Event::DaemonError(err) => {
568 bail!(err.wrap_err("fatal error from background task"));
569 }
570 Event::SpawnNodeResult {
571 dataflow_id,
572 node_id,
573 dynamic_node,
574 result,
575 } => match result {
576 Ok(running_node) => {
577 if let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) {
578 if let Some(ref abort_handle) = running_node.listener_abort_handle {
579 dataflow
580 ._listener_tasks
581 .push(ListenerTask(abort_handle.clone()));
582 }
583 dataflow.running_nodes.insert(node_id, running_node);
584 } else {
585 tracing::error!(
586 "failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}"
587 );
588 }
589 }
590 Err(error) => {
591 self.state
592 .dataflow_node_results
593 .entry(dataflow_id)
594 .or_default()
595 .insert(node_id.clone(), Err(error));
596 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
597 .await?;
598 }
599 },
600 Event::SpawnDataflowResult {
601 dataflow_id,
602 result,
603 } => {
604 if let Some(client) = self.state.coordinator_client() {
605 let client = client.clone();
606 let result = result.map_err(|err| format!("{err:?}"));
607 tokio::spawn(async move {
608 if let Err(err) = client
609 .spawn_result(tarpc::context::current(), dataflow_id, result)
610 .await
611 {
612 tracing::error!(
613 ?err,
614 "failed to send spawn_result notification to coordinator"
615 );
616 }
617 });
618 }
619 }
620 Event::NodeStopped {
621 dataflow_id,
622 node_id,
623 } => {
624 if let Some(exit_when_done) = &mut self.exit_when_done {
625 exit_when_done.remove(&(dataflow_id, node_id));
626 if exit_when_done.is_empty() {
627 tracing::info!(
628 "exiting daemon because all required dataflows are finished"
629 );
630 break;
631 }
632 }
633 if self.exit_when_all_finished && self.state.running.is_empty() {
635 tracing::info!("exiting daemon because all dataflows are finished");
636 break;
637 }
638 }
639 }
640
641 let elapsed = start.elapsed();
643 if elapsed > Duration::from_millis(100) {
644 tracing::warn!(
645 "Daemon took {}ms for handling event: {event_kind}",
646 elapsed.as_millis()
647 );
648 }
649 }
650
651 if let Some(client) = self.state.coordinator_client() {
652 let ctx = tarpc::context::current();
653 if let Err(err) = client.daemon_exit(ctx).await {
654 tracing::error!(
655 ?err,
656 "failed to send daemon_exit notification to coordinator"
657 );
658 }
659 }
660
661 Ok(self
662 .state
663 .dataflow_node_results
664 .iter()
665 .map(|entry| (*entry.key(), entry.value().clone()))
666 .collect())
667 }
668
669 async fn trigger_manual_stop(&mut self) -> eyre::Result<()> {
670 let dataflow_ids: Vec<DataflowId> = self
673 .state
674 .running
675 .iter()
676 .map(|entry| *entry.key())
677 .collect();
678
679 let mut dataflows_to_finish = Vec::new();
680 for dataflow_id in dataflow_ids {
681 if let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) {
682 let mut logger = self.logger.for_dataflow(dataflow_id);
683 let finish_when = dataflow
684 .stop_all(&self.state, None, false, &mut logger)
685 .await?;
686 if matches!(finish_when, FinishDataflowWhen::Now) {
687 dataflows_to_finish.push(dataflow_id);
688 }
689 }
690 }
691
692 for dataflow_id in dataflows_to_finish {
694 self.state.finish_dataflow(dataflow_id).await?;
695 }
696
697 self.exit_when_all_finished = true;
698 Ok(())
699 }
700
701 #[allow(clippy::too_many_arguments)]
702 async fn handle_spawn_request(
703 &mut self,
704 build_id: Option<BuildId>,
705 dataflow_id: DataflowId,
706 local_working_dir: Option<PathBuf>,
707 nodes: BTreeMap<NodeId, ResolvedNode>,
708 dataflow_descriptor: Descriptor,
709 spawn_nodes: BTreeSet<NodeId>,
710 uv: bool,
711 write_events_to: Option<PathBuf>,
712 ) -> Result<(), String> {
713 match dataflow_descriptor.communication.remote {
714 dora_core::config::RemoteCommunicationConfig::Tcp => {}
715 }
716
717 let base_working_dir = match local_working_dir {
719 Some(working_dir) => {
720 if working_dir.exists() {
721 Ok(working_dir)
722 } else {
723 Err(format!(
724 "working directory does not exist: {}",
725 working_dir.display(),
726 ))
727 }
728 }
729 None => {
730 let daemon_working_dir =
731 current_dir().map_err(|e| format!("failed to get daemon working dir: {e}"))?;
732 Ok(daemon_working_dir)
733 }
734 }?;
735
736 let result = self
737 .spawn_dataflow(
738 build_id,
739 dataflow_id,
740 base_working_dir,
741 nodes,
742 dataflow_descriptor,
743 spawn_nodes,
744 uv,
745 write_events_to,
746 )
747 .await;
748 let (trigger_result, result_task) = match result {
749 Ok(result_task) => (Ok(()), Some(result_task)),
750 Err(err) => (Err(format!("{err:?}")), None),
751 };
752
753 if let Some(result_task) = result_task {
755 let state = self.state.clone();
756 tokio::spawn(async move {
757 let result = result_task.await;
758 let spawn_result = result
759 .as_ref()
760 .map(|_| ())
761 .map_err(|err| format!("{err:?}"));
762
763 if let Some(client) = state.coordinator_client() {
764 let ctx = tarpc::context::current();
765 if let Err(err) = client.spawn_result(ctx, dataflow_id, spawn_result).await {
766 tracing::error!(
767 ?err,
768 "failed to send spawn_result notification to coordinator"
769 );
770 }
771 }
772 });
773 }
774
775 trigger_result
776 }
777
778 async fn collect_and_send_metrics(&mut self) -> eyre::Result<()> {
779 use dora_message::daemon_to_coordinator::NodeMetrics;
780 use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate};
781
782 if self.state.coordinator_client().is_none() {
783 return Ok(());
784 }
785
786 let system = &mut self.metrics_system;
788
789 const METRICS_INTERVAL_SECS: f64 = 2.0;
791
792 for entry in self.state.running.iter() {
794 let (dataflow_id, dataflow) = (entry.key(), entry.value());
795 let mut metrics = BTreeMap::new();
796
797 let pids: Vec<Pid> = dataflow
799 .running_nodes
800 .values()
801 .filter_map(|node| {
802 node.pid
803 .as_ref()
804 .map(|pid| Pid::from_u32(pid.load(atomic::Ordering::Acquire)))
805 })
806 .collect();
807
808 if !pids.is_empty() {
809 let refresh_kind = ProcessRefreshKind::nothing()
811 .with_cpu()
812 .with_memory()
813 .with_disk_usage();
814 system.refresh_processes_specifics(
815 ProcessesToUpdate::Some(&pids),
816 true,
817 refresh_kind,
818 );
819
820 for (node_id, running_node) in &dataflow.running_nodes {
822 if let Some(pid) = running_node.pid.as_ref() {
823 let pid = pid.load(atomic::Ordering::Acquire);
824 let sys_pid = Pid::from_u32(pid);
825 if let Some(process) = system.process(sys_pid) {
826 let disk_usage = process.disk_usage();
827 metrics.insert(
829 node_id.clone(),
830 NodeMetrics {
831 pid,
832 cpu_usage: process.cpu_usage(),
833 memory_bytes: process.memory(),
834 disk_read_bytes: Some(
835 (disk_usage.read_bytes as f64 / METRICS_INTERVAL_SECS)
836 as u64,
837 ),
838 disk_write_bytes: Some(
839 (disk_usage.written_bytes as f64 / METRICS_INTERVAL_SECS)
840 as u64,
841 ),
842 },
843 );
844 }
845 }
846 }
847 }
848
849 if !metrics.is_empty() {
851 if let Some(client) = self.state.coordinator_client() {
852 let client = client.clone();
853 let dataflow_id = *dataflow_id;
854 tokio::spawn(async move {
855 let _ = client
856 .node_metrics(tarpc::context::current(), dataflow_id, metrics)
857 .await;
858 });
859 }
860 }
861 }
862
863 Ok(())
864 }
865
866 async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
867 match event {
868 InterDaemonEvent::Output {
869 dataflow_id,
870 node_id,
871 output_id,
872 metadata,
873 data,
874 } => {
875 let inner = async {
876 let mut dataflow =
877 self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
878 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
879 })?;
880 send_output_to_local_receivers(
881 node_id.clone(),
882 output_id.clone(),
883 &mut *dataflow,
884 &metadata,
885 data.map(DataMessage::Vec),
886 &self.state.clock,
887 )
888 .await?;
889 Result::<_, eyre::Report>::Ok(())
890 };
891 if let Err(err) = inner
892 .await
893 .wrap_err("failed to forward remote output to local receivers")
894 {
895 let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
896 logger
897 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
898 .await;
899 }
900 Ok(())
901 }
902 InterDaemonEvent::OutputClosed {
903 dataflow_id,
904 node_id,
905 output_id,
906 } => {
907 let output_id = OutputId(node_id.clone(), output_id);
908 let mut logger = self
909 .logger
910 .for_dataflow(dataflow_id)
911 .for_node(node_id.clone());
912 logger
913 .log(
914 LogLevel::Debug,
915 Some("daemon".into()),
916 format!("received OutputClosed event for output {output_id:?}"),
917 )
918 .await;
919
920 let result = match self.state.running.get_mut(&dataflow_id) {
921 Some(mut dataflow) => {
922 if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
923 for (receiver_id, input_id) in &inputs {
924 close_input(
925 &mut *dataflow,
926 receiver_id,
927 input_id,
928 &self.state.clock,
929 );
930 }
931 }
932 Ok(())
933 }
934 None => Err(eyre::eyre!(
935 "send out failed: no running dataflow with ID `{dataflow_id}`"
936 )),
937 };
938 if let Err(err) =
939 result.wrap_err("failed to handle InputsClosed event sent by coordinator")
940 {
941 logger
942 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
943 .await;
944 }
945 Ok(())
946 }
947 InterDaemonEvent::NodeFailed {
948 dataflow_id,
949 source_node_id,
950 error,
951 } => {
952 tracing::debug!(
953 "received NodeFailed event from {source_node_id} for dataflow {dataflow_id}"
954 );
955
956 let result = match self.state.running.get_mut(&dataflow_id) {
957 Some(mut dataflow) => {
958 if dataflow.handled_node_failed.insert(source_node_id.clone()) {
962 let (_outputs, _remote_receivers) =
963 Self::find_and_notify_local_receivers(
964 &mut dataflow,
965 &source_node_id,
966 &error,
967 &self.state.clock,
968 );
969 }
970 Ok(())
971 }
972 None => Err(eyre::eyre!(
973 "failed to handle NodeFailed: no running dataflow with ID `{dataflow_id}`"
974 )),
975 };
976 if let Err(err) =
977 result.wrap_err("failed to handle NodeFailed event sent by remote daemon")
978 {
979 tracing::warn!("Failed to handle NodeFailed event: {err:?}");
980 }
981 Ok(())
982 }
983 }
984 }
985
986 #[allow(clippy::too_many_arguments)]
987 async fn build_dataflow(
988 &mut self,
989 build_id: BuildId,
990 session_id: SessionId,
991 base_working_dir: PathBuf,
992 git_sources: BTreeMap<NodeId, GitSource>,
993 prev_git_sources: BTreeMap<NodeId, GitSource>,
994 dataflow_descriptor: Descriptor,
995 local_nodes: BTreeSet<NodeId>,
996 uv: bool,
997 ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
998 let builder = build::Builder {
999 session_id,
1000 base_working_dir,
1001 uv,
1002 };
1003 {
1004 let mut git_manager = self.state.git_manager.lock().await;
1005 git_manager.clear_planned_builds(session_id);
1006 }
1007
1008 let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1009
1010 let mut tasks = Vec::new();
1011
1012 for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1014 let dynamic_node = node.kind.dynamic();
1015
1016 let node_id = node.id.clone();
1017 let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1018 logger.log(LogLevel::Debug, "building").await;
1019 let git_source = git_sources.get(&node_id).cloned();
1020 let prev_git_source = prev_git_sources.get(&node_id).cloned();
1021 let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1022 still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1023 git_source: prev_source,
1024 });
1025
1026 let logger_cloned = logger
1027 .try_clone_impl()
1028 .await
1029 .wrap_err("failed to clone logger")?;
1030
1031 let mut builder = builder.clone();
1032 if let Some(node_working_dir) =
1033 node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1034 {
1035 builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1036 }
1037
1038 let mut git_manager = self.state.git_manager.lock().await;
1039 match builder
1040 .build_node(node, git_source, prev_git, logger_cloned, &mut git_manager)
1041 .await
1042 .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1043 {
1044 Ok(result) => {
1045 tasks.push(NodeBuildTask {
1046 node_id,
1047 task: result,
1048 dynamic_node,
1049 });
1050 }
1051 Err(err) => {
1052 logger.log(LogLevel::Error, format!("{err:?}")).await;
1053 return Err(err);
1054 }
1055 }
1056 }
1057
1058 let task = async move {
1059 let mut info = BuildInfo {
1060 node_working_dirs: Default::default(),
1061 };
1062 for task in tasks {
1063 let NodeBuildTask {
1064 node_id,
1065 dynamic_node: _,
1066 task,
1067 } = task;
1068 let node = task
1069 .await
1070 .with_context(|| format!("failed to build node `{node_id}`"))?;
1071 info.node_working_dirs
1072 .insert(node_id, node.node_working_dir);
1073 }
1074 Ok(info)
1075 };
1076
1077 Ok(task)
1078 }
1079
1080 #[allow(clippy::too_many_arguments)]
1081 async fn spawn_dataflow(
1082 &mut self,
1083 build_id: Option<BuildId>,
1084 dataflow_id: DataflowId,
1085 base_working_dir: PathBuf,
1086 nodes: BTreeMap<NodeId, ResolvedNode>,
1087 dataflow_descriptor: Descriptor,
1088 spawn_nodes: BTreeSet<NodeId>,
1089 uv: bool,
1090 write_events_to: Option<PathBuf>,
1091 ) -> eyre::Result<impl Future<Output = eyre::Result<()>> + use<>> {
1092 let mut logger = self
1093 .logger
1094 .for_dataflow(dataflow_id)
1095 .try_clone()
1096 .await
1097 .context("failed to clone logger")?;
1098 let dataflow = RunningDataflow::new(
1099 dataflow_id,
1100 self.state.daemon_id().clone(),
1101 dataflow_descriptor.clone(),
1102 self.state.events_tx.clone(),
1103 self.state.clock.clone(),
1104 );
1105 let mut dataflow = match self.state.running.entry(dataflow_id) {
1106 dashmap::Entry::Vacant(entry) => {
1107 self.state
1108 .working_dir
1109 .insert(dataflow_id, base_working_dir.clone());
1110 entry.insert(dataflow)
1111 }
1112 dashmap::Entry::Occupied(_) => {
1113 bail!("there is already a running dataflow with ID `{dataflow_id}`")
1114 }
1115 };
1116
1117 let mut stopped = Vec::new();
1118
1119 let build_info = build_id.and_then(|build_id| self.state.builds.get(&build_id));
1120 let node_with_git_source = nodes.values().find(|n| n.has_git_source());
1121 if let Some(git_node) = node_with_git_source {
1122 if build_info.is_none() {
1123 eyre::bail!(
1124 "node {} has git source, but no `dora build` was run yet\n\n\
1125 nodes with a `git` field must be built using `dora build` before starting the \
1126 dataflow",
1127 git_node.id
1128 )
1129 }
1130 }
1131 let node_working_dirs = build_info
1132 .map(|info| info.node_working_dirs.clone())
1133 .unwrap_or_default();
1134
1135 for node in nodes.values() {
1137 let local = spawn_nodes.contains(&node.id);
1138
1139 let inputs = node_inputs(node);
1140 for (input_id, input) in inputs {
1141 if local {
1142 dataflow
1143 .open_inputs
1144 .entry(node.id.clone())
1145 .or_default()
1146 .insert(input_id.clone());
1147 match input.mapping {
1148 InputMapping::User(mapping) => {
1149 dataflow
1150 .mappings
1151 .entry(OutputId(mapping.source, mapping.output))
1152 .or_default()
1153 .insert((node.id.clone(), input_id));
1154 }
1155 InputMapping::Timer { interval } => {
1156 dataflow
1157 .timers
1158 .entry(interval)
1159 .or_default()
1160 .insert((node.id.clone(), input_id));
1161 }
1162 }
1163 } else if let InputMapping::User(mapping) = input.mapping {
1164 dataflow
1165 .open_external_mappings
1166 .insert(OutputId(mapping.source, mapping.output));
1167 }
1168 }
1169 }
1170
1171 let spawner = Spawner {
1172 dataflow_id,
1173 daemon_tx: self.state.events_tx.clone(),
1174 dataflow_descriptor,
1175 clock: self.state.clock.clone(),
1176 uv,
1177 };
1178
1179 let mut tasks = Vec::new();
1180
1181 for node in nodes.into_values() {
1183 let mut logger = logger.reborrow().for_node(node.id.clone());
1184 let local = spawn_nodes.contains(&node.id);
1185 if local {
1186 let dynamic_node = node.kind.dynamic();
1187 if dynamic_node {
1188 dataflow.dynamic_nodes.insert(node.id.clone());
1189 } else {
1190 dataflow.pending_nodes.insert(node.id.clone());
1191 }
1192
1193 let node_id = node.id.clone();
1194 let node_stderr_most_recent = dataflow
1195 .node_stderr_most_recent
1196 .entry(node.id.clone())
1197 .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES_MAX)))
1198 .clone();
1199
1200 let configured_node_working_dir = node_working_dirs.get(&node_id).cloned();
1201 if configured_node_working_dir.is_none() && node.has_git_source() {
1202 eyre::bail!(
1203 "node {} has git source, but no git clone directory was found for it\n\n\
1204 try running `dora build` again",
1205 node.id
1206 )
1207 }
1208 let node_working_dir = configured_node_working_dir
1209 .or_else(|| {
1210 node.deploy
1211 .as_ref()
1212 .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1213 })
1214 .unwrap_or(base_working_dir.clone())
1215 .clone();
1216 let node_write_events_to = write_events_to
1217 .as_ref()
1218 .map(|p| p.join(format!("inputs-{}.json", node.id)));
1219 match spawner
1220 .clone()
1221 .spawn_node(
1222 node,
1223 node_working_dir,
1224 node_stderr_most_recent,
1225 node_write_events_to,
1226 &mut logger,
1227 )
1228 .await
1229 .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1230 {
1231 Ok(result) => {
1232 tasks.push(NodeBuildTask {
1233 node_id,
1234 task: result,
1235 dynamic_node,
1236 });
1237 }
1238 Err(err) => {
1239 logger
1240 .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1241 .await;
1242 self.state
1243 .dataflow_node_results
1244 .entry(dataflow_id)
1245 .or_default()
1246 .insert(
1247 node_id.clone(),
1248 Err(NodeError {
1249 timestamp: self.state.clock.new_timestamp(),
1250 cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1251 exit_status: NodeExitStatus::Unknown,
1252 }),
1253 );
1254 stopped.push((node_id.clone(), dynamic_node));
1255 }
1256 }
1257 } else {
1258 dataflow.pending_nodes.set_external_nodes(true);
1260
1261 for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1263 let tx = self
1264 .state
1265 .remote_daemon_events_tx
1266 .clone()
1267 .wrap_err("no remote_daemon_events_tx channel")?;
1268 let mut finished_rx = dataflow.finished_tx.subscribe();
1269 let subscribe_topic =
1270 zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1271 tracing::debug!("declaring subscriber on {subscribe_topic}");
1272 let zenoh = self
1273 .state
1274 .zenoh_session
1275 .as_ref()
1276 .wrap_err("no zenoh session")?;
1277 let subscriber = zenoh
1278 .declare_subscriber(subscribe_topic)
1279 .await
1280 .map_err(|e| eyre!(e))
1281 .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1282 tokio::spawn(async move {
1283 let mut finished = pin!(finished_rx.recv());
1284 loop {
1285 let finished_or_next =
1286 futures::future::select(finished, subscriber.recv_async());
1287 match finished_or_next.await {
1288 future::Either::Left((finished, _)) => match finished {
1289 Err(broadcast::error::RecvError::Closed) => {
1290 tracing::debug!(
1291 "dataflow finished, breaking from zenoh subscribe task"
1292 );
1293 break;
1294 }
1295 other => {
1296 tracing::warn!(
1297 "unexpected return value of dataflow finished_rx channel: {other:?}"
1298 );
1299 break;
1300 }
1301 },
1302 future::Either::Right((sample, f)) => {
1303 finished = f;
1304 let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1305 Timestamped::deserialize_inter_daemon_event(
1306 &s.payload().to_bytes(),
1307 )
1308 });
1309 if tx.send_async(event).await.is_err() {
1310 break;
1312 }
1313 }
1314 }
1315 }
1316 });
1317 }
1318 }
1319 }
1320 drop(dataflow);
1321 for (node_id, dynamic) in stopped {
1322 self.handle_node_stop(dataflow_id, &node_id, dynamic)
1323 .await?;
1324 }
1325
1326 let spawn_result = Self::spawn_prepared_nodes(
1327 dataflow_id,
1328 logger,
1329 tasks,
1330 self.state.events_tx.clone(),
1331 self.state.clock.clone(),
1332 );
1333
1334 Ok(spawn_result)
1335 }
1336
1337 async fn spawn_prepared_nodes(
1338 dataflow_id: Uuid,
1339 mut logger: DataflowLogger<'_>,
1340 tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1341 events_tx: mpsc::Sender<Timestamped<Event>>,
1342 clock: Arc<HLC>,
1343 ) -> eyre::Result<()> {
1344 let node_result = |node_id, dynamic_node, result| Timestamped {
1345 inner: Event::SpawnNodeResult {
1346 dataflow_id,
1347 node_id,
1348 dynamic_node,
1349 result,
1350 },
1351 timestamp: clock.new_timestamp(),
1352 };
1353 let mut failed_to_prepare = None;
1354 let mut prepared_nodes = Vec::new();
1355 for task in tasks {
1356 let NodeBuildTask {
1357 node_id,
1358 dynamic_node,
1359 task,
1360 } = task;
1361 match task.await {
1362 Ok(node) => prepared_nodes.push(node),
1363 Err(err) => {
1364 if failed_to_prepare.is_none() {
1365 failed_to_prepare = Some(node_id.clone());
1366 }
1367 let node_err: NodeError = NodeError {
1368 timestamp: clock.new_timestamp(),
1369 cause: NodeErrorCause::FailedToSpawn(format!(
1370 "preparing for spawn failed: {err:?}"
1371 )),
1372 exit_status: NodeExitStatus::Unknown,
1373 };
1374 let send_result = events_tx
1375 .send(node_result(node_id, dynamic_node, Err(node_err)))
1376 .await;
1377 if send_result.is_err() {
1378 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1379 }
1380 }
1381 }
1382 }
1383
1384 if let Some(failed_node) = failed_to_prepare {
1386 for node in prepared_nodes {
1388 let err = NodeError {
1389 timestamp: clock.new_timestamp(),
1390 cause: NodeErrorCause::Cascading {
1391 caused_by_node: failed_node.clone(),
1392 },
1393 exit_status: NodeExitStatus::Unknown,
1394 };
1395 let send_result = events_tx
1396 .send(node_result(
1397 node.node_id().clone(),
1398 node.dynamic(),
1399 Err(err),
1400 ))
1401 .await;
1402 if send_result.is_err() {
1403 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1404 }
1405 }
1406 Err(eyre!("failed to prepare node {failed_node}"))
1407 } else {
1408 let mut spawn_result = Ok(());
1409
1410 logger
1411 .log(
1412 LogLevel::Info,
1413 None,
1414 Some("dora daemon".into()),
1415 "finished building nodes, spawning...",
1416 )
1417 .await;
1418
1419 for node in prepared_nodes {
1421 let node_id = node.node_id().clone();
1422 let dynamic_node = node.dynamic();
1423 let logger = logger
1424 .reborrow()
1425 .for_node(node_id.clone())
1426 .try_clone()
1427 .await
1428 .context("failed to clone NodeLogger")?;
1429 let result = node.spawn(logger).await;
1430 let node_spawn_result = match result {
1431 Ok(node) => Ok(node),
1432 Err(err) => {
1433 let node_err = NodeError {
1434 timestamp: clock.new_timestamp(),
1435 cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1436 exit_status: NodeExitStatus::Unknown,
1437 };
1438 if spawn_result.is_ok() {
1439 spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1440 }
1441 Err(node_err)
1442 }
1443 };
1444 let send_result = events_tx
1445 .send(node_result(node_id, dynamic_node, node_spawn_result))
1446 .await;
1447 if send_result.is_err() {
1448 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1449 }
1450 }
1451 spawn_result
1452 }
1453 }
1454
1455 async fn handle_dynamic_node_event(
1456 &mut self,
1457 event: DynamicNodeEventWrapper,
1458 ) -> eyre::Result<()> {
1459 match event {
1460 DynamicNodeEventWrapper {
1461 event: DynamicNodeEvent::NodeConfig { node_id },
1462 reply_tx,
1463 } => {
1464 let number_node_id = self
1465 .state
1466 .running
1467 .iter()
1468 .filter(|entry| entry.value().running_nodes.contains_key(&node_id))
1469 .count();
1470
1471 let node_config = match number_node_id {
1472 2.. => Err(format!(
1473 "multiple dataflows contain dynamic node id {node_id}. \
1474 Please only have one running dataflow with the specified \
1475 node id if you want to use dynamic node",
1476 )),
1477 1 => self
1478 .state
1479 .running
1480 .iter()
1481 .filter(|entry| entry.value().running_nodes.contains_key(&node_id))
1482 .map(|entry| -> Result<NodeConfig> {
1483 let (id, dataflow) = (entry.key(), entry.value());
1484 let node_config = dataflow
1485 .running_nodes
1486 .get(&node_id)
1487 .with_context(|| {
1488 format!("no node with ID `{node_id}` within the given dataflow")
1489 })?
1490 .node_config
1491 .clone();
1492 if !node_config.dynamic {
1493 bail!("node with ID `{node_id}` in {id} is not dynamic");
1494 }
1495 Ok(node_config)
1496 })
1497 .next()
1498 .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1499 .and_then(|r| r)
1500 .map_err(|err| {
1501 format!(
1502 "failed to get dynamic node config within given dataflow: {err}"
1503 )
1504 }),
1505 0 => Err(format!("no node with ID `{node_id}`")),
1506 };
1507
1508 let reply = DaemonReply::NodeConfig {
1509 result: node_config,
1510 };
1511 let _ = reply_tx.send(Some(reply)).map_err(|_| {
1512 error!("could not send node info reply from daemon to coordinator")
1513 });
1514 Ok(())
1515 }
1516 }
1517 }
1518
1519 async fn handle_node_event(
1520 &mut self,
1521 event: DaemonNodeEvent,
1522 dataflow_id: DataflowId,
1523 node_id: NodeId,
1524 ) -> eyre::Result<()> {
1525 let might_restart = || {
1526 let dataflow = self.state.running.get(&dataflow_id)?;
1527 let node = dataflow.running_nodes.get(&node_id)?;
1528 Some(match node.restart_policy {
1529 RestartPolicy::Never => false,
1530 _ if node.restarts_disabled() => false,
1531 RestartPolicy::OnFailure | RestartPolicy::Always => true,
1532 })
1533 };
1534 match event {
1535 DaemonNodeEvent::Subscribe {
1536 event_sender,
1537 reply_sender,
1538 } => {
1539 let mut logger = self.logger.for_dataflow(dataflow_id);
1540 logger
1541 .log(
1542 LogLevel::Info,
1543 Some(node_id.clone()),
1544 Some("daemon".into()),
1545 "node is ready",
1546 )
1547 .await;
1548
1549 let dataflow = self.state.running.get_mut(&dataflow_id).ok_or_else(|| {
1550 format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1551 });
1552
1553 match dataflow {
1554 Err(err) => {
1555 let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1556 }
1557 Ok(mut dataflow) => {
1558 Self::subscribe(
1559 &mut *dataflow,
1560 node_id.clone(),
1561 event_sender,
1562 &self.state.clock,
1563 )
1564 .await;
1565
1566 let df = &mut *dataflow;
1567 let status = df
1568 .pending_nodes
1569 .handle_node_subscription(
1570 node_id.clone(),
1571 reply_sender,
1572 &self.state.coordinator_client().cloned(),
1573 &self.state.clock,
1574 &mut df.cascading_error_causes,
1575 &mut logger,
1576 )
1577 .await?;
1578 match status {
1579 DataflowStatus::AllNodesReady if !dataflow.dataflow_started => {
1580 logger
1581 .log(
1582 LogLevel::Info,
1583 None,
1584 Some("daemon".into()),
1585 "all nodes are ready, starting dataflow",
1586 )
1587 .await;
1588 dataflow
1589 .start(&self.state.events_tx, &self.state.clock)
1590 .await?;
1591 dataflow.dataflow_started = true;
1592 }
1593 _ => {}
1594 }
1595 }
1596 }
1597 }
1598 DaemonNodeEvent::SubscribeDrop {
1599 event_sender,
1600 reply_sender,
1601 } => {
1602 let dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1603 format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1604 });
1605 let result = match dataflow {
1606 Ok(mut dataflow) => {
1607 dataflow.drop_channels.insert(node_id, event_sender);
1608 Ok(())
1609 }
1610 Err(err) => Err(err.to_string()),
1611 };
1612 let _ = reply_sender.send(DaemonReply::Result(result));
1613 }
1614 DaemonNodeEvent::CloseOutputs {
1615 outputs,
1616 reply_sender,
1617 } => {
1618 let reply = if might_restart().unwrap_or(false) {
1619 self.logger
1620 .for_dataflow(dataflow_id)
1621 .for_node(node_id.clone())
1622 .log(
1623 LogLevel::Debug,
1624 Some("daemon".into()),
1625 "skipping CloseOutputs because node might restart",
1626 )
1627 .await;
1628 Ok(())
1629 } else {
1630 let inner = async {
1632 self.send_output_closed_events(dataflow_id, node_id, outputs)
1633 .await
1634 };
1635
1636 inner.await.map_err(|err| format!("{err:?}"))
1637 };
1638 let _ = reply_sender.send(DaemonReply::Result(reply));
1639 }
1640 DaemonNodeEvent::OutputsDone { reply_sender } => {
1641 let result = self
1642 .handle_outputs_done(dataflow_id, &node_id, might_restart().unwrap_or(false))
1643 .await;
1644
1645 let _ = reply_sender.send(DaemonReply::Result(
1646 result.map_err(|err| format!("{err:?}")),
1647 ));
1648 }
1649 DaemonNodeEvent::SendOut {
1650 output_id,
1651 metadata,
1652 data,
1653 } => self
1654 .send_out(dataflow_id, node_id, output_id, metadata, data)
1655 .await
1656 .context("failed to send out")?,
1657 DaemonNodeEvent::ReportDrop { tokens } => {
1658 let dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1659 format!(
1660 "failed to get handle drop tokens: \
1661 no running dataflow with ID `{dataflow_id}`"
1662 )
1663 });
1664
1665 match dataflow {
1666 Ok(mut dataflow) => {
1667 for token in tokens {
1668 match dataflow.pending_drop_tokens.get_mut(&token) {
1669 Some(info) => {
1670 if info.pending_nodes.remove(&node_id) {
1671 dataflow.check_drop_token(token, &self.state.clock).await?;
1672 } else {
1673 tracing::warn!(
1674 "node `{node_id}` is not pending for drop token `{token:?}`"
1675 );
1676 }
1677 }
1678 None => tracing::warn!("unknown drop token `{token:?}`"),
1679 }
1680 }
1681 }
1682 Err(err) => tracing::warn!("{err:?}"),
1683 }
1684 }
1685 DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1686 let reply = match self.state.running.get_mut(&dataflow_id) {
1687 Some(mut dataflow) => {
1688 dataflow.subscribe_channels.remove(&node_id);
1689 Ok(())
1690 }
1691 None => Err(format!("no running dataflow with ID `{dataflow_id}`")),
1692 };
1693 let _ = reply_sender.send(DaemonReply::Result(reply));
1694 }
1695 }
1696 Ok(())
1697 }
1698
1699 async fn send_reload(
1700 &mut self,
1701 dataflow_id: Uuid,
1702 node_id: NodeId,
1703 operator_id: Option<OperatorId>,
1704 ) -> Result<(), eyre::ErrReport> {
1705 let mut dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1706 format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1707 })?;
1708 if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1709 match send_with_timestamp(
1710 channel,
1711 NodeEvent::Reload { operator_id },
1712 &self.state.clock,
1713 ) {
1714 Ok(()) => {}
1715 Err(_) => {
1716 dataflow.subscribe_channels.remove(&node_id);
1717 }
1718 }
1719 }
1720 Ok(())
1721 }
1722
1723 async fn send_out(
1724 &mut self,
1725 dataflow_id: Uuid,
1726 node_id: NodeId,
1727 output_id: DataId,
1728 metadata: dora_message::metadata::Metadata,
1729 data: Option<DataMessage>,
1730 ) -> Result<(), eyre::ErrReport> {
1731 let mut dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1732 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1733 })?;
1734 let data_bytes = send_output_to_local_receivers(
1735 node_id.clone(),
1736 output_id.clone(),
1737 &mut *dataflow,
1738 &metadata,
1739 data,
1740 &self.state.clock,
1741 )
1742 .await?;
1743
1744 let output_id = OutputId(node_id, output_id);
1745 let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1746 || dataflow.publish_all_messages_to_zenoh;
1747 drop(dataflow);
1748 if remote_receivers {
1749 let event = InterDaemonEvent::Output {
1750 dataflow_id,
1751 node_id: output_id.0.clone(),
1752 output_id: output_id.1.clone(),
1753 metadata,
1754 data: data_bytes,
1755 };
1756 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1757 .await?;
1758 }
1759
1760 Ok(())
1761 }
1762
1763 fn find_and_notify_local_receivers(
1767 dataflow: &mut RunningDataflow,
1768 source_node_id: &NodeId,
1769 error_message: &str,
1770 clock: &HLC,
1771 ) -> (Vec<DataId>, BTreeSet<NodeId>) {
1772 let outputs: Vec<DataId> = dataflow
1774 .mappings
1775 .keys()
1776 .filter(|m| &m.0 == source_node_id)
1777 .map(|m| m.1.clone())
1778 .collect();
1779
1780 let mut affected_by_receiver: BTreeMap<NodeId, Vec<DataId>> = BTreeMap::new();
1782
1783 for output_id in &outputs {
1784 let output_key = OutputId(source_node_id.clone(), output_id.clone());
1785 if let Some(receivers) = dataflow.mappings.get(&output_key) {
1786 for (receiver_node_id, input_id) in receivers {
1787 affected_by_receiver
1788 .entry(receiver_node_id.clone())
1789 .or_default()
1790 .push(input_id.clone());
1791 }
1792 }
1793 }
1794
1795 let remote_receivers: BTreeSet<NodeId> = affected_by_receiver
1797 .keys()
1798 .filter(|receiver_node_id| !dataflow.subscribe_channels.contains_key(receiver_node_id))
1799 .cloned()
1800 .collect();
1801
1802 for (receiver_node_id, affected_input_ids) in affected_by_receiver {
1804 if let Some(channel) = dataflow.subscribe_channels.get(&receiver_node_id) {
1805 let event = NodeEvent::NodeFailed {
1807 affected_input_ids,
1808 error: error_message.to_string(),
1809 source_node_id: source_node_id.clone(),
1810 };
1811 let _ = send_with_timestamp(channel, event, clock);
1812 }
1813 }
1814
1815 (outputs, remote_receivers)
1816 }
1817
1818 async fn send_node_failed_events(
1825 &mut self,
1826 dataflow_id: Uuid,
1827 source_node_id: NodeId,
1828 error_message: String,
1829 ) -> Result<(), eyre::ErrReport> {
1830 let mut dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1831 format!("propagate error failed: no running dataflow with ID `{dataflow_id}`")
1832 })?;
1833
1834 tracing::warn!(
1835 "Node {}/{} failed: {}. Propagating error to downstream nodes.",
1836 dataflow_id,
1837 source_node_id,
1838 error_message
1839 );
1840
1841 let (outputs, remote_receivers) = Self::find_and_notify_local_receivers(
1842 &mut dataflow,
1843 &source_node_id,
1844 &error_message,
1845 &self.state.clock,
1846 );
1847
1848 drop(dataflow);
1850
1851 if !remote_receivers.is_empty() {
1854 if outputs.is_empty() {
1856 return Ok(());
1857 }
1858
1859 for output_id in &outputs {
1862 let output_key = OutputId(source_node_id.clone(), output_id.clone());
1863 let event = InterDaemonEvent::NodeFailed {
1864 dataflow_id,
1865 source_node_id: source_node_id.clone(),
1866 error: error_message.clone(),
1867 };
1868 if let Err(err) = self
1869 .send_to_remote_receivers(dataflow_id, &output_key, event)
1870 .await
1871 {
1872 tracing::warn!(
1873 "Failed to send error event to remote receivers via output {}: {err:?}",
1874 output_id
1875 );
1876 }
1877 }
1878 }
1879
1880 Ok(())
1881 }
1882
1883 async fn send_to_remote_receivers(
1884 &mut self,
1885 dataflow_id: Uuid,
1886 output_id: &OutputId,
1887 event: InterDaemonEvent,
1888 ) -> Result<(), eyre::Error> {
1889 {
1892 let has_publisher = self
1893 .state
1894 .running
1895 .get(&dataflow_id)
1896 .map(|d| d.publishers.contains_key(output_id))
1897 .unwrap_or(false);
1898 if !has_publisher {
1899 let publish_topic = {
1900 let dataflow = self.state.running.get(&dataflow_id).wrap_err_with(|| {
1901 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1902 })?;
1903 zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1)
1904 };
1905 tracing::debug!("declaring publisher on {publish_topic}");
1907 let zenoh = self
1908 .state
1909 .zenoh_session
1910 .as_ref()
1911 .wrap_err("no zenoh session")?;
1912 let publisher = zenoh
1913 .declare_publisher(publish_topic)
1914 .await
1915 .map_err(|e| eyre!(e))
1916 .context("failed to create zenoh publisher")?;
1917 let mut dataflow =
1919 self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
1920 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1921 })?;
1922 dataflow.publishers.insert(output_id.clone(), publisher);
1923 }
1924 }
1925
1926 let dataflow = self.state.running.get(&dataflow_id).wrap_err_with(|| {
1929 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1930 })?;
1931 let publisher = dataflow
1932 .publishers
1933 .get(output_id)
1934 .wrap_err("publisher disappeared")?;
1935 let serialized_event = Timestamped {
1936 inner: event,
1937 timestamp: self.state.clock.new_timestamp(),
1938 }
1939 .serialize();
1940 publisher
1941 .put(serialized_event)
1942 .await
1943 .map_err(|e| eyre!(e))
1944 .context("zenoh put failed")?;
1945 Ok(())
1946 }
1947
1948 async fn send_output_closed_events(
1949 &mut self,
1950 dataflow_id: DataflowId,
1951 node_id: NodeId,
1952 outputs: Vec<DataId>,
1953 ) -> eyre::Result<()> {
1954 let (local_node_inputs, closed) = {
1955 let mut dataflow = self
1956 .state
1957 .running
1958 .get_mut(&dataflow_id)
1959 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1960 let local_node_inputs: BTreeSet<_> = dataflow
1961 .mappings
1962 .iter()
1963 .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1964 .flat_map(|(_, v)| v)
1965 .cloned()
1966 .collect();
1967 for (receiver_id, input_id) in &local_node_inputs {
1968 close_input(&mut *dataflow, receiver_id, input_id, &self.state.clock);
1969 }
1970
1971 let mut closed = Vec::new();
1972 for output_id in &dataflow.open_external_mappings {
1973 if output_id.0 == node_id && outputs.contains(&output_id.1) {
1974 closed.push(output_id.clone());
1975 }
1976 }
1977 (local_node_inputs, closed)
1978 };
1979
1980 for output_id in closed {
1981 let event = InterDaemonEvent::OutputClosed {
1982 dataflow_id,
1983 node_id: output_id.0.clone(),
1984 output_id: output_id.1.clone(),
1985 };
1986 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1987 .await?;
1988 }
1989
1990 Ok(())
1991 }
1992
1993 async fn subscribe(
1994 dataflow: &mut RunningDataflow,
1995 node_id: NodeId,
1996 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1997 clock: &HLC,
1998 ) {
1999 let closed_inputs = dataflow
2001 .mappings
2002 .values()
2003 .flatten()
2004 .filter(|(node, _)| node == &node_id)
2005 .map(|(_, input)| input)
2006 .filter(|input| {
2007 dataflow
2008 .open_inputs
2009 .get(&node_id)
2010 .map(|open_inputs| !open_inputs.contains(*input))
2011 .unwrap_or(true)
2012 });
2013 for input_id in closed_inputs {
2014 let _ = send_with_timestamp(
2015 &event_sender,
2016 NodeEvent::InputClosed {
2017 id: input_id.clone(),
2018 },
2019 clock,
2020 );
2021 }
2022 if dataflow.open_inputs(&node_id).is_empty() {
2023 if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
2024 node.disable_restart();
2025 }
2026 if let Some(node) = dataflow.descriptor.nodes.iter().find(|n| n.id == node_id) {
2027 if node.inputs.is_empty() {
2028 } else {
2030 let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
2031 }
2032 }
2033 }
2034
2035 if dataflow.stop_sent {
2038 if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
2039 node.disable_restart();
2040 }
2041 let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
2042 }
2043
2044 dataflow.subscribe_channels.insert(node_id, event_sender);
2045 }
2046
2047 #[tracing::instrument(skip(self), level = "trace")]
2048 async fn handle_outputs_done(
2049 &mut self,
2050 dataflow_id: DataflowId,
2051 node_id: &NodeId,
2052 might_restart: bool,
2053 ) -> eyre::Result<()> {
2054 let outputs = {
2055 let dataflow = self
2056 .state
2057 .running
2058 .get(&dataflow_id)
2059 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
2060 dataflow
2061 .mappings
2062 .keys()
2063 .filter(|m| &m.0 == node_id)
2064 .map(|m| &m.1)
2065 .cloned()
2066 .collect()
2067 };
2068
2069 if might_restart {
2070 self.logger
2071 .for_dataflow(dataflow_id)
2072 .for_node(node_id.clone())
2073 .log(
2074 LogLevel::Debug,
2075 Some("daemon".into()),
2076 "keeping outputs open because node might restart",
2077 )
2078 .await;
2079 } else {
2080 self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
2081 .await?;
2082 }
2083
2084 if let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) {
2085 dataflow.drop_channels.remove(node_id);
2086 }
2087 Ok(())
2088 }
2089
2090 async fn handle_node_stop(
2091 &mut self,
2092 dataflow_id: Uuid,
2093 node_id: &NodeId,
2094 dynamic_node: bool,
2095 ) -> eyre::Result<()> {
2096 let result = self
2097 .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
2098 .await;
2099
2100 if let Err(err) = self
2103 .state
2104 .events_tx
2105 .send(Timestamped {
2106 inner: Event::NodeStopped {
2107 dataflow_id,
2108 node_id: node_id.clone(),
2109 },
2110 timestamp: self.state.clock.new_timestamp(),
2111 })
2112 .await
2113 {
2114 tracing::error!(
2115 "Failed to send NodeStopped event for {}/{}: {err:?}. Daemon may not exit properly.",
2116 dataflow_id,
2117 node_id
2118 );
2119 }
2120
2121 result
2122 }
2123
2124 async fn handle_node_stop_inner(
2125 &mut self,
2126 dataflow_id: Uuid,
2127 node_id: &NodeId,
2128 dynamic_node: bool,
2129 ) -> eyre::Result<()> {
2130 let mut logger = self.logger.for_dataflow(dataflow_id);
2131 let exited_before_subscribe = {
2132 let mut dataflow = match self.state.running.get_mut(&dataflow_id) {
2133 Some(dataflow) => dataflow,
2134 None if dynamic_node => {
2135 tracing::debug!(
2138 "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
2139 );
2140 return Ok(());
2141 }
2142 None => eyre::bail!(
2143 "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2144 ),
2145 };
2146
2147 let df = &mut *dataflow;
2148 df.pending_nodes
2149 .handle_node_stop_sync(node_id, &mut df.cascading_error_causes)
2150 };
2151 if exited_before_subscribe {
2153 logger
2154 .log(
2155 LogLevel::Warn,
2156 Some(node_id.clone()),
2157 Some("daemon::pending".into()),
2158 "node exited before initializing dora connection",
2159 )
2160 .await;
2161 }
2162 let needs_report = self
2166 .state
2167 .running
2168 .get_mut(&dataflow_id)
2169 .map(|mut dataflow| {
2170 let df = &mut *dataflow;
2171 df.pending_nodes
2172 .check_and_answer_subscribers(&mut df.cascading_error_causes)
2173 })
2174 .unwrap_or(false);
2175 if needs_report {
2176 if let Some(client) = self.state.coordinator_client() {
2178 if let Some(dataflow) = self.state.running.get(&dataflow_id) {
2179 let exited = dataflow.pending_nodes.exited_before_subscribe().to_vec();
2180 drop(dataflow);
2181 logger
2183 .log(
2184 LogLevel::Info,
2185 None,
2186 Some("daemon".into()),
2187 format!(
2188 "all local nodes are ready (exit before subscribe: {exited:?}), \
2189 waiting for remote nodes",
2190 ),
2191 )
2192 .await;
2193 let client = client.clone();
2194 let events_tx = self.state.events_tx.clone();
2195 let clock = self.state.clock.clone();
2196 tokio::spawn(async move {
2197 if let Err(err) = client
2198 .all_nodes_ready(tarpc::context::current(), dataflow_id, exited)
2199 .await
2200 {
2201 let _ = events_tx
2202 .send(Timestamped {
2203 inner: Event::DaemonError(
2204 eyre::eyre!(err)
2205 .wrap_err("failed to send AllNodesReady RPC"),
2206 ),
2207 timestamp: clock.new_timestamp(),
2208 })
2209 .await;
2210 }
2211 });
2212 }
2213 }
2214 }
2215
2216 let might_restart = false;
2218
2219 self.handle_outputs_done(dataflow_id, node_id, might_restart)
2220 .await?;
2221
2222 let should_finish = {
2223 let mut dataflow = self.state.running.get_mut(&dataflow_id).wrap_err_with(|| {
2224 format!(
2225 "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2226 )
2227 })?;
2228 dataflow.running_nodes.remove(node_id);
2229 !dataflow.pending_nodes.local_nodes_pending()
2231 && dataflow
2232 .running_nodes
2233 .iter()
2234 .all(|(_id, n)| n.node_config.dynamic)
2235 };
2236
2237 if should_finish {
2238 self.finish_dataflow(dataflow_id).await?;
2239 }
2240
2241 Ok(())
2242 }
2243
2244 async fn finish_dataflow(&mut self, dataflow_id: Uuid) -> eyre::Result<()> {
2249 let mut logger = self.logger.for_dataflow(dataflow_id);
2250
2251 let result = DataflowDaemonResult {
2254 timestamp: self.state.clock.new_timestamp(),
2255 node_results: self
2256 .state
2257 .dataflow_node_results
2258 .get(&dataflow_id)
2259 .map(|entry| entry.value().clone())
2260 .unwrap_or_default(),
2261 };
2262
2263 {
2264 let mut git_manager = self.state.git_manager.lock().await;
2265 git_manager
2266 .clones_in_use
2267 .values_mut()
2268 .for_each(|dataflows| {
2269 dataflows.remove(&dataflow_id);
2270 });
2271 }
2272
2273 logger
2274 .log(
2275 LogLevel::Info,
2276 None,
2277 Some("daemon".into()),
2278 format!("dataflow finished on machine `{}`", self.state.daemon_id()),
2279 )
2280 .await;
2281
2282 if let Some(client) = self.state.coordinator_client() {
2283 let client = client.clone();
2284 tokio::spawn(async move {
2285 if let Err(err) = client
2286 .all_nodes_finished(tarpc::context::current(), dataflow_id, result)
2287 .await
2288 {
2289 tracing::error!(
2290 ?err,
2291 "failed to send all_nodes_finished notification to coordinator"
2292 );
2293 }
2294 });
2295 }
2296 self.state.running.remove(&dataflow_id);
2297
2298 Ok(())
2299 }
2300
2301 async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
2302 match event {
2303 DoraEvent::Timer {
2304 dataflow_id,
2305 interval,
2306 metadata,
2307 } => {
2308 let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) else {
2309 tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
2310 return Ok(());
2311 };
2312
2313 let Some(subscribers) = dataflow.timers.get(&interval).cloned() else {
2314 return Ok(());
2315 };
2316
2317 let mut closed = Vec::new();
2318 for (receiver_id, input_id) in &subscribers {
2319 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2320 continue;
2321 };
2322
2323 let send_result = send_with_timestamp(
2324 channel,
2325 NodeEvent::Input {
2326 id: input_id.clone(),
2327 metadata: metadata.clone(),
2328 data: None,
2329 },
2330 &self.state.clock,
2331 );
2332 match send_result {
2333 Ok(()) => {}
2334 Err(_) => {
2335 closed.push(receiver_id.clone());
2336 }
2337 }
2338 }
2339 for id in &closed {
2340 dataflow.subscribe_channels.remove(id);
2341 }
2342 }
2343 DoraEvent::Logs {
2344 dataflow_id,
2345 output_id,
2346 message,
2347 metadata,
2348 } => {
2349 let Some(mut dataflow) = self.state.running.get_mut(&dataflow_id) else {
2350 tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2351 return Ok(());
2352 };
2353
2354 let Some(subscribers) = dataflow.mappings.get(&output_id).cloned() else {
2355 tracing::warn!(
2356 "No subscribers found for {:?} in {:?}",
2357 output_id,
2358 dataflow.mappings
2359 );
2360 return Ok(());
2361 };
2362
2363 let mut closed = Vec::new();
2364 for (receiver_id, input_id) in &subscribers {
2365 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2366 tracing::warn!("No subscriber channel found for {:?}", output_id);
2367 continue;
2368 };
2369
2370 let send_result = send_with_timestamp(
2371 channel,
2372 NodeEvent::Input {
2373 id: input_id.clone(),
2374 metadata: metadata.clone(),
2375 data: Some(message.clone()),
2376 },
2377 &self.state.clock,
2378 );
2379 match send_result {
2380 Ok(()) => {}
2381 Err(_) => {
2382 closed.push(receiver_id.clone());
2383 }
2384 }
2385 }
2386 for id in &closed {
2387 dataflow.subscribe_channels.remove(id);
2388 }
2389 }
2390 DoraEvent::SpawnedNodeResult {
2391 dataflow_id,
2392 node_id,
2393 dynamic_node,
2394 exit_status,
2395 restart,
2396 } => {
2397 let mut logger = self
2398 .logger
2399 .for_dataflow(dataflow_id)
2400 .for_node(node_id.clone());
2401 logger
2402 .log(
2403 LogLevel::Debug,
2404 Some("daemon".into()),
2405 format!("handling node stop with exit status {exit_status:?} (restart: {restart})"),
2406 )
2407 .await;
2408
2409 let node_result = match exit_status {
2410 NodeExitStatus::Success => Ok(()),
2411 exit_status => {
2412 let (caused_by_node, grace_duration_kill, stderr_lines) =
2414 if let Some(dataflow) = self.state.running.get(&dataflow_id) {
2415 let caused_by = dataflow
2416 .cascading_error_causes
2417 .error_caused_by(&node_id)
2418 .cloned();
2419
2420 let grace_kill = dataflow.grace_duration_kills.contains(&node_id);
2421 let stderr =
2422 dataflow.node_stderr_most_recent.get(&node_id).map(|queue| {
2423 let mut lines = Vec::new();
2424 if queue.is_full() {
2425 lines.push("[...]".into());
2426 }
2427 while let Some(line) = queue.pop() {
2428 lines.push(line);
2429 }
2430 lines
2431 });
2432 (caused_by, grace_kill, stderr)
2433 } else {
2434 (None, false, None)
2435 };
2436
2437 let cause = match caused_by_node {
2438 Some(caused_by_node) => {
2439 logger
2440 .log(
2441 LogLevel::Info,
2442 Some("daemon".into()),
2443 format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2444 )
2445 .await;
2446
2447 NodeErrorCause::Cascading { caused_by_node }
2448 }
2449 None if grace_duration_kill => NodeErrorCause::GraceDuration,
2450 None => {
2451 let cause = stderr_lines
2452 .map(extract_err_from_stderr)
2453 .unwrap_or_default();
2454
2455 NodeErrorCause::Other { stderr: cause }
2456 }
2457 };
2458 Err(NodeError {
2459 timestamp: self.state.clock.new_timestamp(),
2460 cause,
2461 exit_status,
2462 })
2463 }
2464 };
2465
2466 logger
2467 .log(
2468 if node_result.is_ok() {
2469 LogLevel::Info
2470 } else {
2471 LogLevel::Error
2472 },
2473 Some("daemon".into()),
2474 match &node_result {
2475 Ok(()) => format!("{node_id} finished successfully"),
2476 Err(err) => format!("{err}"),
2477 },
2478 )
2479 .await;
2480
2481 drop(logger);
2482
2483 if let Err(node_error) = &node_result {
2491 if matches!(node_error.cause, NodeErrorCause::Other { .. }) {
2492 let error_message = format!("{node_error}");
2493 if let Err(err) = self
2494 .send_node_failed_events(dataflow_id, node_id.clone(), error_message)
2495 .await
2496 {
2497 tracing::warn!(
2498 "Failed to propagate error for node {}/{}: {err:?}",
2499 dataflow_id,
2500 node_id
2501 );
2502 }
2503 }
2504 }
2505
2506 if restart {
2507 self.logger
2508 .for_dataflow(dataflow_id)
2509 .for_node(node_id.clone())
2510 .log(
2511 LogLevel::Info,
2512 Some("daemon".into()),
2513 "node will be restarted",
2514 )
2515 .await;
2516 } else {
2517 self.state
2518 .dataflow_node_results
2519 .entry(dataflow_id)
2520 .or_default()
2521 .insert(node_id.clone(), node_result);
2522
2523 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2524 .await?;
2525 }
2526 }
2527 }
2528 Ok(())
2529 }
2530
2531 fn base_working_dir(
2532 &self,
2533 local_working_dir: Option<PathBuf>,
2534 session_id: SessionId,
2535 ) -> eyre::Result<PathBuf> {
2536 Self::base_working_dir_static(local_working_dir, session_id)
2537 }
2538
2539 pub(crate) fn base_working_dir_static(
2541 local_working_dir: Option<PathBuf>,
2542 session_id: SessionId,
2543 ) -> eyre::Result<PathBuf> {
2544 match local_working_dir {
2545 Some(working_dir) => {
2546 if working_dir.exists() {
2548 Ok(working_dir)
2549 } else {
2550 bail!(
2551 "working directory does not exist: {}",
2552 working_dir.display(),
2553 )
2554 }
2555 }
2556 None => {
2557 let daemon_working_dir =
2559 current_dir().context("failed to get daemon working dir")?;
2560 Ok(daemon_working_dir
2561 .join("_work")
2562 .join(session_id.uuid().to_string()))
2563 }
2564 }
2565 }
2566
2567 #[allow(clippy::too_many_arguments)]
2570 pub(crate) async fn build_dataflow_static(
2571 state: &state::DaemonState,
2572 build_id: BuildId,
2573 session_id: SessionId,
2574 base_working_dir: PathBuf,
2575 git_sources: BTreeMap<NodeId, GitSource>,
2576 prev_git_sources: BTreeMap<NodeId, GitSource>,
2577 dataflow_descriptor: Descriptor,
2578 local_nodes: BTreeSet<NodeId>,
2579 uv: bool,
2580 ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
2581 let builder = build::Builder {
2582 session_id,
2583 base_working_dir,
2584 uv,
2585 };
2586 {
2587 let mut git_manager = state.git_manager.lock().await;
2588 git_manager.clear_planned_builds(session_id);
2589 }
2590
2591 let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
2592
2593 let mut tasks = Vec::new();
2594
2595 for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
2597 let node_id = node.id.clone();
2598 let git_source = git_sources.get(&node_id).cloned();
2599 let prev_git_source = prev_git_sources.get(&node_id).cloned();
2600 let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
2601 still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
2602 git_source: prev_source,
2603 });
2604
2605 let mut builder = builder.clone();
2606 if let Some(node_working_dir) =
2607 node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
2608 {
2609 builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
2610 }
2611
2612 let logger = build::TracingBuildLogger::new(build_id, node_id.clone());
2614
2615 let mut git_manager = state.git_manager.lock().await;
2616 match builder
2617 .build_node(node, git_source, prev_git, logger, &mut git_manager)
2618 .await
2619 .wrap_err_with(|| format!("failed to build node `{node_id}`"))
2620 {
2621 Ok(result) => {
2622 tasks.push(NodeBuildTask {
2623 node_id,
2624 task: result,
2625 dynamic_node: false,
2626 });
2627 }
2628 Err(err) => {
2629 tracing::error!("build failed for node {node_id}: {err:?}");
2630 return Err(err);
2631 }
2632 }
2633 }
2634
2635 let task = async move {
2636 let mut info = BuildInfo {
2637 node_working_dirs: Default::default(),
2638 };
2639 for task in tasks {
2640 let NodeBuildTask {
2641 node_id,
2642 dynamic_node: _,
2643 task,
2644 } = task;
2645 let node = task
2646 .await
2647 .with_context(|| format!("failed to build node `{node_id}`"))?;
2648 info.node_working_dirs
2649 .insert(node_id, node.node_working_dir);
2650 }
2651 Ok(info)
2652 };
2653
2654 Ok(task)
2655 }
2656}
2657
2658async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result<Vec<u8>> {
2659 let mut pos = file.seek(io::SeekFrom::End(0)).await?;
2660
2661 let mut output = VecDeque::<u8>::new();
2662 let mut extend_slice_to_start = |slice: &[u8]| {
2663 output.extend(slice);
2664 output.rotate_right(slice.len());
2665 };
2666
2667 let mut buffer = vec![0; 2048];
2668 let mut estimated_line_length = 0;
2669 let mut at_end = true;
2670 'main: while tail > 0 && pos > 0 {
2671 let new_pos = pos.saturating_sub(buffer.len() as u64);
2672 file.seek(io::SeekFrom::Start(new_pos)).await?;
2673 let read_len = (pos - new_pos) as usize;
2674 pos = new_pos;
2675
2676 file.read_exact(&mut buffer[..read_len]).await?;
2677 let read_buf = if at_end {
2678 at_end = false;
2679 &buffer[..read_len].trim_ascii_end()
2680 } else {
2681 &buffer[..read_len]
2682 };
2683
2684 let mut iter = memchr::memrchr_iter(b'\n', read_buf);
2685 let mut lines = 1;
2686 loop {
2687 let Some(pos) = iter.next() else {
2688 extend_slice_to_start(read_buf);
2689 break;
2690 };
2691 lines += 1;
2692 tail -= 1;
2693 if tail == 0 {
2694 extend_slice_to_start(&read_buf[(pos + 1)..]);
2695 break 'main;
2696 }
2697 }
2698
2699 estimated_line_length = estimated_line_length.max((read_buf.len() + 1).div_ceil(lines));
2700 let estimated_buffer_length = estimated_line_length * tail;
2701 if estimated_buffer_length >= buffer.len() * 2 {
2702 buffer.resize(buffer.len() * 2, 0);
2703 }
2704 }
2705
2706 Ok(output.into())
2707}
2708
2709async fn set_up_event_stream(
2713 coordinator_addr: SocketAddr,
2714 machine_id: &Option<String>,
2715 clock: &Arc<HLC>,
2716 state: Arc<state::DaemonState>,
2717 remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2718 local_listen_port: u16,
2720) -> eyre::Result<(
2721 (
2722 DaemonId,
2723 dora_message::daemon_to_coordinator::CoordinatorNotifyClient,
2724 ),
2725 impl Stream<Item = Timestamped<Event>> + Unpin,
2726)> {
2727 let clock_cloned = clock.clone();
2728 let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2729 Ok(e) => Timestamped {
2730 inner: Event::Daemon(e.inner),
2731 timestamp: e.timestamp,
2732 },
2733 Err(err) => Timestamped {
2734 inner: Event::DaemonError(err),
2735 timestamp: clock_cloned.new_timestamp(),
2736 },
2737 });
2738
2739 let register_result = coordinator::register(coordinator_addr, machine_id.clone(), clock, state)
2740 .await
2741 .wrap_err("failed to connect to dora-coordinator")?;
2742 let rpc_server_handle = register_result.rpc_server_handle;
2744 let daemon_id = register_result.daemon_id;
2745 let coordinator_client = register_result.coordinator_client;
2746 tokio::spawn(async move {
2747 if let Err(err) = rpc_server_handle.await {
2748 tracing::error!("coordinator RPC server task panicked: {err}");
2749 }
2750 });
2751
2752 let (events_tx, events_rx) = flume::bounded(10);
2753 let _listen_port =
2754 local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2755 .await?;
2756 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2757 inner: Event::DynamicNode(e.inner),
2758 timestamp: e.timestamp,
2759 });
2760 let incoming = (remote_daemon_events, dynamic_node_events).merge();
2761 Ok(((daemon_id, coordinator_client), incoming))
2762}
2763
2764async fn send_output_to_local_receivers(
2765 node_id: NodeId,
2766 output_id: DataId,
2767 dataflow: &mut RunningDataflow,
2768 metadata: &metadata::Metadata,
2769 data: Option<DataMessage>,
2770 clock: &HLC,
2771) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2772 let timestamp = metadata.timestamp();
2773 let empty_set = BTreeSet::new();
2774 let output_id = OutputId(node_id, output_id);
2775 let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2776 let OutputId(node_id, _) = output_id;
2777 let mut closed = Vec::new();
2778 for (receiver_id, input_id) in local_receivers {
2779 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2780 let item = NodeEvent::Input {
2781 id: input_id.clone(),
2782 metadata: metadata.clone(),
2783 data: data.clone(),
2784 };
2785 match channel.send(Timestamped {
2786 inner: item,
2787 timestamp,
2788 }) {
2789 Ok(()) => {
2790 if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2791 dataflow
2792 .pending_drop_tokens
2793 .entry(token)
2794 .or_insert_with(|| DropTokenInformation {
2795 owner: node_id.clone(),
2796 pending_nodes: Default::default(),
2797 })
2798 .pending_nodes
2799 .insert(receiver_id.clone());
2800 }
2801 }
2802 Err(_) => {
2803 closed.push(receiver_id);
2804 }
2805 }
2806 }
2807 }
2808 for id in closed {
2809 dataflow.subscribe_channels.remove(id);
2810 }
2811 let (data_bytes, drop_token) = match data {
2812 None => (None, None),
2813 Some(DataMessage::SharedMemory {
2814 shared_memory_id,
2815 len,
2816 drop_token,
2817 }) => {
2818 let memory = ShmemConf::new()
2819 .os_id(shared_memory_id)
2820 .open()
2821 .wrap_err("failed to map shared memory output")?;
2822 let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2823 (data, Some(drop_token))
2824 }
2825 Some(DataMessage::Vec(v)) => (Some(v), None),
2826 };
2827 if let Some(token) = drop_token {
2828 dataflow
2830 .pending_drop_tokens
2831 .entry(token)
2832 .or_insert_with(|| DropTokenInformation {
2833 owner: node_id.clone(),
2834 pending_nodes: Default::default(),
2835 });
2836 dataflow.check_drop_token(token, clock).await?;
2838 }
2839 Ok(data_bytes)
2840}
2841
2842fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2843 match &node.kind {
2844 CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2845 CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2846 }
2847}
2848
2849fn close_input(
2850 dataflow: &mut RunningDataflow,
2851 receiver_id: &NodeId,
2852 input_id: &DataId,
2853 clock: &HLC,
2854) {
2855 if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2856 if !open_inputs.remove(input_id) {
2857 return;
2858 }
2859 }
2860 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2861 let _ = send_with_timestamp(
2862 channel,
2863 NodeEvent::InputClosed {
2864 id: input_id.clone(),
2865 },
2866 clock,
2867 );
2868
2869 if dataflow.open_inputs(receiver_id).is_empty() {
2870 if let Some(node) = dataflow.running_nodes.get_mut(receiver_id) {
2871 node.disable_restart();
2872 }
2873 let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2874 }
2875 }
2876}
2877
2878#[derive(Debug)]
2879pub struct RunningNode {
2880 process: Option<ProcessHandle>,
2881 node_config: NodeConfig,
2882 pid: Option<Arc<AtomicU32>>,
2883 restart_policy: RestartPolicy,
2884 disable_restart: Arc<AtomicBool>,
2889 listener_abort_handle: Option<tokio::task::AbortHandle>,
2892}
2893
2894impl RunningNode {
2895 pub fn restarts_disabled(&self) -> bool {
2896 self.disable_restart.load(atomic::Ordering::Acquire)
2897 }
2898
2899 pub fn disable_restart(&mut self) {
2900 self.disable_restart.store(true, atomic::Ordering::Release);
2901 }
2902}
2903
2904#[derive(Debug)]
2905enum ProcessOperation {
2906 SoftKill,
2907 Kill,
2908}
2909
2910impl ProcessOperation {
2911 pub fn execute(&self, child: &mut dyn TokioChildWrapper) {
2912 match self {
2913 Self::SoftKill => {
2914 #[cfg(unix)]
2915 {
2916 if let Err(err) = child.signal(15) {
2918 warn!("failed to send SIGTERM to process {:?}: {err}", child.id());
2919 }
2920 }
2921
2922 #[cfg(windows)]
2923 unsafe {
2924 let Some(pid) = child.id() else {
2925 warn!("failed to get child process id");
2926 return;
2927 };
2928 if let Err(err) = windows::Win32::System::Console::GenerateConsoleCtrlEvent(
2929 windows::Win32::System::Console::CTRL_BREAK_EVENT,
2930 pid,
2931 ) {
2932 warn!("failed to send CTRL_BREAK_EVENT to process {pid}: {err}");
2933 }
2934 }
2935
2936 #[cfg(not(any(unix, windows)))]
2937 {
2938 warn!("killing process is not implemented on this platform");
2939 }
2940 }
2941 Self::Kill => {
2942 if let Err(err) = child.start_kill() {
2943 warn!("failed to kill child process: {err}");
2944 }
2945 }
2946 }
2947 }
2948}
2949
2950#[derive(Debug)]
2951struct ProcessHandle {
2952 op_tx: flume::Sender<ProcessOperation>,
2953}
2954
2955impl ProcessHandle {
2956 pub fn new(op_tx: flume::Sender<ProcessOperation>) -> Self {
2957 Self { op_tx }
2958 }
2959
2960 pub fn submit(&self, operation: ProcessOperation) -> bool {
2963 self.op_tx.send(operation).is_ok()
2964 }
2965}
2966
2967impl Drop for ProcessHandle {
2968 fn drop(&mut self) {
2969 if self.submit(ProcessOperation::Kill) {
2970 warn!("process was killed on drop because it was still running");
2971 }
2972 }
2973}
2974
2975struct ListenerTask(tokio::task::AbortHandle);
2977
2978impl Drop for ListenerTask {
2979 fn drop(&mut self) {
2980 self.0.abort();
2981 }
2982}
2983
2984pub struct RunningDataflow {
2985 id: Uuid,
2986
2987 descriptor: Descriptor,
2988
2989 pending_nodes: PendingNodes,
2991
2992 dataflow_started: bool,
2993
2994 subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2995 drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2996 mappings: HashMap<OutputId, BTreeSet<InputId>>,
2997 timers: BTreeMap<Duration, BTreeSet<InputId>>,
2998 open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2999 running_nodes: BTreeMap<NodeId, RunningNode>,
3000
3001 dynamic_nodes: BTreeSet<NodeId>,
3006
3007 open_external_mappings: BTreeSet<OutputId>,
3008
3009 pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
3010
3011 _timer_handles: BTreeMap<Duration, futures::future::RemoteHandle<()>>,
3013 _listener_tasks: Vec<ListenerTask>,
3016 stop_sent: bool,
3017
3018 cascading_error_causes: CascadingErrorCauses,
3022 grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
3023
3024 handled_node_failed: BTreeSet<NodeId>,
3030
3031 node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
3032
3033 publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
3034
3035 finished_tx: broadcast::Sender<()>,
3036
3037 publish_all_messages_to_zenoh: bool,
3038}
3039
3040#[must_use]
3043pub enum FinishDataflowWhen {
3044 Now,
3046 WaitForNodes,
3048}
3049
3050impl RunningDataflow {
3051 fn new(
3052 dataflow_id: Uuid,
3053 daemon_id: DaemonId,
3054 dataflow_descriptor: Descriptor,
3055 events_tx: tokio::sync::mpsc::Sender<Timestamped<Event>>,
3056 clock: Arc<HLC>,
3057 ) -> RunningDataflow {
3058 let (finished_tx, _) = broadcast::channel(1);
3059 Self {
3060 id: dataflow_id,
3061 pending_nodes: PendingNodes::new(dataflow_id, daemon_id, events_tx, clock),
3062 dataflow_started: false,
3063 subscribe_channels: HashMap::new(),
3064 drop_channels: HashMap::new(),
3065 mappings: HashMap::new(),
3066 timers: BTreeMap::new(),
3067 open_inputs: BTreeMap::new(),
3068 running_nodes: BTreeMap::new(),
3069 dynamic_nodes: BTreeSet::new(),
3070 open_external_mappings: Default::default(),
3071 pending_drop_tokens: HashMap::new(),
3072 _timer_handles: BTreeMap::new(),
3073 _listener_tasks: Vec::new(),
3074 stop_sent: false,
3075 cascading_error_causes: Default::default(),
3076 grace_duration_kills: Default::default(),
3077 handled_node_failed: BTreeSet::new(),
3078 node_stderr_most_recent: BTreeMap::new(),
3079 publishers: Default::default(),
3080 finished_tx,
3081 publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
3082 descriptor: dataflow_descriptor,
3083 }
3084 }
3085
3086 async fn start(
3087 &mut self,
3088 events_tx: &mpsc::Sender<Timestamped<Event>>,
3089 clock: &Arc<HLC>,
3090 ) -> eyre::Result<()> {
3091 for interval in self.timers.keys().copied() {
3092 if self._timer_handles.get(&interval).is_some() {
3093 continue;
3094 }
3095 let events_tx = events_tx.clone();
3096 let dataflow_id = self.id;
3097 let clock = clock.clone();
3098 let task = async move {
3099 let mut interval_stream = tokio::time::interval(interval);
3100 let hlc = HLC::default();
3101 loop {
3102 interval_stream.tick().await;
3103
3104 let span = tracing::span!(tracing::Level::TRACE, "tick");
3105 let _ = span.enter();
3106
3107 let mut parameters = BTreeMap::new();
3108 parameters.insert(
3109 "open_telemetry_context".to_string(),
3110 #[cfg(feature = "telemetry")]
3111 Parameter::String(serialize_context(&span.context())),
3112 #[cfg(not(feature = "telemetry"))]
3113 Parameter::String("".into()),
3114 );
3115
3116 let metadata = metadata::Metadata::from_parameters(
3117 hlc.new_timestamp(),
3118 empty_type_info(),
3119 parameters,
3120 );
3121
3122 let event = Timestamped {
3123 inner: DoraEvent::Timer {
3124 dataflow_id,
3125 interval,
3126 metadata,
3127 }
3128 .into(),
3129 timestamp: clock.new_timestamp(),
3130 };
3131 if events_tx.send(event).await.is_err() {
3132 break;
3133 }
3134 }
3135 };
3136 let (task, handle) = task.remote_handle();
3137 tokio::spawn(task);
3138 self._timer_handles.insert(interval, handle);
3139 }
3140
3141 Ok(())
3142 }
3143
3144 async fn stop_all(
3145 &mut self,
3146 state: &state::DaemonState,
3147 grace_duration: Option<Duration>,
3148 force: bool,
3149 logger: &mut DataflowLogger<'_>,
3150 ) -> eyre::Result<FinishDataflowWhen> {
3151 self.pending_nodes
3152 .handle_dataflow_stop(
3153 &state.coordinator_client().cloned(),
3154 &state.clock,
3155 &mut self.cascading_error_causes,
3156 &self.dynamic_nodes,
3157 logger,
3158 )
3159 .await?;
3160
3161 for node in self.running_nodes.values_mut() {
3162 node.disable_restart();
3163 }
3164
3165 for (_node_id, channel) in self.subscribe_channels.drain() {
3166 let _ = send_with_timestamp(&channel, NodeEvent::Stop, &state.clock);
3167 }
3168
3169 let running_processes: Vec<_> = self
3170 .running_nodes
3171 .iter_mut()
3172 .map(|(id, n)| (id.clone(), n.process.take()))
3173 .collect();
3174 if force {
3175 for (_, proc) in &running_processes {
3176 if let Some(proc) = proc {
3177 proc.submit(crate::ProcessOperation::Kill);
3178 }
3179 }
3180 } else {
3181 let grace_duration_kills = self.grace_duration_kills.clone();
3182 tokio::spawn(async move {
3183 let duration = grace_duration.unwrap_or(Duration::from_millis(10000));
3184 tokio::time::sleep(duration).await;
3185
3186 for (node, proc) in &running_processes {
3187 if let Some(proc) = proc {
3188 grace_duration_kills.insert(node.clone());
3189 proc.submit(crate::ProcessOperation::SoftKill);
3190 }
3191 }
3192
3193 let kill_duration = duration / 2;
3194 tokio::time::sleep(kill_duration).await;
3195
3196 for (node, proc) in &running_processes {
3197 if let Some(proc) = proc {
3198 grace_duration_kills.insert(node.clone());
3199 proc.submit(crate::ProcessOperation::Kill);
3200 warn!(
3201 "{node} was killed due to not stopping within the {:#?} grace period",
3202 duration + kill_duration
3203 );
3204 }
3205 }
3206 });
3207 }
3208 self.stop_sent = true;
3209
3210 Ok(self.should_finish_immediately())
3212 }
3213
3214 fn stop_all_after_pending(
3218 &mut self,
3219 state: &state::DaemonState,
3220 grace_duration: Option<Duration>,
3221 force: bool,
3222 ) -> eyre::Result<FinishDataflowWhen> {
3223 for node in self.running_nodes.values_mut() {
3224 node.disable_restart();
3225 }
3226
3227 for (_node_id, channel) in self.subscribe_channels.drain() {
3228 let _ = send_with_timestamp(&channel, NodeEvent::Stop, &state.clock);
3229 }
3230
3231 let running_processes: Vec<_> = self
3232 .running_nodes
3233 .iter_mut()
3234 .map(|(id, n)| (id.clone(), n.process.take()))
3235 .collect();
3236 if force {
3237 for (_, proc) in &running_processes {
3238 if let Some(proc) = proc {
3239 proc.submit(crate::ProcessOperation::Kill);
3240 }
3241 }
3242 } else {
3243 let grace_duration_kills = self.grace_duration_kills.clone();
3244 tokio::spawn(async move {
3245 let duration = grace_duration.unwrap_or(Duration::from_millis(10000));
3246 tokio::time::sleep(duration).await;
3247
3248 for (node, proc) in &running_processes {
3249 if let Some(proc) = proc {
3250 grace_duration_kills.insert(node.clone());
3251 proc.submit(crate::ProcessOperation::SoftKill);
3252 }
3253 }
3254
3255 let kill_duration = duration / 2;
3256 tokio::time::sleep(kill_duration).await;
3257
3258 for (node, proc) in &running_processes {
3259 if let Some(proc) = proc {
3260 grace_duration_kills.insert(node.clone());
3261 proc.submit(crate::ProcessOperation::Kill);
3262 warn!(
3263 "{node} was killed due to not stopping within the {:#?} grace period",
3264 duration + kill_duration
3265 );
3266 }
3267 }
3268 });
3269 }
3270 self.stop_sent = true;
3271
3272 Ok(self.should_finish_immediately())
3273 }
3274
3275 fn should_finish_immediately(&self) -> FinishDataflowWhen {
3279 if !self.pending_nodes.local_nodes_pending()
3284 && self
3285 .running_nodes
3286 .iter()
3287 .all(|(_id, n)| n.node_config.dynamic)
3288 && self.stop_sent
3289 {
3290 FinishDataflowWhen::Now
3291 } else {
3292 FinishDataflowWhen::WaitForNodes
3293 }
3294 }
3295
3296 fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
3297 self.open_inputs.get(node_id).unwrap_or(&EMPTY_SET)
3298 }
3299
3300 async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
3301 match self.pending_drop_tokens.entry(token) {
3302 std::collections::hash_map::Entry::Occupied(entry) => {
3303 if entry.get().pending_nodes.is_empty() {
3304 let (drop_token, info) = entry.remove_entry();
3305 let result = match self.drop_channels.get_mut(&info.owner) {
3306 Some(channel) => send_with_timestamp(
3307 channel,
3308 NodeDropEvent::OutputDropped { drop_token },
3309 clock,
3310 )
3311 .wrap_err("send failed"),
3312 None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
3313 };
3314 if let Err(err) = result.wrap_err_with(|| {
3315 format!(
3316 "failed to report drop token `{drop_token:?}` to owner `{}`",
3317 &info.owner
3318 )
3319 }) {
3320 tracing::warn!("{err:?}");
3321 }
3322 }
3323 }
3324 std::collections::hash_map::Entry::Vacant(_) => {
3325 tracing::warn!("check_drop_token called with already closed token")
3326 }
3327 }
3328
3329 Ok(())
3330 }
3331}
3332
3333fn empty_type_info() -> ArrowTypeInfo {
3334 ArrowTypeInfo {
3335 data_type: DataType::Null,
3336 len: 0,
3337 null_count: 0,
3338 validity: None,
3339 offset: 0,
3340 buffer_offsets: Vec::new(),
3341 child_data: Vec::new(),
3342 }
3343}
3344
3345#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
3346pub struct OutputId(NodeId, DataId);
3347type InputId = (NodeId, DataId);
3348
3349struct DropTokenInformation {
3350 owner: NodeId,
3352 pending_nodes: BTreeSet<NodeId>,
3355}
3356
3357#[derive(Debug)]
3358pub enum Event {
3359 Node {
3360 dataflow_id: DataflowId,
3361 node_id: NodeId,
3362 event: DaemonNodeEvent,
3363 },
3364 Daemon(InterDaemonEvent),
3365 Dora(DoraEvent),
3366 DynamicNode(DynamicNodeEventWrapper),
3367 HeartbeatInterval,
3368 MetricsInterval,
3369 CtrlC,
3370 StopAfter(Duration),
3371 SecondCtrlC,
3372 DaemonError(eyre::Report),
3373 Destroy,
3375 SpawnRequest {
3377 build_id: Option<BuildId>,
3378 dataflow_id: DataflowId,
3379 local_working_dir: Option<PathBuf>,
3380 nodes: BTreeMap<NodeId, ResolvedNode>,
3381 dataflow_descriptor: Descriptor,
3382 spawn_nodes: BTreeSet<NodeId>,
3383 uv: bool,
3384 write_events_to: Option<PathBuf>,
3385 reply_tx: oneshot::Sender<Result<(), String>>,
3386 },
3387 StopDataflowRequest {
3389 dataflow_id: DataflowId,
3390 grace_duration: Option<Duration>,
3391 force: bool,
3392 reply_tx: oneshot::Sender<Result<(), String>>,
3393 },
3394 SpawnNodeResult {
3395 dataflow_id: DataflowId,
3396 node_id: NodeId,
3397 dynamic_node: bool,
3398 result: Result<RunningNode, NodeError>,
3399 },
3400 SpawnDataflowResult {
3401 dataflow_id: Uuid,
3402 result: eyre::Result<()>,
3403 },
3404 NodeStopped {
3405 dataflow_id: Uuid,
3406 node_id: NodeId,
3407 },
3408}
3409
3410impl From<DoraEvent> for Event {
3411 fn from(event: DoraEvent) -> Self {
3412 Event::Dora(event)
3413 }
3414}
3415
3416impl Event {
3417 pub fn kind(&self) -> &'static str {
3418 match self {
3419 Event::Node { .. } => "Node",
3420 Event::Daemon(_) => "Daemon",
3421 Event::Dora(_) => "Dora",
3422 Event::DynamicNode(_) => "DynamicNode",
3423 Event::HeartbeatInterval => "HeartbeatInterval",
3424 Event::MetricsInterval => "MetricsInterval",
3425 Event::CtrlC => "CtrlC",
3426 Event::StopAfter(_) => "StopAfter",
3427 Event::SecondCtrlC => "SecondCtrlC",
3428 Event::DaemonError(_) => "DaemonError",
3429 Event::Destroy => "Destroy",
3430 Event::SpawnRequest { .. } => "SpawnRequest",
3431 Event::StopDataflowRequest { .. } => "StopDataflowRequest",
3432 Event::SpawnNodeResult { .. } => "SpawnNodeResult",
3433 Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
3434 Event::NodeStopped { .. } => "NodeStopped",
3435 }
3436 }
3437}
3438
3439#[derive(Debug)]
3440#[allow(clippy::large_enum_variant)]
3441pub enum DaemonNodeEvent {
3442 OutputsDone {
3443 reply_sender: oneshot::Sender<DaemonReply>,
3444 },
3445 Subscribe {
3446 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
3447 reply_sender: oneshot::Sender<DaemonReply>,
3448 },
3449 SubscribeDrop {
3450 event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
3451 reply_sender: oneshot::Sender<DaemonReply>,
3452 },
3453 CloseOutputs {
3454 outputs: Vec<dora_core::config::DataId>,
3455 reply_sender: oneshot::Sender<DaemonReply>,
3456 },
3457 SendOut {
3458 output_id: DataId,
3459 metadata: metadata::Metadata,
3460 data: Option<DataMessage>,
3461 },
3462 ReportDrop {
3463 tokens: Vec<DropToken>,
3464 },
3465 EventStreamDropped {
3466 reply_sender: oneshot::Sender<DaemonReply>,
3467 },
3468}
3469
3470#[derive(Debug)]
3471pub enum DoraEvent {
3472 Timer {
3473 dataflow_id: DataflowId,
3474 interval: Duration,
3475 metadata: metadata::Metadata,
3476 },
3477 Logs {
3478 dataflow_id: DataflowId,
3479 output_id: OutputId,
3480 message: DataMessage,
3481 metadata: metadata::Metadata,
3482 },
3483 SpawnedNodeResult {
3484 dataflow_id: DataflowId,
3485 node_id: NodeId,
3486 dynamic_node: bool,
3487 exit_status: NodeExitStatus,
3488 restart: bool,
3490 },
3491}
3492
3493#[must_use]
3494enum RunStatus {
3495 Continue,
3496 Exit,
3497}
3498
3499fn send_with_timestamp<T>(
3500 sender: &UnboundedSender<Timestamped<T>>,
3501 event: T,
3502 clock: &HLC,
3503) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
3504 sender.send(Timestamped {
3505 inner: event,
3506 timestamp: clock.new_timestamp(),
3507 })
3508}
3509
3510fn set_up_ctrlc_handler(
3511 clock: Arc<HLC>,
3512) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
3513 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
3514
3515 let mut ctrlc_sent = 0;
3516 ctrlc::set_handler(move || {
3517 let event = match ctrlc_sent {
3518 0 => Event::CtrlC,
3519 1 => Event::SecondCtrlC,
3520 _ => {
3521 tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
3522 std::process::abort();
3523 }
3524 };
3525 if ctrlc_tx
3526 .blocking_send(Timestamped {
3527 inner: event,
3528 timestamp: clock.new_timestamp(),
3529 })
3530 .is_err()
3531 {
3532 tracing::error!("failed to report ctrl-c event to dora-coordinator");
3533 }
3534
3535 ctrlc_sent += 1;
3536 })
3537 .wrap_err("failed to set ctrl-c handler")?;
3538
3539 Ok(ctrlc_rx)
3540}
3541
3542#[derive(Debug, Default, Clone, PartialEq, Eq)]
3543pub struct CascadingErrorCauses {
3544 caused_by: BTreeMap<NodeId, NodeId>,
3545}
3546
3547impl CascadingErrorCauses {
3548 pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
3549 self.caused_by.contains_key(node)
3550 }
3551
3552 pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
3554 self.caused_by.get(node)
3555 }
3556
3557 pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
3558 self.caused_by.entry(affected_node).or_insert(causing_node);
3559 }
3560}
3561
3562fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
3563 n.operators
3564 .iter()
3565 .flat_map(|operator| {
3566 operator.config.inputs.iter().map(|(input_id, mapping)| {
3567 (
3568 DataId::from(format!("{}/{input_id}", operator.id)),
3569 mapping.clone(),
3570 )
3571 })
3572 })
3573 .collect()
3574}
3575
3576fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
3577 n.operators
3578 .iter()
3579 .flat_map(|operator| {
3580 operator
3581 .config
3582 .outputs
3583 .iter()
3584 .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
3585 })
3586 .collect()
3587}
3588
3589trait CoreNodeKindExt {
3590 fn run_config(&self) -> NodeRunConfig;
3591 fn dynamic(&self) -> bool;
3592}
3593
3594impl CoreNodeKindExt for CoreNodeKind {
3595 fn run_config(&self) -> NodeRunConfig {
3596 match self {
3597 CoreNodeKind::Runtime(n) => NodeRunConfig {
3598 inputs: runtime_node_inputs(n),
3599 outputs: runtime_node_outputs(n),
3600 },
3601 CoreNodeKind::Custom(n) => n.run_config.clone(),
3602 }
3603 }
3604
3605 fn dynamic(&self) -> bool {
3606 match self {
3607 CoreNodeKind::Runtime(_n) => false,
3608 CoreNodeKind::Custom(n) => {
3609 matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
3610 }
3611 }
3612 }
3613}