1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5 build::{self, BuildInfo, GitManager, PrevGitSource},
6 config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
7 descriptor::{
8 CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
9 read_as_descriptor,
10 },
11 topics::LOCALHOST,
12 uhlc::{self, HLC},
13};
14use dora_message::{
15 BuildId, DataflowId, SessionId,
16 common::{
17 DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
18 NodeExitStatus,
19 },
20 coordinator_to_cli::DataflowResult,
21 coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
22 daemon_to_coordinator::{
23 CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
24 },
25 daemon_to_daemon::InterDaemonEvent,
26 daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
27 descriptor::NodeSource,
28 metadata::{self, ArrowTypeInfo},
29 node_to_daemon::{DynamicNodeEvent, Timestamped},
30};
31use dora_node_api::{Parameter, arrow::datatypes::DataType};
32use eyre::{Context, ContextCompat, Result, bail, eyre};
33use futures::{FutureExt, TryFutureExt, future, stream};
34use futures_concurrency::stream::Merge;
35use local_listener::DynamicNodeEventWrapper;
36use log::{DaemonLogger, DataflowLogger, Logger};
37use pending::PendingNodes;
38use shared_memory_server::ShmemConf;
39use socket_stream_utils::socket_stream_send;
40use spawn::Spawner;
41use std::{
42 collections::{BTreeMap, BTreeSet, HashMap},
43 env::current_dir,
44 future::Future,
45 net::SocketAddr,
46 path::{Path, PathBuf},
47 pin::pin,
48 sync::Arc,
49 time::{Duration, Instant},
50};
51use sysinfo::Pid;
52use tokio::{
53 fs::File,
54 io::AsyncReadExt,
55 net::TcpStream,
56 sync::{
57 broadcast,
58 mpsc::{self, UnboundedSender},
59 oneshot::{self, Sender},
60 },
61};
62use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
63use tracing::{error, warn};
64use uuid::{NoContext, Timestamp, Uuid};
65
66pub use flume;
67pub use log::LogDestination;
68
69mod coordinator;
70mod local_listener;
71mod log;
72mod node_communication;
73mod pending;
74mod socket_stream_utils;
75mod spawn;
76
77#[cfg(feature = "telemetry")]
78use dora_tracing::telemetry::serialize_context;
79#[cfg(feature = "telemetry")]
80use tracing_opentelemetry::OpenTelemetrySpanExt;
81
82use crate::pending::DataflowStatus;
83
84const STDERR_LOG_LINES: usize = 10;
85
86pub struct Daemon {
87 running: HashMap<DataflowId, RunningDataflow>,
88 working_dir: HashMap<DataflowId, PathBuf>,
89
90 events_tx: mpsc::Sender<Timestamped<Event>>,
91
92 coordinator_connection: Option<TcpStream>,
93 last_coordinator_heartbeat: Instant,
94 daemon_id: DaemonId,
95
96 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
98 exit_when_all_finished: bool,
100 dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
102
103 clock: Arc<uhlc::HLC>,
104
105 zenoh_session: zenoh::Session,
106 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
107
108 logger: DaemonLogger,
109
110 sessions: BTreeMap<SessionId, BuildId>,
111 builds: BTreeMap<BuildId, BuildInfo>,
112 git_manager: GitManager,
113}
114
115type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
116
117struct NodeBuildTask<F> {
118 node_id: NodeId,
119 dynamic_node: bool,
120 task: F,
121}
122
123impl Daemon {
124 pub async fn run(
125 coordinator_addr: SocketAddr,
126 machine_id: Option<String>,
127 local_listen_port: u16,
128 ) -> eyre::Result<()> {
129 let clock = Arc::new(HLC::default());
130
131 let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
132 let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
133 let (daemon_id, incoming_events) = {
134 let incoming_events = set_up_event_stream(
135 coordinator_addr,
136 &machine_id,
137 &clock,
138 remote_daemon_events_rx,
139 local_listen_port,
140 );
141
142 let ctrl_c = pin!(ctrlc_events.recv());
144 match futures::future::select(ctrl_c, pin!(incoming_events)).await {
145 future::Either::Left((_ctrl_c, _)) => {
146 tracing::info!("received ctrl-c signal -> stopping daemon");
147 return Ok(());
148 }
149 future::Either::Right((events, _)) => events?,
150 }
151 };
152
153 let log_destination = {
154 let stream = TcpStream::connect(coordinator_addr)
156 .await
157 .wrap_err("failed to connect log to dora-coordinator")?;
158 stream
159 .set_nodelay(true)
160 .wrap_err("failed to set TCP_NODELAY")?;
161 LogDestination::Coordinator {
162 coordinator_connection: stream,
163 }
164 };
165
166 Self::run_general(
167 (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
168 Some(coordinator_addr),
169 daemon_id,
170 None,
171 clock.clone(),
172 Some(remote_daemon_events_tx),
173 Default::default(),
174 log_destination,
175 )
176 .await
177 .map(|_| ())
178 }
179
180 pub async fn run_dataflow(
181 dataflow_path: &Path,
182 build_id: Option<BuildId>,
183 local_build: Option<BuildInfo>,
184 session_id: SessionId,
185 uv: bool,
186 log_destination: LogDestination,
187 ) -> eyre::Result<DataflowResult> {
188 let working_dir = dataflow_path
189 .canonicalize()
190 .context("failed to canonicalize dataflow path")?
191 .parent()
192 .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
193 .to_owned();
194
195 let descriptor = read_as_descriptor(dataflow_path).await?;
196 if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
197 eyre::bail!(
198 "node {} has a `deploy` section, which is not supported in `dora run`\n\n
199 Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
200 instances and then use `dora start`.",
201 node.id
202 )
203 }
204
205 descriptor.check(&working_dir)?;
206 let nodes = descriptor.resolve_aliases_and_set_defaults()?;
207
208 let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
209 let spawn_command = SpawnDataflowNodes {
210 build_id,
211 session_id,
212 dataflow_id,
213 local_working_dir: Some(working_dir),
214 spawn_nodes: nodes.keys().cloned().collect(),
215 nodes,
216 dataflow_descriptor: descriptor,
217 uv,
218 };
219
220 let clock = Arc::new(HLC::default());
221
222 let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
223
224 let exit_when_done = spawn_command
225 .nodes
226 .values()
227 .map(|n| (spawn_command.dataflow_id, n.id.clone()))
228 .collect();
229 let (reply_tx, reply_rx) = oneshot::channel();
230 let timestamp = clock.new_timestamp();
231 let coordinator_events = stream::once(async move {
232 Timestamped {
233 inner: Event::Coordinator(CoordinatorEvent {
234 event: DaemonCoordinatorEvent::Spawn(spawn_command),
235 reply_tx,
236 }),
237 timestamp,
238 }
239 });
240 let events = (coordinator_events, ctrlc_events).merge();
241 let run_result = Self::run_general(
242 Box::pin(events),
243 None,
244 DaemonId::new(None),
245 Some(exit_when_done),
246 clock.clone(),
247 None,
248 if let Some(local_build) = local_build {
249 let Some(build_id) = build_id else {
250 bail!("no build_id, but local_build set")
251 };
252 let mut builds = BTreeMap::new();
253 builds.insert(build_id, local_build);
254 builds
255 } else {
256 Default::default()
257 },
258 log_destination,
259 );
260
261 let spawn_result = reply_rx
262 .map_err(|err| eyre!("failed to receive spawn result: {err}"))
263 .and_then(|r| async {
264 match r {
265 Some(DaemonCoordinatorReply::TriggerSpawnResult(result)) => {
266 result.map_err(|err| eyre!(err))
267 }
268 _ => Err(eyre!("unexpected spawn reply")),
269 }
270 });
271
272 let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
273
274 Ok(DataflowResult {
275 uuid: dataflow_id,
276 timestamp: clock.new_timestamp(),
277 node_results: dataflow_results
278 .remove(&dataflow_id)
279 .context("no node results for dataflow_id")?,
280 })
281 }
282
283 #[allow(clippy::too_many_arguments)]
284 async fn run_general(
285 external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
286 coordinator_addr: Option<SocketAddr>,
287 daemon_id: DaemonId,
288 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
289 clock: Arc<HLC>,
290 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
291 builds: BTreeMap<BuildId, BuildInfo>,
292 log_destination: LogDestination,
293 ) -> eyre::Result<DaemonRunResult> {
294 let coordinator_connection = match coordinator_addr {
295 Some(addr) => {
296 let stream = TcpStream::connect(addr)
297 .await
298 .wrap_err("failed to connect to dora-coordinator")?;
299 stream
300 .set_nodelay(true)
301 .wrap_err("failed to set TCP_NODELAY")?;
302 Some(stream)
303 }
304 None => None,
305 };
306
307 let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
308 Ok(path) => {
309 let zenoh_config = zenoh::Config::from_file(&path)
310 .map_err(|e| eyre!(e))
311 .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
312 zenoh::open(zenoh_config)
313 .await
314 .map_err(|e| eyre!(e))
315 .context("failed to open zenoh session")?
316 }
317 Err(std::env::VarError::NotPresent) => {
318 let mut zenoh_config = zenoh::Config::default();
319
320 if let Some(addr) = coordinator_addr {
321 if cfg!(not(target_os = "windows")) {
324 zenoh_config
325 .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
326 .unwrap();
327 }
328
329 zenoh_config
330 .insert_json5(
331 "connect/endpoints",
332 &format!(
333 r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
334 addr.ip()
335 ),
336 )
337 .unwrap();
338 zenoh_config
339 .insert_json5(
340 "listen/endpoints",
341 r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
342 )
343 .unwrap();
344 if cfg!(target_os = "macos") {
345 warn!(
346 "disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file"
347 );
348 zenoh_config
349 .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
350 .unwrap();
351 }
352 };
353 if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
354 zenoh_session
355 } else {
356 warn!(
357 "failed to open zenoh session, retrying with default config + coordinator"
358 );
359 let mut zenoh_config = zenoh::Config::default();
360 if cfg!(not(target_os = "windows")) {
363 zenoh_config
364 .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
365 .unwrap();
366 }
367
368 if let Some(addr) = coordinator_addr {
369 zenoh_config
370 .insert_json5(
371 "connect/endpoints",
372 &format!(
373 r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
374 addr.ip()
375 ),
376 )
377 .unwrap();
378 if cfg!(target_os = "macos") {
379 warn!(
380 "disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file"
381 );
382 zenoh_config
383 .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
384 .unwrap();
385 }
386 }
387 if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
388 zenoh_session
389 } else {
390 warn!("failed to open zenoh session, retrying with default config");
391 let zenoh_config = zenoh::Config::default();
392 zenoh::open(zenoh_config)
393 .await
394 .map_err(|e| eyre!(e))
395 .context("failed to open zenoh session")?
396 }
397 }
398 }
399 Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
400 "{} env variable is not valid unicode",
401 zenoh::Config::DEFAULT_CONFIG_PATH_ENV
402 ),
403 };
404 let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
405 let daemon = Self {
406 logger: Logger {
407 destination: log_destination,
408 daemon_id: daemon_id.clone(),
409 clock: clock.clone(),
410 }
411 .for_daemon(daemon_id.clone()),
412 running: HashMap::new(),
413 working_dir: HashMap::new(),
414 events_tx: dora_events_tx,
415 coordinator_connection,
416 last_coordinator_heartbeat: Instant::now(),
417 daemon_id,
418 exit_when_done,
419 exit_when_all_finished: false,
420 dataflow_node_results: BTreeMap::new(),
421 clock,
422 zenoh_session,
423 remote_daemon_events_tx,
424 git_manager: Default::default(),
425 builds,
426 sessions: Default::default(),
427 };
428
429 let dora_events = ReceiverStream::new(dora_events_rx);
430 let watchdog_clock = daemon.clock.clone();
431 let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
432 Duration::from_secs(5),
433 ))
434 .map(|_| Timestamped {
435 inner: Event::HeartbeatInterval,
436 timestamp: watchdog_clock.new_timestamp(),
437 });
438 let events = (external_events, dora_events, watchdog_interval).merge();
439 daemon.run_inner(events).await
440 }
441
442 #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
443 async fn run_inner(
444 mut self,
445 incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
446 ) -> eyre::Result<DaemonRunResult> {
447 let mut events = incoming_events;
448
449 while let Some(event) = events.next().await {
450 let Timestamped { inner, timestamp } = event;
451 if let Err(err) = self.clock.update_with_timestamp(×tamp) {
452 tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
453 }
454
455 let start = Instant::now();
457 let event_kind = inner.kind();
458
459 match inner {
460 Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
461 let status = self.handle_coordinator_event(event, reply_tx).await?;
462
463 match status {
464 RunStatus::Continue => {}
465 RunStatus::Exit => break,
466 }
467 }
468 Event::Daemon(event) => {
469 self.handle_inter_daemon_event(event).await?;
470 }
471 Event::Node {
472 dataflow_id: dataflow,
473 node_id,
474 event,
475 } => self.handle_node_event(event, dataflow, node_id).await?,
476 Event::Dora(event) => self.handle_dora_event(event).await?,
477 Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
478 Event::HeartbeatInterval => {
479 if let Some(connection) = &mut self.coordinator_connection {
480 let msg = serde_json::to_vec(&Timestamped {
481 inner: CoordinatorRequest::Event {
482 daemon_id: self.daemon_id.clone(),
483 event: DaemonEvent::Heartbeat,
484 },
485 timestamp: self.clock.new_timestamp(),
486 })?;
487 socket_stream_send(connection, &msg)
488 .await
489 .wrap_err("failed to send watchdog message to dora-coordinator")?;
490
491 if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
492 bail!("lost connection to coordinator")
493 }
494 }
495 }
496 Event::CtrlC => {
497 tracing::info!("received ctrlc signal -> stopping all dataflows");
498 for dataflow in self.running.values_mut() {
499 let mut logger = self.logger.for_dataflow(dataflow.id);
500 dataflow
501 .stop_all(
502 &mut self.coordinator_connection,
503 &self.clock,
504 None,
505 &mut logger,
506 )
507 .await?;
508 }
509 self.exit_when_all_finished = true;
510 if self.running.is_empty() {
511 break;
512 }
513 }
514 Event::SecondCtrlC => {
515 tracing::warn!("received second ctrlc signal -> exit immediately");
516 bail!("received second ctrl-c signal");
517 }
518 Event::DaemonError(err) => {
519 tracing::error!("Daemon error: {err:?}");
520 }
521 Event::SpawnNodeResult {
522 dataflow_id,
523 node_id,
524 dynamic_node,
525 result,
526 } => match result {
527 Ok(running_node) => {
528 if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
529 dataflow.running_nodes.insert(node_id, running_node);
530 } else {
531 tracing::error!(
532 "failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}"
533 );
534 }
535 }
536 Err(error) => {
537 self.dataflow_node_results
538 .entry(dataflow_id)
539 .or_default()
540 .insert(node_id.clone(), Err(error));
541 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
542 .await?;
543 }
544 },
545 Event::BuildDataflowResult {
546 build_id,
547 session_id,
548 result,
549 } => {
550 let (build_info, result) = match result {
551 Ok(build_info) => (Some(build_info), Ok(())),
552 Err(err) => (None, Err(err)),
553 };
554 if let Some(build_info) = build_info {
555 self.builds.insert(build_id, build_info);
556 if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
557 self.builds.remove(&old_build_id);
558 }
559 }
560 if let Some(connection) = &mut self.coordinator_connection {
561 let msg = serde_json::to_vec(&Timestamped {
562 inner: CoordinatorRequest::Event {
563 daemon_id: self.daemon_id.clone(),
564 event: DaemonEvent::BuildResult {
565 build_id,
566 result: result.map_err(|err| format!("{err:?}")),
567 },
568 },
569 timestamp: self.clock.new_timestamp(),
570 })?;
571 socket_stream_send(connection, &msg).await.wrap_err(
572 "failed to send BuildDataflowResult message to dora-coordinator",
573 )?;
574 }
575 }
576 Event::SpawnDataflowResult {
577 dataflow_id,
578 result,
579 } => {
580 if let Some(connection) = &mut self.coordinator_connection {
581 let msg = serde_json::to_vec(&Timestamped {
582 inner: CoordinatorRequest::Event {
583 daemon_id: self.daemon_id.clone(),
584 event: DaemonEvent::SpawnResult {
585 dataflow_id,
586 result: result.map_err(|err| format!("{err:?}")),
587 },
588 },
589 timestamp: self.clock.new_timestamp(),
590 })?;
591 socket_stream_send(connection, &msg).await.wrap_err(
592 "failed to send SpawnDataflowResult message to dora-coordinator",
593 )?;
594 }
595 }
596 Event::NodeStopped {
597 dataflow_id,
598 node_id,
599 } => {
600 if let Some(exit_when_done) = &mut self.exit_when_done {
601 exit_when_done.remove(&(dataflow_id, node_id));
602 if exit_when_done.is_empty() {
603 tracing::info!(
604 "exiting daemon because all required dataflows are finished"
605 );
606 break;
607 }
608 }
609 if self.exit_when_all_finished && self.running.is_empty() {
610 break;
611 }
612 }
613 }
614
615 let elapsed = start.elapsed();
617 if elapsed > Duration::from_millis(100) {
618 tracing::warn!(
619 "Daemon took {}ms for handling event: {event_kind}",
620 elapsed.as_millis()
621 );
622 }
623 }
624
625 if let Some(mut connection) = self.coordinator_connection.take() {
626 let msg = serde_json::to_vec(&Timestamped {
627 inner: CoordinatorRequest::Event {
628 daemon_id: self.daemon_id.clone(),
629 event: DaemonEvent::Exit,
630 },
631 timestamp: self.clock.new_timestamp(),
632 })?;
633 socket_stream_send(&mut connection, &msg)
634 .await
635 .wrap_err("failed to send Exit message to dora-coordinator")?;
636 }
637
638 Ok(self.dataflow_node_results)
639 }
640
641 async fn handle_coordinator_event(
642 &mut self,
643 event: DaemonCoordinatorEvent,
644 reply_tx: Sender<Option<DaemonCoordinatorReply>>,
645 ) -> eyre::Result<RunStatus> {
646 let status = match event {
647 DaemonCoordinatorEvent::Build(BuildDataflowNodes {
648 build_id,
649 session_id,
650 local_working_dir,
651 git_sources,
652 prev_git_sources,
653 dataflow_descriptor,
654 nodes_on_machine,
655 uv,
656 }) => {
657 match dataflow_descriptor.communication.remote {
658 dora_core::config::RemoteCommunicationConfig::Tcp => {}
659 }
660
661 let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
662
663 let result = self
664 .build_dataflow(
665 build_id,
666 session_id,
667 base_working_dir,
668 git_sources,
669 prev_git_sources,
670 dataflow_descriptor,
671 nodes_on_machine,
672 uv,
673 )
674 .await;
675 let (trigger_result, result_task) = match result {
676 Ok(result_task) => (Ok(()), Some(result_task)),
677 Err(err) => (Err(format!("{err:?}")), None),
678 };
679 let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
680 let _ = reply_tx.send(Some(reply)).map_err(|_| {
681 error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
682 });
683
684 let result_tx = self.events_tx.clone();
685 let clock = self.clock.clone();
686 if let Some(result_task) = result_task {
687 tokio::spawn(async move {
688 let message = Timestamped {
689 inner: Event::BuildDataflowResult {
690 build_id,
691 session_id,
692 result: result_task.await,
693 },
694 timestamp: clock.new_timestamp(),
695 };
696 let _ = result_tx
697 .send(message)
698 .map_err(|_| {
699 error!(
700 "could not send `BuildResult` reply from daemon to coordinator"
701 )
702 })
703 .await;
704 });
705 }
706
707 RunStatus::Continue
708 }
709 DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
710 build_id,
711 session_id,
712 dataflow_id,
713 local_working_dir,
714 nodes,
715 dataflow_descriptor,
716 spawn_nodes,
717 uv,
718 }) => {
719 match dataflow_descriptor.communication.remote {
720 dora_core::config::RemoteCommunicationConfig::Tcp => {}
721 }
722
723 let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
724
725 let result = self
726 .spawn_dataflow(
727 build_id,
728 dataflow_id,
729 base_working_dir,
730 nodes,
731 dataflow_descriptor,
732 spawn_nodes,
733 uv,
734 )
735 .await;
736 let (trigger_result, result_task) = match result {
737 Ok(result_task) => (Ok(()), Some(result_task)),
738 Err(err) => (Err(format!("{err:?}")), None),
739 };
740 let reply = DaemonCoordinatorReply::TriggerSpawnResult(trigger_result);
741 let _ = reply_tx.send(Some(reply)).map_err(|_| {
742 error!("could not send `TriggerSpawnResult` reply from daemon to coordinator")
743 });
744
745 let result_tx = self.events_tx.clone();
746 let clock = self.clock.clone();
747 if let Some(result_task) = result_task {
748 tokio::spawn(async move {
749 let message = Timestamped {
750 inner: Event::SpawnDataflowResult {
751 dataflow_id,
752 result: result_task.await,
753 },
754 timestamp: clock.new_timestamp(),
755 };
756 let _ = result_tx
757 .send(message)
758 .map_err(|_| {
759 error!(
760 "could not send `SpawnResult` reply from daemon to coordinator"
761 )
762 })
763 .await;
764 });
765 }
766
767 RunStatus::Continue
768 }
769 DaemonCoordinatorEvent::AllNodesReady {
770 dataflow_id,
771 exited_before_subscribe,
772 } => {
773 let mut logger = self.logger.for_dataflow(dataflow_id);
774 logger.log(LogLevel::Debug, None,
775 Some("daemon".into()),
776 format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
777 )).await;
778 match self.running.get_mut(&dataflow_id) {
779 Some(dataflow) => {
780 let ready = exited_before_subscribe.is_empty();
781 dataflow
782 .pending_nodes
783 .handle_external_all_nodes_ready(
784 exited_before_subscribe,
785 &mut dataflow.cascading_error_causes,
786 )
787 .await?;
788 if ready {
789 logger.log(LogLevel::Info, None,
790 Some("daemon".into()),
791 "coordinator reported that all nodes are ready, starting dataflow",
792 ).await;
793 dataflow.start(&self.events_tx, &self.clock).await?;
794 }
795 }
796 None => {
797 tracing::warn!(
798 "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
799 );
800 }
801 }
802 let _ = reply_tx.send(None).map_err(|_| {
803 error!("could not send `AllNodesReady` reply from daemon to coordinator")
804 });
805 RunStatus::Continue
806 }
807 DaemonCoordinatorEvent::Logs {
808 dataflow_id,
809 node_id,
810 } => {
811 match self.working_dir.get(&dataflow_id) {
812 Some(working_dir) => {
813 let working_dir = working_dir.clone();
814 tokio::spawn(async move {
815 let logs = async {
816 let mut file =
817 File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
818 .await
819 .wrap_err(format!(
820 "Could not open log file: {:#?}",
821 log::log_path(&working_dir, &dataflow_id, &node_id)
822 ))?;
823
824 let mut contents = vec![];
825 file.read_to_end(&mut contents)
826 .await
827 .wrap_err("Could not read content of log file")?;
828 Result::<Vec<u8>, eyre::Report>::Ok(contents)
829 }
830 .await
831 .map_err(|err| format!("{err:?}"));
832 let _ = reply_tx
833 .send(Some(DaemonCoordinatorReply::Logs(logs)))
834 .map_err(|_| {
835 error!("could not send logs reply from daemon to coordinator")
836 });
837 });
838 }
839 None => {
840 tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
841 let _ = reply_tx.send(None).map_err(|_| {
842 error!(
843 "could not send `AllNodesReady` reply from daemon to coordinator"
844 )
845 });
846 }
847 }
848 RunStatus::Continue
849 }
850 DaemonCoordinatorEvent::ReloadDataflow {
851 dataflow_id,
852 node_id,
853 operator_id,
854 } => {
855 let result = self.send_reload(dataflow_id, node_id, operator_id).await;
856 let reply =
857 DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
858 let _ = reply_tx
859 .send(Some(reply))
860 .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
861 RunStatus::Continue
862 }
863 DaemonCoordinatorEvent::StopDataflow {
864 dataflow_id,
865 grace_duration,
866 } => {
867 let mut logger = self.logger.for_dataflow(dataflow_id);
868 let dataflow = self
869 .running
870 .get_mut(&dataflow_id)
871 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
872 let (reply, future) = match dataflow {
873 Ok(dataflow) => {
874 let future = dataflow.stop_all(
875 &mut self.coordinator_connection,
876 &self.clock,
877 grace_duration,
878 &mut logger,
879 );
880 (Ok(()), Some(future))
881 }
882 Err(err) => (Err(err.to_string()), None),
883 };
884
885 let _ = reply_tx
886 .send(Some(DaemonCoordinatorReply::StopResult(reply)))
887 .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
888
889 if let Some(future) = future {
890 future.await?;
891 }
892
893 RunStatus::Continue
894 }
895 DaemonCoordinatorEvent::Destroy => {
896 tracing::info!("received destroy command -> exiting");
897 let (notify_tx, notify_rx) = oneshot::channel();
898 let reply = DaemonCoordinatorReply::DestroyResult {
899 result: Ok(()),
900 notify: Some(notify_tx),
901 };
902 let _ = reply_tx
903 .send(Some(reply))
904 .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
905 if notify_rx.await.is_err() {
907 tracing::warn!("no confirmation received for DestroyReply");
908 }
909 RunStatus::Exit
910 }
911 DaemonCoordinatorEvent::Heartbeat => {
912 self.last_coordinator_heartbeat = Instant::now();
913 let _ = reply_tx.send(None);
914 RunStatus::Continue
915 }
916 };
917 Ok(status)
918 }
919
920 async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
921 match event {
922 InterDaemonEvent::Output {
923 dataflow_id,
924 node_id,
925 output_id,
926 metadata,
927 data,
928 } => {
929 let inner = async {
930 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
931 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
932 })?;
933 send_output_to_local_receivers(
934 node_id.clone(),
935 output_id.clone(),
936 dataflow,
937 &metadata,
938 data.map(DataMessage::Vec),
939 &self.clock,
940 )
941 .await?;
942 Result::<_, eyre::Report>::Ok(())
943 };
944 if let Err(err) = inner
945 .await
946 .wrap_err("failed to forward remote output to local receivers")
947 {
948 let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
949 logger
950 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
951 .await;
952 }
953 Ok(())
954 }
955 InterDaemonEvent::OutputClosed {
956 dataflow_id,
957 node_id,
958 output_id,
959 } => {
960 let output_id = OutputId(node_id.clone(), output_id);
961 let mut logger = self
962 .logger
963 .for_dataflow(dataflow_id)
964 .for_node(node_id.clone());
965 logger
966 .log(
967 LogLevel::Debug,
968 Some("daemon".into()),
969 format!("received OutputClosed event for output {output_id:?}"),
970 )
971 .await;
972
973 let inner = async {
974 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
975 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
976 })?;
977
978 if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
979 for (receiver_id, input_id) in &inputs {
980 close_input(dataflow, receiver_id, input_id, &self.clock);
981 }
982 }
983 Result::<(), eyre::Report>::Ok(())
984 };
985 if let Err(err) = inner
986 .await
987 .wrap_err("failed to handle InputsClosed event sent by coordinator")
988 {
989 logger
990 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
991 .await;
992 }
993 Ok(())
994 }
995 }
996 }
997
998 #[allow(clippy::too_many_arguments)]
999 async fn build_dataflow(
1000 &mut self,
1001 build_id: BuildId,
1002 session_id: SessionId,
1003 base_working_dir: PathBuf,
1004 git_sources: BTreeMap<NodeId, GitSource>,
1005 prev_git_sources: BTreeMap<NodeId, GitSource>,
1006 dataflow_descriptor: Descriptor,
1007 local_nodes: BTreeSet<NodeId>,
1008 uv: bool,
1009 ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>> + use<>> {
1010 let builder = build::Builder {
1011 session_id,
1012 base_working_dir,
1013 uv,
1014 };
1015 self.git_manager.clear_planned_builds(session_id);
1016
1017 let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1018
1019 let mut tasks = Vec::new();
1020
1021 for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1023 let dynamic_node = node.kind.dynamic();
1024
1025 let node_id = node.id.clone();
1026 let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1027 logger.log(LogLevel::Debug, "building").await;
1028 let git_source = git_sources.get(&node_id).cloned();
1029 let prev_git_source = prev_git_sources.get(&node_id).cloned();
1030 let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1031 still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1032 git_source: prev_source,
1033 });
1034
1035 let logger_cloned = logger
1036 .try_clone_impl()
1037 .await
1038 .wrap_err("failed to clone logger")?;
1039
1040 let mut builder = builder.clone();
1041 if let Some(node_working_dir) =
1042 node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1043 {
1044 builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1045 }
1046
1047 match builder
1048 .build_node(
1049 node,
1050 git_source,
1051 prev_git,
1052 logger_cloned,
1053 &mut self.git_manager,
1054 )
1055 .await
1056 .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1057 {
1058 Ok(result) => {
1059 tasks.push(NodeBuildTask {
1060 node_id,
1061 task: result,
1062 dynamic_node,
1063 });
1064 }
1065 Err(err) => {
1066 logger.log(LogLevel::Error, format!("{err:?}")).await;
1067 return Err(err);
1068 }
1069 }
1070 }
1071
1072 let task = async move {
1073 let mut info = BuildInfo {
1074 node_working_dirs: Default::default(),
1075 };
1076 for task in tasks {
1077 let NodeBuildTask {
1078 node_id,
1079 dynamic_node: _,
1080 task,
1081 } = task;
1082 let node = task
1083 .await
1084 .with_context(|| format!("failed to build node `{node_id}`"))?;
1085 info.node_working_dirs
1086 .insert(node_id, node.node_working_dir);
1087 }
1088 Ok(info)
1089 };
1090
1091 Ok(task)
1092 }
1093
1094 #[allow(clippy::too_many_arguments)]
1095 async fn spawn_dataflow(
1096 &mut self,
1097 build_id: Option<BuildId>,
1098 dataflow_id: DataflowId,
1099 base_working_dir: PathBuf,
1100 nodes: BTreeMap<NodeId, ResolvedNode>,
1101 dataflow_descriptor: Descriptor,
1102 spawn_nodes: BTreeSet<NodeId>,
1103 uv: bool,
1104 ) -> eyre::Result<impl Future<Output = eyre::Result<()>> + use<>> {
1105 let mut logger = self
1106 .logger
1107 .for_dataflow(dataflow_id)
1108 .try_clone()
1109 .await
1110 .context("failed to clone logger")?;
1111 let dataflow =
1112 RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
1113 let dataflow = match self.running.entry(dataflow_id) {
1114 std::collections::hash_map::Entry::Vacant(entry) => {
1115 self.working_dir
1116 .insert(dataflow_id, base_working_dir.clone());
1117 entry.insert(dataflow)
1118 }
1119 std::collections::hash_map::Entry::Occupied(_) => {
1120 bail!("there is already a running dataflow with ID `{dataflow_id}`")
1121 }
1122 };
1123
1124 let mut stopped = Vec::new();
1125
1126 let build_info = build_id.and_then(|build_id| self.builds.get(&build_id));
1127 let node_with_git_source = nodes.values().find(|n| n.has_git_source());
1128 if let Some(git_node) = node_with_git_source {
1129 if build_info.is_none() {
1130 eyre::bail!(
1131 "node {} has git source, but no `dora build` was run yet\n\n\
1132 nodes with a `git` field must be built using `dora build` before starting the \
1133 dataflow",
1134 git_node.id
1135 )
1136 }
1137 }
1138 let node_working_dirs = build_info
1139 .map(|info| info.node_working_dirs.clone())
1140 .unwrap_or_default();
1141
1142 for node in nodes.values() {
1144 let local = spawn_nodes.contains(&node.id);
1145
1146 let inputs = node_inputs(node);
1147 for (input_id, input) in inputs {
1148 if local {
1149 dataflow
1150 .open_inputs
1151 .entry(node.id.clone())
1152 .or_default()
1153 .insert(input_id.clone());
1154 match input.mapping {
1155 InputMapping::User(mapping) => {
1156 dataflow
1157 .mappings
1158 .entry(OutputId(mapping.source, mapping.output))
1159 .or_default()
1160 .insert((node.id.clone(), input_id));
1161 }
1162 InputMapping::Timer { interval } => {
1163 dataflow
1164 .timers
1165 .entry(interval)
1166 .or_default()
1167 .insert((node.id.clone(), input_id));
1168 }
1169 }
1170 } else if let InputMapping::User(mapping) = input.mapping {
1171 dataflow
1172 .open_external_mappings
1173 .insert(OutputId(mapping.source, mapping.output));
1174 }
1175 }
1176 }
1177
1178 let spawner = Spawner {
1179 dataflow_id,
1180 daemon_tx: self.events_tx.clone(),
1181 dataflow_descriptor,
1182 clock: self.clock.clone(),
1183 uv,
1184 };
1185
1186 let mut tasks = Vec::new();
1187
1188 for node in nodes.into_values() {
1190 let mut logger = logger.reborrow().for_node(node.id.clone());
1191 let local = spawn_nodes.contains(&node.id);
1192 if local {
1193 let dynamic_node = node.kind.dynamic();
1194 if dynamic_node {
1195 dataflow.dynamic_nodes.insert(node.id.clone());
1196 } else {
1197 dataflow.pending_nodes.insert(node.id.clone());
1198 }
1199
1200 let node_id = node.id.clone();
1201 let node_stderr_most_recent = dataflow
1202 .node_stderr_most_recent
1203 .entry(node.id.clone())
1204 .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
1205 .clone();
1206
1207 let configured_node_working_dir = node_working_dirs.get(&node_id).cloned();
1208 if configured_node_working_dir.is_none() && node.has_git_source() {
1209 eyre::bail!(
1210 "node {} has git source, but no git clone directory was found for it\n\n\
1211 try running `dora build` again",
1212 node.id
1213 )
1214 }
1215 let node_working_dir = configured_node_working_dir
1216 .or_else(|| {
1217 node.deploy
1218 .as_ref()
1219 .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1220 })
1221 .unwrap_or(base_working_dir.clone())
1222 .clone();
1223 match spawner
1224 .clone()
1225 .spawn_node(node, node_working_dir, node_stderr_most_recent, &mut logger)
1226 .await
1227 .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1228 {
1229 Ok(result) => {
1230 tasks.push(NodeBuildTask {
1231 node_id,
1232 task: result,
1233 dynamic_node,
1234 });
1235 }
1236 Err(err) => {
1237 logger
1238 .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1239 .await;
1240 self.dataflow_node_results
1241 .entry(dataflow_id)
1242 .or_default()
1243 .insert(
1244 node_id.clone(),
1245 Err(NodeError {
1246 timestamp: self.clock.new_timestamp(),
1247 cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1248 exit_status: NodeExitStatus::Unknown,
1249 }),
1250 );
1251 stopped.push((node_id.clone(), dynamic_node));
1252 }
1253 }
1254 } else {
1255 dataflow.pending_nodes.set_external_nodes(true);
1257
1258 for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1260 let tx = self
1261 .remote_daemon_events_tx
1262 .clone()
1263 .wrap_err("no remote_daemon_events_tx channel")?;
1264 let mut finished_rx = dataflow.finished_tx.subscribe();
1265 let subscribe_topic = dataflow.output_publish_topic(output_id);
1266 tracing::debug!("declaring subscriber on {subscribe_topic}");
1267 let subscriber = self
1268 .zenoh_session
1269 .declare_subscriber(subscribe_topic)
1270 .await
1271 .map_err(|e| eyre!(e))
1272 .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1273 tokio::spawn(async move {
1274 let mut finished = pin!(finished_rx.recv());
1275 loop {
1276 let finished_or_next =
1277 futures::future::select(finished, subscriber.recv_async());
1278 match finished_or_next.await {
1279 future::Either::Left((finished, _)) => match finished {
1280 Err(broadcast::error::RecvError::Closed) => {
1281 tracing::debug!(
1282 "dataflow finished, breaking from zenoh subscribe task"
1283 );
1284 break;
1285 }
1286 other => {
1287 tracing::warn!(
1288 "unexpected return value of dataflow finished_rx channel: {other:?}"
1289 );
1290 break;
1291 }
1292 },
1293 future::Either::Right((sample, f)) => {
1294 finished = f;
1295 let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1296 Timestamped::deserialize_inter_daemon_event(
1297 &s.payload().to_bytes(),
1298 )
1299 });
1300 if tx.send_async(event).await.is_err() {
1301 break;
1303 }
1304 }
1305 }
1306 }
1307 });
1308 }
1309 }
1310 }
1311 for (node_id, dynamic) in stopped {
1312 self.handle_node_stop(dataflow_id, &node_id, dynamic)
1313 .await?;
1314 }
1315
1316 let spawn_result = Self::spawn_prepared_nodes(
1317 dataflow_id,
1318 logger,
1319 tasks,
1320 self.events_tx.clone(),
1321 self.clock.clone(),
1322 );
1323
1324 Ok(spawn_result)
1325 }
1326
1327 async fn spawn_prepared_nodes(
1328 dataflow_id: Uuid,
1329 mut logger: DataflowLogger<'_>,
1330 tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1331 events_tx: mpsc::Sender<Timestamped<Event>>,
1332 clock: Arc<HLC>,
1333 ) -> eyre::Result<()> {
1334 let node_result = |node_id, dynamic_node, result| Timestamped {
1335 inner: Event::SpawnNodeResult {
1336 dataflow_id,
1337 node_id,
1338 dynamic_node,
1339 result,
1340 },
1341 timestamp: clock.new_timestamp(),
1342 };
1343 let mut failed_to_prepare = None;
1344 let mut prepared_nodes = Vec::new();
1345 for task in tasks {
1346 let NodeBuildTask {
1347 node_id,
1348 dynamic_node,
1349 task,
1350 } = task;
1351 match task.await {
1352 Ok(node) => prepared_nodes.push(node),
1353 Err(err) => {
1354 if failed_to_prepare.is_none() {
1355 failed_to_prepare = Some(node_id.clone());
1356 }
1357 let node_err: NodeError = NodeError {
1358 timestamp: clock.new_timestamp(),
1359 cause: NodeErrorCause::FailedToSpawn(format!(
1360 "preparing for spawn failed: {err:?}"
1361 )),
1362 exit_status: NodeExitStatus::Unknown,
1363 };
1364 let send_result = events_tx
1365 .send(node_result(node_id, dynamic_node, Err(node_err)))
1366 .await;
1367 if send_result.is_err() {
1368 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1369 }
1370 }
1371 }
1372 }
1373
1374 if let Some(failed_node) = failed_to_prepare {
1376 for node in prepared_nodes {
1378 let err = NodeError {
1379 timestamp: clock.new_timestamp(),
1380 cause: NodeErrorCause::Cascading {
1381 caused_by_node: failed_node.clone(),
1382 },
1383 exit_status: NodeExitStatus::Unknown,
1384 };
1385 let send_result = events_tx
1386 .send(node_result(
1387 node.node_id().clone(),
1388 node.dynamic(),
1389 Err(err),
1390 ))
1391 .await;
1392 if send_result.is_err() {
1393 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1394 }
1395 }
1396 Err(eyre!("failed to prepare node {failed_node}"))
1397 } else {
1398 let mut spawn_result = Ok(());
1399
1400 logger
1401 .log(
1402 LogLevel::Info,
1403 None,
1404 Some("dora daemon".into()),
1405 "finished building nodes, spawning...",
1406 )
1407 .await;
1408
1409 for node in prepared_nodes {
1411 let node_id = node.node_id().clone();
1412 let dynamic_node = node.dynamic();
1413 let mut logger = logger.reborrow().for_node(node_id.clone());
1414 let result = node.spawn(&mut logger).await;
1415 let node_spawn_result = match result {
1416 Ok(node) => Ok(node),
1417 Err(err) => {
1418 let node_err = NodeError {
1419 timestamp: clock.new_timestamp(),
1420 cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1421 exit_status: NodeExitStatus::Unknown,
1422 };
1423 if spawn_result.is_ok() {
1424 spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1425 }
1426 Err(node_err)
1427 }
1428 };
1429 let send_result = events_tx
1430 .send(node_result(node_id, dynamic_node, node_spawn_result))
1431 .await;
1432 if send_result.is_err() {
1433 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1434 }
1435 }
1436 spawn_result
1437 }
1438 }
1439
1440 async fn handle_dynamic_node_event(
1441 &mut self,
1442 event: DynamicNodeEventWrapper,
1443 ) -> eyre::Result<()> {
1444 match event {
1445 DynamicNodeEventWrapper {
1446 event: DynamicNodeEvent::NodeConfig { node_id },
1447 reply_tx,
1448 } => {
1449 let number_node_id = self
1450 .running
1451 .iter()
1452 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1453 .count();
1454
1455 let node_config = match number_node_id {
1456 2.. => Err(format!(
1457 "multiple dataflows contain dynamic node id {node_id}. \
1458 Please only have one running dataflow with the specified \
1459 node id if you want to use dynamic node",
1460 )),
1461 1 => self
1462 .running
1463 .iter()
1464 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1465 .map(|(id, dataflow)| -> Result<NodeConfig> {
1466 let node_config = dataflow
1467 .running_nodes
1468 .get(&node_id)
1469 .with_context(|| {
1470 format!("no node with ID `{node_id}` within the given dataflow")
1471 })?
1472 .node_config
1473 .clone();
1474 if !node_config.dynamic {
1475 bail!("node with ID `{node_id}` in {id} is not dynamic");
1476 }
1477 Ok(node_config)
1478 })
1479 .next()
1480 .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1481 .and_then(|r| r)
1482 .map_err(|err| {
1483 format!(
1484 "failed to get dynamic node config within given dataflow: {err}"
1485 )
1486 }),
1487 0 => Err(format!("no node with ID `{node_id}`")),
1488 };
1489
1490 let reply = DaemonReply::NodeConfig {
1491 result: node_config,
1492 };
1493 let _ = reply_tx.send(Some(reply)).map_err(|_| {
1494 error!("could not send node info reply from daemon to coordinator")
1495 });
1496 Ok(())
1497 }
1498 }
1499 }
1500
1501 async fn handle_node_event(
1502 &mut self,
1503 event: DaemonNodeEvent,
1504 dataflow_id: DataflowId,
1505 node_id: NodeId,
1506 ) -> eyre::Result<()> {
1507 match event {
1508 DaemonNodeEvent::Subscribe {
1509 event_sender,
1510 reply_sender,
1511 } => {
1512 let mut logger = self.logger.for_dataflow(dataflow_id);
1513 logger
1514 .log(
1515 LogLevel::Info,
1516 Some(node_id.clone()),
1517 Some("daemon".into()),
1518 "node is ready",
1519 )
1520 .await;
1521
1522 let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1523 format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1524 });
1525
1526 match dataflow {
1527 Err(err) => {
1528 let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1529 }
1530 Ok(dataflow) => {
1531 Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1532
1533 let status = dataflow
1534 .pending_nodes
1535 .handle_node_subscription(
1536 node_id.clone(),
1537 reply_sender,
1538 &mut self.coordinator_connection,
1539 &self.clock,
1540 &mut dataflow.cascading_error_causes,
1541 &mut logger,
1542 )
1543 .await?;
1544 match status {
1545 DataflowStatus::AllNodesReady => {
1546 logger
1547 .log(
1548 LogLevel::Info,
1549 None,
1550 Some("daemon".into()),
1551 "all nodes are ready, starting dataflow",
1552 )
1553 .await;
1554 dataflow.start(&self.events_tx, &self.clock).await?;
1555 }
1556 DataflowStatus::Pending => {}
1557 }
1558 }
1559 }
1560 }
1561 DaemonNodeEvent::SubscribeDrop {
1562 event_sender,
1563 reply_sender,
1564 } => {
1565 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1566 format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1567 });
1568 let result = match dataflow {
1569 Ok(dataflow) => {
1570 dataflow.drop_channels.insert(node_id, event_sender);
1571 Ok(())
1572 }
1573 Err(err) => Err(err.to_string()),
1574 };
1575 let _ = reply_sender.send(DaemonReply::Result(result));
1576 }
1577 DaemonNodeEvent::CloseOutputs {
1578 outputs,
1579 reply_sender,
1580 } => {
1581 let inner = async {
1583 self.send_output_closed_events(dataflow_id, node_id, outputs)
1584 .await
1585 };
1586
1587 let reply = inner.await.map_err(|err| format!("{err:?}"));
1588 let _ = reply_sender.send(DaemonReply::Result(reply));
1589 }
1590 DaemonNodeEvent::OutputsDone { reply_sender } => {
1591 let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1592
1593 let _ = reply_sender.send(DaemonReply::Result(
1594 result.map_err(|err| format!("{err:?}")),
1595 ));
1596 }
1597 DaemonNodeEvent::SendOut {
1598 output_id,
1599 metadata,
1600 data,
1601 } => self
1602 .send_out(dataflow_id, node_id, output_id, metadata, data)
1603 .await
1604 .context("failed to send out")?,
1605 DaemonNodeEvent::ReportDrop { tokens } => {
1606 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1607 format!(
1608 "failed to get handle drop tokens: \
1609 no running dataflow with ID `{dataflow_id}`"
1610 )
1611 });
1612
1613 match dataflow {
1614 Ok(dataflow) => {
1615 for token in tokens {
1616 match dataflow.pending_drop_tokens.get_mut(&token) {
1617 Some(info) => {
1618 if info.pending_nodes.remove(&node_id) {
1619 dataflow.check_drop_token(token, &self.clock).await?;
1620 } else {
1621 tracing::warn!(
1622 "node `{node_id}` is not pending for drop token `{token:?}`"
1623 );
1624 }
1625 }
1626 None => tracing::warn!("unknown drop token `{token:?}`"),
1627 }
1628 }
1629 }
1630 Err(err) => tracing::warn!("{err:?}"),
1631 }
1632 }
1633 DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1634 let inner = async {
1635 let dataflow = self
1636 .running
1637 .get_mut(&dataflow_id)
1638 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1639 dataflow.subscribe_channels.remove(&node_id);
1640 Result::<_, eyre::Error>::Ok(())
1641 };
1642
1643 let reply = inner.await.map_err(|err| format!("{err:?}"));
1644 let _ = reply_sender.send(DaemonReply::Result(reply));
1645 }
1646 }
1647 Ok(())
1648 }
1649
1650 async fn send_reload(
1651 &mut self,
1652 dataflow_id: Uuid,
1653 node_id: NodeId,
1654 operator_id: Option<OperatorId>,
1655 ) -> Result<(), eyre::ErrReport> {
1656 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1657 format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1658 })?;
1659 if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1660 match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1661 Ok(()) => {}
1662 Err(_) => {
1663 dataflow.subscribe_channels.remove(&node_id);
1664 }
1665 }
1666 }
1667 Ok(())
1668 }
1669
1670 async fn send_out(
1671 &mut self,
1672 dataflow_id: Uuid,
1673 node_id: NodeId,
1674 output_id: DataId,
1675 metadata: dora_message::metadata::Metadata,
1676 data: Option<DataMessage>,
1677 ) -> Result<(), eyre::ErrReport> {
1678 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1679 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1680 })?;
1681 let data_bytes = send_output_to_local_receivers(
1682 node_id.clone(),
1683 output_id.clone(),
1684 dataflow,
1685 &metadata,
1686 data,
1687 &self.clock,
1688 )
1689 .await?;
1690
1691 let output_id = OutputId(node_id, output_id);
1692 let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1693 || dataflow.publish_all_messages_to_zenoh;
1694 if remote_receivers {
1695 let event = InterDaemonEvent::Output {
1696 dataflow_id,
1697 node_id: output_id.0.clone(),
1698 output_id: output_id.1.clone(),
1699 metadata,
1700 data: data_bytes,
1701 };
1702 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1703 .await?;
1704 }
1705
1706 Ok(())
1707 }
1708
1709 async fn send_to_remote_receivers(
1710 &mut self,
1711 dataflow_id: Uuid,
1712 output_id: &OutputId,
1713 event: InterDaemonEvent,
1714 ) -> Result<(), eyre::Error> {
1715 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1716 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1717 })?;
1718
1719 let publisher = match dataflow.publishers.get(output_id) {
1721 Some(publisher) => publisher,
1722 None => {
1723 let publish_topic = dataflow.output_publish_topic(output_id);
1724 tracing::debug!("declaring publisher on {publish_topic}");
1725 let publisher = self
1726 .zenoh_session
1727 .declare_publisher(publish_topic)
1728 .await
1729 .map_err(|e| eyre!(e))
1730 .context("failed to create zenoh publisher")?;
1731 dataflow.publishers.insert(output_id.clone(), publisher);
1732 dataflow.publishers.get(output_id).unwrap()
1733 }
1734 };
1735
1736 let serialized_event = Timestamped {
1737 inner: event,
1738 timestamp: self.clock.new_timestamp(),
1739 }
1740 .serialize();
1741 publisher
1742 .put(serialized_event)
1743 .await
1744 .map_err(|e| eyre!(e))
1745 .context("zenoh put failed")?;
1746 Ok(())
1747 }
1748
1749 async fn send_output_closed_events(
1750 &mut self,
1751 dataflow_id: DataflowId,
1752 node_id: NodeId,
1753 outputs: Vec<DataId>,
1754 ) -> eyre::Result<()> {
1755 let dataflow = self
1756 .running
1757 .get_mut(&dataflow_id)
1758 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1759 let local_node_inputs: BTreeSet<_> = dataflow
1760 .mappings
1761 .iter()
1762 .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1763 .flat_map(|(_, v)| v)
1764 .cloned()
1765 .collect();
1766 for (receiver_id, input_id) in &local_node_inputs {
1767 close_input(dataflow, receiver_id, input_id, &self.clock);
1768 }
1769
1770 let mut closed = Vec::new();
1771 for output_id in &dataflow.open_external_mappings {
1772 if output_id.0 == node_id && outputs.contains(&output_id.1) {
1773 closed.push(output_id.clone());
1774 }
1775 }
1776
1777 for output_id in closed {
1778 let event = InterDaemonEvent::OutputClosed {
1779 dataflow_id,
1780 node_id: output_id.0.clone(),
1781 output_id: output_id.1.clone(),
1782 };
1783 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1784 .await?;
1785 }
1786
1787 Ok(())
1788 }
1789
1790 async fn subscribe(
1791 dataflow: &mut RunningDataflow,
1792 node_id: NodeId,
1793 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1794 clock: &HLC,
1795 ) {
1796 let closed_inputs = dataflow
1798 .mappings
1799 .values()
1800 .flatten()
1801 .filter(|(node, _)| node == &node_id)
1802 .map(|(_, input)| input)
1803 .filter(|input| {
1804 dataflow
1805 .open_inputs
1806 .get(&node_id)
1807 .map(|open_inputs| !open_inputs.contains(*input))
1808 .unwrap_or(true)
1809 });
1810 for input_id in closed_inputs {
1811 let _ = send_with_timestamp(
1812 &event_sender,
1813 NodeEvent::InputClosed {
1814 id: input_id.clone(),
1815 },
1816 clock,
1817 );
1818 }
1819 if dataflow.open_inputs(&node_id).is_empty() {
1820 let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1821 }
1822
1823 if dataflow.stop_sent {
1826 let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1827 }
1828
1829 dataflow.subscribe_channels.insert(node_id, event_sender);
1830 }
1831
1832 #[tracing::instrument(skip(self), level = "trace")]
1833 async fn handle_outputs_done(
1834 &mut self,
1835 dataflow_id: DataflowId,
1836 node_id: &NodeId,
1837 ) -> eyre::Result<()> {
1838 let dataflow = self
1839 .running
1840 .get_mut(&dataflow_id)
1841 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1842
1843 let outputs = dataflow
1844 .mappings
1845 .keys()
1846 .filter(|m| &m.0 == node_id)
1847 .map(|m| &m.1)
1848 .cloned()
1849 .collect();
1850 self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1851 .await?;
1852
1853 let dataflow = self
1854 .running
1855 .get_mut(&dataflow_id)
1856 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1857 dataflow.drop_channels.remove(node_id);
1858 Ok(())
1859 }
1860
1861 async fn handle_node_stop(
1862 &mut self,
1863 dataflow_id: Uuid,
1864 node_id: &NodeId,
1865 dynamic_node: bool,
1866 ) -> eyre::Result<()> {
1867 let result = self
1868 .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
1869 .await;
1870 let _ = self
1871 .events_tx
1872 .send(Timestamped {
1873 inner: Event::NodeStopped {
1874 dataflow_id,
1875 node_id: node_id.clone(),
1876 },
1877 timestamp: self.clock.new_timestamp(),
1878 })
1879 .await;
1880 result
1881 }
1882
1883 async fn handle_node_stop_inner(
1884 &mut self,
1885 dataflow_id: Uuid,
1886 node_id: &NodeId,
1887 dynamic_node: bool,
1888 ) -> eyre::Result<()> {
1889 let mut logger = self.logger.for_dataflow(dataflow_id);
1890 let dataflow = match self.running.get_mut(&dataflow_id) {
1891 Some(dataflow) => dataflow,
1892 None if dynamic_node => {
1893 tracing::debug!(
1896 "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
1897 );
1898 return Ok(());
1899 }
1900 None => eyre::bail!(
1901 "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
1902 ),
1903 };
1904
1905 dataflow
1906 .pending_nodes
1907 .handle_node_stop(
1908 node_id,
1909 &mut self.coordinator_connection,
1910 &self.clock,
1911 &mut dataflow.cascading_error_causes,
1912 &mut logger,
1913 )
1914 .await?;
1915
1916 self.handle_outputs_done(dataflow_id, node_id).await?;
1917
1918 let mut logger = self.logger.for_dataflow(dataflow_id);
1919 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1920 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1921 })?;
1922 if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1923 pid.mark_as_stopped()
1924 }
1925 if !dataflow.pending_nodes.local_nodes_pending()
1926 && dataflow
1927 .running_nodes
1928 .iter()
1929 .all(|(_id, n)| n.node_config.dynamic)
1930 {
1931 let result = DataflowDaemonResult {
1932 timestamp: self.clock.new_timestamp(),
1933 node_results: self
1934 .dataflow_node_results
1935 .get(&dataflow.id)
1936 .context("failed to get dataflow node results")?
1937 .clone(),
1938 };
1939
1940 self.git_manager
1941 .clones_in_use
1942 .values_mut()
1943 .for_each(|dataflows| {
1944 dataflows.remove(&dataflow_id);
1945 });
1946
1947 logger
1948 .log(
1949 LogLevel::Info,
1950 None,
1951 Some("daemon".into()),
1952 format!("dataflow finished on machine `{}`", self.daemon_id),
1953 )
1954 .await;
1955 if let Some(connection) = &mut self.coordinator_connection {
1956 let msg = serde_json::to_vec(&Timestamped {
1957 inner: CoordinatorRequest::Event {
1958 daemon_id: self.daemon_id.clone(),
1959 event: DaemonEvent::AllNodesFinished {
1960 dataflow_id,
1961 result,
1962 },
1963 },
1964 timestamp: self.clock.new_timestamp(),
1965 })?;
1966 socket_stream_send(connection, &msg)
1967 .await
1968 .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1969 }
1970 self.running.remove(&dataflow_id);
1971 }
1972
1973 Ok(())
1974 }
1975
1976 async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
1977 match event {
1978 DoraEvent::Timer {
1979 dataflow_id,
1980 interval,
1981 metadata,
1982 } => {
1983 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1984 tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1985 return Ok(());
1986 };
1987
1988 let Some(subscribers) = dataflow.timers.get(&interval) else {
1989 return Ok(());
1990 };
1991
1992 let mut closed = Vec::new();
1993 for (receiver_id, input_id) in subscribers {
1994 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1995 continue;
1996 };
1997
1998 let send_result = send_with_timestamp(
1999 channel,
2000 NodeEvent::Input {
2001 id: input_id.clone(),
2002 metadata: metadata.clone(),
2003 data: None,
2004 },
2005 &self.clock,
2006 );
2007 match send_result {
2008 Ok(()) => {}
2009 Err(_) => {
2010 closed.push(receiver_id);
2011 }
2012 }
2013 }
2014 for id in closed {
2015 dataflow.subscribe_channels.remove(id);
2016 }
2017 }
2018 DoraEvent::Logs {
2019 dataflow_id,
2020 output_id,
2021 message,
2022 metadata,
2023 } => {
2024 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2025 tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2026 return Ok(());
2027 };
2028
2029 let Some(subscribers) = dataflow.mappings.get(&output_id) else {
2030 tracing::warn!(
2031 "No subscribers found for {:?} in {:?}",
2032 output_id,
2033 dataflow.mappings
2034 );
2035 return Ok(());
2036 };
2037
2038 let mut closed = Vec::new();
2039 for (receiver_id, input_id) in subscribers {
2040 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2041 tracing::warn!("No subscriber channel found for {:?}", output_id);
2042 continue;
2043 };
2044
2045 let send_result = send_with_timestamp(
2046 channel,
2047 NodeEvent::Input {
2048 id: input_id.clone(),
2049 metadata: metadata.clone(),
2050 data: Some(message.clone()),
2051 },
2052 &self.clock,
2053 );
2054 match send_result {
2055 Ok(()) => {}
2056 Err(_) => {
2057 closed.push(receiver_id);
2058 }
2059 }
2060 }
2061 for id in closed {
2062 dataflow.subscribe_channels.remove(id);
2063 }
2064 }
2065 DoraEvent::SpawnedNodeResult {
2066 dataflow_id,
2067 node_id,
2068 dynamic_node,
2069 exit_status,
2070 } => {
2071 let mut logger = self
2072 .logger
2073 .for_dataflow(dataflow_id)
2074 .for_node(node_id.clone());
2075 logger
2076 .log(
2077 LogLevel::Debug,
2078 Some("daemon".into()),
2079 format!("handling node stop with exit status {exit_status:?}"),
2080 )
2081 .await;
2082
2083 let node_result = match exit_status {
2084 NodeExitStatus::Success => Ok(()),
2085 exit_status => {
2086 let dataflow = self.running.get(&dataflow_id);
2087 let caused_by_node = dataflow
2088 .and_then(|dataflow| {
2089 dataflow.cascading_error_causes.error_caused_by(&node_id)
2090 })
2091 .cloned();
2092 let grace_duration_kill = dataflow
2093 .map(|d| d.grace_duration_kills.contains(&node_id))
2094 .unwrap_or_default();
2095
2096 let cause = match caused_by_node {
2097 Some(caused_by_node) => {
2098 logger
2099 .log(
2100 LogLevel::Info,
2101 Some("daemon".into()),
2102 format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2103 )
2104 .await;
2105
2106 NodeErrorCause::Cascading { caused_by_node }
2107 }
2108 None if grace_duration_kill => NodeErrorCause::GraceDuration,
2109 None => {
2110 let cause = dataflow
2111 .and_then(|d| d.node_stderr_most_recent.get(&node_id))
2112 .map(|queue| {
2113 let mut s = if queue.is_full() {
2114 "[...]".into()
2115 } else {
2116 String::new()
2117 };
2118 while let Some(line) = queue.pop() {
2119 s += &line;
2120 }
2121 s
2122 })
2123 .unwrap_or_default();
2124
2125 NodeErrorCause::Other { stderr: cause }
2126 }
2127 };
2128 Err(NodeError {
2129 timestamp: self.clock.new_timestamp(),
2130 cause,
2131 exit_status,
2132 })
2133 }
2134 };
2135
2136 logger
2137 .log(
2138 if node_result.is_ok() {
2139 LogLevel::Info
2140 } else {
2141 LogLevel::Error
2142 },
2143 Some("daemon".into()),
2144 match &node_result {
2145 Ok(()) => format!("{node_id} finished successfully"),
2146 Err(err) => format!("{err}"),
2147 },
2148 )
2149 .await;
2150
2151 self.dataflow_node_results
2152 .entry(dataflow_id)
2153 .or_default()
2154 .insert(node_id.clone(), node_result);
2155
2156 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2157 .await?;
2158 }
2159 }
2160 Ok(())
2161 }
2162
2163 fn base_working_dir(
2164 &self,
2165 local_working_dir: Option<PathBuf>,
2166 session_id: SessionId,
2167 ) -> eyre::Result<PathBuf> {
2168 match local_working_dir {
2169 Some(working_dir) => {
2170 if working_dir.exists() {
2172 Ok(working_dir)
2173 } else {
2174 bail!(
2175 "working directory does not exist: {}",
2176 working_dir.display(),
2177 )
2178 }
2179 }
2180 None => {
2181 let daemon_working_dir =
2183 current_dir().context("failed to get daemon working dir")?;
2184 Ok(daemon_working_dir
2185 .join("_work")
2186 .join(session_id.uuid().to_string()))
2187 }
2188 }
2189 }
2190}
2191
2192async fn set_up_event_stream(
2193 coordinator_addr: SocketAddr,
2194 machine_id: &Option<String>,
2195 clock: &Arc<HLC>,
2196 remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2197 local_listen_port: u16,
2199) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
2200 let clock_cloned = clock.clone();
2201 let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2202 Ok(e) => Timestamped {
2203 inner: Event::Daemon(e.inner),
2204 timestamp: e.timestamp,
2205 },
2206 Err(err) => Timestamped {
2207 inner: Event::DaemonError(err),
2208 timestamp: clock_cloned.new_timestamp(),
2209 },
2210 });
2211 let (daemon_id, coordinator_events) =
2212 coordinator::register(coordinator_addr, machine_id.clone(), clock)
2213 .await
2214 .wrap_err("failed to connect to dora-coordinator")?;
2215 let coordinator_events = coordinator_events.map(
2216 |Timestamped {
2217 inner: event,
2218 timestamp,
2219 }| Timestamped {
2220 inner: Event::Coordinator(event),
2221 timestamp,
2222 },
2223 );
2224 let (events_tx, events_rx) = flume::bounded(10);
2225 let _listen_port =
2226 local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2227 .await?;
2228 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2229 inner: Event::DynamicNode(e.inner),
2230 timestamp: e.timestamp,
2231 });
2232 let incoming = (
2233 coordinator_events,
2234 remote_daemon_events,
2235 dynamic_node_events,
2236 )
2237 .merge();
2238 Ok((daemon_id, incoming))
2239}
2240
2241async fn send_output_to_local_receivers(
2242 node_id: NodeId,
2243 output_id: DataId,
2244 dataflow: &mut RunningDataflow,
2245 metadata: &metadata::Metadata,
2246 data: Option<DataMessage>,
2247 clock: &HLC,
2248) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2249 let timestamp = metadata.timestamp();
2250 let empty_set = BTreeSet::new();
2251 let output_id = OutputId(node_id, output_id);
2252 let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2253 let OutputId(node_id, _) = output_id;
2254 let mut closed = Vec::new();
2255 for (receiver_id, input_id) in local_receivers {
2256 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2257 let item = NodeEvent::Input {
2258 id: input_id.clone(),
2259 metadata: metadata.clone(),
2260 data: data.clone(),
2261 };
2262 match channel.send(Timestamped {
2263 inner: item,
2264 timestamp,
2265 }) {
2266 Ok(()) => {
2267 if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2268 dataflow
2269 .pending_drop_tokens
2270 .entry(token)
2271 .or_insert_with(|| DropTokenInformation {
2272 owner: node_id.clone(),
2273 pending_nodes: Default::default(),
2274 })
2275 .pending_nodes
2276 .insert(receiver_id.clone());
2277 }
2278 }
2279 Err(_) => {
2280 closed.push(receiver_id);
2281 }
2282 }
2283 }
2284 }
2285 for id in closed {
2286 dataflow.subscribe_channels.remove(id);
2287 }
2288 let (data_bytes, drop_token) = match data {
2289 None => (None, None),
2290 Some(DataMessage::SharedMemory {
2291 shared_memory_id,
2292 len,
2293 drop_token,
2294 }) => {
2295 let memory = ShmemConf::new()
2296 .os_id(shared_memory_id)
2297 .open()
2298 .wrap_err("failed to map shared memory output")?;
2299 let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2300 (data, Some(drop_token))
2301 }
2302 Some(DataMessage::Vec(v)) => (Some(v), None),
2303 };
2304 if let Some(token) = drop_token {
2305 dataflow
2307 .pending_drop_tokens
2308 .entry(token)
2309 .or_insert_with(|| DropTokenInformation {
2310 owner: node_id.clone(),
2311 pending_nodes: Default::default(),
2312 });
2313 dataflow.check_drop_token(token, clock).await?;
2315 }
2316 Ok(data_bytes)
2317}
2318
2319fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2320 match &node.kind {
2321 CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2322 CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2323 }
2324}
2325
2326fn close_input(
2327 dataflow: &mut RunningDataflow,
2328 receiver_id: &NodeId,
2329 input_id: &DataId,
2330 clock: &HLC,
2331) {
2332 if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2333 if !open_inputs.remove(input_id) {
2334 return;
2335 }
2336 }
2337 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2338 let _ = send_with_timestamp(
2339 channel,
2340 NodeEvent::InputClosed {
2341 id: input_id.clone(),
2342 },
2343 clock,
2344 );
2345
2346 if dataflow.open_inputs(receiver_id).is_empty() {
2347 let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2348 }
2349 }
2350}
2351
2352#[derive(Debug)]
2353pub struct RunningNode {
2354 pid: Option<ProcessId>,
2355 node_config: NodeConfig,
2356}
2357
2358#[derive(Debug)]
2359struct ProcessId(Option<u32>);
2360
2361impl ProcessId {
2362 pub fn new(process_id: u32) -> Self {
2363 Self(Some(process_id))
2364 }
2365
2366 pub fn mark_as_stopped(&mut self) {
2367 self.0 = None;
2368 }
2369
2370 pub fn kill(&mut self) -> bool {
2371 if let Some(pid) = self.0 {
2372 let pid = Pid::from(pid as usize);
2373 let mut system = sysinfo::System::new();
2374 system.refresh_process(pid);
2375
2376 if let Some(process) = system.process(pid) {
2377 process.kill();
2378 self.mark_as_stopped();
2379 return true;
2380 }
2381 }
2382
2383 false
2384 }
2385}
2386
2387impl Drop for ProcessId {
2388 fn drop(&mut self) {
2389 if let Some(pid) = self.0 {
2391 if self.kill() {
2392 warn!("process {pid} was killed on drop because it was still running")
2393 }
2394 }
2395 }
2396}
2397
2398pub struct RunningDataflow {
2399 id: Uuid,
2400 pending_nodes: PendingNodes,
2402
2403 subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2404 drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2405 mappings: HashMap<OutputId, BTreeSet<InputId>>,
2406 timers: BTreeMap<Duration, BTreeSet<InputId>>,
2407 open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2408 running_nodes: BTreeMap<NodeId, RunningNode>,
2409
2410 dynamic_nodes: BTreeSet<NodeId>,
2415
2416 open_external_mappings: BTreeSet<OutputId>,
2417
2418 pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
2419
2420 _timer_handles: BTreeMap<Duration, futures::future::RemoteHandle<()>>,
2422 stop_sent: bool,
2423
2424 empty_set: BTreeSet<DataId>,
2428
2429 cascading_error_causes: CascadingErrorCauses,
2431 grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
2432
2433 node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
2434
2435 publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
2436
2437 finished_tx: broadcast::Sender<()>,
2438
2439 publish_all_messages_to_zenoh: bool,
2440}
2441
2442impl RunningDataflow {
2443 fn new(
2444 dataflow_id: Uuid,
2445 daemon_id: DaemonId,
2446 dataflow_descriptor: &Descriptor,
2447 ) -> RunningDataflow {
2448 let (finished_tx, _) = broadcast::channel(1);
2449 Self {
2450 id: dataflow_id,
2451 pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
2452 subscribe_channels: HashMap::new(),
2453 drop_channels: HashMap::new(),
2454 mappings: HashMap::new(),
2455 timers: BTreeMap::new(),
2456 open_inputs: BTreeMap::new(),
2457 running_nodes: BTreeMap::new(),
2458 dynamic_nodes: BTreeSet::new(),
2459 open_external_mappings: Default::default(),
2460 pending_drop_tokens: HashMap::new(),
2461 _timer_handles: BTreeMap::new(),
2462 stop_sent: false,
2463 empty_set: BTreeSet::new(),
2464 cascading_error_causes: Default::default(),
2465 grace_duration_kills: Default::default(),
2466 node_stderr_most_recent: BTreeMap::new(),
2467 publishers: Default::default(),
2468 finished_tx,
2469 publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
2470 }
2471 }
2472
2473 async fn start(
2474 &mut self,
2475 events_tx: &mpsc::Sender<Timestamped<Event>>,
2476 clock: &Arc<HLC>,
2477 ) -> eyre::Result<()> {
2478 for interval in self.timers.keys().copied() {
2479 if self._timer_handles.get(&interval).is_some() {
2480 continue;
2481 }
2482 let events_tx = events_tx.clone();
2483 let dataflow_id = self.id;
2484 let clock = clock.clone();
2485 let task = async move {
2486 let mut interval_stream = tokio::time::interval(interval);
2487 let hlc = HLC::default();
2488 loop {
2489 interval_stream.tick().await;
2490
2491 let span = tracing::span!(tracing::Level::TRACE, "tick");
2492 let _ = span.enter();
2493
2494 let mut parameters = BTreeMap::new();
2495 parameters.insert(
2496 "open_telemetry_context".to_string(),
2497 #[cfg(feature = "telemetry")]
2498 Parameter::String(serialize_context(&span.context())),
2499 #[cfg(not(feature = "telemetry"))]
2500 Parameter::String("".into()),
2501 );
2502
2503 let metadata = metadata::Metadata::from_parameters(
2504 hlc.new_timestamp(),
2505 empty_type_info(),
2506 parameters,
2507 );
2508
2509 let event = Timestamped {
2510 inner: DoraEvent::Timer {
2511 dataflow_id,
2512 interval,
2513 metadata,
2514 }
2515 .into(),
2516 timestamp: clock.new_timestamp(),
2517 };
2518 if events_tx.send(event).await.is_err() {
2519 break;
2520 }
2521 }
2522 };
2523 let (task, handle) = task.remote_handle();
2524 tokio::spawn(task);
2525 self._timer_handles.insert(interval, handle);
2526 }
2527
2528 Ok(())
2529 }
2530
2531 async fn stop_all(
2532 &mut self,
2533 coordinator_connection: &mut Option<TcpStream>,
2534 clock: &HLC,
2535 grace_duration: Option<Duration>,
2536 logger: &mut DataflowLogger<'_>,
2537 ) -> eyre::Result<()> {
2538 self.pending_nodes
2539 .handle_dataflow_stop(
2540 coordinator_connection,
2541 clock,
2542 &mut self.cascading_error_causes,
2543 &self.dynamic_nodes,
2544 logger,
2545 )
2546 .await?;
2547
2548 for (_node_id, channel) in self.subscribe_channels.drain() {
2549 let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
2550 }
2551
2552 let running_processes: Vec<_> = self
2553 .running_nodes
2554 .iter_mut()
2555 .map(|(id, n)| (id.clone(), n.pid.take()))
2556 .collect();
2557 let grace_duration_kills = self.grace_duration_kills.clone();
2558 tokio::spawn(async move {
2559 let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
2560 tokio::time::sleep(duration).await;
2561
2562 for (node, pid) in running_processes {
2563 if let Some(mut pid) = pid {
2564 if pid.kill() {
2565 grace_duration_kills.insert(node.clone());
2566 warn!(
2567 "{node} was killed due to not stopping within the {:#?} grace period",
2568 duration
2569 )
2570 }
2571 }
2572 }
2573 });
2574 self.stop_sent = true;
2575 Ok(())
2576 }
2577
2578 fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2579 self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2580 }
2581
2582 async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2583 match self.pending_drop_tokens.entry(token) {
2584 std::collections::hash_map::Entry::Occupied(entry) => {
2585 if entry.get().pending_nodes.is_empty() {
2586 let (drop_token, info) = entry.remove_entry();
2587 let result = match self.drop_channels.get_mut(&info.owner) {
2588 Some(channel) => send_with_timestamp(
2589 channel,
2590 NodeDropEvent::OutputDropped { drop_token },
2591 clock,
2592 )
2593 .wrap_err("send failed"),
2594 None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2595 };
2596 if let Err(err) = result.wrap_err_with(|| {
2597 format!(
2598 "failed to report drop token `{drop_token:?}` to owner `{}`",
2599 &info.owner
2600 )
2601 }) {
2602 tracing::warn!("{err:?}");
2603 }
2604 }
2605 }
2606 std::collections::hash_map::Entry::Vacant(_) => {
2607 tracing::warn!("check_drop_token called with already closed token")
2608 }
2609 }
2610
2611 Ok(())
2612 }
2613
2614 fn output_publish_topic(&self, output_id: &OutputId) -> String {
2615 let network_id = "default";
2616 let dataflow_id = self.id;
2617 let OutputId(node_id, output_id) = output_id;
2618 format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
2619 }
2620}
2621
2622fn empty_type_info() -> ArrowTypeInfo {
2623 ArrowTypeInfo {
2624 data_type: DataType::Null,
2625 len: 0,
2626 null_count: 0,
2627 validity: None,
2628 offset: 0,
2629 buffer_offsets: Vec::new(),
2630 child_data: Vec::new(),
2631 }
2632}
2633
2634#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2635pub struct OutputId(NodeId, DataId);
2636type InputId = (NodeId, DataId);
2637
2638struct DropTokenInformation {
2639 owner: NodeId,
2641 pending_nodes: BTreeSet<NodeId>,
2644}
2645
2646#[derive(Debug)]
2647pub enum Event {
2648 Node {
2649 dataflow_id: DataflowId,
2650 node_id: NodeId,
2651 event: DaemonNodeEvent,
2652 },
2653 Coordinator(CoordinatorEvent),
2654 Daemon(InterDaemonEvent),
2655 Dora(DoraEvent),
2656 DynamicNode(DynamicNodeEventWrapper),
2657 HeartbeatInterval,
2658 CtrlC,
2659 SecondCtrlC,
2660 DaemonError(eyre::Report),
2661 SpawnNodeResult {
2662 dataflow_id: DataflowId,
2663 node_id: NodeId,
2664 dynamic_node: bool,
2665 result: Result<RunningNode, NodeError>,
2666 },
2667 BuildDataflowResult {
2668 build_id: BuildId,
2669 session_id: SessionId,
2670 result: eyre::Result<BuildInfo>,
2671 },
2672 SpawnDataflowResult {
2673 dataflow_id: Uuid,
2674 result: eyre::Result<()>,
2675 },
2676 NodeStopped {
2677 dataflow_id: Uuid,
2678 node_id: NodeId,
2679 },
2680}
2681
2682impl From<DoraEvent> for Event {
2683 fn from(event: DoraEvent) -> Self {
2684 Event::Dora(event)
2685 }
2686}
2687
2688impl Event {
2689 pub fn kind(&self) -> &'static str {
2690 match self {
2691 Event::Node { .. } => "Node",
2692 Event::Coordinator(_) => "Coordinator",
2693 Event::Daemon(_) => "Daemon",
2694 Event::Dora(_) => "Dora",
2695 Event::DynamicNode(_) => "DynamicNode",
2696 Event::HeartbeatInterval => "HeartbeatInterval",
2697 Event::CtrlC => "CtrlC",
2698 Event::SecondCtrlC => "SecondCtrlC",
2699 Event::DaemonError(_) => "DaemonError",
2700 Event::SpawnNodeResult { .. } => "SpawnNodeResult",
2701 Event::BuildDataflowResult { .. } => "BuildDataflowResult",
2702 Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
2703 Event::NodeStopped { .. } => "NodeStopped",
2704 }
2705 }
2706}
2707
2708#[derive(Debug)]
2709#[allow(clippy::large_enum_variant)]
2710pub enum DaemonNodeEvent {
2711 OutputsDone {
2712 reply_sender: oneshot::Sender<DaemonReply>,
2713 },
2714 Subscribe {
2715 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2716 reply_sender: oneshot::Sender<DaemonReply>,
2717 },
2718 SubscribeDrop {
2719 event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2720 reply_sender: oneshot::Sender<DaemonReply>,
2721 },
2722 CloseOutputs {
2723 outputs: Vec<dora_core::config::DataId>,
2724 reply_sender: oneshot::Sender<DaemonReply>,
2725 },
2726 SendOut {
2727 output_id: DataId,
2728 metadata: metadata::Metadata,
2729 data: Option<DataMessage>,
2730 },
2731 ReportDrop {
2732 tokens: Vec<DropToken>,
2733 },
2734 EventStreamDropped {
2735 reply_sender: oneshot::Sender<DaemonReply>,
2736 },
2737}
2738
2739#[derive(Debug)]
2740pub enum DoraEvent {
2741 Timer {
2742 dataflow_id: DataflowId,
2743 interval: Duration,
2744 metadata: metadata::Metadata,
2745 },
2746 Logs {
2747 dataflow_id: DataflowId,
2748 output_id: OutputId,
2749 message: DataMessage,
2750 metadata: metadata::Metadata,
2751 },
2752 SpawnedNodeResult {
2753 dataflow_id: DataflowId,
2754 node_id: NodeId,
2755 dynamic_node: bool,
2756 exit_status: NodeExitStatus,
2757 },
2758}
2759
2760#[must_use]
2761enum RunStatus {
2762 Continue,
2763 Exit,
2764}
2765
2766fn send_with_timestamp<T>(
2767 sender: &UnboundedSender<Timestamped<T>>,
2768 event: T,
2769 clock: &HLC,
2770) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2771 sender.send(Timestamped {
2772 inner: event,
2773 timestamp: clock.new_timestamp(),
2774 })
2775}
2776
2777fn set_up_ctrlc_handler(
2778 clock: Arc<HLC>,
2779) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2780 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2781
2782 let mut ctrlc_sent = 0;
2783 ctrlc::set_handler(move || {
2784 let event = match ctrlc_sent {
2785 0 => Event::CtrlC,
2786 1 => Event::SecondCtrlC,
2787 _ => {
2788 tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2789 std::process::abort();
2790 }
2791 };
2792 if ctrlc_tx
2793 .blocking_send(Timestamped {
2794 inner: event,
2795 timestamp: clock.new_timestamp(),
2796 })
2797 .is_err()
2798 {
2799 tracing::error!("failed to report ctrl-c event to dora-coordinator");
2800 }
2801
2802 ctrlc_sent += 1;
2803 })
2804 .wrap_err("failed to set ctrl-c handler")?;
2805
2806 Ok(ctrlc_rx)
2807}
2808
2809#[derive(Debug, Default, Clone, PartialEq, Eq)]
2810pub struct CascadingErrorCauses {
2811 caused_by: BTreeMap<NodeId, NodeId>,
2812}
2813
2814impl CascadingErrorCauses {
2815 pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2816 self.caused_by.contains_key(node)
2817 }
2818
2819 pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2821 self.caused_by.get(node)
2822 }
2823
2824 pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2825 self.caused_by.entry(affected_node).or_insert(causing_node);
2826 }
2827}
2828
2829fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2830 n.operators
2831 .iter()
2832 .flat_map(|operator| {
2833 operator.config.inputs.iter().map(|(input_id, mapping)| {
2834 (
2835 DataId::from(format!("{}/{input_id}", operator.id)),
2836 mapping.clone(),
2837 )
2838 })
2839 })
2840 .collect()
2841}
2842
2843fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2844 n.operators
2845 .iter()
2846 .flat_map(|operator| {
2847 operator
2848 .config
2849 .outputs
2850 .iter()
2851 .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2852 })
2853 .collect()
2854}
2855
2856trait CoreNodeKindExt {
2857 fn run_config(&self) -> NodeRunConfig;
2858 fn dynamic(&self) -> bool;
2859}
2860
2861impl CoreNodeKindExt for CoreNodeKind {
2862 fn run_config(&self) -> NodeRunConfig {
2863 match self {
2864 CoreNodeKind::Runtime(n) => NodeRunConfig {
2865 inputs: runtime_node_inputs(n),
2866 outputs: runtime_node_outputs(n),
2867 },
2868 CoreNodeKind::Custom(n) => n.run_config.clone(),
2869 }
2870 }
2871
2872 fn dynamic(&self) -> bool {
2873 match self {
2874 CoreNodeKind::Runtime(_n) => false,
2875 CoreNodeKind::Custom(n) => {
2876 matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
2877 }
2878 }
2879 }
2880}