1use aligned_vec::{AVec, ConstAlign};
2use coordinator::CoordinatorEvent;
3use crossbeam::queue::ArrayQueue;
4use dora_core::{
5 build::{self, BuildInfo, GitManager, PrevGitSource},
6 config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
7 descriptor::{
8 read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
9 DYNAMIC_SOURCE,
10 },
11 topics::LOCALHOST,
12 uhlc::{self, HLC},
13};
14use dora_message::{
15 common::{
16 DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
17 NodeExitStatus,
18 },
19 coordinator_to_cli::DataflowResult,
20 coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
21 daemon_to_coordinator::{
22 CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
23 },
24 daemon_to_daemon::InterDaemonEvent,
25 daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
26 descriptor::NodeSource,
27 metadata::{self, ArrowTypeInfo},
28 node_to_daemon::{DynamicNodeEvent, Timestamped},
29 BuildId, DataflowId, SessionId,
30};
31use dora_node_api::{arrow::datatypes::DataType, Parameter};
32use eyre::{bail, eyre, Context, ContextCompat, Result};
33use futures::{future, stream, FutureExt, TryFutureExt};
34use futures_concurrency::stream::Merge;
35use local_listener::DynamicNodeEventWrapper;
36use log::{DaemonLogger, DataflowLogger, Logger};
37use pending::PendingNodes;
38use shared_memory_server::ShmemConf;
39use socket_stream_utils::socket_stream_send;
40use spawn::Spawner;
41use std::{
42 collections::{BTreeMap, BTreeSet, HashMap},
43 env::current_dir,
44 future::Future,
45 net::SocketAddr,
46 path::{Path, PathBuf},
47 pin::pin,
48 sync::Arc,
49 time::{Duration, Instant},
50};
51use sysinfo::Pid;
52use tokio::{
53 fs::File,
54 io::AsyncReadExt,
55 net::TcpStream,
56 sync::{
57 broadcast,
58 mpsc::{self, UnboundedSender},
59 oneshot::{self, Sender},
60 },
61};
62use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
63use tracing::{error, warn};
64use uuid::{NoContext, Timestamp, Uuid};
65
66pub use flume;
67pub use log::LogDestination;
68
69mod coordinator;
70mod local_listener;
71mod log;
72mod node_communication;
73mod pending;
74mod socket_stream_utils;
75mod spawn;
76
77#[cfg(feature = "telemetry")]
78use dora_tracing::telemetry::serialize_context;
79#[cfg(feature = "telemetry")]
80use tracing_opentelemetry::OpenTelemetrySpanExt;
81
82use crate::pending::DataflowStatus;
83
84const STDERR_LOG_LINES: usize = 10;
85
86pub struct Daemon {
87 running: HashMap<DataflowId, RunningDataflow>,
88 working_dir: HashMap<DataflowId, PathBuf>,
89
90 events_tx: mpsc::Sender<Timestamped<Event>>,
91
92 coordinator_connection: Option<TcpStream>,
93 last_coordinator_heartbeat: Instant,
94 daemon_id: DaemonId,
95
96 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
98 exit_when_all_finished: bool,
100 dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,
102
103 clock: Arc<uhlc::HLC>,
104
105 zenoh_session: zenoh::Session,
106 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
107
108 logger: DaemonLogger,
109
110 sessions: BTreeMap<SessionId, BuildId>,
111 builds: BTreeMap<BuildId, BuildInfo>,
112 git_manager: GitManager,
113}
114
115type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
116
117struct NodeBuildTask<F> {
118 node_id: NodeId,
119 dynamic_node: bool,
120 task: F,
121}
122
123impl Daemon {
124 pub async fn run(
125 coordinator_addr: SocketAddr,
126 machine_id: Option<String>,
127 local_listen_port: u16,
128 ) -> eyre::Result<()> {
129 let clock = Arc::new(HLC::default());
130
131 let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?;
132 let (remote_daemon_events_tx, remote_daemon_events_rx) = flume::bounded(10);
133 let (daemon_id, incoming_events) = {
134 let incoming_events = set_up_event_stream(
135 coordinator_addr,
136 &machine_id,
137 &clock,
138 remote_daemon_events_rx,
139 local_listen_port,
140 );
141
142 let ctrl_c = pin!(ctrlc_events.recv());
144 match futures::future::select(ctrl_c, pin!(incoming_events)).await {
145 future::Either::Left((_ctrl_c, _)) => {
146 tracing::info!("received ctrl-c signal -> stopping daemon");
147 return Ok(());
148 }
149 future::Either::Right((events, _)) => events?,
150 }
151 };
152
153 let log_destination = {
154 let stream = TcpStream::connect(coordinator_addr)
156 .await
157 .wrap_err("failed to connect log to dora-coordinator")?;
158 stream
159 .set_nodelay(true)
160 .wrap_err("failed to set TCP_NODELAY")?;
161 LogDestination::Coordinator {
162 coordinator_connection: stream,
163 }
164 };
165
166 Self::run_general(
167 (ReceiverStream::new(ctrlc_events), incoming_events).merge(),
168 Some(coordinator_addr),
169 daemon_id,
170 None,
171 clock,
172 Some(remote_daemon_events_tx),
173 Default::default(),
174 log_destination,
175 )
176 .await
177 .map(|_| ())
178 }
179
180 pub async fn run_dataflow(
181 dataflow_path: &Path,
182 build_id: Option<BuildId>,
183 local_build: Option<BuildInfo>,
184 session_id: SessionId,
185 uv: bool,
186 log_destination: LogDestination,
187 ) -> eyre::Result<DataflowResult> {
188 let working_dir = dataflow_path
189 .canonicalize()
190 .context("failed to canonicalize dataflow path")?
191 .parent()
192 .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
193 .to_owned();
194
195 let descriptor = read_as_descriptor(dataflow_path).await?;
196 if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
197 eyre::bail!(
198 "node {} has a `deploy` section, which is not supported in `dora run`\n\n
199 Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
200 instances and then use `dora start`.",
201 node.id
202 )
203 }
204
205 descriptor.check(&working_dir)?;
206 let nodes = descriptor.resolve_aliases_and_set_defaults()?;
207
208 let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
209 let spawn_command = SpawnDataflowNodes {
210 build_id,
211 session_id,
212 dataflow_id,
213 local_working_dir: Some(working_dir),
214 spawn_nodes: nodes.keys().cloned().collect(),
215 nodes,
216 dataflow_descriptor: descriptor,
217 uv,
218 };
219
220 let clock = Arc::new(HLC::default());
221
222 let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?);
223
224 let exit_when_done = spawn_command
225 .nodes
226 .values()
227 .map(|n| (spawn_command.dataflow_id, n.id.clone()))
228 .collect();
229 let (reply_tx, reply_rx) = oneshot::channel();
230 let timestamp = clock.new_timestamp();
231 let coordinator_events = stream::once(async move {
232 Timestamped {
233 inner: Event::Coordinator(CoordinatorEvent {
234 event: DaemonCoordinatorEvent::Spawn(spawn_command),
235 reply_tx,
236 }),
237 timestamp,
238 }
239 });
240 let events = (coordinator_events, ctrlc_events).merge();
241 let run_result = Self::run_general(
242 Box::pin(events),
243 None,
244 DaemonId::new(None),
245 Some(exit_when_done),
246 clock.clone(),
247 None,
248 if let Some(local_build) = local_build {
249 let Some(build_id) = build_id else {
250 bail!("no build_id, but local_build set")
251 };
252 let mut builds = BTreeMap::new();
253 builds.insert(build_id, local_build);
254 builds
255 } else {
256 Default::default()
257 },
258 log_destination,
259 );
260
261 let spawn_result = reply_rx
262 .map_err(|err| eyre!("failed to receive spawn result: {err}"))
263 .and_then(|r| async {
264 match r {
265 Some(DaemonCoordinatorReply::TriggerSpawnResult(result)) => {
266 result.map_err(|err| eyre!(err))
267 }
268 _ => Err(eyre!("unexpected spawn reply")),
269 }
270 });
271
272 let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
273
274 Ok(DataflowResult {
275 uuid: dataflow_id,
276 timestamp: clock.new_timestamp(),
277 node_results: dataflow_results
278 .remove(&dataflow_id)
279 .context("no node results for dataflow_id")?,
280 })
281 }
282
283 #[allow(clippy::too_many_arguments)]
284 async fn run_general(
285 external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
286 coordinator_addr: Option<SocketAddr>,
287 daemon_id: DaemonId,
288 exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
289 clock: Arc<HLC>,
290 remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
291 builds: BTreeMap<BuildId, BuildInfo>,
292 log_destination: LogDestination,
293 ) -> eyre::Result<DaemonRunResult> {
294 let coordinator_connection = match coordinator_addr {
295 Some(addr) => {
296 let stream = TcpStream::connect(addr)
297 .await
298 .wrap_err("failed to connect to dora-coordinator")?;
299 stream
300 .set_nodelay(true)
301 .wrap_err("failed to set TCP_NODELAY")?;
302 Some(stream)
303 }
304 None => None,
305 };
306
307 let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
308 Ok(path) => {
309 let zenoh_config = zenoh::Config::from_file(&path)
310 .map_err(|e| eyre!(e))
311 .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
312 zenoh::open(zenoh_config)
313 .await
314 .map_err(|e| eyre!(e))
315 .context("failed to open zenoh session")?
316 }
317 Err(std::env::VarError::NotPresent) => {
318 let mut zenoh_config = zenoh::Config::default();
319
320 if let Some(addr) = coordinator_addr {
321 if cfg!(not(target_os = "windows")) {
324 zenoh_config
325 .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
326 .unwrap();
327 }
328
329 zenoh_config
330 .insert_json5(
331 "connect/endpoints",
332 &format!(
333 r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
334 addr.ip()
335 ),
336 )
337 .unwrap();
338 zenoh_config
339 .insert_json5(
340 "listen/endpoints",
341 r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
342 )
343 .unwrap();
344 if cfg!(target_os = "macos") {
345 warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
346 zenoh_config
347 .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
348 .unwrap();
349 }
350 };
351 if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
352 zenoh_session
353 } else {
354 warn!(
355 "failed to open zenoh session, retrying with default config + coordinator"
356 );
357 let mut zenoh_config = zenoh::Config::default();
358 if cfg!(not(target_os = "windows")) {
361 zenoh_config
362 .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
363 .unwrap();
364 }
365
366 if let Some(addr) = coordinator_addr {
367 zenoh_config
368 .insert_json5(
369 "connect/endpoints",
370 &format!(
371 r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
372 addr.ip()
373 ),
374 )
375 .unwrap();
376 if cfg!(target_os = "macos") {
377 warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
378 zenoh_config
379 .insert_json5("scouting/multicast", r#"{ enabled: false }"#)
380 .unwrap();
381 }
382 }
383 if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
384 zenoh_session
385 } else {
386 warn!("failed to open zenoh session, retrying with default config");
387 let zenoh_config = zenoh::Config::default();
388 zenoh::open(zenoh_config)
389 .await
390 .map_err(|e| eyre!(e))
391 .context("failed to open zenoh session")?
392 }
393 }
394 }
395 Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
396 "{} env variable is not valid unicode",
397 zenoh::Config::DEFAULT_CONFIG_PATH_ENV
398 ),
399 };
400 let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
401 let daemon = Self {
402 logger: Logger {
403 destination: log_destination,
404 daemon_id: daemon_id.clone(),
405 clock: clock.clone(),
406 }
407 .for_daemon(daemon_id.clone()),
408 running: HashMap::new(),
409 working_dir: HashMap::new(),
410 events_tx: dora_events_tx,
411 coordinator_connection,
412 last_coordinator_heartbeat: Instant::now(),
413 daemon_id,
414 exit_when_done,
415 exit_when_all_finished: false,
416 dataflow_node_results: BTreeMap::new(),
417 clock,
418 zenoh_session,
419 remote_daemon_events_tx,
420 git_manager: Default::default(),
421 builds,
422 sessions: Default::default(),
423 };
424
425 let dora_events = ReceiverStream::new(dora_events_rx);
426 let watchdog_clock = daemon.clock.clone();
427 let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
428 Duration::from_secs(5),
429 ))
430 .map(|_| Timestamped {
431 inner: Event::HeartbeatInterval,
432 timestamp: watchdog_clock.new_timestamp(),
433 });
434 let events = (external_events, dora_events, watchdog_interval).merge();
435 daemon.run_inner(events).await
436 }
437
438 #[tracing::instrument(skip(incoming_events, self), fields(?self.daemon_id))]
439 async fn run_inner(
440 mut self,
441 incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
442 ) -> eyre::Result<DaemonRunResult> {
443 let mut events = incoming_events;
444
445 while let Some(event) = events.next().await {
446 let Timestamped { inner, timestamp } = event;
447 if let Err(err) = self.clock.update_with_timestamp(×tamp) {
448 tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
449 }
450
451 let start = Instant::now();
453 let event_kind = inner.kind();
454
455 match inner {
456 Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
457 let status = self.handle_coordinator_event(event, reply_tx).await?;
458
459 match status {
460 RunStatus::Continue => {}
461 RunStatus::Exit => break,
462 }
463 }
464 Event::Daemon(event) => {
465 self.handle_inter_daemon_event(event).await?;
466 }
467 Event::Node {
468 dataflow_id: dataflow,
469 node_id,
470 event,
471 } => self.handle_node_event(event, dataflow, node_id).await?,
472 Event::Dora(event) => self.handle_dora_event(event).await?,
473 Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
474 Event::HeartbeatInterval => {
475 if let Some(connection) = &mut self.coordinator_connection {
476 let msg = serde_json::to_vec(&Timestamped {
477 inner: CoordinatorRequest::Event {
478 daemon_id: self.daemon_id.clone(),
479 event: DaemonEvent::Heartbeat,
480 },
481 timestamp: self.clock.new_timestamp(),
482 })?;
483 socket_stream_send(connection, &msg)
484 .await
485 .wrap_err("failed to send watchdog message to dora-coordinator")?;
486
487 if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
488 bail!("lost connection to coordinator")
489 }
490 }
491 }
492 Event::CtrlC => {
493 tracing::info!("received ctrlc signal -> stopping all dataflows");
494 for dataflow in self.running.values_mut() {
495 let mut logger = self.logger.for_dataflow(dataflow.id);
496 dataflow
497 .stop_all(
498 &mut self.coordinator_connection,
499 &self.clock,
500 None,
501 &mut logger,
502 )
503 .await?;
504 }
505 self.exit_when_all_finished = true;
506 if self.running.is_empty() {
507 break;
508 }
509 }
510 Event::SecondCtrlC => {
511 tracing::warn!("received second ctrlc signal -> exit immediately");
512 bail!("received second ctrl-c signal");
513 }
514 Event::DaemonError(err) => {
515 tracing::error!("Daemon error: {err:?}");
516 }
517 Event::SpawnNodeResult {
518 dataflow_id,
519 node_id,
520 dynamic_node,
521 result,
522 } => match result {
523 Ok(running_node) => {
524 if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
525 dataflow.running_nodes.insert(node_id, running_node);
526 } else {
527 tracing::error!("failed to handle SpawnNodeResult: no running dataflow with ID {dataflow_id}");
528 }
529 }
530 Err(error) => {
531 self.dataflow_node_results
532 .entry(dataflow_id)
533 .or_default()
534 .insert(node_id.clone(), Err(error));
535 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
536 .await?;
537 }
538 },
539 Event::BuildDataflowResult {
540 build_id,
541 session_id,
542 result,
543 } => {
544 let (build_info, result) = match result {
545 Ok(build_info) => (Some(build_info), Ok(())),
546 Err(err) => (None, Err(err)),
547 };
548 if let Some(build_info) = build_info {
549 self.builds.insert(build_id, build_info);
550 if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
551 self.builds.remove(&old_build_id);
552 }
553 }
554 if let Some(connection) = &mut self.coordinator_connection {
555 let msg = serde_json::to_vec(&Timestamped {
556 inner: CoordinatorRequest::Event {
557 daemon_id: self.daemon_id.clone(),
558 event: DaemonEvent::BuildResult {
559 build_id,
560 result: result.map_err(|err| format!("{err:?}")),
561 },
562 },
563 timestamp: self.clock.new_timestamp(),
564 })?;
565 socket_stream_send(connection, &msg).await.wrap_err(
566 "failed to send BuildDataflowResult message to dora-coordinator",
567 )?;
568 }
569 }
570 Event::SpawnDataflowResult {
571 dataflow_id,
572 result,
573 } => {
574 if let Some(connection) = &mut self.coordinator_connection {
575 let msg = serde_json::to_vec(&Timestamped {
576 inner: CoordinatorRequest::Event {
577 daemon_id: self.daemon_id.clone(),
578 event: DaemonEvent::SpawnResult {
579 dataflow_id,
580 result: result.map_err(|err| format!("{err:?}")),
581 },
582 },
583 timestamp: self.clock.new_timestamp(),
584 })?;
585 socket_stream_send(connection, &msg).await.wrap_err(
586 "failed to send SpawnDataflowResult message to dora-coordinator",
587 )?;
588 }
589 }
590 Event::NodeStopped {
591 dataflow_id,
592 node_id,
593 } => {
594 if let Some(exit_when_done) = &mut self.exit_when_done {
595 exit_when_done.remove(&(dataflow_id, node_id));
596 if exit_when_done.is_empty() {
597 tracing::info!(
598 "exiting daemon because all required dataflows are finished"
599 );
600 break;
601 }
602 }
603 if self.exit_when_all_finished && self.running.is_empty() {
604 break;
605 }
606 }
607 }
608
609 let elapsed = start.elapsed();
611 if elapsed > Duration::from_millis(100) {
612 tracing::warn!(
613 "Daemon took {}ms for handling event: {event_kind}",
614 elapsed.as_millis()
615 );
616 }
617 }
618
619 if let Some(mut connection) = self.coordinator_connection.take() {
620 let msg = serde_json::to_vec(&Timestamped {
621 inner: CoordinatorRequest::Event {
622 daemon_id: self.daemon_id.clone(),
623 event: DaemonEvent::Exit,
624 },
625 timestamp: self.clock.new_timestamp(),
626 })?;
627 socket_stream_send(&mut connection, &msg)
628 .await
629 .wrap_err("failed to send Exit message to dora-coordinator")?;
630 }
631
632 Ok(self.dataflow_node_results)
633 }
634
635 async fn handle_coordinator_event(
636 &mut self,
637 event: DaemonCoordinatorEvent,
638 reply_tx: Sender<Option<DaemonCoordinatorReply>>,
639 ) -> eyre::Result<RunStatus> {
640 let status = match event {
641 DaemonCoordinatorEvent::Build(BuildDataflowNodes {
642 build_id,
643 session_id,
644 local_working_dir,
645 git_sources,
646 prev_git_sources,
647 dataflow_descriptor,
648 nodes_on_machine,
649 uv,
650 }) => {
651 match dataflow_descriptor.communication.remote {
652 dora_core::config::RemoteCommunicationConfig::Tcp => {}
653 }
654
655 let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
656
657 let result = self
658 .build_dataflow(
659 build_id,
660 session_id,
661 base_working_dir,
662 git_sources,
663 prev_git_sources,
664 dataflow_descriptor,
665 nodes_on_machine,
666 uv,
667 )
668 .await;
669 let (trigger_result, result_task) = match result {
670 Ok(result_task) => (Ok(()), Some(result_task)),
671 Err(err) => (Err(format!("{err:?}")), None),
672 };
673 let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
674 let _ = reply_tx.send(Some(reply)).map_err(|_| {
675 error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
676 });
677
678 let result_tx = self.events_tx.clone();
679 let clock = self.clock.clone();
680 if let Some(result_task) = result_task {
681 tokio::spawn(async move {
682 let message = Timestamped {
683 inner: Event::BuildDataflowResult {
684 build_id,
685 session_id,
686 result: result_task.await,
687 },
688 timestamp: clock.new_timestamp(),
689 };
690 let _ = result_tx
691 .send(message)
692 .map_err(|_| {
693 error!(
694 "could not send `BuildResult` reply from daemon to coordinator"
695 )
696 })
697 .await;
698 });
699 }
700
701 RunStatus::Continue
702 }
703 DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
704 build_id,
705 session_id,
706 dataflow_id,
707 local_working_dir,
708 nodes,
709 dataflow_descriptor,
710 spawn_nodes,
711 uv,
712 }) => {
713 match dataflow_descriptor.communication.remote {
714 dora_core::config::RemoteCommunicationConfig::Tcp => {}
715 }
716
717 let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;
718
719 let result = self
720 .spawn_dataflow(
721 build_id,
722 dataflow_id,
723 base_working_dir,
724 nodes,
725 dataflow_descriptor,
726 spawn_nodes,
727 uv,
728 )
729 .await;
730 let (trigger_result, result_task) = match result {
731 Ok(result_task) => (Ok(()), Some(result_task)),
732 Err(err) => (Err(format!("{err:?}")), None),
733 };
734 let reply = DaemonCoordinatorReply::TriggerSpawnResult(trigger_result);
735 let _ = reply_tx.send(Some(reply)).map_err(|_| {
736 error!("could not send `TriggerSpawnResult` reply from daemon to coordinator")
737 });
738
739 let result_tx = self.events_tx.clone();
740 let clock = self.clock.clone();
741 if let Some(result_task) = result_task {
742 tokio::spawn(async move {
743 let message = Timestamped {
744 inner: Event::SpawnDataflowResult {
745 dataflow_id,
746 result: result_task.await,
747 },
748 timestamp: clock.new_timestamp(),
749 };
750 let _ = result_tx
751 .send(message)
752 .map_err(|_| {
753 error!(
754 "could not send `SpawnResult` reply from daemon to coordinator"
755 )
756 })
757 .await;
758 });
759 }
760
761 RunStatus::Continue
762 }
763 DaemonCoordinatorEvent::AllNodesReady {
764 dataflow_id,
765 exited_before_subscribe,
766 } => {
767 let mut logger = self.logger.for_dataflow(dataflow_id);
768 logger.log(LogLevel::Debug, None,
769 Some("daemon".into()),
770 format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
771 )).await;
772 match self.running.get_mut(&dataflow_id) {
773 Some(dataflow) => {
774 let ready = exited_before_subscribe.is_empty();
775 dataflow
776 .pending_nodes
777 .handle_external_all_nodes_ready(
778 exited_before_subscribe,
779 &mut dataflow.cascading_error_causes,
780 )
781 .await?;
782 if ready {
783 logger.log(LogLevel::Info, None,
784 Some("daemon".into()),
785 "coordinator reported that all nodes are ready, starting dataflow",
786 ).await;
787 dataflow.start(&self.events_tx, &self.clock).await?;
788 }
789 }
790 None => {
791 tracing::warn!(
792 "received AllNodesReady for unknown dataflow (ID `{dataflow_id}`)"
793 );
794 }
795 }
796 let _ = reply_tx.send(None).map_err(|_| {
797 error!("could not send `AllNodesReady` reply from daemon to coordinator")
798 });
799 RunStatus::Continue
800 }
801 DaemonCoordinatorEvent::Logs {
802 dataflow_id,
803 node_id,
804 } => {
805 match self.working_dir.get(&dataflow_id) {
806 Some(working_dir) => {
807 let working_dir = working_dir.clone();
808 tokio::spawn(async move {
809 let logs = async {
810 let mut file =
811 File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
812 .await
813 .wrap_err(format!(
814 "Could not open log file: {:#?}",
815 log::log_path(&working_dir, &dataflow_id, &node_id)
816 ))?;
817
818 let mut contents = vec![];
819 file.read_to_end(&mut contents)
820 .await
821 .wrap_err("Could not read content of log file")?;
822 Result::<Vec<u8>, eyre::Report>::Ok(contents)
823 }
824 .await
825 .map_err(|err| format!("{err:?}"));
826 let _ = reply_tx
827 .send(Some(DaemonCoordinatorReply::Logs(logs)))
828 .map_err(|_| {
829 error!("could not send logs reply from daemon to coordinator")
830 });
831 });
832 }
833 None => {
834 tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
835 let _ = reply_tx.send(None).map_err(|_| {
836 error!(
837 "could not send `AllNodesReady` reply from daemon to coordinator"
838 )
839 });
840 }
841 }
842 RunStatus::Continue
843 }
844 DaemonCoordinatorEvent::ReloadDataflow {
845 dataflow_id,
846 node_id,
847 operator_id,
848 } => {
849 let result = self.send_reload(dataflow_id, node_id, operator_id).await;
850 let reply =
851 DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
852 let _ = reply_tx
853 .send(Some(reply))
854 .map_err(|_| error!("could not send reload reply from daemon to coordinator"));
855 RunStatus::Continue
856 }
857 DaemonCoordinatorEvent::StopDataflow {
858 dataflow_id,
859 grace_duration,
860 } => {
861 let mut logger = self.logger.for_dataflow(dataflow_id);
862 let dataflow = self
863 .running
864 .get_mut(&dataflow_id)
865 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
866 let (reply, future) = match dataflow {
867 Ok(dataflow) => {
868 let future = dataflow.stop_all(
869 &mut self.coordinator_connection,
870 &self.clock,
871 grace_duration,
872 &mut logger,
873 );
874 (Ok(()), Some(future))
875 }
876 Err(err) => (Err(err.to_string()), None),
877 };
878
879 let _ = reply_tx
880 .send(Some(DaemonCoordinatorReply::StopResult(reply)))
881 .map_err(|_| error!("could not send stop reply from daemon to coordinator"));
882
883 if let Some(future) = future {
884 future.await?;
885 }
886
887 RunStatus::Continue
888 }
889 DaemonCoordinatorEvent::Destroy => {
890 tracing::info!("received destroy command -> exiting");
891 let (notify_tx, notify_rx) = oneshot::channel();
892 let reply = DaemonCoordinatorReply::DestroyResult {
893 result: Ok(()),
894 notify: Some(notify_tx),
895 };
896 let _ = reply_tx
897 .send(Some(reply))
898 .map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
899 if notify_rx.await.is_err() {
901 tracing::warn!("no confirmation received for DestroyReply");
902 }
903 RunStatus::Exit
904 }
905 DaemonCoordinatorEvent::Heartbeat => {
906 self.last_coordinator_heartbeat = Instant::now();
907 let _ = reply_tx.send(None);
908 RunStatus::Continue
909 }
910 };
911 Ok(status)
912 }
913
914 async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
915 match event {
916 InterDaemonEvent::Output {
917 dataflow_id,
918 node_id,
919 output_id,
920 metadata,
921 data,
922 } => {
923 let inner = async {
924 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
925 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
926 })?;
927 send_output_to_local_receivers(
928 node_id.clone(),
929 output_id.clone(),
930 dataflow,
931 &metadata,
932 data.map(DataMessage::Vec),
933 &self.clock,
934 )
935 .await?;
936 Result::<_, eyre::Report>::Ok(())
937 };
938 if let Err(err) = inner
939 .await
940 .wrap_err("failed to forward remote output to local receivers")
941 {
942 let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
943 logger
944 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
945 .await;
946 }
947 Ok(())
948 }
949 InterDaemonEvent::OutputClosed {
950 dataflow_id,
951 node_id,
952 output_id,
953 } => {
954 let output_id = OutputId(node_id.clone(), output_id);
955 let mut logger = self
956 .logger
957 .for_dataflow(dataflow_id)
958 .for_node(node_id.clone());
959 logger
960 .log(
961 LogLevel::Debug,
962 Some("daemon".into()),
963 format!("received OutputClosed event for output {output_id:?}"),
964 )
965 .await;
966
967 let inner = async {
968 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
969 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
970 })?;
971
972 if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
973 for (receiver_id, input_id) in &inputs {
974 close_input(dataflow, receiver_id, input_id, &self.clock);
975 }
976 }
977 Result::<(), eyre::Report>::Ok(())
978 };
979 if let Err(err) = inner
980 .await
981 .wrap_err("failed to handle InputsClosed event sent by coordinator")
982 {
983 logger
984 .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
985 .await;
986 }
987 Ok(())
988 }
989 }
990 }
991
992 #[allow(clippy::too_many_arguments)]
993 async fn build_dataflow(
994 &mut self,
995 build_id: BuildId,
996 session_id: SessionId,
997 base_working_dir: PathBuf,
998 git_sources: BTreeMap<NodeId, GitSource>,
999 prev_git_sources: BTreeMap<NodeId, GitSource>,
1000 dataflow_descriptor: Descriptor,
1001 local_nodes: BTreeSet<NodeId>,
1002 uv: bool,
1003 ) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>>> {
1004 let builder = build::Builder {
1005 session_id,
1006 base_working_dir,
1007 uv,
1008 };
1009 self.git_manager.clear_planned_builds(session_id);
1010
1011 let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;
1012
1013 let mut tasks = Vec::new();
1014
1015 for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
1017 let dynamic_node = node.kind.dynamic();
1018
1019 let node_id = node.id.clone();
1020 let mut logger = self.logger.for_node_build(build_id, node_id.clone());
1021 logger.log(LogLevel::Info, "building").await;
1022 let git_source = git_sources.get(&node_id).cloned();
1023 let prev_git_source = prev_git_sources.get(&node_id).cloned();
1024 let prev_git = prev_git_source.map(|prev_source| PrevGitSource {
1025 still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source),
1026 git_source: prev_source,
1027 });
1028
1029 let logger_cloned = logger
1030 .try_clone_impl()
1031 .await
1032 .wrap_err("failed to clone logger")?;
1033
1034 let mut builder = builder.clone();
1035 if let Some(node_working_dir) =
1036 node.deploy.as_ref().and_then(|d| d.working_dir.as_deref())
1037 {
1038 builder.base_working_dir = builder.base_working_dir.join(node_working_dir);
1039 }
1040
1041 match builder
1042 .build_node(
1043 node,
1044 git_source,
1045 prev_git,
1046 logger_cloned,
1047 &mut self.git_manager,
1048 )
1049 .await
1050 .wrap_err_with(|| format!("failed to build node `{node_id}`"))
1051 {
1052 Ok(result) => {
1053 tasks.push(NodeBuildTask {
1054 node_id,
1055 task: result,
1056 dynamic_node,
1057 });
1058 }
1059 Err(err) => {
1060 logger.log(LogLevel::Error, format!("{err:?}")).await;
1061 return Err(err);
1062 }
1063 }
1064 }
1065
1066 let task = async move {
1067 let mut info = BuildInfo {
1068 node_working_dirs: Default::default(),
1069 };
1070 for task in tasks {
1071 let NodeBuildTask {
1072 node_id,
1073 dynamic_node,
1074 task,
1075 } = task;
1076 let node = task
1077 .await
1078 .with_context(|| format!("failed to build node `{node_id}`"))?;
1079 info.node_working_dirs
1080 .insert(node_id, node.node_working_dir);
1081 }
1082 Ok(info)
1083 };
1084
1085 Ok(task)
1086 }
1087
1088 #[allow(clippy::too_many_arguments)]
1089 async fn spawn_dataflow(
1090 &mut self,
1091 build_id: Option<BuildId>,
1092 dataflow_id: DataflowId,
1093 base_working_dir: PathBuf,
1094 nodes: BTreeMap<NodeId, ResolvedNode>,
1095 dataflow_descriptor: Descriptor,
1096 spawn_nodes: BTreeSet<NodeId>,
1097 uv: bool,
1098 ) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
1099 let mut logger = self
1100 .logger
1101 .for_dataflow(dataflow_id)
1102 .try_clone()
1103 .await
1104 .context("failed to clone logger")?;
1105 let dataflow =
1106 RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
1107 let dataflow = match self.running.entry(dataflow_id) {
1108 std::collections::hash_map::Entry::Vacant(entry) => {
1109 self.working_dir
1110 .insert(dataflow_id, base_working_dir.clone());
1111 entry.insert(dataflow)
1112 }
1113 std::collections::hash_map::Entry::Occupied(_) => {
1114 bail!("there is already a running dataflow with ID `{dataflow_id}`")
1115 }
1116 };
1117
1118 let mut stopped = Vec::new();
1119
1120 let node_working_dirs = build_id
1121 .and_then(|build_id| self.builds.get(&build_id))
1122 .map(|info| info.node_working_dirs.clone())
1123 .unwrap_or_default();
1124
1125 for node in nodes.values() {
1127 let local = spawn_nodes.contains(&node.id);
1128
1129 let inputs = node_inputs(node);
1130 for (input_id, input) in inputs {
1131 if local {
1132 dataflow
1133 .open_inputs
1134 .entry(node.id.clone())
1135 .or_default()
1136 .insert(input_id.clone());
1137 match input.mapping {
1138 InputMapping::User(mapping) => {
1139 dataflow
1140 .mappings
1141 .entry(OutputId(mapping.source, mapping.output))
1142 .or_default()
1143 .insert((node.id.clone(), input_id));
1144 }
1145 InputMapping::Timer { interval } => {
1146 dataflow
1147 .timers
1148 .entry(interval)
1149 .or_default()
1150 .insert((node.id.clone(), input_id));
1151 }
1152 }
1153 } else if let InputMapping::User(mapping) = input.mapping {
1154 dataflow
1155 .open_external_mappings
1156 .insert(OutputId(mapping.source, mapping.output));
1157 }
1158 }
1159 }
1160
1161 let spawner = Spawner {
1162 dataflow_id,
1163 daemon_tx: self.events_tx.clone(),
1164 dataflow_descriptor,
1165 clock: self.clock.clone(),
1166 uv,
1167 };
1168
1169 let mut tasks = Vec::new();
1170
1171 for node in nodes.into_values() {
1173 let mut logger = logger.reborrow().for_node(node.id.clone());
1174 let local = spawn_nodes.contains(&node.id);
1175 if local {
1176 let dynamic_node = node.kind.dynamic();
1177 if dynamic_node {
1178 dataflow.dynamic_nodes.insert(node.id.clone());
1179 } else {
1180 dataflow.pending_nodes.insert(node.id.clone());
1181 }
1182
1183 let node_id = node.id.clone();
1184 let node_stderr_most_recent = dataflow
1185 .node_stderr_most_recent
1186 .entry(node.id.clone())
1187 .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
1188 .clone();
1189 logger
1190 .log(LogLevel::Info, Some("daemon".into()), "spawning")
1191 .await;
1192 let node_working_dir = node_working_dirs
1193 .get(&node_id)
1194 .cloned()
1195 .or_else(|| {
1196 node.deploy
1197 .as_ref()
1198 .and_then(|d| d.working_dir.as_ref().map(|d| base_working_dir.join(d)))
1199 })
1200 .unwrap_or(base_working_dir.clone())
1201 .clone();
1202 match spawner
1203 .clone()
1204 .spawn_node(node, node_working_dir, node_stderr_most_recent, &mut logger)
1205 .await
1206 .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
1207 {
1208 Ok(result) => {
1209 tasks.push(NodeBuildTask {
1210 node_id,
1211 task: result,
1212 dynamic_node,
1213 });
1214 }
1215 Err(err) => {
1216 logger
1217 .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
1218 .await;
1219 self.dataflow_node_results
1220 .entry(dataflow_id)
1221 .or_default()
1222 .insert(
1223 node_id.clone(),
1224 Err(NodeError {
1225 timestamp: self.clock.new_timestamp(),
1226 cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
1227 exit_status: NodeExitStatus::Unknown,
1228 }),
1229 );
1230 stopped.push((node_id.clone(), dynamic_node));
1231 }
1232 }
1233 } else {
1234 dataflow.pending_nodes.set_external_nodes(true);
1236
1237 for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
1239 let tx = self
1240 .remote_daemon_events_tx
1241 .clone()
1242 .wrap_err("no remote_daemon_events_tx channel")?;
1243 let mut finished_rx = dataflow.finished_tx.subscribe();
1244 let subscribe_topic = dataflow.output_publish_topic(output_id);
1245 tracing::debug!("declaring subscriber on {subscribe_topic}");
1246 let subscriber = self
1247 .zenoh_session
1248 .declare_subscriber(subscribe_topic)
1249 .await
1250 .map_err(|e| eyre!(e))
1251 .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
1252 tokio::spawn(async move {
1253 let mut finished = pin!(finished_rx.recv());
1254 loop {
1255 let finished_or_next =
1256 futures::future::select(finished, subscriber.recv_async());
1257 match finished_or_next.await {
1258 future::Either::Left((finished, _)) => {
1259 match finished {
1260 Err(broadcast::error::RecvError::Closed) => {
1261 tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
1262 break;
1263 }
1264 other => {
1265 tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
1266 break;
1267 }
1268 }
1269 }
1270 future::Either::Right((sample, f)) => {
1271 finished = f;
1272 let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
1273 Timestamped::deserialize_inter_daemon_event(
1274 &s.payload().to_bytes(),
1275 )
1276 });
1277 if tx.send_async(event).await.is_err() {
1278 break;
1280 }
1281 }
1282 }
1283 }
1284 });
1285 }
1286 }
1287 }
1288 for (node_id, dynamic) in stopped {
1289 self.handle_node_stop(dataflow_id, &node_id, dynamic)
1290 .await?;
1291 }
1292
1293 let spawn_result = Self::spawn_prepared_nodes(
1294 dataflow_id,
1295 logger,
1296 tasks,
1297 self.events_tx.clone(),
1298 self.clock.clone(),
1299 );
1300
1301 Ok(spawn_result)
1302 }
1303
1304 async fn spawn_prepared_nodes(
1305 dataflow_id: Uuid,
1306 mut logger: DataflowLogger<'_>,
1307 tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
1308 events_tx: mpsc::Sender<Timestamped<Event>>,
1309 clock: Arc<HLC>,
1310 ) -> eyre::Result<()> {
1311 let node_result = |node_id, dynamic_node, result| Timestamped {
1312 inner: Event::SpawnNodeResult {
1313 dataflow_id,
1314 node_id,
1315 dynamic_node,
1316 result,
1317 },
1318 timestamp: clock.new_timestamp(),
1319 };
1320 let mut failed_to_prepare = None;
1321 let mut prepared_nodes = Vec::new();
1322 for task in tasks {
1323 let NodeBuildTask {
1324 node_id,
1325 dynamic_node,
1326 task,
1327 } = task;
1328 match task.await {
1329 Ok(node) => prepared_nodes.push(node),
1330 Err(err) => {
1331 if failed_to_prepare.is_none() {
1332 failed_to_prepare = Some(node_id.clone());
1333 }
1334 let node_err: NodeError = NodeError {
1335 timestamp: clock.new_timestamp(),
1336 cause: NodeErrorCause::FailedToSpawn(format!(
1337 "preparing for spawn failed: {err:?}"
1338 )),
1339 exit_status: NodeExitStatus::Unknown,
1340 };
1341 let send_result = events_tx
1342 .send(node_result(node_id, dynamic_node, Err(node_err)))
1343 .await;
1344 if send_result.is_err() {
1345 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1346 }
1347 }
1348 }
1349 }
1350
1351 if let Some(failed_node) = failed_to_prepare {
1353 for node in prepared_nodes {
1355 let err = NodeError {
1356 timestamp: clock.new_timestamp(),
1357 cause: NodeErrorCause::Cascading {
1358 caused_by_node: failed_node.clone(),
1359 },
1360 exit_status: NodeExitStatus::Unknown,
1361 };
1362 let send_result = events_tx
1363 .send(node_result(
1364 node.node_id().clone(),
1365 node.dynamic(),
1366 Err(err),
1367 ))
1368 .await;
1369 if send_result.is_err() {
1370 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1371 }
1372 }
1373 Err(eyre!("failed to prepare node {failed_node}"))
1374 } else {
1375 let mut spawn_result = Ok(());
1376
1377 logger
1378 .log(
1379 LogLevel::Info,
1380 None,
1381 Some("dora daemon".into()),
1382 "finished building nodes, spawning...",
1383 )
1384 .await;
1385
1386 for node in prepared_nodes {
1388 let node_id = node.node_id().clone();
1389 let dynamic_node = node.dynamic();
1390 let mut logger = logger.reborrow().for_node(node_id.clone());
1391 let result = node.spawn(&mut logger).await;
1392 let node_spawn_result = match result {
1393 Ok(node) => Ok(node),
1394 Err(err) => {
1395 let node_err = NodeError {
1396 timestamp: clock.new_timestamp(),
1397 cause: NodeErrorCause::FailedToSpawn(format!("spawn failed: {err:?}")),
1398 exit_status: NodeExitStatus::Unknown,
1399 };
1400 if spawn_result.is_ok() {
1401 spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
1402 }
1403 Err(node_err)
1404 }
1405 };
1406 let send_result = events_tx
1407 .send(node_result(node_id, dynamic_node, node_spawn_result))
1408 .await;
1409 if send_result.is_err() {
1410 tracing::error!("failed to send SpawnNodeResult to main daemon task")
1411 }
1412 }
1413 spawn_result
1414 }
1415 }
1416
1417 async fn handle_dynamic_node_event(
1418 &mut self,
1419 event: DynamicNodeEventWrapper,
1420 ) -> eyre::Result<()> {
1421 match event {
1422 DynamicNodeEventWrapper {
1423 event: DynamicNodeEvent::NodeConfig { node_id },
1424 reply_tx,
1425 } => {
1426 let number_node_id = self
1427 .running
1428 .iter()
1429 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1430 .count();
1431
1432 let node_config = match number_node_id {
1433 2.. => Err(format!(
1434 "multiple dataflows contain dynamic node id {node_id}. \
1435 Please only have one running dataflow with the specified \
1436 node id if you want to use dynamic node",
1437 )),
1438 1 => self
1439 .running
1440 .iter()
1441 .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
1442 .map(|(id, dataflow)| -> Result<NodeConfig> {
1443 let node_config = dataflow
1444 .running_nodes
1445 .get(&node_id)
1446 .with_context(|| {
1447 format!("no node with ID `{node_id}` within the given dataflow")
1448 })?
1449 .node_config
1450 .clone();
1451 if !node_config.dynamic {
1452 bail!("node with ID `{node_id}` in {id} is not dynamic");
1453 }
1454 Ok(node_config)
1455 })
1456 .next()
1457 .ok_or_else(|| eyre!("no node with ID `{node_id}`"))
1458 .and_then(|r| r)
1459 .map_err(|err| {
1460 format!(
1461 "failed to get dynamic node config within given dataflow: {err}"
1462 )
1463 }),
1464 0 => Err(format!("no node with ID `{node_id}`")),
1465 };
1466
1467 let reply = DaemonReply::NodeConfig {
1468 result: node_config,
1469 };
1470 let _ = reply_tx.send(Some(reply)).map_err(|_| {
1471 error!("could not send node info reply from daemon to coordinator")
1472 });
1473 Ok(())
1474 }
1475 }
1476 }
1477
1478 async fn handle_node_event(
1479 &mut self,
1480 event: DaemonNodeEvent,
1481 dataflow_id: DataflowId,
1482 node_id: NodeId,
1483 ) -> eyre::Result<()> {
1484 match event {
1485 DaemonNodeEvent::Subscribe {
1486 event_sender,
1487 reply_sender,
1488 } => {
1489 let mut logger = self.logger.for_dataflow(dataflow_id);
1490 logger
1491 .log(
1492 LogLevel::Info,
1493 Some(node_id.clone()),
1494 Some("daemon".into()),
1495 "node is ready",
1496 )
1497 .await;
1498
1499 let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
1500 format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
1501 });
1502
1503 match dataflow {
1504 Err(err) => {
1505 let _ = reply_sender.send(DaemonReply::Result(Err(err)));
1506 }
1507 Ok(dataflow) => {
1508 Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;
1509
1510 let status = dataflow
1511 .pending_nodes
1512 .handle_node_subscription(
1513 node_id.clone(),
1514 reply_sender,
1515 &mut self.coordinator_connection,
1516 &self.clock,
1517 &mut dataflow.cascading_error_causes,
1518 &mut logger,
1519 )
1520 .await?;
1521 match status {
1522 DataflowStatus::AllNodesReady => {
1523 logger
1524 .log(
1525 LogLevel::Info,
1526 None,
1527 Some("daemon".into()),
1528 "all nodes are ready, starting dataflow",
1529 )
1530 .await;
1531 dataflow.start(&self.events_tx, &self.clock).await?;
1532 }
1533 DataflowStatus::Pending => {}
1534 }
1535 }
1536 }
1537 }
1538 DaemonNodeEvent::SubscribeDrop {
1539 event_sender,
1540 reply_sender,
1541 } => {
1542 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1543 format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
1544 });
1545 let result = match dataflow {
1546 Ok(dataflow) => {
1547 dataflow.drop_channels.insert(node_id, event_sender);
1548 Ok(())
1549 }
1550 Err(err) => Err(err.to_string()),
1551 };
1552 let _ = reply_sender.send(DaemonReply::Result(result));
1553 }
1554 DaemonNodeEvent::CloseOutputs {
1555 outputs,
1556 reply_sender,
1557 } => {
1558 let inner = async {
1560 self.send_output_closed_events(dataflow_id, node_id, outputs)
1561 .await
1562 };
1563
1564 let reply = inner.await.map_err(|err| format!("{err:?}"));
1565 let _ = reply_sender.send(DaemonReply::Result(reply));
1566 }
1567 DaemonNodeEvent::OutputsDone { reply_sender } => {
1568 let result = self.handle_outputs_done(dataflow_id, &node_id).await;
1569
1570 let _ = reply_sender.send(DaemonReply::Result(
1571 result.map_err(|err| format!("{err:?}")),
1572 ));
1573 }
1574 DaemonNodeEvent::SendOut {
1575 output_id,
1576 metadata,
1577 data,
1578 } => self
1579 .send_out(dataflow_id, node_id, output_id, metadata, data)
1580 .await
1581 .context("failed to send out")?,
1582 DaemonNodeEvent::ReportDrop { tokens } => {
1583 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1584 format!(
1585 "failed to get handle drop tokens: \
1586 no running dataflow with ID `{dataflow_id}`"
1587 )
1588 });
1589
1590 match dataflow {
1591 Ok(dataflow) => {
1592 for token in tokens {
1593 match dataflow.pending_drop_tokens.get_mut(&token) {
1594 Some(info) => {
1595 if info.pending_nodes.remove(&node_id) {
1596 dataflow.check_drop_token(token, &self.clock).await?;
1597 } else {
1598 tracing::warn!(
1599 "node `{node_id}` is not pending for drop token `{token:?}`"
1600 );
1601 }
1602 }
1603 None => tracing::warn!("unknown drop token `{token:?}`"),
1604 }
1605 }
1606 }
1607 Err(err) => tracing::warn!("{err:?}"),
1608 }
1609 }
1610 DaemonNodeEvent::EventStreamDropped { reply_sender } => {
1611 let inner = async {
1612 let dataflow = self
1613 .running
1614 .get_mut(&dataflow_id)
1615 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1616 dataflow.subscribe_channels.remove(&node_id);
1617 Result::<_, eyre::Error>::Ok(())
1618 };
1619
1620 let reply = inner.await.map_err(|err| format!("{err:?}"));
1621 let _ = reply_sender.send(DaemonReply::Result(reply));
1622 }
1623 }
1624 Ok(())
1625 }
1626
1627 async fn send_reload(
1628 &mut self,
1629 dataflow_id: Uuid,
1630 node_id: NodeId,
1631 operator_id: Option<OperatorId>,
1632 ) -> Result<(), eyre::ErrReport> {
1633 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1634 format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
1635 })?;
1636 if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
1637 match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
1638 Ok(()) => {}
1639 Err(_) => {
1640 dataflow.subscribe_channels.remove(&node_id);
1641 }
1642 }
1643 }
1644 Ok(())
1645 }
1646
1647 async fn send_out(
1648 &mut self,
1649 dataflow_id: Uuid,
1650 node_id: NodeId,
1651 output_id: DataId,
1652 metadata: dora_message::metadata::Metadata,
1653 data: Option<DataMessage>,
1654 ) -> Result<(), eyre::ErrReport> {
1655 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1656 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1657 })?;
1658 let data_bytes = send_output_to_local_receivers(
1659 node_id.clone(),
1660 output_id.clone(),
1661 dataflow,
1662 &metadata,
1663 data,
1664 &self.clock,
1665 )
1666 .await?;
1667
1668 let output_id = OutputId(node_id, output_id);
1669 let remote_receivers = dataflow.open_external_mappings.contains(&output_id)
1670 || dataflow.publish_all_messages_to_zenoh;
1671 if remote_receivers {
1672 let event = InterDaemonEvent::Output {
1673 dataflow_id,
1674 node_id: output_id.0.clone(),
1675 output_id: output_id.1.clone(),
1676 metadata,
1677 data: data_bytes,
1678 };
1679 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1680 .await?;
1681 }
1682
1683 Ok(())
1684 }
1685
1686 async fn send_to_remote_receivers(
1687 &mut self,
1688 dataflow_id: Uuid,
1689 output_id: &OutputId,
1690 event: InterDaemonEvent,
1691 ) -> Result<(), eyre::Error> {
1692 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1693 format!("send out failed: no running dataflow with ID `{dataflow_id}`")
1694 })?;
1695
1696 let publisher = match dataflow.publishers.get(output_id) {
1698 Some(publisher) => publisher,
1699 None => {
1700 let publish_topic = dataflow.output_publish_topic(output_id);
1701 tracing::debug!("declaring publisher on {publish_topic}");
1702 let publisher = self
1703 .zenoh_session
1704 .declare_publisher(publish_topic)
1705 .await
1706 .map_err(|e| eyre!(e))
1707 .context("failed to create zenoh publisher")?;
1708 dataflow.publishers.insert(output_id.clone(), publisher);
1709 dataflow.publishers.get(output_id).unwrap()
1710 }
1711 };
1712
1713 let serialized_event = Timestamped {
1714 inner: event,
1715 timestamp: self.clock.new_timestamp(),
1716 }
1717 .serialize();
1718 publisher
1719 .put(serialized_event)
1720 .await
1721 .map_err(|e| eyre!(e))
1722 .context("zenoh put failed")?;
1723 Ok(())
1724 }
1725
1726 async fn send_output_closed_events(
1727 &mut self,
1728 dataflow_id: DataflowId,
1729 node_id: NodeId,
1730 outputs: Vec<DataId>,
1731 ) -> eyre::Result<()> {
1732 let dataflow = self
1733 .running
1734 .get_mut(&dataflow_id)
1735 .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
1736 let local_node_inputs: BTreeSet<_> = dataflow
1737 .mappings
1738 .iter()
1739 .filter(|(k, _)| k.0 == node_id && outputs.contains(&k.1))
1740 .flat_map(|(_, v)| v)
1741 .cloned()
1742 .collect();
1743 for (receiver_id, input_id) in &local_node_inputs {
1744 close_input(dataflow, receiver_id, input_id, &self.clock);
1745 }
1746
1747 let mut closed = Vec::new();
1748 for output_id in &dataflow.open_external_mappings {
1749 if output_id.0 == node_id && outputs.contains(&output_id.1) {
1750 closed.push(output_id.clone());
1751 }
1752 }
1753
1754 for output_id in closed {
1755 let event = InterDaemonEvent::OutputClosed {
1756 dataflow_id,
1757 node_id: output_id.0.clone(),
1758 output_id: output_id.1.clone(),
1759 };
1760 self.send_to_remote_receivers(dataflow_id, &output_id, event)
1761 .await?;
1762 }
1763
1764 Ok(())
1765 }
1766
1767 async fn subscribe(
1768 dataflow: &mut RunningDataflow,
1769 node_id: NodeId,
1770 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
1771 clock: &HLC,
1772 ) {
1773 let closed_inputs = dataflow
1775 .mappings
1776 .values()
1777 .flatten()
1778 .filter(|(node, _)| node == &node_id)
1779 .map(|(_, input)| input)
1780 .filter(|input| {
1781 dataflow
1782 .open_inputs
1783 .get(&node_id)
1784 .map(|open_inputs| !open_inputs.contains(*input))
1785 .unwrap_or(true)
1786 });
1787 for input_id in closed_inputs {
1788 let _ = send_with_timestamp(
1789 &event_sender,
1790 NodeEvent::InputClosed {
1791 id: input_id.clone(),
1792 },
1793 clock,
1794 );
1795 }
1796 if dataflow.open_inputs(&node_id).is_empty() {
1797 let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
1798 }
1799
1800 if dataflow.stop_sent {
1803 let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
1804 }
1805
1806 dataflow.subscribe_channels.insert(node_id, event_sender);
1807 }
1808
1809 #[tracing::instrument(skip(self), level = "trace")]
1810 async fn handle_outputs_done(
1811 &mut self,
1812 dataflow_id: DataflowId,
1813 node_id: &NodeId,
1814 ) -> eyre::Result<()> {
1815 let dataflow = self
1816 .running
1817 .get_mut(&dataflow_id)
1818 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1819
1820 let outputs = dataflow
1821 .mappings
1822 .keys()
1823 .filter(|m| &m.0 == node_id)
1824 .map(|m| &m.1)
1825 .cloned()
1826 .collect();
1827 self.send_output_closed_events(dataflow_id, node_id.clone(), outputs)
1828 .await?;
1829
1830 let dataflow = self
1831 .running
1832 .get_mut(&dataflow_id)
1833 .ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1834 dataflow.drop_channels.remove(node_id);
1835 Ok(())
1836 }
1837
1838 async fn handle_node_stop(
1839 &mut self,
1840 dataflow_id: Uuid,
1841 node_id: &NodeId,
1842 dynamic_node: bool,
1843 ) -> eyre::Result<()> {
1844 let result = self
1845 .handle_node_stop_inner(dataflow_id, node_id, dynamic_node)
1846 .await;
1847 let _ = self
1848 .events_tx
1849 .send(Timestamped {
1850 inner: Event::NodeStopped {
1851 dataflow_id,
1852 node_id: node_id.clone(),
1853 },
1854 timestamp: self.clock.new_timestamp(),
1855 })
1856 .await;
1857 result
1858 }
1859
1860 async fn handle_node_stop_inner(
1861 &mut self,
1862 dataflow_id: Uuid,
1863 node_id: &NodeId,
1864 dynamic_node: bool,
1865 ) -> eyre::Result<()> {
1866 let mut logger = self.logger.for_dataflow(dataflow_id);
1867 let dataflow = match self.running.get_mut(&dataflow_id) {
1868 Some(dataflow) => dataflow,
1869 None if dynamic_node => {
1870 tracing::debug!(
1873 "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
1874 );
1875 return Ok(());
1876 }
1877 None => eyre::bail!(
1878 "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
1879 ),
1880 };
1881
1882 dataflow
1883 .pending_nodes
1884 .handle_node_stop(
1885 node_id,
1886 &mut self.coordinator_connection,
1887 &self.clock,
1888 &mut dataflow.cascading_error_causes,
1889 &mut logger,
1890 )
1891 .await?;
1892
1893 self.handle_outputs_done(dataflow_id, node_id).await?;
1894
1895 let mut logger = self.logger.for_dataflow(dataflow_id);
1896 let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
1897 format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
1898 })?;
1899 if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
1900 pid.mark_as_stopped()
1901 }
1902 if !dataflow.pending_nodes.local_nodes_pending()
1903 && dataflow
1904 .running_nodes
1905 .iter()
1906 .all(|(_id, n)| n.node_config.dynamic)
1907 {
1908 let result = DataflowDaemonResult {
1909 timestamp: self.clock.new_timestamp(),
1910 node_results: self
1911 .dataflow_node_results
1912 .get(&dataflow.id)
1913 .context("failed to get dataflow node results")?
1914 .clone(),
1915 };
1916
1917 self.git_manager
1918 .clones_in_use
1919 .values_mut()
1920 .for_each(|dataflows| {
1921 dataflows.remove(&dataflow_id);
1922 });
1923
1924 logger
1925 .log(
1926 LogLevel::Info,
1927 None,
1928 Some("daemon".into()),
1929 format!("dataflow finished on machine `{}`", self.daemon_id),
1930 )
1931 .await;
1932 if let Some(connection) = &mut self.coordinator_connection {
1933 let msg = serde_json::to_vec(&Timestamped {
1934 inner: CoordinatorRequest::Event {
1935 daemon_id: self.daemon_id.clone(),
1936 event: DaemonEvent::AllNodesFinished {
1937 dataflow_id,
1938 result,
1939 },
1940 },
1941 timestamp: self.clock.new_timestamp(),
1942 })?;
1943 socket_stream_send(connection, &msg)
1944 .await
1945 .wrap_err("failed to report dataflow finish to dora-coordinator")?;
1946 }
1947 self.running.remove(&dataflow_id);
1948 }
1949
1950 Ok(())
1951 }
1952
1953 async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
1954 match event {
1955 DoraEvent::Timer {
1956 dataflow_id,
1957 interval,
1958 metadata,
1959 } => {
1960 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
1961 tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
1962 return Ok(());
1963 };
1964
1965 let Some(subscribers) = dataflow.timers.get(&interval) else {
1966 return Ok(());
1967 };
1968
1969 let mut closed = Vec::new();
1970 for (receiver_id, input_id) in subscribers {
1971 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
1972 continue;
1973 };
1974
1975 let send_result = send_with_timestamp(
1976 channel,
1977 NodeEvent::Input {
1978 id: input_id.clone(),
1979 metadata: metadata.clone(),
1980 data: None,
1981 },
1982 &self.clock,
1983 );
1984 match send_result {
1985 Ok(()) => {}
1986 Err(_) => {
1987 closed.push(receiver_id);
1988 }
1989 }
1990 }
1991 for id in closed {
1992 dataflow.subscribe_channels.remove(id);
1993 }
1994 }
1995 DoraEvent::Logs {
1996 dataflow_id,
1997 output_id,
1998 message,
1999 metadata,
2000 } => {
2001 let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
2002 tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
2003 return Ok(());
2004 };
2005
2006 let Some(subscribers) = dataflow.mappings.get(&output_id) else {
2007 tracing::warn!(
2008 "No subscribers found for {:?} in {:?}",
2009 output_id,
2010 dataflow.mappings
2011 );
2012 return Ok(());
2013 };
2014
2015 let mut closed = Vec::new();
2016 for (receiver_id, input_id) in subscribers {
2017 let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
2018 tracing::warn!("No subscriber channel found for {:?}", output_id);
2019 continue;
2020 };
2021
2022 let send_result = send_with_timestamp(
2023 channel,
2024 NodeEvent::Input {
2025 id: input_id.clone(),
2026 metadata: metadata.clone(),
2027 data: Some(message.clone()),
2028 },
2029 &self.clock,
2030 );
2031 match send_result {
2032 Ok(()) => {}
2033 Err(_) => {
2034 closed.push(receiver_id);
2035 }
2036 }
2037 }
2038 for id in closed {
2039 dataflow.subscribe_channels.remove(id);
2040 }
2041 }
2042 DoraEvent::SpawnedNodeResult {
2043 dataflow_id,
2044 node_id,
2045 dynamic_node,
2046 exit_status,
2047 } => {
2048 let mut logger = self
2049 .logger
2050 .for_dataflow(dataflow_id)
2051 .for_node(node_id.clone());
2052 logger
2053 .log(
2054 LogLevel::Debug,
2055 Some("daemon".into()),
2056 format!("handling node stop with exit status {exit_status:?}"),
2057 )
2058 .await;
2059
2060 let node_result = match exit_status {
2061 NodeExitStatus::Success => Ok(()),
2062 exit_status => {
2063 let dataflow = self.running.get(&dataflow_id);
2064 let caused_by_node = dataflow
2065 .and_then(|dataflow| {
2066 dataflow.cascading_error_causes.error_caused_by(&node_id)
2067 })
2068 .cloned();
2069 let grace_duration_kill = dataflow
2070 .map(|d| d.grace_duration_kills.contains(&node_id))
2071 .unwrap_or_default();
2072
2073 let cause = match caused_by_node {
2074 Some(caused_by_node) => {
2075 logger
2076 .log(
2077 LogLevel::Info,
2078 Some("daemon".into()),
2079 format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2080 )
2081 .await;
2082
2083 NodeErrorCause::Cascading { caused_by_node }
2084 }
2085 None if grace_duration_kill => NodeErrorCause::GraceDuration,
2086 None => {
2087 let cause = dataflow
2088 .and_then(|d| d.node_stderr_most_recent.get(&node_id))
2089 .map(|queue| {
2090 let mut s = if queue.is_full() {
2091 "[...]".into()
2092 } else {
2093 String::new()
2094 };
2095 while let Some(line) = queue.pop() {
2096 s += &line;
2097 }
2098 s
2099 })
2100 .unwrap_or_default();
2101
2102 NodeErrorCause::Other { stderr: cause }
2103 }
2104 };
2105 Err(NodeError {
2106 timestamp: self.clock.new_timestamp(),
2107 cause,
2108 exit_status,
2109 })
2110 }
2111 };
2112
2113 logger
2114 .log(
2115 if node_result.is_ok() {
2116 LogLevel::Info
2117 } else {
2118 LogLevel::Error
2119 },
2120 Some("daemon".into()),
2121 match &node_result {
2122 Ok(()) => format!("{node_id} finished successfully"),
2123 Err(err) => format!("{err}"),
2124 },
2125 )
2126 .await;
2127
2128 self.dataflow_node_results
2129 .entry(dataflow_id)
2130 .or_default()
2131 .insert(node_id.clone(), node_result);
2132
2133 self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2134 .await?;
2135 }
2136 }
2137 Ok(())
2138 }
2139
2140 fn base_working_dir(
2141 &self,
2142 local_working_dir: Option<PathBuf>,
2143 session_id: SessionId,
2144 ) -> eyre::Result<PathBuf> {
2145 match local_working_dir {
2146 Some(working_dir) => {
2147 if working_dir.exists() {
2149 Ok(working_dir)
2150 } else {
2151 bail!(
2152 "working directory does not exist: {}",
2153 working_dir.display(),
2154 )
2155 }
2156 }
2157 None => {
2158 let daemon_working_dir =
2160 current_dir().context("failed to get daemon working dir")?;
2161 Ok(daemon_working_dir
2162 .join("_work")
2163 .join(session_id.uuid().to_string()))
2164 }
2165 }
2166 }
2167}
2168
2169async fn set_up_event_stream(
2170 coordinator_addr: SocketAddr,
2171 machine_id: &Option<String>,
2172 clock: &Arc<HLC>,
2173 remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
2174 local_listen_port: u16,
2176) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
2177 let clock_cloned = clock.clone();
2178 let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
2179 Ok(e) => Timestamped {
2180 inner: Event::Daemon(e.inner),
2181 timestamp: e.timestamp,
2182 },
2183 Err(err) => Timestamped {
2184 inner: Event::DaemonError(err),
2185 timestamp: clock_cloned.new_timestamp(),
2186 },
2187 });
2188 let (daemon_id, coordinator_events) =
2189 coordinator::register(coordinator_addr, machine_id.clone(), clock)
2190 .await
2191 .wrap_err("failed to connect to dora-coordinator")?;
2192 let coordinator_events = coordinator_events.map(
2193 |Timestamped {
2194 inner: event,
2195 timestamp,
2196 }| Timestamped {
2197 inner: Event::Coordinator(event),
2198 timestamp,
2199 },
2200 );
2201 let (events_tx, events_rx) = flume::bounded(10);
2202 let _listen_port =
2203 local_listener::spawn_listener_loop((LOCALHOST, local_listen_port).into(), events_tx)
2204 .await?;
2205 let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
2206 inner: Event::DynamicNode(e.inner),
2207 timestamp: e.timestamp,
2208 });
2209 let incoming = (
2210 coordinator_events,
2211 remote_daemon_events,
2212 dynamic_node_events,
2213 )
2214 .merge();
2215 Ok((daemon_id, incoming))
2216}
2217
2218async fn send_output_to_local_receivers(
2219 node_id: NodeId,
2220 output_id: DataId,
2221 dataflow: &mut RunningDataflow,
2222 metadata: &metadata::Metadata,
2223 data: Option<DataMessage>,
2224 clock: &HLC,
2225) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
2226 let timestamp = metadata.timestamp();
2227 let empty_set = BTreeSet::new();
2228 let output_id = OutputId(node_id, output_id);
2229 let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
2230 let OutputId(node_id, _) = output_id;
2231 let mut closed = Vec::new();
2232 for (receiver_id, input_id) in local_receivers {
2233 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2234 let item = NodeEvent::Input {
2235 id: input_id.clone(),
2236 metadata: metadata.clone(),
2237 data: data.clone(),
2238 };
2239 match channel.send(Timestamped {
2240 inner: item,
2241 timestamp,
2242 }) {
2243 Ok(()) => {
2244 if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
2245 dataflow
2246 .pending_drop_tokens
2247 .entry(token)
2248 .or_insert_with(|| DropTokenInformation {
2249 owner: node_id.clone(),
2250 pending_nodes: Default::default(),
2251 })
2252 .pending_nodes
2253 .insert(receiver_id.clone());
2254 }
2255 }
2256 Err(_) => {
2257 closed.push(receiver_id);
2258 }
2259 }
2260 }
2261 }
2262 for id in closed {
2263 dataflow.subscribe_channels.remove(id);
2264 }
2265 let (data_bytes, drop_token) = match data {
2266 None => (None, None),
2267 Some(DataMessage::SharedMemory {
2268 shared_memory_id,
2269 len,
2270 drop_token,
2271 }) => {
2272 let memory = ShmemConf::new()
2273 .os_id(shared_memory_id)
2274 .open()
2275 .wrap_err("failed to map shared memory output")?;
2276 let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len]));
2277 (data, Some(drop_token))
2278 }
2279 Some(DataMessage::Vec(v)) => (Some(v), None),
2280 };
2281 if let Some(token) = drop_token {
2282 dataflow
2284 .pending_drop_tokens
2285 .entry(token)
2286 .or_insert_with(|| DropTokenInformation {
2287 owner: node_id.clone(),
2288 pending_nodes: Default::default(),
2289 });
2290 dataflow.check_drop_token(token, clock).await?;
2292 }
2293 Ok(data_bytes)
2294}
2295
2296fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
2297 match &node.kind {
2298 CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
2299 CoreNodeKind::Runtime(n) => runtime_node_inputs(n),
2300 }
2301}
2302
2303fn close_input(
2304 dataflow: &mut RunningDataflow,
2305 receiver_id: &NodeId,
2306 input_id: &DataId,
2307 clock: &HLC,
2308) {
2309 if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
2310 if !open_inputs.remove(input_id) {
2311 return;
2312 }
2313 }
2314 if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
2315 let _ = send_with_timestamp(
2316 channel,
2317 NodeEvent::InputClosed {
2318 id: input_id.clone(),
2319 },
2320 clock,
2321 );
2322
2323 if dataflow.open_inputs(receiver_id).is_empty() {
2324 let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
2325 }
2326 }
2327}
2328
2329#[derive(Debug)]
2330pub struct RunningNode {
2331 pid: Option<ProcessId>,
2332 node_config: NodeConfig,
2333}
2334
2335#[derive(Debug)]
2336struct ProcessId(Option<u32>);
2337
2338impl ProcessId {
2339 pub fn new(process_id: u32) -> Self {
2340 Self(Some(process_id))
2341 }
2342
2343 pub fn mark_as_stopped(&mut self) {
2344 self.0 = None;
2345 }
2346
2347 pub fn kill(&mut self) -> bool {
2348 if let Some(pid) = self.0 {
2349 let pid = Pid::from(pid as usize);
2350 let mut system = sysinfo::System::new();
2351 system.refresh_process(pid);
2352
2353 if let Some(process) = system.process(pid) {
2354 process.kill();
2355 self.mark_as_stopped();
2356 return true;
2357 }
2358 }
2359
2360 false
2361 }
2362}
2363
2364impl Drop for ProcessId {
2365 fn drop(&mut self) {
2366 if let Some(pid) = self.0 {
2368 if self.kill() {
2369 warn!("process {pid} was killed on drop because it was still running")
2370 }
2371 }
2372 }
2373}
2374
2375pub struct RunningDataflow {
2376 id: Uuid,
2377 pending_nodes: PendingNodes,
2379
2380 subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
2381 drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
2382 mappings: HashMap<OutputId, BTreeSet<InputId>>,
2383 timers: BTreeMap<Duration, BTreeSet<InputId>>,
2384 open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
2385 running_nodes: BTreeMap<NodeId, RunningNode>,
2386
2387 dynamic_nodes: BTreeSet<NodeId>,
2392
2393 open_external_mappings: BTreeSet<OutputId>,
2394
2395 pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
2396
2397 _timer_handles: Vec<futures::future::RemoteHandle<()>>,
2399 stop_sent: bool,
2400
2401 empty_set: BTreeSet<DataId>,
2405
2406 cascading_error_causes: CascadingErrorCauses,
2408 grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,
2409
2410 node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
2411
2412 publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,
2413
2414 finished_tx: broadcast::Sender<()>,
2415
2416 publish_all_messages_to_zenoh: bool,
2417}
2418
2419impl RunningDataflow {
2420 fn new(
2421 dataflow_id: Uuid,
2422 daemon_id: DaemonId,
2423 dataflow_descriptor: &Descriptor,
2424 ) -> RunningDataflow {
2425 let (finished_tx, _) = broadcast::channel(1);
2426 Self {
2427 id: dataflow_id,
2428 pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
2429 subscribe_channels: HashMap::new(),
2430 drop_channels: HashMap::new(),
2431 mappings: HashMap::new(),
2432 timers: BTreeMap::new(),
2433 open_inputs: BTreeMap::new(),
2434 running_nodes: BTreeMap::new(),
2435 dynamic_nodes: BTreeSet::new(),
2436 open_external_mappings: Default::default(),
2437 pending_drop_tokens: HashMap::new(),
2438 _timer_handles: Vec::new(),
2439 stop_sent: false,
2440 empty_set: BTreeSet::new(),
2441 cascading_error_causes: Default::default(),
2442 grace_duration_kills: Default::default(),
2443 node_stderr_most_recent: BTreeMap::new(),
2444 publishers: Default::default(),
2445 finished_tx,
2446 publish_all_messages_to_zenoh: dataflow_descriptor.debug.publish_all_messages_to_zenoh,
2447 }
2448 }
2449
2450 async fn start(
2451 &mut self,
2452 events_tx: &mpsc::Sender<Timestamped<Event>>,
2453 clock: &Arc<HLC>,
2454 ) -> eyre::Result<()> {
2455 for interval in self.timers.keys().copied() {
2456 let events_tx = events_tx.clone();
2457 let dataflow_id = self.id;
2458 let clock = clock.clone();
2459 let task = async move {
2460 let mut interval_stream = tokio::time::interval(interval);
2461 let hlc = HLC::default();
2462 loop {
2463 interval_stream.tick().await;
2464
2465 let span = tracing::span!(tracing::Level::TRACE, "tick");
2466 let _ = span.enter();
2467
2468 let mut parameters = BTreeMap::new();
2469 parameters.insert(
2470 "open_telemetry_context".to_string(),
2471 #[cfg(feature = "telemetry")]
2472 Parameter::String(serialize_context(&span.context())),
2473 #[cfg(not(feature = "telemetry"))]
2474 Parameter::String("".into()),
2475 );
2476
2477 let metadata = metadata::Metadata::from_parameters(
2478 hlc.new_timestamp(),
2479 empty_type_info(),
2480 parameters,
2481 );
2482
2483 let event = Timestamped {
2484 inner: DoraEvent::Timer {
2485 dataflow_id,
2486 interval,
2487 metadata,
2488 }
2489 .into(),
2490 timestamp: clock.new_timestamp(),
2491 };
2492 if events_tx.send(event).await.is_err() {
2493 break;
2494 }
2495 }
2496 };
2497 let (task, handle) = task.remote_handle();
2498 tokio::spawn(task);
2499 self._timer_handles.push(handle);
2500 }
2501
2502 Ok(())
2503 }
2504
2505 async fn stop_all(
2506 &mut self,
2507 coordinator_connection: &mut Option<TcpStream>,
2508 clock: &HLC,
2509 grace_duration: Option<Duration>,
2510 logger: &mut DataflowLogger<'_>,
2511 ) -> eyre::Result<()> {
2512 self.pending_nodes
2513 .handle_dataflow_stop(
2514 coordinator_connection,
2515 clock,
2516 &mut self.cascading_error_causes,
2517 &self.dynamic_nodes,
2518 logger,
2519 )
2520 .await?;
2521
2522 for (_node_id, channel) in self.subscribe_channels.drain() {
2523 let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
2524 }
2525
2526 let running_processes: Vec<_> = self
2527 .running_nodes
2528 .iter_mut()
2529 .map(|(id, n)| (id.clone(), n.pid.take()))
2530 .collect();
2531 let grace_duration_kills = self.grace_duration_kills.clone();
2532 tokio::spawn(async move {
2533 let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
2534 tokio::time::sleep(duration).await;
2535
2536 for (node, pid) in running_processes {
2537 if let Some(mut pid) = pid {
2538 if pid.kill() {
2539 grace_duration_kills.insert(node.clone());
2540 warn!(
2541 "{node} was killed due to not stopping within the {:#?} grace period",
2542 duration
2543 )
2544 }
2545 }
2546 }
2547 });
2548 self.stop_sent = true;
2549 Ok(())
2550 }
2551
2552 fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
2553 self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
2554 }
2555
2556 async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
2557 match self.pending_drop_tokens.entry(token) {
2558 std::collections::hash_map::Entry::Occupied(entry) => {
2559 if entry.get().pending_nodes.is_empty() {
2560 let (drop_token, info) = entry.remove_entry();
2561 let result = match self.drop_channels.get_mut(&info.owner) {
2562 Some(channel) => send_with_timestamp(
2563 channel,
2564 NodeDropEvent::OutputDropped { drop_token },
2565 clock,
2566 )
2567 .wrap_err("send failed"),
2568 None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
2569 };
2570 if let Err(err) = result.wrap_err_with(|| {
2571 format!(
2572 "failed to report drop token `{drop_token:?}` to owner `{}`",
2573 &info.owner
2574 )
2575 }) {
2576 tracing::warn!("{err:?}");
2577 }
2578 }
2579 }
2580 std::collections::hash_map::Entry::Vacant(_) => {
2581 tracing::warn!("check_drop_token called with already closed token")
2582 }
2583 }
2584
2585 Ok(())
2586 }
2587
2588 fn output_publish_topic(&self, output_id: &OutputId) -> String {
2589 let network_id = "default";
2590 let dataflow_id = self.id;
2591 let OutputId(node_id, output_id) = output_id;
2592 format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
2593 }
2594}
2595
2596fn empty_type_info() -> ArrowTypeInfo {
2597 ArrowTypeInfo {
2598 data_type: DataType::Null,
2599 len: 0,
2600 null_count: 0,
2601 validity: None,
2602 offset: 0,
2603 buffer_offsets: Vec::new(),
2604 child_data: Vec::new(),
2605 }
2606}
2607
2608#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2609pub struct OutputId(NodeId, DataId);
2610type InputId = (NodeId, DataId);
2611
2612struct DropTokenInformation {
2613 owner: NodeId,
2615 pending_nodes: BTreeSet<NodeId>,
2618}
2619
2620#[derive(Debug)]
2621pub enum Event {
2622 Node {
2623 dataflow_id: DataflowId,
2624 node_id: NodeId,
2625 event: DaemonNodeEvent,
2626 },
2627 Coordinator(CoordinatorEvent),
2628 Daemon(InterDaemonEvent),
2629 Dora(DoraEvent),
2630 DynamicNode(DynamicNodeEventWrapper),
2631 HeartbeatInterval,
2632 CtrlC,
2633 SecondCtrlC,
2634 DaemonError(eyre::Report),
2635 SpawnNodeResult {
2636 dataflow_id: DataflowId,
2637 node_id: NodeId,
2638 dynamic_node: bool,
2639 result: Result<RunningNode, NodeError>,
2640 },
2641 BuildDataflowResult {
2642 build_id: BuildId,
2643 session_id: SessionId,
2644 result: eyre::Result<BuildInfo>,
2645 },
2646 SpawnDataflowResult {
2647 dataflow_id: Uuid,
2648 result: eyre::Result<()>,
2649 },
2650 NodeStopped {
2651 dataflow_id: Uuid,
2652 node_id: NodeId,
2653 },
2654}
2655
2656impl From<DoraEvent> for Event {
2657 fn from(event: DoraEvent) -> Self {
2658 Event::Dora(event)
2659 }
2660}
2661
2662impl Event {
2663 pub fn kind(&self) -> &'static str {
2664 match self {
2665 Event::Node { .. } => "Node",
2666 Event::Coordinator(_) => "Coordinator",
2667 Event::Daemon(_) => "Daemon",
2668 Event::Dora(_) => "Dora",
2669 Event::DynamicNode(_) => "DynamicNode",
2670 Event::HeartbeatInterval => "HeartbeatInterval",
2671 Event::CtrlC => "CtrlC",
2672 Event::SecondCtrlC => "SecondCtrlC",
2673 Event::DaemonError(_) => "DaemonError",
2674 Event::SpawnNodeResult { .. } => "SpawnNodeResult",
2675 Event::BuildDataflowResult { .. } => "BuildDataflowResult",
2676 Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
2677 Event::NodeStopped { .. } => "NodeStopped",
2678 }
2679 }
2680}
2681
2682#[derive(Debug)]
2683pub enum DaemonNodeEvent {
2684 OutputsDone {
2685 reply_sender: oneshot::Sender<DaemonReply>,
2686 },
2687 Subscribe {
2688 event_sender: UnboundedSender<Timestamped<NodeEvent>>,
2689 reply_sender: oneshot::Sender<DaemonReply>,
2690 },
2691 SubscribeDrop {
2692 event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
2693 reply_sender: oneshot::Sender<DaemonReply>,
2694 },
2695 CloseOutputs {
2696 outputs: Vec<dora_core::config::DataId>,
2697 reply_sender: oneshot::Sender<DaemonReply>,
2698 },
2699 SendOut {
2700 output_id: DataId,
2701 metadata: metadata::Metadata,
2702 data: Option<DataMessage>,
2703 },
2704 ReportDrop {
2705 tokens: Vec<DropToken>,
2706 },
2707 EventStreamDropped {
2708 reply_sender: oneshot::Sender<DaemonReply>,
2709 },
2710}
2711
2712#[derive(Debug)]
2713pub enum DoraEvent {
2714 Timer {
2715 dataflow_id: DataflowId,
2716 interval: Duration,
2717 metadata: metadata::Metadata,
2718 },
2719 Logs {
2720 dataflow_id: DataflowId,
2721 output_id: OutputId,
2722 message: DataMessage,
2723 metadata: metadata::Metadata,
2724 },
2725 SpawnedNodeResult {
2726 dataflow_id: DataflowId,
2727 node_id: NodeId,
2728 dynamic_node: bool,
2729 exit_status: NodeExitStatus,
2730 },
2731}
2732
2733#[must_use]
2734enum RunStatus {
2735 Continue,
2736 Exit,
2737}
2738
2739fn send_with_timestamp<T>(
2740 sender: &UnboundedSender<Timestamped<T>>,
2741 event: T,
2742 clock: &HLC,
2743) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
2744 sender.send(Timestamped {
2745 inner: event,
2746 timestamp: clock.new_timestamp(),
2747 })
2748}
2749
2750fn set_up_ctrlc_handler(
2751 clock: Arc<HLC>,
2752) -> eyre::Result<tokio::sync::mpsc::Receiver<Timestamped<Event>>> {
2753 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
2754
2755 let mut ctrlc_sent = 0;
2756 ctrlc::set_handler(move || {
2757 let event = match ctrlc_sent {
2758 0 => Event::CtrlC,
2759 1 => Event::SecondCtrlC,
2760 _ => {
2761 tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
2762 std::process::abort();
2763 }
2764 };
2765 if ctrlc_tx
2766 .blocking_send(Timestamped {
2767 inner: event,
2768 timestamp: clock.new_timestamp(),
2769 })
2770 .is_err()
2771 {
2772 tracing::error!("failed to report ctrl-c event to dora-coordinator");
2773 }
2774
2775 ctrlc_sent += 1;
2776 })
2777 .wrap_err("failed to set ctrl-c handler")?;
2778
2779 Ok(ctrlc_rx)
2780}
2781
2782#[derive(Debug, Default, Clone, PartialEq, Eq)]
2783pub struct CascadingErrorCauses {
2784 caused_by: BTreeMap<NodeId, NodeId>,
2785}
2786
2787impl CascadingErrorCauses {
2788 pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
2789 self.caused_by.contains_key(node)
2790 }
2791
2792 pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
2794 self.caused_by.get(node)
2795 }
2796
2797 pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
2798 self.caused_by.entry(affected_node).or_insert(causing_node);
2799 }
2800}
2801
2802fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
2803 n.operators
2804 .iter()
2805 .flat_map(|operator| {
2806 operator.config.inputs.iter().map(|(input_id, mapping)| {
2807 (
2808 DataId::from(format!("{}/{input_id}", operator.id)),
2809 mapping.clone(),
2810 )
2811 })
2812 })
2813 .collect()
2814}
2815
2816fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
2817 n.operators
2818 .iter()
2819 .flat_map(|operator| {
2820 operator
2821 .config
2822 .outputs
2823 .iter()
2824 .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
2825 })
2826 .collect()
2827}
2828
2829trait CoreNodeKindExt {
2830 fn run_config(&self) -> NodeRunConfig;
2831 fn dynamic(&self) -> bool;
2832}
2833
2834impl CoreNodeKindExt for CoreNodeKind {
2835 fn run_config(&self) -> NodeRunConfig {
2836 match self {
2837 CoreNodeKind::Runtime(n) => NodeRunConfig {
2838 inputs: runtime_node_inputs(n),
2839 outputs: runtime_node_outputs(n),
2840 },
2841 CoreNodeKind::Custom(n) => n.run_config.clone(),
2842 }
2843 }
2844
2845 fn dynamic(&self) -> bool {
2846 match self {
2847 CoreNodeKind::Runtime(_n) => false,
2848 CoreNodeKind::Custom(n) => {
2849 matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
2850 }
2851 }
2852 }
2853}