1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5 build::{self, BuildInfo, GitManager, PrevGitSource},
6 config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
7 descriptor::{
8 CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
9 read_as_descriptor,
10 },
11 topics::{
12 DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST, open_zenoh_session,
13 zenoh_output_publish_topic,
14 },
15 uhlc::{self, HLC},
16};
17use dora_message::{
18 BuildId, DataflowId, SessionId,
19 common::{
20 DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
21 NodeExitStatus,
22 },
23 coordinator_to_cli::DataflowResult,
24 coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
25 daemon_to_coordinator::{
26 CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
27 },
28 daemon_to_daemon::InterDaemonEvent,
29 daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
30 descriptor::{NodeSource, RestartPolicy},
31 metadata::{self, ArrowTypeInfo},
32 node_to_daemon::{DynamicNodeEvent, Timestamped},
33};
34use dora_node_api::{Parameter, arrow::datatypes::DataType};
35use eyre::{Context, ContextCompat, Result, bail, eyre};
36use futures::{FutureExt, TryFutureExt, future, stream};
37use futures_concurrency::stream::Merge;
38use local_listener::DynamicNodeEventWrapper;
39use log::{DaemonLogger, DataflowLogger, Logger};
40use pending::PendingNodes;
41use process_wrap::tokio::TokioChildWrapper;
42use shared_memory_server::ShmemConf;
43use socket_stream_utils::socket_stream_send;
44use spawn::Spawner;
45use std::{
46 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
47 env::current_dir,
48 future::Future,
49 io,
50 net::SocketAddr,
51 path::{Path, PathBuf},
52 pin::pin,
53 sync::{
54 Arc,
55 atomic::{self, AtomicBool, AtomicU32},
56 },
57 time::{Duration, Instant},
58};
59use tokio::{
60 fs::File,
61 io::{AsyncReadExt, AsyncSeekExt},
62 net::TcpStream,
63 sync::{
64 broadcast,
65 mpsc::{self, UnboundedSender},
66 oneshot::{self, Sender},
67 },
68};
69use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
70use tracing::{error, warn};
71use uuid::{NoContext, Timestamp, Uuid};
72
73pub use flume;
74pub use log::LogDestination;
75
76mod coordinator;
77mod extract_err_from_stderr;
78mod local_listener;
79mod log;
80mod node_communication;
81mod pending;
82mod socket_stream_utils;
83mod spawn;
84
85#[cfg(feature = "telemetry")]
86use dora_tracing::telemetry::serialize_context;
87#[cfg(feature = "telemetry")]
88use tracing_opentelemetry::OpenTelemetrySpanExt;
89
90use crate::{extract_err_from_stderr::extract_err_from_stderr, pending::DataflowStatus};
91
92const STDERR_LOG_LINES_MAX: usize = 500;
93
94pub struct Daemon {
95 running: HashMap<DataflowId, RunningDataflow>,
96 working_dir: HashMap<DataflowId, PathBuf>,
97
98 events_tx: mpsc::Sender<Timestamped<Event>>,
99
100 coordinator_connection: Option<TcpStream>,
101 last_coordinator_heartbeat: Instant,
102 daemon_id: DaemonId,
103
104 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
106 exit_when_all_finished: bool,
108 dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
110
111 clock: Arc<uhlc::HLC>,
112
113 zenoh_session: zenoh::Session,
114 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
115
116 logger: DaemonLogger,
117
118 sessions: BTreeMap<SessionId, BuildId>,
119 builds: BTreeMap<BuildId, BuildInfo>,
120 git_manager: GitManager,
121 metrics_system: sysinfo::System,
123}
124
125type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
126
127struct NodeBuildTask<F> {
128 node_id: NodeId,
129 dynamic_node: bool,
130 task: F,
131}
132
133impl Daemon {
134 pub async fn run(
135 coordinator_addr: SocketAddr,
136 machine_id: Option<String>,
137 local_listen_port: u16,
138 ) -> eyre::Result<()> {
139 let clock = Arc::new(HLC::default());
140
141 let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
142 let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
143 let (daemon_id, incoming_events) = {
144 let incoming_events = set_up_event_stream(
145 coordinator_addr,
146 &machine_id,
147 &clock,
148 remote_daemon_events_rx,
149 local_listen_port,
150 );
151
152 let ctrl_c = pin!(ctrlc_events.recv());
154 match futures::future::select(ctrl_c, pin!(incoming_events)).await {
155 future::Either::Left((_ctrl_c, _)) => {
156 tracing::info!("received ctrl-c signal -> stopping daemon");
157 return Ok(());
158 }
159 future::Either::Right((events, _)) => events?,
160 }
161 };
162
163 let log_destination = {
164 let stream = TcpStream::connect(coordinator_addr)
166 .await
167 .wrap_err("failed to connect log to dora-coordinator")?;
168 stream
169 .set_nodelay(true)
170 .wrap_err("failed to set TCP_NODELAY")?;
171 LogDestination::Coordinator {
172 coordinator_connection: stream,
173 }
174 };
175
176 Self::run_general(
177 (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
178 Some(coordinator_addr),
179 daemon_id,
180 None,
181 clock.clone(),
182 Some(remote_daemon_events_tx),
183 Default::default(),
184 log_destination,
185 )
186 .await
187 .map(|_| ())
188 }
189
190 pub async fn run_dataflow(
191 dataflow_path: &Path,
192 build_id: Option<BuildId>,
193 local_build: Option<BuildInfo>,
194 session_id: SessionId,
195 uv: bool,
196 log_destination: LogDestination,
197 write_events_to: Option<PathBuf>,
198 ) -> eyre::Result<DataflowResult> {
199 let working_dir = dataflow_path
200 .canonicalize()
201 .context("failed to canonicalize dataflow path")?
202 .parent()
203 .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
204 .to_owned();
205
206 let descriptor = read_as_descriptor(dataflow_path).await?;
207 if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
208 eyre::bail!(
209 "node {} has a `deploy` section, which is not supported in `dora run`\n\n
210 Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
211 instances and then use `dora start`.",
212 node.id
213 )
214 }
215
216 descriptor.check(&working_dir)?;
217 let nodes = descriptor.resolve_aliases_and_set_defaults()?;
218
219 let (events_tx, events_rx) = flume::bounded(10);
220 if nodes
221 .iter()
222 .find(|(_n, resolved_nodes)| resolved_nodes.kind.dynamic())
223 .is_some()
224 {
225 let _listen_port = local_listener::spawn_listener_loop(
227 (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(),
228 events_tx,
229 )
230 .await?;
231 }
232 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
233 inner: Event::DynamicNode(e.inner),
234 timestamp: e.timestamp,
235 });
236
237 let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
238 let spawn_command = SpawnDataflowNodes {
239 build_id,
240 session_id,
241 dataflow_id,
242 local_working_dir: Some(working_dir),
243 spawn_nodes: nodes.keys().cloned().collect(),
244 nodes,
245 dataflow_descriptor: descriptor,
246 uv,
247 write_events_to,
248 };
249
250 let clock = Arc::new(HLC::default());
251
252 let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
253
254 let exit_when_done = spawn_command
255 .nodes
256 .values()
257 .filter(|n| !n.kind.dynamic())
258 .map(|n| (spawn_command.dataflow_id, n.id.clone()))
259 .collect();
260 let (reply_tx, reply_rx) = oneshot::channel();
261 let timestamp = clock.new_timestamp();
262 let coordinator_events = stream::once(async move {
263 Timestamped {
264 inner: Event::Coordinator(CoordinatorEvent {
265 event: DaemonCoordinatorEvent::Spawn(spawn_command),
266 reply_tx,
267 }),
268 timestamp,
269 }
270 });
271 let events = (coordinator_events, ctrlc_events, dynamic_node_events).merge();
272 let run_result = Self::run_general(
273 Box::pin(events),
274 None,
275 DaemonId::new(None),
276 Some(exit_when_done),
277 clock.clone(),
278 None,
279 if let Some(local_build) = local_build {
280 let Some(build_id) = build_id else {
281 bail!("no build_id, but local_build set")
282 };
283 let mut builds = BTreeMap::new();
284 builds.insert(build_id, local_build);
285 builds
286 } else {
287 Default::default()
288 },
289 log_destination,
290 );
291
292 let spawn_result = reply_rx
293 .map_err(|err| eyre!("failed to receive spawn result: {err}"))
294 .and_then(|r| async {
295 match r {
296 Some(DaemonCoordinatorReply::TriggerSpawnResult(result)) => {
297 result.map_err(|err| eyre!(err))
298 }
299 _ => Err(eyre!("unexpected spawn reply")),
300 }
301 });
302
303 let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
304
305 Ok(DataflowResult {
306 uuid: dataflow_id,
307 timestamp: clock.new_timestamp(),
308 node_results: dataflow_results
309 .remove(&dataflow_id)
310 .context("no node results for dataflow_id")?,
311 })
312 }
313
314 #[allow(clippy::too_many_arguments)]
315 async fn run_general(
316 external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
317 coordinator_addr: Option<SocketAddr>,
318 daemon_id: DaemonId,
319 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
320 clock: Arc<HLC>,
321 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
322 builds: BTreeMap<BuildId, BuildInfo>,
323 log_destination: LogDestination,
324 ) -> eyre::Result<DaemonRunResult> {
325 let coordinator_connection = match coordinator_addr {
326 Some(addr) => {
327 let stream = TcpStream::connect(addr)
328 .await
329 .wrap_err("failed to connect to dora-coordinator")?;
330 stream
331 .set_nodelay(true)
332 .wrap_err("failed to set TCP_NODELAY")?;
333 Some(stream)
334 }
335 None => None,
336 };
337
338 let zenoh_session = open_zenoh_session(coordinator_addr.map(|addr| addr.ip()))
339 .await
340 .wrap_err("failed to open zenoh session")?;
341 let (dora_events_tx, dora_events_rx) = mpsc::channel(1000);
343 let daemon = Self {
344 logger: Logger {
345 destination: log_destination,
346 daemon_id: daemon_id.clone(),
347 clock: clock.clone(),
348 }
349 .for_daemon(daemon_id.clone()),
350 running: HashMap::new(),
351 working_dir: HashMap::new(),
352 events_tx: dora_events_tx,
353 coordinator_connection,
354 last_coordinator_heartbeat: Instant::now(),
355 daemon_id,
356 exit_when_done,
357 exit_when_all_finished: false,
358 dataflow_node_results: BTreeMap::new(),
359 clock,
360 zenoh_session,
361 remote_daemon_events_tx,
362 git_manager: Default::default(),
363 builds,
364 sessions: Default::default(),
365 metrics_system: sysinfo::System::new(),
366 };
367
368 let dora_events = ReceiverStream::new(dora_events_rx);
369 let watchdog_clock = daemon.clock.clone();
370 let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
371 Duration::from_secs(5),
372 ))
373 .map(|_| Timestamped {
374 inner: Event::HeartbeatInterval,
375 timestamp: watchdog_clock.new_timestamp(),
376 });
377
378 let metrics_clock = daemon.clock.clone();
379 let metrics_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
380 Duration::from_secs(2), ))
382 .map(|_| Timestamped {
383 inner: Event::MetricsInterval,
384 timestamp: metrics_clock.new_timestamp(),
385 });
386
387 let events = (
388 external_events,
389 dora_events,
390 watchdog_interval,
391 metrics_interval,
392 )
393 .merge();
394 daemon.run_inner(events).await
395 }
396
397 #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
398 async fn run_inner(
399 mut self,
400 incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
401 ) -> eyre::Result<DaemonRunResult> {
402 let mut events = incoming_events;
403
404 while let Some(event) = events.next().await {
405 let Timestamped { inner, timestamp } = event;
406 if let Err(err) = self.clock.update_with_timestamp(×tamp) {
407 tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
408 }
409
410 let start = Instant::now();
412 let event_kind = inner.kind();
413
414 match inner {
415 Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
416 let status = self.handle_coordinator_event(event, reply_tx).await?;
417
418 match status {
419 RunStatus::Continue => {}
420 RunStatus::Exit => break,
421 }
422 }
423 Event::Daemon(event) => {
424 self.handle_inter_daemon_event(event).await?;
425 }
426 Event::Node {
427 dataflow_id: dataflow,
428 node_id,
429 event,
430 } => self.handle_node_event(event, dataflow, node_id).await?,
431 Event::Dora(event) => self.handle_dora_event(event).await?,
432 Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
433 Event::HeartbeatInterval => {
434 if let Some(connection) = &mut self.coordinator_connection {
435 let msg = serde_json::to_vec(&Timestamped {
436 inner: CoordinatorRequest::Event {
437 daemon_id: self.daemon_id.clone(),
438 event: DaemonEvent::Heartbeat,
439 },
440 timestamp: self.clock.new_timestamp(),
441 })?;
442 socket_stream_send(connection, &msg)
443 .await
444 .wrap_err("failed to send watchdog message to dora-coordinator")?;
445
446 if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
447 bail!("lost connection to coordinator")
448 }
449 }
450 }
451 Event::MetricsInterval => {
452 self.collect_and_send_metrics().await?;
453 }
454 Event::CtrlC => {
455 tracing::info!("received ctrlc signal -> stopping all dataflows");
456 for dataflow in self.running.values_mut() {
457 let mut logger = self.logger.for_dataflow(dataflow.id);
458 dataflow
459 .stop_all(
460 &mut self.coordinator_connection,
461 &self.clock,
462 None,
463 false,
464 &mut logger,
465 )
466 .await?;
467 }
468 self.exit_when_all_finished = true;
469 if self.running.is_empty() {
470 break;
471 }
472 }
473 Event::SecondCtrlC => {
474 tracing::warn!("received second ctrlc signal -> exit immediately");
475 bail!("received second ctrl-c signal");
476 }
477 Event::DaemonError(err) => {
478 tracing::error!("Daemon error: {err:?}");
479 }
480 Event::SpawnNodeResult {
481 dataflow_id,
482 node_id,
483 dynamic_node,
484 result,
485 } => match result {
486 Ok(running_node) => {
487 if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
488 dataflow.running_nodes.insert(node_id, running_node);
489 } else {
490 tracing::error!(
491 "failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}"
492 );
493 }
494 }
495 Err(error) => {
496 self.dataflow_node_results
497 .entry(dataflow_id)
498 .or_default()
499 .insert(node_id.clone(), Err(error));
500 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
501 .await?;
502 }
503 },
504 Event::BuildDataflowResult {
505 build_id,
506 session_id,
507 result,
508 } => {
509 let (build_info, result) = match result {
510 Ok(build_info) => (Some(build_info), Ok(())),
511 Err(err) => (None, Err(err)),
512 };
513 if let Some(build_info) = build_info {
514 self.builds.insert(build_id, build_info);
515 if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
516 self.builds.remove(&old_build_id);
517 }
518 }
519 if let Some(connection) = &mut self.coordinator_connection {
520 let msg = serde_json::to_vec(&Timestamped {
521 inner: CoordinatorRequest::Event {
522 daemon_id: self.daemon_id.clone(),
523 event: DaemonEvent::BuildResult {
524 build_id,
525 result: result.map_err(|err| format!("{err:?}")),
526 },
527 },
528 timestamp: self.clock.new_timestamp(),
529 })?;
530 socket_stream_send(connection, &msg).await.wrap_err(
531 "failed to send BuildDataflowResult message to dora-coordinator",
532 )?;
533 }
534 }
535 Event::SpawnDataflowResult {
536 dataflow_id,
537 result,
538 } => {
539 if let Some(connection) = &mut self.coordinator_connection {
540 let msg = serde_json::to_vec(&Timestamped {
541 inner: CoordinatorRequest::Event {
542 daemon_id: self.daemon_id.clone(),
543 event: DaemonEvent::SpawnResult {
544 dataflow_id,
545 result: result.map_err(|err| format!("{err:?}")),
546 },
547 },
548 timestamp: self.clock.new_timestamp(),
549 })?;
550 socket_stream_send(connection, &msg).await.wrap_err(
551 "failed to send SpawnDataflowResult message to dora-coordinator",
552 )?;
553 }
554 }
555 Event::NodeStopped {
556 dataflow_id,
557 node_id,
558 } => {
559 if let Some(exit_when_done) = &mut self.exit_when_done {
560 exit_when_done.remove(&(dataflow_id, node_id));
561 if exit_when_done.is_empty() {
562 tracing::info!(
563 "exiting daemon because all required dataflows are finished"
564 );
565 break;
566 }
567 }
568 if self.exit_when_all_finished && self.running.is_empty() {
569 break;
570 }
571 }
572 }
573
574 let elapsed = start.elapsed();
576 if elapsed > Duration::from_millis(100) {
577 tracing::warn!(
578 "Daemon took {}ms for handling event: {event_kind}",
579 elapsed.as_millis()
580 );
581 }
582 }
583
584 if let Some(mut connection) = self.coordinator_connection.take() {
585 let msg = serde_json::to_vec(&Timestamped {
586 inner: CoordinatorRequest::Event {
587 daemon_id: self.daemon_id.clone(),
588 event: DaemonEvent::Exit,
589 },
590 timestamp: self.clock.new_timestamp(),
591 })?;
592 socket_stream_send(&mut connection, &msg)
593 .await
594 .wrap_err("failed to send Exit message to dora-coordinator")?;
595 }
596
597 Ok(self.dataflow_node_results)
598 }
599
600 async fn handle_coordinator_event(
601 &mut self,
602 event: DaemonCoordinatorEvent,
603 reply_tx: Sender<Option<DaemonCoordinatorReply>>,
604 ) -> eyre::Result<RunStatus> {
605 let status = match event {
606 DaemonCoordinatorEvent::Build(BuildDataflowNodes {
607 build_id,
608 session_id,
609 local_working_dir,
610 git_sources,
611 prev_git_sources,
612 dataflow_descriptor,
613 nodes_on_machine,
614 uv,
615 }) => {
616 match dataflow_descriptor.communication.remote {
617 dora_core::config::RemoteCommunicationConfig::Tcp => {}
618 }
619
620 let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
621
622 let result = self
623 .build_dataflow(
624 build_id,
625 session_id,
626 base_working_dir,
627 git_sources,
628 prev_git_sources,
629 dataflow_descriptor,
630 nodes_on_machine,
631 uv,
632 )
633 .await;
634 let (trigger_result, result_task) = match result {
635 Ok(result_task) => (Ok(()), Some(result_task)),
636 Err(err) => (Err(format!("{err:?}")), None),
637 };
638 let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
639 let _ = reply_tx.send(Some(reply)).map_err(|_| {
640 error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
641 });
642
643 let result_tx = self.events_tx.clone();
644 let clock = self.clock.clone();
645 if let Some(result_task) = result_task {
646 tokio::spawn(async move {
647 let message = Timestamped {
648 inner: Event::BuildDataflowResult {
649 build_id,
650 session_id,
651 result: result_task.await,
652 },
653 timestamp: clock.new_timestamp(),
654 };
655 let _ = result_tx
656 .send(message)
657 .map_err(|_| {
658 error!(
659 "could not send `BuildResult` reply from daemon to coordinator"
660 )
661 })
662 .await;
663 });
664 }
665
666 RunStatus::Continue
667 }
668 DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
669 build_id,
670 session_id,
671 dataflow_id,
672 local_working_dir,
673 nodes,
674 dataflow_descriptor,
675 spawn_nodes,
676 uv,
677 write_events_to,
678 }) => {
679 match dataflow_descriptor.communication.remote {
680 dora_core::config::RemoteCommunicationConfig::Tcp => {}
681 }
682
683 let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
684
685 let result = self
686 .spawn_dataflow(
687 build_id,
688 dataflow_id,
689 base_working_dir,
690 nodes,
691 dataflow_descriptor,
692 spawn_nodes,
693 uv,
694 write_events_to,
695 )
696 .await;
697 let (trigger_result, result_task) = match result {
698 Ok(result_task) => (Ok(()), Some(result_task)),
699 Err(err) => (Err(format!("{err:?}")), None),
700 };
701 let reply = DaemonCoordinatorReply::TriggerSpawnResult(trigger_result);
702 let _ = reply_tx.send(Some(reply)).map_err(|_| {
703 error!("could not send `TriggerSpawnResult` reply from daemon to coordinator")
704 });
705
706 let result_tx = self.events_tx.clone();
707 let clock = self.clock.clone();
708 if let Some(result_task) = result_task {
709 tokio::spawn(async move {
710 let message = Timestamped {
711 inner: Event::SpawnDataflowResult {
712 dataflow_id,
713 result: result_task.await,
714 },
715 timestamp: clock.new_timestamp(),
716 };
717 let _ = result_tx
718 .send(message)
719 .map_err(|_| {
720 error!(
721 "could not send `SpawnResult` reply from daemon to coordinator"
722 )
723 })
724 .await;
725 });
726 }
727
728 RunStatus::Continue
729 }
730 DaemonCoordinatorEvent::AllNodesReady {
731 dataflow_id,
732 exited_before_subscribe,
733 } => {
734 let mut logger = self.logger.for_dataflow(dataflow_id);
735 logger.log(LogLevel::Debug, None,
736 Some("daemon".into()),
737 format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
738 )).await;
739 match self.running.get_mut(&dataflow_id) {
740 Some(dataflow) => {
741 let ready = exited_before_subscribe.is_empty();
742 dataflow
743 .pending_nodes
744 .handle_external_all_nodes_ready(
745 exited_before_subscribe,
746 &mut dataflow.cascading_error_causes,
747 )
748 .await?;
749 if ready {
750 logger.log(LogLevel::Info, None,
751 Some("daemon".into()),
752 "coordinator reported that all nodes are ready, starting dataflow",
753 ).await;
754 dataflow.start(&self.events_tx, &self.clock).await?;
755 }
756 }
757 None => {
758 tracing::warn!(
759 "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
760 );
761 }
762 }
763 let _ = reply_tx.send(None).map_err(|_| {
764 error!("could not send `AllNodesReady` reply from daemon to coordinator")
765 });
766 RunStatus::Continue
767 }
768 DaemonCoordinatorEvent::Logs {
769 dataflow_id,
770 node_id,
771 tail,
772 } => {
773 match self.working_dir.get(&dataflow_id) {
774 Some(working_dir) => {
775 let working_dir = working_dir.clone();
776 tokio::spawn(async move {
777 let logs = async {
778 let mut file =
779 File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
780 .await
781 .wrap_err(format!(
782 "Could not open log file: {:#?}",
783 log::log_path(&working_dir, &dataflow_id, &node_id)
784 ))?;
785
786 let mut contents = match tail {
787 None | Some(0) => {
788 let mut contents = vec![];
789 file.read_to_end(&mut contents).await.map(|_| contents)
790 }
791 Some(tail) => read_last_n_lines(&mut file, tail).await,
792 }
793 .wrap_err("Could not read last n lines of log file")?;
794 if !contents.ends_with(b"\n") {
795 contents.push(b'\n');
797 }
798 Result::<Vec<u8>, eyre::Report>::Ok(contents)
799 }
800 .await
801 .map_err(|err| format!("{err:?}"));
802 let _ = reply_tx
803 .send(Some(DaemonCoordinatorReply::Logs(logs)))
804 .map_err(|_| {
805 error!("could not send logs reply from daemon to coordinator")
806 });
807 });
808 }
809 None => {
810 tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
811 let _ = reply_tx.send(None).map_err(|_| {
812 error!(
813 "could not send `AllNodesReady` reply from daemon to coordinator"
814 )
815 });
816 }
817 }
818 RunStatus::Continue
819 }
820 DaemonCoordinatorEvent::ReloadDataflow {
821 dataflow_id,
822 node_id,
823 operator_id,
824 } => {
825 let result = self.send_reload(dataflow_id, node_id, operator_id).await;
826 let reply =
827 DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
828 let _ = reply_tx
829 .send(Some(reply))
830 .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
831 RunStatus::Continue
832 }
833 DaemonCoordinatorEvent::StopDataflow {
834 dataflow_id,
835 grace_duration,
836 force,
837 } => {
838 let mut logger = self.logger.for_dataflow(dataflow_id);
839 let dataflow = self
840 .running
841 .get_mut(&dataflow_id)
842 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
843 let (reply, future) = match dataflow {
844 Ok(dataflow) => {
845 let future = dataflow.stop_all(
846 &mut self.coordinator_connection,
847 &self.clock,
848 grace_duration,
849 force,
850 &mut logger,
851 );
852 (Ok(()), Some(future))
853 }
854 Err(err) => (Err(err.to_string()), None),
855 };
856
857 let _ = reply_tx
858 .send(Some(DaemonCoordinatorReply::StopResult(reply)))
859 .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
860
861 if let Some(future) = future {
862 future.await?;
863 }
864
865 RunStatus::Continue
866 }
867 DaemonCoordinatorEvent::Destroy => {
868 tracing::info!("received destroy command -> exiting");
869 let (notify_tx, notify_rx) = oneshot::channel();
870 let reply = DaemonCoordinatorReply::DestroyResult {
871 result: Ok(()),
872 notify: Some(notify_tx),
873 };
874 let _ = reply_tx
875 .send(Some(reply))
876 .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
877 if notify_rx.await.is_err() {
879 tracing::warn!("no confirmation received for DestroyReply");
880 }
881 RunStatus::Exit
882 }
883 DaemonCoordinatorEvent::Heartbeat => {
884 self.last_coordinator_heartbeat = Instant::now();
885 let _ = reply_tx.send(None);
886 RunStatus::Continue
887 }
888 };
889 Ok(status)
890 }
891
892 async fn collect_and_send_metrics(&mut self) -> eyre::Result<()> {
893 use dora_message::daemon_to_coordinator::NodeMetrics;
894 use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate};
895
896 if self.coordinator_connection.is_none() {
897 return Ok(());
898 }
899
900 let system = &mut self.metrics_system;
902
903 const METRICS_INTERVAL_SECS: f64 = 2.0;
905
906 for (dataflow_id, dataflow) in &self.running {
908 let mut metrics = BTreeMap::new();
909
910 let pids: Vec<Pid> = dataflow
912 .running_nodes
913 .values()
914 .filter_map(|node| {
915 node.pid
916 .as_ref()
917 .map(|pid| Pid::from_u32(pid.load(atomic::Ordering::Acquire)))
918 })
919 .collect();
920
921 if !pids.is_empty() {
922 let refresh_kind = ProcessRefreshKind::nothing()
924 .with_cpu()
925 .with_memory()
926 .with_disk_usage();
927 system.refresh_processes_specifics(
928 ProcessesToUpdate::Some(&pids),
929 true,
930 refresh_kind,
931 );
932
933 for (node_id, running_node) in &dataflow.running_nodes {
935 if let Some(pid) = running_node.pid.as_ref() {
936 let pid = pid.load(atomic::Ordering::Acquire);
937 let sys_pid = Pid::from_u32(pid);
938 if let Some(process) = system.process(sys_pid) {
939 let disk_usage = process.disk_usage();
940 metrics.insert(
942 node_id.clone(),
943 NodeMetrics {
944 pid,
945 cpu_usage: process.cpu_usage(),
946 memory_bytes: process.memory(),
947 disk_read_bytes: Some(
948 (disk_usage.read_bytes as f64 / METRICS_INTERVAL_SECS)
949 as u64,
950 ),
951 disk_write_bytes: Some(
952 (disk_usage.written_bytes as f64 / METRICS_INTERVAL_SECS)
953 as u64,
954 ),
955 },
956 );
957 }
958 }
959 }
960 }
961
962 if !metrics.is_empty() {
964 if let Some(connection) = &mut self.coordinator_connection {
965 let msg = serde_json::to_vec(&Timestamped {
966 inner: CoordinatorRequest::Event {
967 daemon_id: self.daemon_id.clone(),
968 event: DaemonEvent::NodeMetrics {
969 dataflow_id: *dataflow_id,
970 metrics,
971 },
972 },
973 timestamp: self.clock.new_timestamp(),
974 })?;
975 socket_stream_send(connection, &msg)
976 .await
977 .wrap_err("failed to send metrics to coordinator")?;
978 }
979 }
980 }
981
982 Ok(())
983 }
984
985 async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
986 match event {
987 InterDaemonEvent::Output {
988 dataflow_id,
989 node_id,
990 output_id,
991 metadata,
992 data,
993 } => {
994 let inner = async {
995 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
996 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
997 })?;
998 send_output_to_local_receivers(
999 node_id.clone(),
1000 output_id.clone(),
1001 dataflow,
1002 &metadata,
1003 data.map(DataMessage::Vec),
1004 &self.clock,
1005 )
1006 .await?;
1007 Result::<_, eyre::Report>::Ok(())
1008 };
1009 if let Err(err) = inner
1010 .await
1011 .wrap_err("failed to forward remote output to local receivers")
1012 {
1013 let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
1014 logger
1015 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
1016 .await;
1017 }
1018 Ok(())
1019 }
1020 InterDaemonEvent::OutputClosed {
1021 dataflow_id,
1022 node_id,
1023 output_id,
1024 } => {
1025 let output_id = OutputId(node_id.clone(), output_id);
1026 let mut logger = self
1027 .logger
1028 .for_dataflow(dataflow_id)
1029 .for_node(node_id.clone());
1030 logger
1031 .log(
1032 LogLevel::Debug,
1033 Some("daemon".into()),
1034 format!("received OutputClosed event for output {output_id:?}"),
1035 )
1036 .await;
1037
1038 let inner = async {
1039 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1040 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1041 })?;
1042
1043 if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
1044 for (receiver_id, input_id) in &inputs {
1045 close_input(dataflow, receiver_id, input_id, &self.clock);
1046 }
1047 }
1048 Result::<(), eyre::Report>::Ok(())
1049 };
1050 if let Err(err) = inner
1051 .await
1052 .wrap_err("failed to handle InputsClosed event sent by coordinator")
1053 {
1054 logger
1055 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
1056 .await;
1057 }
1058 Ok(())
1059 }
1060 }
1061 }
1062
1063 #[allow(clippy::too_many_arguments)]
1064 async fn build_dataflow(
1065 &mut self,
1066 build_id: BuildId,
1067 session_id: SessionId,
1068 base_working_dir: PathBuf,
1069 git_sources: BTreeMap<NodeId, GitSource>,
1070 prev_git_sources: BTreeMap<NodeId, GitSource>,
1071 dataflow_descriptor: Descriptor,
1072 local_nodes: BTreeSet<NodeId>,
1073 uv: bool,
1074 ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
1075 let builder = build::Builder {
1076 session_id,
1077 base_working_dir,
1078 uv,
1079 };
1080 self.git_manager.clear_planned_builds(session_id);
1081
1082 let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1083
1084 let mut tasks = Vec::new();
1085
1086 for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1088 let dynamic_node = node.kind.dynamic();
1089
1090 let node_id = node.id.clone();
1091 let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1092 logger.log(LogLevel::Debug, "building").await;
1093 let git_source = git_sources.get(&node_id).cloned();
1094 let prev_git_source = prev_git_sources.get(&node_id).cloned();
1095 let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1096 still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1097 git_source: prev_source,
1098 });
1099
1100 let logger_cloned = logger
1101 .try_clone_impl()
1102 .await
1103 .wrap_err("failed to clone logger")?;
1104
1105 let mut builder = builder.clone();
1106 if let Some(node_working_dir) =
1107 node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1108 {
1109 builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1110 }
1111
1112 match builder
1113 .build_node(
1114 node,
1115 git_source,
1116 prev_git,
1117 logger_cloned,
1118 &mut self.git_manager,
1119 )
1120 .await
1121 .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1122 {
1123 Ok(result) => {
1124 tasks.push(NodeBuildTask {
1125 node_id,
1126 task: result,
1127 dynamic_node,
1128 });
1129 }
1130 Err(err) => {
1131 logger.log(LogLevel::Error, format!("{err:?}")).await;
1132 return Err(err);
1133 }
1134 }
1135 }
1136
1137 let task = async move {
1138 let mut info = BuildInfo {
1139 node_working_dirs: Default::default(),
1140 };
1141 for task in tasks {
1142 let NodeBuildTask {
1143 node_id,
1144 dynamic_node: _,
1145 task,
1146 } = task;
1147 let node = task
1148 .await
1149 .with_context(|| format!("failed to build node `{node_id}`"))?;
1150 info.node_working_dirs
1151 .insert(node_id, node.node_working_dir);
1152 }
1153 Ok(info)
1154 };
1155
1156 Ok(task)
1157 }
1158
1159 #[allow(clippy::too_many_arguments)]
1160 async fn spawn_dataflow(
1161 &mut self,
1162 build_id: Option<BuildId>,
1163 dataflow_id: DataflowId,
1164 base_working_dir: PathBuf,
1165 nodes: BTreeMap<NodeId, ResolvedNode>,
1166 dataflow_descriptor: Descriptor,
1167 spawn_nodes: BTreeSet<NodeId>,
1168 uv: bool,
1169 write_events_to: Option<PathBuf>,
1170 ) -> eyre::Result<impl Future<Output = eyre::Result<()>> + use<>> {
1171 let mut logger = self
1172 .logger
1173 .for_dataflow(dataflow_id)
1174 .try_clone()
1175 .await
1176 .context("failed to clone logger")?;
1177 let dataflow = RunningDataflow::new(
1178 dataflow_id,
1179 self.daemon_id.clone(),
1180 dataflow_descriptor.clone(),
1181 );
1182 let dataflow = match self.running.entry(dataflow_id) {
1183 std::collections::hash_map::Entry::Vacant(entry) => {
1184 self.working_dir
1185 .insert(dataflow_id, base_working_dir.clone());
1186 entry.insert(dataflow)
1187 }
1188 std::collections::hash_map::Entry::Occupied(_) => {
1189 bail!("there is already a running dataflow with ID `{dataflow_id}`")
1190 }
1191 };
1192
1193 let mut stopped = Vec::new();
1194
1195 let build_info = build_id.and_then(|build_id| self.builds.get(&build_id));
1196 let node_with_git_source = nodes.values().find(|n| n.has_git_source());
1197 if let Some(git_node) = node_with_git_source {
1198 if build_info.is_none() {
1199 eyre::bail!(
1200 "node {} has git source, but no `dora build` was run yet\n\n\
1201 nodes with a `git` field must be built using `dora build` before starting the \
1202 dataflow",
1203 git_node.id
1204 )
1205 }
1206 }
1207 let node_working_dirs = build_info
1208 .map(|info| info.node_working_dirs.clone())
1209 .unwrap_or_default();
1210
1211 for node in nodes.values() {
1213 let local = spawn_nodes.contains(&node.id);
1214
1215 let inputs = node_inputs(node);
1216 for (input_id, input) in inputs {
1217 if local {
1218 dataflow
1219 .open_inputs
1220 .entry(node.id.clone())
1221 .or_default()
1222 .insert(input_id.clone());
1223 match input.mapping {
1224 InputMapping::User(mapping) => {
1225 dataflow
1226 .mappings
1227 .entry(OutputId(mapping.source, mapping.output))
1228 .or_default()
1229 .insert((node.id.clone(), input_id));
1230 }
1231 InputMapping::Timer { interval } => {
1232 dataflow
1233 .timers
1234 .entry(interval)
1235 .or_default()
1236 .insert((node.id.clone(), input_id));
1237 }
1238 }
1239 } else if let InputMapping::User(mapping) = input.mapping {
1240 dataflow
1241 .open_external_mappings
1242 .insert(OutputId(mapping.source, mapping.output));
1243 }
1244 }
1245 }
1246
1247 let spawner = Spawner {
1248 dataflow_id,
1249 daemon_tx: self.events_tx.clone(),
1250 dataflow_descriptor,
1251 clock: self.clock.clone(),
1252 uv,
1253 };
1254
1255 let mut tasks = Vec::new();
1256
1257 for node in nodes.into_values() {
1259 let mut logger = logger.reborrow().for_node(node.id.clone());
1260 let local = spawn_nodes.contains(&node.id);
1261 if local {
1262 let dynamic_node = node.kind.dynamic();
1263 if dynamic_node {
1264 dataflow.dynamic_nodes.insert(node.id.clone());
1265 } else {
1266 dataflow.pending_nodes.insert(node.id.clone());
1267 }
1268
1269 let node_id = node.id.clone();
1270 let node_stderr_most_recent = dataflow
1271 .node_stderr_most_recent
1272 .entry(node.id.clone())
1273 .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES_MAX)))
1274 .clone();
1275
1276 let configured_node_working_dir = node_working_dirs.get(&node_id).cloned();
1277 if configured_node_working_dir.is_none() && node.has_git_source() {
1278 eyre::bail!(
1279 "node {} has git source, but no git clone directory was found for it\n\n\
1280 try running `dora build` again",
1281 node.id
1282 )
1283 }
1284 let node_working_dir = configured_node_working_dir
1285 .or_else(|| {
1286 node.deploy
1287 .as_ref()
1288 .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1289 })
1290 .unwrap_or(base_working_dir.clone())
1291 .clone();
1292 let node_write_events_to = write_events_to
1293 .as_ref()
1294 .map(|p| p.join(format!("inputs-{}.json", node.id)));
1295 match spawner
1296 .clone()
1297 .spawn_node(
1298 node,
1299 node_working_dir,
1300 node_stderr_most_recent,
1301 node_write_events_to,
1302 &mut logger,
1303 )
1304 .await
1305 .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1306 {
1307 Ok(result) => {
1308 tasks.push(NodeBuildTask {
1309 node_id,
1310 task: result,
1311 dynamic_node,
1312 });
1313 }
1314 Err(err) => {
1315 logger
1316 .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1317 .await;
1318 self.dataflow_node_results
1319 .entry(dataflow_id)
1320 .or_default()
1321 .insert(
1322 node_id.clone(),
1323 Err(NodeError {
1324 timestamp: self.clock.new_timestamp(),
1325 cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1326 exit_status: NodeExitStatus::Unknown,
1327 }),
1328 );
1329 stopped.push((node_id.clone(), dynamic_node));
1330 }
1331 }
1332 } else {
1333 dataflow.pending_nodes.set_external_nodes(true);
1335
1336 for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1338 let tx = self
1339 .remote_daemon_events_tx
1340 .clone()
1341 .wrap_err("no remote_daemon_events_tx channel")?;
1342 let mut finished_rx = dataflow.finished_tx.subscribe();
1343 let subscribe_topic =
1344 zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1345 tracing::debug!("declaring subscriber on {subscribe_topic}");
1346 let subscriber = self
1347 .zenoh_session
1348 .declare_subscriber(subscribe_topic)
1349 .await
1350 .map_err(|e| eyre!(e))
1351 .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1352 tokio::spawn(async move {
1353 let mut finished = pin!(finished_rx.recv());
1354 loop {
1355 let finished_or_next =
1356 futures::future::select(finished, subscriber.recv_async());
1357 match finished_or_next.await {
1358 future::Either::Left((finished, _)) => match finished {
1359 Err(broadcast::error::RecvError::Closed) => {
1360 tracing::debug!(
1361 "dataflow finished, breaking from zenoh subscribe task"
1362 );
1363 break;
1364 }
1365 other => {
1366 tracing::warn!(
1367 "unexpected return value of dataflow finished_rx channel: {other:?}"
1368 );
1369 break;
1370 }
1371 },
1372 future::Either::Right((sample, f)) => {
1373 finished = f;
1374 let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1375 Timestamped::deserialize_inter_daemon_event(
1376 &s.payload().to_bytes(),
1377 )
1378 });
1379 if tx.send_async(event).await.is_err() {
1380 break;
1382 }
1383 }
1384 }
1385 }
1386 });
1387 }
1388 }
1389 }
1390 for (node_id, dynamic) in stopped {
1391 self.handle_node_stop(dataflow_id, &node_id, dynamic)
1392 .await?;
1393 }
1394
1395 let spawn_result = Self::spawn_prepared_nodes(
1396 dataflow_id,
1397 logger,
1398 tasks,
1399 self.events_tx.clone(),
1400 self.clock.clone(),
1401 );
1402
1403 Ok(spawn_result)
1404 }
1405
1406 async fn spawn_prepared_nodes(
1407 dataflow_id: Uuid,
1408 mut logger: DataflowLogger<'_>,
1409 tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1410 events_tx: mpsc::Sender<Timestamped<Event>>,
1411 clock: Arc<HLC>,
1412 ) -> eyre::Result<()> {
1413 let node_result = |node_id, dynamic_node, result| Timestamped {
1414 inner: Event::SpawnNodeResult {
1415 dataflow_id,
1416 node_id,
1417 dynamic_node,
1418 result,
1419 },
1420 timestamp: clock.new_timestamp(),
1421 };
1422 let mut failed_to_prepare = None;
1423 let mut prepared_nodes = Vec::new();
1424 for task in tasks {
1425 let NodeBuildTask {
1426 node_id,
1427 dynamic_node,
1428 task,
1429 } = task;
1430 match task.await {
1431 Ok(node) => prepared_nodes.push(node),
1432 Err(err) => {
1433 if failed_to_prepare.is_none() {
1434 failed_to_prepare = Some(node_id.clone());
1435 }
1436 let node_err: NodeError = NodeError {
1437 timestamp: clock.new_timestamp(),
1438 cause: NodeErrorCause::FailedToSpawn(format!(
1439 "preparing for spawn failed: {err:?}"
1440 )),
1441 exit_status: NodeExitStatus::Unknown,
1442 };
1443 let send_result = events_tx
1444 .send(node_result(node_id, dynamic_node, Err(node_err)))
1445 .await;
1446 if send_result.is_err() {
1447 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1448 }
1449 }
1450 }
1451 }
1452
1453 if let Some(failed_node) = failed_to_prepare {
1455 for node in prepared_nodes {
1457 let err = NodeError {
1458 timestamp: clock.new_timestamp(),
1459 cause: NodeErrorCause::Cascading {
1460 caused_by_node: failed_node.clone(),
1461 },
1462 exit_status: NodeExitStatus::Unknown,
1463 };
1464 let send_result = events_tx
1465 .send(node_result(
1466 node.node_id().clone(),
1467 node.dynamic(),
1468 Err(err),
1469 ))
1470 .await;
1471 if send_result.is_err() {
1472 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1473 }
1474 }
1475 Err(eyre!("failed to prepare node {failed_node}"))
1476 } else {
1477 let mut spawn_result = Ok(());
1478
1479 logger
1480 .log(
1481 LogLevel::Info,
1482 None,
1483 Some("dora daemon".into()),
1484 "finished building nodes, spawning...",
1485 )
1486 .await;
1487
1488 for node in prepared_nodes {
1490 let node_id = node.node_id().clone();
1491 let dynamic_node = node.dynamic();
1492 let logger = logger
1493 .reborrow()
1494 .for_node(node_id.clone())
1495 .try_clone()
1496 .await
1497 .context("failed to clone NodeLogger")?;
1498 let result = node.spawn(logger).await;
1499 let node_spawn_result = match result {
1500 Ok(node) => Ok(node),
1501 Err(err) => {
1502 let node_err = NodeError {
1503 timestamp: clock.new_timestamp(),
1504 cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1505 exit_status: NodeExitStatus::Unknown,
1506 };
1507 if spawn_result.is_ok() {
1508 spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1509 }
1510 Err(node_err)
1511 }
1512 };
1513 let send_result = events_tx
1514 .send(node_result(node_id, dynamic_node, node_spawn_result))
1515 .await;
1516 if send_result.is_err() {
1517 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1518 }
1519 }
1520 spawn_result
1521 }
1522 }
1523
1524 async fn handle_dynamic_node_event(
1525 &mut self,
1526 event: DynamicNodeEventWrapper,
1527 ) -> eyre::Result<()> {
1528 match event {
1529 DynamicNodeEventWrapper {
1530 event: DynamicNodeEvent::NodeConfig { node_id },
1531 reply_tx,
1532 } => {
1533 let number_node_id = self
1534 .running
1535 .iter()
1536 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1537 .count();
1538
1539 let node_config = match number_node_id {
1540 2.. => Err(format!(
1541 "multiple dataflows contain dynamic node id {node_id}. \
1542 Please only have one running dataflow with the specified \
1543 node id if you want to use dynamic node",
1544 )),
1545 1 => self
1546 .running
1547 .iter()
1548 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1549 .map(|(id, dataflow)| -> Result<NodeConfig> {
1550 let node_config = dataflow
1551 .running_nodes
1552 .get(&node_id)
1553 .with_context(|| {
1554 format!("no node with ID `{node_id}` within the given dataflow")
1555 })?
1556 .node_config
1557 .clone();
1558 if !node_config.dynamic {
1559 bail!("node with ID `{node_id}` in {id} is not dynamic");
1560 }
1561 Ok(node_config)
1562 })
1563 .next()
1564 .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1565 .and_then(|r| r)
1566 .map_err(|err| {
1567 format!(
1568 "failed to get dynamic node config within given dataflow: {err}"
1569 )
1570 }),
1571 0 => Err(format!("no node with ID `{node_id}`")),
1572 };
1573
1574 let reply = DaemonReply::NodeConfig {
1575 result: node_config,
1576 };
1577 let _ = reply_tx.send(Some(reply)).map_err(|_| {
1578 error!("could not send node info reply from daemon to coordinator")
1579 });
1580 Ok(())
1581 }
1582 }
1583 }
1584
1585 async fn handle_node_event(
1586 &mut self,
1587 event: DaemonNodeEvent,
1588 dataflow_id: DataflowId,
1589 node_id: NodeId,
1590 ) -> eyre::Result<()> {
1591 let might_restart = || {
1592 let dataflow = self.running.get(&dataflow_id)?;
1593 let node = dataflow.running_nodes.get(&node_id)?;
1594 Some(match node.restart_policy {
1595 RestartPolicy::Never => false,
1596 _ if node.restarts_disabled() => false,
1597 RestartPolicy::OnFailure | RestartPolicy::Always => true,
1598 })
1599 };
1600 match event {
1601 DaemonNodeEvent::Subscribe {
1602 event_sender,
1603 reply_sender,
1604 } => {
1605 let mut logger = self.logger.for_dataflow(dataflow_id);
1606 logger
1607 .log(
1608 LogLevel::Info,
1609 Some(node_id.clone()),
1610 Some("daemon".into()),
1611 "node is ready",
1612 )
1613 .await;
1614
1615 let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1616 format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1617 });
1618
1619 match dataflow {
1620 Err(err) => {
1621 let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1622 }
1623 Ok(dataflow) => {
1624 Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1625
1626 let status = dataflow
1627 .pending_nodes
1628 .handle_node_subscription(
1629 node_id.clone(),
1630 reply_sender,
1631 &mut self.coordinator_connection,
1632 &self.clock,
1633 &mut dataflow.cascading_error_causes,
1634 &mut logger,
1635 )
1636 .await?;
1637 match status {
1638 DataflowStatus::AllNodesReady if !dataflow.dataflow_started => {
1639 logger
1640 .log(
1641 LogLevel::Info,
1642 None,
1643 Some("daemon".into()),
1644 "all nodes are ready, starting dataflow",
1645 )
1646 .await;
1647 dataflow.start(&self.events_tx, &self.clock).await?;
1648 dataflow.dataflow_started = true;
1649 }
1650 _ => {}
1651 }
1652 }
1653 }
1654 }
1655 DaemonNodeEvent::SubscribeDrop {
1656 event_sender,
1657 reply_sender,
1658 } => {
1659 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1660 format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1661 });
1662 let result = match dataflow {
1663 Ok(dataflow) => {
1664 dataflow.drop_channels.insert(node_id, event_sender);
1665 Ok(())
1666 }
1667 Err(err) => Err(err.to_string()),
1668 };
1669 let _ = reply_sender.send(DaemonReply::Result(result));
1670 }
1671 DaemonNodeEvent::CloseOutputs {
1672 outputs,
1673 reply_sender,
1674 } => {
1675 let reply = if might_restart().unwrap_or(false) {
1676 self.logger
1677 .for_dataflow(dataflow_id)
1678 .for_node(node_id.clone())
1679 .log(
1680 LogLevel::Debug,
1681 Some("daemon".into()),
1682 "skipping CloseOutputs because node might restart",
1683 )
1684 .await;
1685 Ok(())
1686 } else {
1687 let inner = async {
1689 self.send_output_closed_events(dataflow_id, node_id, outputs)
1690 .await
1691 };
1692
1693 inner.await.map_err(|err| format!("{err:?}"))
1694 };
1695 let _ = reply_sender.send(DaemonReply::Result(reply));
1696 }
1697 DaemonNodeEvent::OutputsDone { reply_sender } => {
1698 let result = self
1699 .handle_outputs_done(dataflow_id, &node_id, might_restart().unwrap_or(false))
1700 .await;
1701
1702 let _ = reply_sender.send(DaemonReply::Result(
1703 result.map_err(|err| format!("{err:?}")),
1704 ));
1705 }
1706 DaemonNodeEvent::SendOut {
1707 output_id,
1708 metadata,
1709 data,
1710 } => self
1711 .send_out(dataflow_id, node_id, output_id, metadata, data)
1712 .await
1713 .context("failed to send out")?,
1714 DaemonNodeEvent::ReportDrop { tokens } => {
1715 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1716 format!(
1717 "failed to get handle drop tokens: \
1718 no running dataflow with ID `{dataflow_id}`"
1719 )
1720 });
1721
1722 match dataflow {
1723 Ok(dataflow) => {
1724 for token in tokens {
1725 match dataflow.pending_drop_tokens.get_mut(&token) {
1726 Some(info) => {
1727 if info.pending_nodes.remove(&node_id) {
1728 dataflow.check_drop_token(token, &self.clock).await?;
1729 } else {
1730 tracing::warn!(
1731 "node `{node_id}` is not pending for drop token `{token:?}`"
1732 );
1733 }
1734 }
1735 None => tracing::warn!("unknown drop token `{token:?}`"),
1736 }
1737 }
1738 }
1739 Err(err) => tracing::warn!("{err:?}"),
1740 }
1741 }
1742 DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1743 let inner = async {
1744 let dataflow = self
1745 .running
1746 .get_mut(&dataflow_id)
1747 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1748 dataflow.subscribe_channels.remove(&node_id);
1749 Result::<_, eyre::Error>::Ok(())
1750 };
1751
1752 let reply = inner.await.map_err(|err| format!("{err:?}"));
1753 let _ = reply_sender.send(DaemonReply::Result(reply));
1754 }
1755 }
1756 Ok(())
1757 }
1758
1759 async fn send_reload(
1760 &mut self,
1761 dataflow_id: Uuid,
1762 node_id: NodeId,
1763 operator_id: Option<OperatorId>,
1764 ) -> Result<(), eyre::ErrReport> {
1765 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1766 format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1767 })?;
1768 if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1769 match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1770 Ok(()) => {}
1771 Err(_) => {
1772 dataflow.subscribe_channels.remove(&node_id);
1773 }
1774 }
1775 }
1776 Ok(())
1777 }
1778
1779 async fn send_out(
1780 &mut self,
1781 dataflow_id: Uuid,
1782 node_id: NodeId,
1783 output_id: DataId,
1784 metadata: dora_message::metadata::Metadata,
1785 data: Option<DataMessage>,
1786 ) -> Result<(), eyre::ErrReport> {
1787 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1788 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1789 })?;
1790 let data_bytes = send_output_to_local_receivers(
1791 node_id.clone(),
1792 output_id.clone(),
1793 dataflow,
1794 &metadata,
1795 data,
1796 &self.clock,
1797 )
1798 .await?;
1799
1800 let output_id = OutputId(node_id, output_id);
1801 let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1802 || dataflow.publish_all_messages_to_zenoh;
1803 if remote_receivers {
1804 let event = InterDaemonEvent::Output {
1805 dataflow_id,
1806 node_id: output_id.0.clone(),
1807 output_id: output_id.1.clone(),
1808 metadata,
1809 data: data_bytes,
1810 };
1811 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1812 .await?;
1813 }
1814
1815 Ok(())
1816 }
1817
1818 async fn send_to_remote_receivers(
1819 &mut self,
1820 dataflow_id: Uuid,
1821 output_id: &OutputId,
1822 event: InterDaemonEvent,
1823 ) -> Result<(), eyre::Error> {
1824 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1825 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1826 })?;
1827
1828 let publisher = match dataflow.publishers.get(output_id) {
1830 Some(publisher) => publisher,
1831 None => {
1832 let publish_topic =
1833 zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
1834 tracing::debug!("declaring publisher on {publish_topic}");
1835 let publisher = self
1836 .zenoh_session
1837 .declare_publisher(publish_topic)
1838 .await
1839 .map_err(|e| eyre!(e))
1840 .context("failed to create zenoh publisher")?;
1841 dataflow.publishers.insert(output_id.clone(), publisher);
1842 dataflow.publishers.get(output_id).unwrap()
1843 }
1844 };
1845
1846 let serialized_event = Timestamped {
1847 inner: event,
1848 timestamp: self.clock.new_timestamp(),
1849 }
1850 .serialize();
1851 publisher
1852 .put(serialized_event)
1853 .await
1854 .map_err(|e| eyre!(e))
1855 .context("zenoh put failed")?;
1856 Ok(())
1857 }
1858
1859 async fn send_output_closed_events(
1860 &mut self,
1861 dataflow_id: DataflowId,
1862 node_id: NodeId,
1863 outputs: Vec<DataId>,
1864 ) -> eyre::Result<()> {
1865 let dataflow = self
1866 .running
1867 .get_mut(&dataflow_id)
1868 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1869 let local_node_inputs: BTreeSet<_> = dataflow
1870 .mappings
1871 .iter()
1872 .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1873 .flat_map(|(_, v)| v)
1874 .cloned()
1875 .collect();
1876 for (receiver_id, input_id) in &local_node_inputs {
1877 close_input(dataflow, receiver_id, input_id, &self.clock);
1878 }
1879
1880 let mut closed = Vec::new();
1881 for output_id in &dataflow.open_external_mappings {
1882 if output_id.0 == node_id && outputs.contains(&output_id.1) {
1883 closed.push(output_id.clone());
1884 }
1885 }
1886
1887 for output_id in closed {
1888 let event = InterDaemonEvent::OutputClosed {
1889 dataflow_id,
1890 node_id: output_id.0.clone(),
1891 output_id: output_id.1.clone(),
1892 };
1893 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1894 .await?;
1895 }
1896
1897 Ok(())
1898 }
1899
1900 async fn subscribe(
1901 dataflow: &mut RunningDataflow,
1902 node_id: NodeId,
1903 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1904 clock: &HLC,
1905 ) {
1906 let closed_inputs = dataflow
1908 .mappings
1909 .values()
1910 .flatten()
1911 .filter(|(node, _)| node == &node_id)
1912 .map(|(_, input)| input)
1913 .filter(|input| {
1914 dataflow
1915 .open_inputs
1916 .get(&node_id)
1917 .map(|open_inputs| !open_inputs.contains(*input))
1918 .unwrap_or(true)
1919 });
1920 for input_id in closed_inputs {
1921 let _ = send_with_timestamp(
1922 &event_sender,
1923 NodeEvent::InputClosed {
1924 id: input_id.clone(),
1925 },
1926 clock,
1927 );
1928 }
1929 if dataflow.open_inputs(&node_id).is_empty() {
1930 if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
1931 node.disable_restart();
1932 }
1933 if let Some(node) = dataflow.descriptor.nodes.iter().find(|n| n.id == node_id) {
1934 if node.inputs.is_empty() {
1935 } else {
1937 let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1938 }
1939 }
1940 }
1941
1942 if dataflow.stop_sent {
1945 if let Some(node) = dataflow.running_nodes.get_mut(&node_id) {
1946 node.disable_restart();
1947 }
1948 let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1949 }
1950
1951 dataflow.subscribe_channels.insert(node_id, event_sender);
1952 }
1953
1954 #[tracing::instrument(skip(self), level = "trace")]
1955 async fn handle_outputs_done(
1956 &mut self,
1957 dataflow_id: DataflowId,
1958 node_id: &NodeId,
1959 might_restart: bool,
1960 ) -> eyre::Result<()> {
1961 let dataflow = self
1962 .running
1963 .get_mut(&dataflow_id)
1964 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1965
1966 let outputs = dataflow
1967 .mappings
1968 .keys()
1969 .filter(|m| &m.0 == node_id)
1970 .map(|m| &m.1)
1971 .cloned()
1972 .collect();
1973
1974 if might_restart {
1975 self.logger
1976 .for_dataflow(dataflow_id)
1977 .for_node(node_id.clone())
1978 .log(
1979 LogLevel::Debug,
1980 Some("daemon".into()),
1981 "keeping outputs open because node might restart",
1982 )
1983 .await;
1984 } else {
1985 self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1986 .await?;
1987 }
1988
1989 let dataflow = self
1990 .running
1991 .get_mut(&dataflow_id)
1992 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1993 dataflow.drop_channels.remove(node_id);
1994 Ok(())
1995 }
1996
1997 async fn handle_node_stop(
1998 &mut self,
1999 dataflow_id: Uuid,
2000 node_id: &NodeId,
2001 dynamic_node: bool,
2002 ) -> eyre::Result<()> {
2003 let result = self
2004 .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
2005 .await;
2006 let _ = self
2007 .events_tx
2008 .send(Timestamped {
2009 inner: Event::NodeStopped {
2010 dataflow_id,
2011 node_id: node_id.clone(),
2012 },
2013 timestamp: self.clock.new_timestamp(),
2014 })
2015 .await;
2016 result
2017 }
2018
2019 async fn handle_node_stop_inner(
2020 &mut self,
2021 dataflow_id: Uuid,
2022 node_id: &NodeId,
2023 dynamic_node: bool,
2024 ) -> eyre::Result<()> {
2025 let mut logger = self.logger.for_dataflow(dataflow_id);
2026 let dataflow = match self.running.get_mut(&dataflow_id) {
2027 Some(dataflow) => dataflow,
2028 None if dynamic_node => {
2029 tracing::debug!(
2032 "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
2033 );
2034 return Ok(());
2035 }
2036 None => eyre::bail!(
2037 "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2038 ),
2039 };
2040
2041 dataflow
2042 .pending_nodes
2043 .handle_node_stop(
2044 node_id,
2045 &mut self.coordinator_connection,
2046 &self.clock,
2047 &mut dataflow.cascading_error_causes,
2048 &mut logger,
2049 )
2050 .await?;
2051
2052 let might_restart = false;
2054
2055 self.handle_outputs_done(dataflow_id, node_id, might_restart)
2056 .await?;
2057
2058 let mut logger = self.logger.for_dataflow(dataflow_id);
2059 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
2060 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
2061 })?;
2062 dataflow.running_nodes.remove(node_id);
2063 if !dataflow.pending_nodes.local_nodes_pending()
2064 && dataflow
2065 .running_nodes
2066 .iter()
2067 .all(|(_id, n)| n.node_config.dynamic)
2068 {
2069 let result = DataflowDaemonResult {
2070 timestamp: self.clock.new_timestamp(),
2071 node_results: self
2072 .dataflow_node_results
2073 .get(&dataflow.id)
2074 .context("failed to get dataflow node results")?
2075 .clone(),
2076 };
2077
2078 self.git_manager
2079 .clones_in_use
2080 .values_mut()
2081 .for_each(|dataflows| {
2082 dataflows.remove(&dataflow_id);
2083 });
2084
2085 logger
2086 .log(
2087 LogLevel::Info,
2088 None,
2089 Some("daemon".into()),
2090 format!("dataflow finished on machine `{}`", self.daemon_id),
2091 )
2092 .await;
2093 if let Some(connection) = &mut self.coordinator_connection {
2094 let msg = serde_json::to_vec(&Timestamped {
2095 inner: CoordinatorRequest::Event {
2096 daemon_id: self.daemon_id.clone(),
2097 event: DaemonEvent::AllNodesFinished {
2098 dataflow_id,
2099 result,
2100 },
2101 },
2102 timestamp: self.clock.new_timestamp(),
2103 })?;
2104 socket_stream_send(connection, &msg)
2105 .await
2106 .wrap_err("failed to report dataflow finish to dora-coordinator")?;
2107 }
2108 self.running.remove(&dataflow_id);
2109 }
2110
2111 Ok(())
2112 }
2113
2114 async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
2115 match event {
2116 DoraEvent::Timer {
2117 dataflow_id,
2118 interval,
2119 metadata,
2120 } => {
2121 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2122 tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
2123 return Ok(());
2124 };
2125
2126 let Some(subscribers) = dataflow.timers.get(&interval) else {
2127 return Ok(());
2128 };
2129
2130 let mut closed = Vec::new();
2131 for (receiver_id, input_id) in subscribers {
2132 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2133 continue;
2134 };
2135
2136 let send_result = send_with_timestamp(
2137 channel,
2138 NodeEvent::Input {
2139 id: input_id.clone(),
2140 metadata: metadata.clone(),
2141 data: None,
2142 },
2143 &self.clock,
2144 );
2145 match send_result {
2146 Ok(()) => {}
2147 Err(_) => {
2148 closed.push(receiver_id);
2149 }
2150 }
2151 }
2152 for id in closed {
2153 dataflow.subscribe_channels.remove(id);
2154 }
2155 }
2156 DoraEvent::Logs {
2157 dataflow_id,
2158 output_id,
2159 message,
2160 metadata,
2161 } => {
2162 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2163 tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2164 return Ok(());
2165 };
2166
2167 let Some(subscribers) = dataflow.mappings.get(&output_id) else {
2168 tracing::warn!(
2169 "No subscribers found for {:?} in {:?}",
2170 output_id,
2171 dataflow.mappings
2172 );
2173 return Ok(());
2174 };
2175
2176 let mut closed = Vec::new();
2177 for (receiver_id, input_id) in subscribers {
2178 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2179 tracing::warn!("No subscriber channel found for {:?}", output_id);
2180 continue;
2181 };
2182
2183 let send_result = send_with_timestamp(
2184 channel,
2185 NodeEvent::Input {
2186 id: input_id.clone(),
2187 metadata: metadata.clone(),
2188 data: Some(message.clone()),
2189 },
2190 &self.clock,
2191 );
2192 match send_result {
2193 Ok(()) => {}
2194 Err(_) => {
2195 closed.push(receiver_id);
2196 }
2197 }
2198 }
2199 for id in closed {
2200 dataflow.subscribe_channels.remove(id);
2201 }
2202 }
2203 DoraEvent::SpawnedNodeResult {
2204 dataflow_id,
2205 node_id,
2206 dynamic_node,
2207 exit_status,
2208 restart,
2209 } => {
2210 let mut logger = self
2211 .logger
2212 .for_dataflow(dataflow_id)
2213 .for_node(node_id.clone());
2214 logger
2215 .log(
2216 LogLevel::Debug,
2217 Some("daemon".into()),
2218 format!("handling node stop with exit status {exit_status:?} (restart: {restart})"),
2219 )
2220 .await;
2221
2222 let node_result = match exit_status {
2223 NodeExitStatus::Success => Ok(()),
2224 exit_status => {
2225 let dataflow = self.running.get(&dataflow_id);
2226 let caused_by_node = dataflow
2227 .and_then(|dataflow| {
2228 dataflow.cascading_error_causes.error_caused_by(&node_id)
2229 })
2230 .cloned();
2231 let grace_duration_kill = dataflow
2232 .map(|d| d.grace_duration_kills.contains(&node_id))
2233 .unwrap_or_default();
2234
2235 let cause = match caused_by_node {
2236 Some(caused_by_node) => {
2237 logger
2238 .log(
2239 LogLevel::Info,
2240 Some("daemon".into()),
2241 format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2242 )
2243 .await;
2244
2245 NodeErrorCause::Cascading { caused_by_node }
2246 }
2247 None if grace_duration_kill => NodeErrorCause::GraceDuration,
2248 None => {
2249 let cause = dataflow
2250 .and_then(|d| d.node_stderr_most_recent.get(&node_id))
2251 .map(|queue| {
2252 let mut lines = Vec::new();
2253 if queue.is_full() {
2254 lines.push("[...]".into());
2255 }
2256 while let Some(line) = queue.pop() {
2257 lines.push(line);
2258 }
2259 lines
2260 })
2261 .map(extract_err_from_stderr)
2262 .unwrap_or_default();
2263
2264 NodeErrorCause::Other { stderr: cause }
2265 }
2266 };
2267 Err(NodeError {
2268 timestamp: self.clock.new_timestamp(),
2269 cause,
2270 exit_status,
2271 })
2272 }
2273 };
2274
2275 logger
2276 .log(
2277 if node_result.is_ok() {
2278 LogLevel::Info
2279 } else {
2280 LogLevel::Error
2281 },
2282 Some("daemon".into()),
2283 match &node_result {
2284 Ok(()) => format!("{node_id} finished successfully"),
2285 Err(err) => format!("{err}"),
2286 },
2287 )
2288 .await;
2289
2290 if restart {
2291 logger
2292 .log(
2293 LogLevel::Info,
2294 Some("daemon".into()),
2295 "node will be restarted",
2296 )
2297 .await;
2298 } else {
2299 self.dataflow_node_results
2300 .entry(dataflow_id)
2301 .or_default()
2302 .insert(node_id.clone(), node_result);
2303
2304 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2305 .await?;
2306 }
2307 }
2308 }
2309 Ok(())
2310 }
2311
2312 fn base_working_dir(
2313 &self,
2314 local_working_dir: Option<PathBuf>,
2315 session_id: SessionId,
2316 ) -> eyre::Result<PathBuf> {
2317 match local_working_dir {
2318 Some(working_dir) => {
2319 if working_dir.exists() {
2321 Ok(working_dir)
2322 } else {
2323 bail!(
2324 "working directory does not exist: {}",
2325 working_dir.display(),
2326 )
2327 }
2328 }
2329 None => {
2330 let daemon_working_dir =
2332 current_dir().context("failed to get daemon working dir")?;
2333 Ok(daemon_working_dir
2334 .join("_work")
2335 .join(session_id.uuid().to_string()))
2336 }
2337 }
2338 }
2339}
2340
2341async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result<Vec<u8>> {
2342 let mut pos = file.seek(io::SeekFrom::End(0)).await?;
2343
2344 let mut output = VecDeque::<u8>::new();
2345 let mut extend_slice_to_start = |slice: &[u8]| {
2346 output.extend(slice);
2347 output.rotate_right(slice.len());
2348 };
2349
2350 let mut buffer = vec![0; 2048];
2351 let mut estimated_line_length = 0;
2352 let mut at_end = true;
2353 'main: while tail > 0 && pos > 0 {
2354 let new_pos = pos.saturating_sub(buffer.len() as u64);
2355 file.seek(io::SeekFrom::Start(new_pos)).await?;
2356 let read_len = (pos - new_pos) as usize;
2357 pos = new_pos;
2358
2359 file.read_exact(&mut buffer[..read_len]).await?;
2360 let read_buf = if at_end {
2361 at_end = false;
2362 &buffer[..read_len].trim_ascii_end()
2363 } else {
2364 &buffer[..read_len]
2365 };
2366
2367 let mut iter = memchr::memrchr_iter(b'\n', read_buf);
2368 let mut lines = 1;
2369 loop {
2370 let Some(pos) = iter.next() else {
2371 extend_slice_to_start(read_buf);
2372 break;
2373 };
2374 lines += 1;
2375 tail -= 1;
2376 if tail == 0 {
2377 extend_slice_to_start(&read_buf[(pos + 1)..]);
2378 break 'main;
2379 }
2380 }
2381
2382 estimated_line_length = estimated_line_length.max((read_buf.len() + 1).div_ceil(lines));
2383 let estimated_buffer_length = estimated_line_length * tail;
2384 if estimated_buffer_length >= buffer.len() * 2 {
2385 buffer.resize(buffer.len() * 2, 0);
2386 }
2387 }
2388
2389 Ok(output.into())
2390}
2391
2392async fn set_up_event_stream(
2393 coordinator_addr: SocketAddr,
2394 machine_id: &Option<String>,
2395 clock: &Arc<HLC>,
2396 remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2397 local_listen_port: u16,
2399) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
2400 let clock_cloned = clock.clone();
2401 let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2402 Ok(e) => Timestamped {
2403 inner: Event::Daemon(e.inner),
2404 timestamp: e.timestamp,
2405 },
2406 Err(err) => Timestamped {
2407 inner: Event::DaemonError(err),
2408 timestamp: clock_cloned.new_timestamp(),
2409 },
2410 });
2411 let (daemon_id, coordinator_events) =
2412 coordinator::register(coordinator_addr, machine_id.clone(), clock)
2413 .await
2414 .wrap_err("failed to connect to dora-coordinator")?;
2415 let coordinator_events = coordinator_events.map(
2416 |Timestamped {
2417 inner: event,
2418 timestamp,
2419 }| Timestamped {
2420 inner: Event::Coordinator(event),
2421 timestamp,
2422 },
2423 );
2424 let (events_tx, events_rx) = flume::bounded(10);
2425 let _listen_port =
2426 local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2427 .await?;
2428 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2429 inner: Event::DynamicNode(e.inner),
2430 timestamp: e.timestamp,
2431 });
2432 let incoming = (
2433 coordinator_events,
2434 remote_daemon_events,
2435 dynamic_node_events,
2436 )
2437 .merge();
2438 Ok((daemon_id, incoming))
2439}
2440
2441async fn send_output_to_local_receivers(
2442 node_id: NodeId,
2443 output_id: DataId,
2444 dataflow: &mut RunningDataflow,
2445 metadata: &metadata::Metadata,
2446 data: Option<DataMessage>,
2447 clock: &HLC,
2448) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2449 let timestamp = metadata.timestamp();
2450 let empty_set = BTreeSet::new();
2451 let output_id = OutputId(node_id, output_id);
2452 let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2453 let OutputId(node_id, _) = output_id;
2454 let mut closed = Vec::new();
2455 for (receiver_id, input_id) in local_receivers {
2456 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2457 let item = NodeEvent::Input {
2458 id: input_id.clone(),
2459 metadata: metadata.clone(),
2460 data: data.clone(),
2461 };
2462 match channel.send(Timestamped {
2463 inner: item,
2464 timestamp,
2465 }) {
2466 Ok(()) => {
2467 if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2468 dataflow
2469 .pending_drop_tokens
2470 .entry(token)
2471 .or_insert_with(|| DropTokenInformation {
2472 owner: node_id.clone(),
2473 pending_nodes: Default::default(),
2474 })
2475 .pending_nodes
2476 .insert(receiver_id.clone());
2477 }
2478 }
2479 Err(_) => {
2480 closed.push(receiver_id);
2481 }
2482 }
2483 }
2484 }
2485 for id in closed {
2486 dataflow.subscribe_channels.remove(id);
2487 }
2488 let (data_bytes, drop_token) = match data {
2489 None => (None, None),
2490 Some(DataMessage::SharedMemory {
2491 shared_memory_id,
2492 len,
2493 drop_token,
2494 }) => {
2495 let memory = ShmemConf::new()
2496 .os_id(shared_memory_id)
2497 .open()
2498 .wrap_err("failed to map shared memory output")?;
2499 let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2500 (data, Some(drop_token))
2501 }
2502 Some(DataMessage::Vec(v)) => (Some(v), None),
2503 };
2504 if let Some(token) = drop_token {
2505 dataflow
2507 .pending_drop_tokens
2508 .entry(token)
2509 .or_insert_with(|| DropTokenInformation {
2510 owner: node_id.clone(),
2511 pending_nodes: Default::default(),
2512 });
2513 dataflow.check_drop_token(token, clock).await?;
2515 }
2516 Ok(data_bytes)
2517}
2518
2519fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2520 match &node.kind {
2521 CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2522 CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2523 }
2524}
2525
2526fn close_input(
2527 dataflow: &mut RunningDataflow,
2528 receiver_id: &NodeId,
2529 input_id: &DataId,
2530 clock: &HLC,
2531) {
2532 if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2533 if !open_inputs.remove(input_id) {
2534 return;
2535 }
2536 }
2537 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2538 let _ = send_with_timestamp(
2539 channel,
2540 NodeEvent::InputClosed {
2541 id: input_id.clone(),
2542 },
2543 clock,
2544 );
2545
2546 if dataflow.open_inputs(receiver_id).is_empty() {
2547 if let Some(node) = dataflow.running_nodes.get_mut(receiver_id) {
2548 node.disable_restart();
2549 }
2550 let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2551 }
2552 }
2553}
2554
2555#[derive(Debug)]
2556pub struct RunningNode {
2557 process: Option<ProcessHandle>,
2558 node_config: NodeConfig,
2559 pid: Option<Arc<AtomicU32>>,
2560 restart_policy: RestartPolicy,
2561 disable_restart: Arc<AtomicBool>,
2566}
2567
2568impl RunningNode {
2569 pub fn restarts_disabled(&self) -> bool {
2570 self.disable_restart.load(atomic::Ordering::Acquire)
2571 }
2572
2573 pub fn disable_restart(&mut self) {
2574 self.disable_restart.store(true, atomic::Ordering::Release);
2575 }
2576}
2577
2578#[derive(Debug)]
2579enum ProcessOperation {
2580 SoftKill,
2581 Kill,
2582}
2583
2584impl ProcessOperation {
2585 pub fn execute(&self, child: &mut dyn TokioChildWrapper) {
2586 match self {
2587 Self::SoftKill => {
2588 #[cfg(unix)]
2589 {
2590 if let Err(err) = child.signal(15) {
2592 warn!("failed to send SIGTERM to process {:?}: {err}", child.id());
2593 }
2594 }
2595
2596 #[cfg(windows)]
2597 unsafe {
2598 let Some(pid) = child.id() else {
2599 warn!("failed to get child process id");
2600 return;
2601 };
2602 if let Err(err) = windows::Win32::System::Console::GenerateConsoleCtrlEvent(
2603 windows::Win32::System::Console::CTRL_BREAK_EVENT,
2604 pid,
2605 ) {
2606 warn!("failed to send CTRL_BREAK_EVENT to process {pid}: {err}");
2607 }
2608 }
2609
2610 #[cfg(not(any(unix, windows)))]
2611 {
2612 warn!("killing process is not implemented on this platform");
2613 }
2614 }
2615 Self::Kill => {
2616 if let Err(err) = child.start_kill() {
2617 warn!("failed to kill child process: {err}");
2618 }
2619 }
2620 }
2621 }
2622}
2623
2624#[derive(Debug)]
2625struct ProcessHandle {
2626 op_tx: flume::Sender<ProcessOperation>,
2627}
2628
2629impl ProcessHandle {
2630 pub fn new(op_tx: flume::Sender<ProcessOperation>) -> Self {
2631 Self { op_tx }
2632 }
2633
2634 pub fn submit(&self, operation: ProcessOperation) -> bool {
2637 self.op_tx.send(operation).is_ok()
2638 }
2639}
2640
2641impl Drop for ProcessHandle {
2642 fn drop(&mut self) {
2643 if self.submit(ProcessOperation::Kill) {
2644 warn!("process was killed on drop because it was still running");
2645 }
2646 }
2647}
2648
2649pub struct RunningDataflow {
2650 id: Uuid,
2651
2652 descriptor: Descriptor,
2653
2654 pending_nodes: PendingNodes,
2656
2657 dataflow_started: bool,
2658
2659 subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2660 drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2661 mappings: HashMap<OutputId, BTreeSet<InputId>>,
2662 timers: BTreeMap<Duration, BTreeSet<InputId>>,
2663 open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2664 running_nodes: BTreeMap<NodeId, RunningNode>,
2665
2666 dynamic_nodes: BTreeSet<NodeId>,
2671
2672 open_external_mappings: BTreeSet<OutputId>,
2673
2674 pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
2675
2676 _timer_handles: BTreeMap<Duration, futures::future::RemoteHandle<()>>,
2678 stop_sent: bool,
2679
2680 empty_set: BTreeSet<DataId>,
2684
2685 cascading_error_causes: CascadingErrorCauses,
2687 grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
2688
2689 node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
2690
2691 publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
2692
2693 finished_tx: broadcast::Sender<()>,
2694
2695 publish_all_messages_to_zenoh: bool,
2696}
2697
2698impl RunningDataflow {
2699 fn new(
2700 dataflow_id: Uuid,
2701 daemon_id: DaemonId,
2702 dataflow_descriptor: Descriptor,
2703 ) -> RunningDataflow {
2704 let (finished_tx, _) = broadcast::channel(1);
2705 Self {
2706 id: dataflow_id,
2707 pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
2708 dataflow_started: false,
2709 subscribe_channels: HashMap::new(),
2710 drop_channels: HashMap::new(),
2711 mappings: HashMap::new(),
2712 timers: BTreeMap::new(),
2713 open_inputs: BTreeMap::new(),
2714 running_nodes: BTreeMap::new(),
2715 dynamic_nodes: BTreeSet::new(),
2716 open_external_mappings: Default::default(),
2717 pending_drop_tokens: HashMap::new(),
2718 _timer_handles: BTreeMap::new(),
2719 stop_sent: false,
2720 empty_set: BTreeSet::new(),
2721 cascading_error_causes: Default::default(),
2722 grace_duration_kills: Default::default(),
2723 node_stderr_most_recent: BTreeMap::new(),
2724 publishers: Default::default(),
2725 finished_tx,
2726 publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
2727 descriptor: dataflow_descriptor,
2728 }
2729 }
2730
2731 async fn start(
2732 &mut self,
2733 events_tx: &mpsc::Sender<Timestamped<Event>>,
2734 clock: &Arc<HLC>,
2735 ) -> eyre::Result<()> {
2736 for interval in self.timers.keys().copied() {
2737 if self._timer_handles.get(&interval).is_some() {
2738 continue;
2739 }
2740 let events_tx = events_tx.clone();
2741 let dataflow_id = self.id;
2742 let clock = clock.clone();
2743 let task = async move {
2744 let mut interval_stream = tokio::time::interval(interval);
2745 let hlc = HLC::default();
2746 loop {
2747 interval_stream.tick().await;
2748
2749 let span = tracing::span!(tracing::Level::TRACE, "tick");
2750 let _ = span.enter();
2751
2752 let mut parameters = BTreeMap::new();
2753 parameters.insert(
2754 "open_telemetry_context".to_string(),
2755 #[cfg(feature = "telemetry")]
2756 Parameter::String(serialize_context(&span.context())),
2757 #[cfg(not(feature = "telemetry"))]
2758 Parameter::String("".into()),
2759 );
2760
2761 let metadata = metadata::Metadata::from_parameters(
2762 hlc.new_timestamp(),
2763 empty_type_info(),
2764 parameters,
2765 );
2766
2767 let event = Timestamped {
2768 inner: DoraEvent::Timer {
2769 dataflow_id,
2770 interval,
2771 metadata,
2772 }
2773 .into(),
2774 timestamp: clock.new_timestamp(),
2775 };
2776 if events_tx.send(event).await.is_err() {
2777 break;
2778 }
2779 }
2780 };
2781 let (task, handle) = task.remote_handle();
2782 tokio::spawn(task);
2783 self._timer_handles.insert(interval, handle);
2784 }
2785
2786 Ok(())
2787 }
2788
2789 async fn stop_all(
2790 &mut self,
2791 coordinator_connection: &mut Option<TcpStream>,
2792 clock: &HLC,
2793 grace_duration: Option<Duration>,
2794 force: bool,
2795 logger: &mut DataflowLogger<'_>,
2796 ) -> eyre::Result<()> {
2797 self.pending_nodes
2798 .handle_dataflow_stop(
2799 coordinator_connection,
2800 clock,
2801 &mut self.cascading_error_causes,
2802 &self.dynamic_nodes,
2803 logger,
2804 )
2805 .await?;
2806
2807 for node in self.running_nodes.values_mut() {
2808 node.disable_restart();
2809 }
2810
2811 for (_node_id, channel) in self.subscribe_channels.drain() {
2812 let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
2813 }
2814
2815 let running_processes: Vec<_> = self
2816 .running_nodes
2817 .iter_mut()
2818 .map(|(id, n)| (id.clone(), n.process.take()))
2819 .collect();
2820 if force {
2821 for (_, proc) in &running_processes {
2822 if let Some(proc) = proc {
2823 proc.submit(crate::ProcessOperation::Kill);
2824 }
2825 }
2826 } else {
2827 let grace_duration_kills = self.grace_duration_kills.clone();
2828 tokio::spawn(async move {
2829 let duration = grace_duration.unwrap_or(Duration::from_millis(10000));
2830 tokio::time::sleep(duration).await;
2831
2832 for (node, proc) in &running_processes {
2833 if let Some(proc) = proc {
2834 if proc.submit(crate::ProcessOperation::SoftKill) {
2835 grace_duration_kills.insert(node.clone());
2836 }
2837 }
2838 }
2839
2840 let kill_duration = duration / 2;
2841 tokio::time::sleep(kill_duration).await;
2842
2843 for (node, proc) in &running_processes {
2844 if let Some(proc) = proc {
2845 if proc.submit(crate::ProcessOperation::Kill) {
2846 grace_duration_kills.insert(node.clone());
2847 warn!(
2848 "{node} was killed due to not stopping within the {:#?} grace period",
2849 duration + kill_duration
2850 );
2851 }
2852 }
2853 }
2854 });
2855 }
2856 self.stop_sent = true;
2857 Ok(())
2858 }
2859
2860 fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2861 self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2862 }
2863
2864 async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2865 match self.pending_drop_tokens.entry(token) {
2866 std::collections::hash_map::Entry::Occupied(entry) => {
2867 if entry.get().pending_nodes.is_empty() {
2868 let (drop_token, info) = entry.remove_entry();
2869 let result = match self.drop_channels.get_mut(&info.owner) {
2870 Some(channel) => send_with_timestamp(
2871 channel,
2872 NodeDropEvent::OutputDropped { drop_token },
2873 clock,
2874 )
2875 .wrap_err("send failed"),
2876 None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2877 };
2878 if let Err(err) = result.wrap_err_with(|| {
2879 format!(
2880 "failed to report drop token `{drop_token:?}` to owner `{}`",
2881 &info.owner
2882 )
2883 }) {
2884 tracing::warn!("{err:?}");
2885 }
2886 }
2887 }
2888 std::collections::hash_map::Entry::Vacant(_) => {
2889 tracing::warn!("check_drop_token called with already closed token")
2890 }
2891 }
2892
2893 Ok(())
2894 }
2895}
2896
2897fn empty_type_info() -> ArrowTypeInfo {
2898 ArrowTypeInfo {
2899 data_type: DataType::Null,
2900 len: 0,
2901 null_count: 0,
2902 validity: None,
2903 offset: 0,
2904 buffer_offsets: Vec::new(),
2905 child_data: Vec::new(),
2906 }
2907}
2908
2909#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2910pub struct OutputId(NodeId, DataId);
2911type InputId = (NodeId, DataId);
2912
2913struct DropTokenInformation {
2914 owner: NodeId,
2916 pending_nodes: BTreeSet<NodeId>,
2919}
2920
2921#[derive(Debug)]
2922pub enum Event {
2923 Node {
2924 dataflow_id: DataflowId,
2925 node_id: NodeId,
2926 event: DaemonNodeEvent,
2927 },
2928 Coordinator(CoordinatorEvent),
2929 Daemon(InterDaemonEvent),
2930 Dora(DoraEvent),
2931 DynamicNode(DynamicNodeEventWrapper),
2932 HeartbeatInterval,
2933 MetricsInterval,
2934 CtrlC,
2935 SecondCtrlC,
2936 DaemonError(eyre::Report),
2937 SpawnNodeResult {
2938 dataflow_id: DataflowId,
2939 node_id: NodeId,
2940 dynamic_node: bool,
2941 result: Result<RunningNode, NodeError>,
2942 },
2943 BuildDataflowResult {
2944 build_id: BuildId,
2945 session_id: SessionId,
2946 result: eyre::Result<BuildInfo>,
2947 },
2948 SpawnDataflowResult {
2949 dataflow_id: Uuid,
2950 result: eyre::Result<()>,
2951 },
2952 NodeStopped {
2953 dataflow_id: Uuid,
2954 node_id: NodeId,
2955 },
2956}
2957
2958impl From<DoraEvent> for Event {
2959 fn from(event: DoraEvent) -> Self {
2960 Event::Dora(event)
2961 }
2962}
2963
2964impl Event {
2965 pub fn kind(&self) -> &'static str {
2966 match self {
2967 Event::Node { .. } => "Node",
2968 Event::Coordinator(_) => "Coordinator",
2969 Event::Daemon(_) => "Daemon",
2970 Event::Dora(_) => "Dora",
2971 Event::DynamicNode(_) => "DynamicNode",
2972 Event::HeartbeatInterval => "HeartbeatInterval",
2973 Event::MetricsInterval => "MetricsInterval",
2974 Event::CtrlC => "CtrlC",
2975 Event::SecondCtrlC => "SecondCtrlC",
2976 Event::DaemonError(_) => "DaemonError",
2977 Event::SpawnNodeResult { .. } => "SpawnNodeResult",
2978 Event::BuildDataflowResult { .. } => "BuildDataflowResult",
2979 Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
2980 Event::NodeStopped { .. } => "NodeStopped",
2981 }
2982 }
2983}
2984
2985#[derive(Debug)]
2986#[allow(clippy::large_enum_variant)]
2987pub enum DaemonNodeEvent {
2988 OutputsDone {
2989 reply_sender: oneshot::Sender<DaemonReply>,
2990 },
2991 Subscribe {
2992 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2993 reply_sender: oneshot::Sender<DaemonReply>,
2994 },
2995 SubscribeDrop {
2996 event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2997 reply_sender: oneshot::Sender<DaemonReply>,
2998 },
2999 CloseOutputs {
3000 outputs: Vec<dora_core::config::DataId>,
3001 reply_sender: oneshot::Sender<DaemonReply>,
3002 },
3003 SendOut {
3004 output_id: DataId,
3005 metadata: metadata::Metadata,
3006 data: Option<DataMessage>,
3007 },
3008 ReportDrop {
3009 tokens: Vec<DropToken>,
3010 },
3011 EventStreamDropped {
3012 reply_sender: oneshot::Sender<DaemonReply>,
3013 },
3014}
3015
3016#[derive(Debug)]
3017pub enum DoraEvent {
3018 Timer {
3019 dataflow_id: DataflowId,
3020 interval: Duration,
3021 metadata: metadata::Metadata,
3022 },
3023 Logs {
3024 dataflow_id: DataflowId,
3025 output_id: OutputId,
3026 message: DataMessage,
3027 metadata: metadata::Metadata,
3028 },
3029 SpawnedNodeResult {
3030 dataflow_id: DataflowId,
3031 node_id: NodeId,
3032 dynamic_node: bool,
3033 exit_status: NodeExitStatus,
3034 restart: bool,
3036 },
3037}
3038
3039#[must_use]
3040enum RunStatus {
3041 Continue,
3042 Exit,
3043}
3044
3045fn send_with_timestamp<T>(
3046 sender: &UnboundedSender<Timestamped<T>>,
3047 event: T,
3048 clock: &HLC,
3049) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
3050 sender.send(Timestamped {
3051 inner: event,
3052 timestamp: clock.new_timestamp(),
3053 })
3054}
3055
3056fn set_up_ctrlc_handler(
3057 clock: Arc<HLC>,
3058) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
3059 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
3060
3061 let mut ctrlc_sent = 0;
3062 ctrlc::set_handler(move || {
3063 let event = match ctrlc_sent {
3064 0 => Event::CtrlC,
3065 1 => Event::SecondCtrlC,
3066 _ => {
3067 tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
3068 std::process::abort();
3069 }
3070 };
3071 if ctrlc_tx
3072 .blocking_send(Timestamped {
3073 inner: event,
3074 timestamp: clock.new_timestamp(),
3075 })
3076 .is_err()
3077 {
3078 tracing::error!("failed to report ctrl-c event to dora-coordinator");
3079 }
3080
3081 ctrlc_sent += 1;
3082 })
3083 .wrap_err("failed to set ctrl-c handler")?;
3084
3085 Ok(ctrlc_rx)
3086}
3087
3088#[derive(Debug, Default, Clone, PartialEq, Eq)]
3089pub struct CascadingErrorCauses {
3090 caused_by: BTreeMap<NodeId, NodeId>,
3091}
3092
3093impl CascadingErrorCauses {
3094 pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
3095 self.caused_by.contains_key(node)
3096 }
3097
3098 pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
3100 self.caused_by.get(node)
3101 }
3102
3103 pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
3104 self.caused_by.entry(affected_node).or_insert(causing_node);
3105 }
3106}
3107
3108fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
3109 n.operators
3110 .iter()
3111 .flat_map(|operator| {
3112 operator.config.inputs.iter().map(|(input_id, mapping)| {
3113 (
3114 DataId::from(format!("{}/{input_id}", operator.id)),
3115 mapping.clone(),
3116 )
3117 })
3118 })
3119 .collect()
3120}
3121
3122fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
3123 n.operators
3124 .iter()
3125 .flat_map(|operator| {
3126 operator
3127 .config
3128 .outputs
3129 .iter()
3130 .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
3131 })
3132 .collect()
3133}
3134
3135trait CoreNodeKindExt {
3136 fn run_config(&self) -> NodeRunConfig;
3137 fn dynamic(&self) -> bool;
3138}
3139
3140impl CoreNodeKindExt for CoreNodeKind {
3141 fn run_config(&self) -> NodeRunConfig {
3142 match self {
3143 CoreNodeKind::Runtime(n) => NodeRunConfig {
3144 inputs: runtime_node_inputs(n),
3145 outputs: runtime_node_outputs(n),
3146 },
3147 CoreNodeKind::Custom(n) => n.run_config.clone(),
3148 }
3149 }
3150
3151 fn dynamic(&self) -> bool {
3152 match self {
3153 CoreNodeKind::Runtime(_n) => false,
3154 CoreNodeKind::Custom(n) => {
3155 matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
3156 }
3157 }
3158 }
3159}